This article explains the architectural redesign and modularization of an automated systematic trading system built on Python’s asyncio framework. The system is designed to interface with the Kiwoom Open API (a hybrid REST and WebSocket client) to execute systematic trading strategies. Specifically, it implements a trend-following methodology that combines ATR (Average True Range) volatility-based position sizing, entries based on Mark Minervini’s Trend Template, dynamic pyramiding, and trailing stops.
1. Architectural Overview and Refactoring Objectives
In the conventional monolithic engine, order execution, state tracking, risk management, and logging were tightly coupled, posing challenges for maintainability and scalability. In this redesign, modularization was implemented to achieve the following four objectives:
💡 Decoupling of Concerns: Transition each component to a loosely coupled, event-driven architecture.
🛠️ Robust State Persistence: Introduce a two-tier state recovery mechanism combining local CSV files (positions.csv, trades.csv, capital_log.csv) with real-time, server-side balance synchronization.
🔄 Asynchronous Event Loop Integration: Execute order placement, real-time WebSocket quote processing, and dynamic trailing stop calculations in a non-blocking manner.
⚠️ Systematic Risk Management: Apply a strict risk budget rule with a maximum risk tolerance of 1% of total capital, dynamically determining position sizes based on ATR.
2. Directory Structure and Module Configuration
The project has been restructured from a single-file configuration into a package structure where each component has an independent role. This ensures that changes to API specifications or logging formats do not affect other modules.
project/
│
├── main.py
├── config.py
│
├── position/
│ ├── __init__.py
│ └── position_manager.py
│
├── logs/
│ ├── __init__.py
│ ├── logger.py
│ ├── positions.csv
│ ├── trades.csv
│ └── capital_log.csv
│
└── trading/
├── __init__.py
├── order.py
└── trailing_stop.py
3. Design and Implementation Code of Each Module
3.1. Position Management (position/position_manager.py)
The PositionManager is responsible for managing the system’s active positions and total capital, and dynamically calculating risk parameters.
import os
import csv
import logging
class PositionManager:
def __init__(self, initial_capital=10000000, risk_ratio=0.01):
self.total_capital = initial_capital
self.risk_ratio = risk_ratio
self.active_positions = {}
self.csv_path = "logs/positions.csv"
self.load_positions()
def load_positions(self):
if os.path.exists(self.csv_path):
try:
with open(self.csv_path, mode='r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
symbol = row['symbol']
self.active_positions[symbol] = {
'entry_price': float(row['entry_price']),
'highest_price': float(row['highest_price']),
'stop_loss': float(row['stop_loss']),
'unit_count': int(row['unit_count']),
'quantity': int(row['quantity'])
}
except Exception as e:
logging.error(f"Failed to load positions from CSV: {e}")
def save_positions(self):
os.makedirs(os.path.dirname(self.csv_path), exist_ok=True)
try:
with open(self.csv_path, mode='w', newline='', encoding='utf-8') as f:
fieldnames = ['symbol', 'entry_price', 'highest_price', 'stop_loss', 'unit_count', 'quantity']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for symbol, pos in self.active_positions.items():
writer.writerow({
'symbol': symbol,
'entry_price': pos['entry_price'],
'highest_price': pos['highest_price'],
'stop_loss': pos['stop_loss'],
'unit_count': pos['unit_count'],
'quantity': pos['quantity']
})
except Exception as e:
logging.error(f"Failed to save positions to CSV: {e}")
def calculate_position_size(self, atr, entry_price):
risk_budget = self.total_capital * self.risk_ratio
stop_loss_range = 2 * atr
if stop_loss_range <= 0:
return 0
quantity = int(risk_budget / stop_loss_range)
return quantity
3.2. Trade Logger (logs/logger.py)
The TradeLogger appends trade history to trades.csv and records equity curve data in capital_log.csv. It also provides history analysis methods to prevent duplicate entries on the same day.
import os
import csv
from datetime import datetime
class TradeLogger:
def __init__(self):
self.trades_csv = "logs/trades.csv"
self.capital_csv = "logs/capital_log.csv"
os.makedirs("logs", exist_ok=True)
def log_trade(self, symbol, action, price, quantity, pnl=0.0):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
file_exists = os.path.exists(self.trades_csv)
with open(self.trades_csv, mode='a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(['timestamp', 'symbol', 'action', 'price', 'quantity', 'pnl'])
writer.writerow([timestamp, symbol, action, price, quantity, pnl])
def log_capital(self, total_capital):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
file_exists = os.path.exists(self.capital_csv)
with open(self.capital_csv, mode='a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(['timestamp', 'total_capital'])
writer.writerow([timestamp, total_capital])
def has_traded_today(self, symbol):
if not os.path.exists(self.trades_csv):
return False
today_str = datetime.now().strftime("%Y-%m-%d")
with open(self.trades_csv, mode='r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
if row['symbol'] == symbol and row['timestamp'].startswith(today_str):
return True
return False
3.3. Order Execution Engine (trading/order.py)
The OrderExecutor acts as an intermediary between strategy decisions and the API client, asynchronously handling order submission, execution confirmation, and reflecting state changes in the position manager and logger.
import asyncio
import logging
class OrderExecutor:
def __init__(self, api_client, position_manager, logger):
self.api_client = api_client
self.position_manager = position_manager
self.logger = logger
async def execute_order(self, symbol, action, quantity, price=0):
try:
logging.info(f"Executing order: {action} {symbol} Qty: {quantity}")
response = await self.api_client.send_order(symbol, action, quantity, price)
if response.get('status') == 'success':
execution_price = response.get('price', price)
await self._handle_execution_success(symbol, action, quantity, execution_price)
return True
except Exception as e:
logging.error(f"Order execution failed for {symbol}: {e}")
return False
async def _handle_execution_success(self, symbol, action, quantity, price):
if action == "BUY":
if symbol not in self.position_manager.active_positions:
self.position_manager.active_positions[symbol] = {
'entry_price': price,
'highest_price': price,
'stop_loss': price - (2 * 100),
'unit_count': 1,
'quantity': quantity
}
else:
pos = self.position_manager.active_positions[symbol]
pos['quantity'] += quantity
pos['unit_count'] += 1
self.logger.log_trade(symbol, "BUY", price, quantity)
elif action == "SELL":
if symbol in self.position_manager.active_positions:
pnl = (price - self.position_manager.active_positions[symbol]['entry_price']) * quantity
self.position_manager.total_capital += pnl
del self.position_manager.active_positions[symbol]
self.logger.log_trade(symbol, "SELL", price, quantity, pnl)
self.logger.log_capital(self.position_manager.total_capital)
self.position_manager.save_positions()
3.4. Trailing Stop Management (trading/trailing_stop.py)
The TrailingStopManager receives real-time price update events and provides a pipeline to sequentially execute stop-loss determination, evaluate pyramiding conditions, and raise trailing stops.
import asyncio
import logging
class TrailingStopManager:
def __init__(self, position_manager, order_executor, atr_provider):
self.position_manager = position_manager
self.order_executor = order_executor
self.atr_provider = atr_provider
async def on_price_update(self, symbol, current_price):
pos = self.position_manager.active_positions.get(symbol)
if not pos:
return
atr = self.atr_provider.get_atr(symbol)
if current_price > pos['highest_price']:
pos['highest_price'] = current_price
new_stop_loss = current_price - (2 * atr)
if new_stop_loss > pos['stop_loss']:
pos['stop_loss'] = new_stop_loss
logging.info(f"Trailing stop updated for {symbol} to {new_stop_loss}")
self.position_manager.save_positions()
if current_price <= pos['stop_loss']:
logging.warning(f"Stop loss triggered for {symbol} at {current_price}")
await self.order_executor.execute_order(symbol, "SELL", pos['quantity'], current_price)
return
if pos['unit_count'] < 4:
next_trigger = pos['entry_price'] + (pos['unit_count'] * 0.5 * atr)
if current_price >= next_trigger:
logging.info(f"Pyramidding triggered for {symbol} at {current_price}")
add_quantity = self.position_manager.calculate_position_size(atr, current_price)
if add_quantity > 0:
await self.order_executor.execute_order(symbol, "BUY", add_quantity, current_price)
4. State Recovery and Server Synchronization Lifecycle
The synchronization flow between the local cache and the brokerage server during system startup and shutdown is as follows:
[System Startup]
│
▼
[Load Local Cache] ──► Read positions.csv & trades.csv
│
▼
[Server Sync] ───────► Request Balance (kt00018) via REST API
│
├─► Match active positions with server holdings
│ ├─ If match: Keep local state & update current prices
│ └─ If mismatch: Log warning & trigger reconciliation
│
▼
[Initialize WebSocket] ──► Subscribe to Real-time Quotes (REAL, 0C)
│
▼
[Event Loop Active] ──► Non-blocking Trailing Stop & Pyramidding
Technical Analysis of Asset Valuation Discrepancies
During initial synchronization, discrepancies may arise between the logged total capital (total_capital) and the total valuation of held positions.
⚠️ Cause: The balance inquiry (kt00018) retrieved from the brokerage server does not include historical realized profit and loss. Therefore, if the local total_capital is restored at its initial setting value (e.g., 10,000,000 KRW), unrealized gains may cause the current valuation of held assets to exceed the initial capital.
💡 Mitigation: This discrepancy is temporary during the startup initialization phase. Once the system is running and new trades or liquidations are executed, the PositionManager and TradeLogger dynamically reflect realized profits and losses, synchronizing total_capital with the actual account net asset value.
5. Verification Protocol for Production Deployment
Verification of the refactored system’s operation is conducted according to the following steps:
Real-Time Feed Connectivity Verification: Confirm via logs that after establishing the WebSocket connection,
trnm: REAL,type: 0C(real-time quotes) are received in a non-blocking manner and dispatched toTrailingStopManager.on_price_updatewithout delay.Trailing Stop Tracking Test: Verify that as the price of held symbols rises,
highest_priceandstop_lossinpositions.csvare dynamically updated.Pyramiding Trigger Verification: Verify that when the price reaches $+0.5 \times \text{ATR}$ from the entry price, additional orders are successfully placed and
unit_countis incremented.Forced Liquidation Operation Verification: Verify that when a tick falling below the stop-loss price is received, a market liquidation order is sent immediately, and the corresponding symbol is removed from the local
active_positions.
Lessons Learned
💡 I/O Separation in Asynchronous Event Loops: In real-time WebSocket processing, synchronous writing to CSV (blocking I/O) can become a bottleneck. In high-frequency trading environments, it is necessary to consider designs that use asynchronous libraries such as aiofiles or offload write operations to a separate thread (run_in_executor).
⚠️ Ensuring State Consistency: Discrepancies between the local CSV cache and the brokerage server’s state can cause erroneous orders. Incorporating a background task into the event loop to perform position reconciliation at regular intervals, rather than just at startup, improves operational stability.