試験録画で Mac がフリーズし、ログも残らず、Parquet 生成はエラー祭り──。
「とりあえず動くスクリプト」を Bybit の 24 時間連続レコーダーへ昇華させるまでには、小さなつまずきの連続です。
本記事では、WebSocket ストリームの非同期設計、ファイルローテーション、メモリ圧迫の真犯人探し、そして Makefile での一発自動化──という一連の格闘を、実際に詰まったログと修正版スクリプト付きで振り返ります。
「過学習しがちなバックテスター」「固まる Mac に怯えるトレーダー」「24h 録画を安定運用したいエンジニア」──そんな自分(たち)への備忘録としても読んでいただければ幸いです。
“実戦配備” までの学びを、章立てで追っていきましょう。
1. WebSocket 録画は何が重い?──Mac が固まった原因分析
1-1. “固まった” と感じた瞬間に起こっていたこと
観測項目 | 症状 | 補足 |
---|---|---|
メモリ (RSS) | 50 MB → 3 GB へ急膨張 | ps で確認。swap は 0 だったため純粋な RAM 圧迫。 |
CPU | 単一プロセスが瞬間的に 400 % | 4 コアを飽和。top -o cpu で検知。 |
ディスク I/O | 書き込みレイテンシ > 300 ms | fs_usage で write 呼び出しが詰まりまくり。 |
ログ | 出力されず | StreamHandler のみ → 端末が固まれば見えない。 |
結論
- CPU スパイク … 1 秒間に数千メッセージを直列処理(JSON →
write()
)- I/O バックプレッシャ … gzip 圧縮をメインスレッドで実行
- メモリ肥大 … Writer キューが無制限で溜まり、GC が間に合わない
1-2. ボトルネックをあぶり出す 3 コマンド
# ① CPU / メモリを 1 秒間隔で観測(終了は Ctrl-C) while true; do ps -o pid,%mem,%cpu,command -p $PID; sleep 1; done # ② ディスク書き込みのシステムコールを可視化(macOS) sudo fs_usage -w -f filesys $PID # ③ キュー長をログに吐く(サブスレッド内) logger.info("queue=%d", self._queue.qsize())
1-3. “根本治療” につながった 3 つの設計変更
Before | After | 効果 |
---|---|---|
メインスレッドで write() | asyncio.Queue + 専用 Writer Task | CPU/I-O を分離しスパイク緩和 |
gzip を その場で実行 | Writer 側の終了時に実行(キューが空になってから) | I/O が他処理をブロックしなくなった |
キュー 無制限 | maxsize=10000 +ドロップ警告 | メモリが天井知らず → 数十 MB で安定 |
改修ポイント抜粋
# token-bucket で 1 秒あたりの処理件数を平滑化 if MSG_CAP_PER_SEC: now = time.monotonic() self._tokens += (now - self._last_refill) * MSG_CAP_PER_SEC self._tokens = min(self._tokens, MSG_CAP_PER_SEC) self._last_refill = now if self._tokens < 1: await asyncio.sleep(0) continue self._tokens -= 1 # キューに入れるだけ(Writer が非同期 flush) try: self._queue.put_nowait(msg) except asyncio.QueueFull: logger.warning("⚠️ writer queue full – dropping message")
1-4. “動かしてみた” ベンチマーク
テスト条件 | 旧版 | 改修後 |
---|---|---|
50 Mbps のローカル WS リプレイ | CPU 380 % / RSS 1.9 GB | CPU 45 % / RSS 120 MB |
本番 WS(Bybit、50 depth + trade) | Mac 操作不能(数分) | 連続 24 h 録画可、マシン常用可 |
1-5. ここまでのチェックリスト
- CPU スパイク対策:トークンバケット
- I/O 分離:Writer Task + 非同期キュー
- メモリ制御:
maxsize
& ドロップ検知 - gzip 圧縮はキュー drain 後に実行
- ログは
FileHandler
で永続化
次章では、この安定化した Recorder を Makefile ひとコマンドで回す 自動化パイプラインを組み立てていきます。
2. 改修 Recorder のコード全公開──キュー/トークンバケット/ローテートの実装
目的
“CPU・I/O スパイクで Mac が固まる” を根絶するため、ボトルネック 3 点 をコードで直接つぶす。
- ① メインスレッド I/O → Writer Task
- ② 無制限メッセージ処理 → Token-Bucket
- ③ run 中 gzip → ローテート後 gzip
2-1. 全体構造(鳥瞰図)
flowchart TD Start["Recorder.run\n(stop by timer)"] Recv["_recv_loop"] Stats["_stats_loop\n(log CPU/RSS every 30 s)"] Q["asyncio.Queue (max 10k)\n(drop -> WARNING)"] Writer["_writer_loop\n(async write & rotate)"] Start --> Recv Start --> Stats Recv -- "put(msg)" --> Q Q --> Writer
2-2. 非同期 Writer Task ―― CPU と I/O を分離
self._queue: asyncio.Queue[str] = asyncio.Queue(maxsize=QUEUE_MAX) self._writer_task = asyncio.create_task(self._writer_loop()) async def _writer_loop(self) -> None: """ Queue を drain しながらファイルへ書き込み。 Sentinel を受け取ったら安全に終了。 """ try: while True: line = await self._queue.get() if line is _SENTINEL: self._queue.task_done() break self._raw_fh.write(line + "\n") # ← ファイル I/O はここだけ self._queue.task_done() finally: if self._raw_fh: self._raw_fh.flush()
- ポイント
- Producer (
_recv_loop
) と Consumer (_writer_loop
) を分離。 - キュー枯渇/飽和をロギングして可観測化。
_queue.task_done()
を忘れずに。
- Producer (
2-3. Token-Bucket ―― 1 秒あたりの処理量を平滑化
MSG_CAP_PER_SEC = int(os.getenv("MSG_CAP_PER_SEC", "2500")) self._tokens = float(MSG_CAP_PER_SEC) or float("inf") self._last_refill = time.monotonic() async def _recv_loop(self): async for msg in self.ws: # --- refill --- now = time.monotonic() self._tokens = min(self._tokens + (now - self._last_refill) * MSG_CAP_PER_SEC, MSG_CAP_PER_SEC) self._last_refill = now # --- guard --- if self._tokens < 1: await asyncio.sleep(0) # cooperative yield continue self._tokens -= 1 # --- enqueue --- try: self._queue.put_nowait(msg) except asyncio.QueueFull: logger.warning("⚠️ writer queue full – dropping message")
- 結果
- Bybit 本番ストリーム(depth+trade)で CPU 400 % → 45 % に低減。
- 背景:急に来る delta ラッシュでもキューが吸収、JSON パースが暴走しない。
2-4. 安全ローテート & 後段 gzip
async def _rotate(self): logger.info("🔄 rotating file (interval %ss)", self.rotate_sec) await self._close_writer() # キュー flush & ファイル close await self._compress_last() # gzip はここで実行 await self._open_writer() # 次ファイルを開く
async def _compress_last(self): if not (self._raw_path and self._compressed_path): return try: with open(self._raw_path, "rb") as fin, \ gzip.open(self._compressed_path, "wb", compresslevel=1) as fout: shutil.copyfileobj(fin, fout) except Exception as e: logger.error("gzip failed: %s", e) with suppress(FileNotFoundError): os.remove(self._compressed_path) else: os.remove(self._raw_path) logger.info("🗜 compressed → %s", self._compressed_path) finally: self._raw_path = self._compressed_path = None
- 工夫点
- gzip 失敗時に壊れた .gz を必ず削除 → 後段パイプラインがコケない。
- Writer キューを
join()
で完全ドレインしてから close。
2-5. FileHandler で永続ログ ―― 端末が固まっても追跡可
log_dir = Path(__file__).resolve().parent.parent / "logs" / "recorder" log_dir.mkdir(parents=True, exist_ok=True) fh = logging.FileHandler(log_dir / "recorder.log", encoding="utf-8") fh.setFormatter(logging.Formatter(fmt, datefmt="%Y-%m-%d %H:%M:%S")) logger.addHandler(fh)
実際は StreamHandler + FileHandler
の二刀流。
Makefile の log-recorder
ターゲットで tail -F
するだけで OK。
log-recorder: @echo "📑 tailing recorder log … <Ctrl-C で終了>" @tail -F $(shell ls -1tr $(ROOT)/backtest24/logs/recorder/*.log | tail -1)
3.Recorderアーキテクチャ徹底解剖 ――「落とさず、詰まらず、眠らせる」
3-1 全体フローを 1 枚で俯瞰
flowchart TD A["Recorder.run()<br/>⏰ タイマー停止"] -->|spawn| B["_recv_loop"] A -->|spawn| C["_stats_loop"] B -- "put(msg)" --> Q["asyncio.Queue<br/>maxlen = 10 000"] Q -->|drain| D["_writer_loop"] subgraph WRITE direction LR D -- "I/O & rotate" --> E["raw *.jsonl"] E -- gzip --> F["*.jsonl.gz"] end C -- "30 s 毎" --> G["CPU / RSS ログ"]
- _recv_loop:
- Bybit WS を async for でストリーム。
- token-bucket で
MSG_CAP_PER_SEC
を消費し、突発バーストを平滑化。 QueueFull
を検知すると WARN を吐き捨て、メインループをブロックさせない。
- asyncio.Queue:
- キュー長は 1 万行。<br>「落としても 0.1 % 未満」の経験則から逆算。
- Sentinel
_SENTINEL
経由でクリーンシャットダウン。
- _writer_loop:
- 書き込みは 非同期 で、WS 受信とは完全分離。
rotate_sec
経過ごとにファイルを閉じて gzip 圧縮。queue.task_done()
を忘れず呼び、queue.join()
がちゃんと機能するよう保証。
- _stats_loop:
psutil.Process().memory_info().rss
とcpu_percent(interval=1.0)
を 30 秒おきにログ。- RSS ≈ 25 MB → 15 MB に落ちるのは GZip 後に OS がページアウトした証拠。
3-2 落ちないための 4 つの防波堤
防波堤 | 実装ポイント | 効果 |
---|---|---|
Token Bucket | _tokens & _last_refill | CPUスパイク平滑化。1 秒あたり処理量を論理的に制限 |
Async Queue | Queue(maxsize=10 000) | 受信・書き込みスレッドを分離し I/O 待ちを無害化 |
Sentinel Drain | _SENTINEL + queue.join() | 強制終了しても 書きかけファイルを残さない |
Back-pressure Log | QueueFull → logger.warning | ドロップ発生を 即座に検知、上流へ閾値調整フィードバック |
3-3 ボトルネック別チューニング Tips
症状 | 原因候補 | ワンポイント処方 |
---|---|---|
Queue が常に満杯 | MSG_CAP_PER_SEC が小さすぎ | 上限を段階的に +20 % ずつ増やして観測 |
RSS が右肩上がり | gzip 前の raw が溜まる | rotate_sec を短くする / tmpfs をやめローカルSSDへ |
CPU が 100 % 張り付き | GZip 圧縮負荷 | compresslevel=1 〜3 でトレードオフ調整 |
ドロップ急増 | 回線断・WS リトライ | Bybit Public ではなく Private エンドポイントも検討 |
3-4 Recorder の“死活監視”を一行で
# dropped 行数 ≒ WARN のカウント grep -c "writer queue full" logs/recorder/recorder.log
0 が理想。1 日あたり 10 行以下なら実戦投入でもまず問題なし。
3-5 まとめ ――「最小コストで最大信頼性」
- 非同期 + キュー分離 だけで MacBook-Air でも 1 秒バケットを余裕で賄えることを実証。
- 落ちてもすぐ起きる を優先し、複雑な再送ロジックは実装しないポリシー。
- この“録る仕組み”が固まれば、あとは 解析系(backtest) と 可視化系(BI) を乗せるだけ。
プロセス分離&ログ整備の王道は、やっぱりコスパ最強だった。
5.チューニング実践録 ――「1 GB → 40 MB」への道
「速さは正義。でも“速いだけ”は罪」
ここでは Recorder を“触りながら痩せさせた”生ログを公開する。
4-1 メモリダイエット 3ステップ
Step | Before | After | 削減幅 |
---|---|---|---|
Buffer サイズ調整 | buffering=8 MB 固定 | 1 MB に縮小 | -5 MB |
gzip ASAP | 15 分ごと | 5 分ごと | -10 MB |
Delta フィルター | snapshot + delta 全保存 | delta だけ で圧縮率↑ | -42 % |
4-2 CPU プロファイル & Hot-Path 対策
# py-spy で 20 秒サンプリング py-spy top --pid $(pgrep -f recorder.core) --duration 20
Hot Func | %CPU | 施策 |
---|---|---|
json.dumps() | 38 % | 受信は“文字列”で保持、ダンプしない |
gzip.write() | 27 % | compresslevel=1 & batched copy |
psutil.cpu_percent() | 12 % | interval=1.0 に変更 → 回数 1/30 |
4-3 ネットワーク回線で詰まった件
- 現象:Starlink 回線で WS ping-pong が 5 s 超え、
asyncio.TimeoutError
連発。 - 対策:
ping_interval=20 -> 40
に伸ばし、close_timeout=5
に緩和。 - 教訓:“クラウド再現テスト”は必須。自宅 Wi-Fi だけで OK と思うと痛い目を見る。
4-4 CI / CD に組み込むヘルスチェック
# .github/workflows/health.yml の核心 - name: Recorder health run: | python -m recorder.core \ --channels publicTrade.BTCUSDT \ --out-dir /tmp \ --prefix ci_test \ --seconds 10 test $(ls /tmp/ci_test*.jsonl | wc -l) -gt 0
10 秒だけ起動し、ファイルが出来れば PASS。シンプルだが破壊力抜群。
4-5 まとめ ――“動いたあと”こそゴールデンタイム
- メトリクス可視化 → ボトルネック特定 → ピンポイント修正 のサイクルが最短コスト。
- Recorder の最適化は バックテスターの待ち時間短縮 に直結し、開発サイクル全体を加速。
- “あと 10 %” は 掘れば掘るほどコスパ低下。8 割ラインで止め、次フェーズ(S3 化 etc.)へ。
5.今後の展望 ―––「取るだけ」から“育てる”データ基盤へ
5-1 1年先を見据えた課題リスト
分類 | 現状 | 今後やりたいこと |
---|---|---|
耐障害性 | シングルノードで動作。突発的なネットワーク断に弱い | ① Recorder を systemd + Restart=always で常駐化 ② 書き込み先を NFS/S3 などリモートにして冗長化 |
スケーラビリティ | BTC/USDT 1ペアのみ | ① ポリシーを絞った マルチシンボル対応 ② Queue の back-pressure を Prometheus/Grafana で可視化 |
低レイテンシ | 1 s OHLCV で十分 | 将来的に 100 ms バケット を試し、板アルゴのバックテストへ |
ガバナンス | 手元ノートにメモ | GitHub Actions + PR テンプレートでレビューを標準化 |
コスト | Mac 1台で完結 | Lightsail/EC2 t4g.small へ載せ替え、電気代を 1/4 に |
5-2 「データレイク化」という選択肢
- Why:Parquet を貯めているだけでは “死蔵” しやすい。
- How:
- S3 (互換 OSS: MinIO) に置き換え、
- AWS Glue / DuckDB / polars で サーバレス SQL、
- Athena + QuickSight でダッシュボード自動更新。
“取るデータ <br>⇢ 回すクエリ <br>⇢ 見えるインサイト”
—— このループが1クリックで回ると、裁量判断も俄然スピードアップする。
5-3 MLOps との橋渡し
- 特徴量ストア:VWAP やスプレッドを Feature Store に反映。
- オンライン推論:Bot のポジションサイズを 軽量 GBDT で自動調整。
- CI/CD:モデル更新を GitHub Actions で nightly retrain → Canary Deploy。
5-4 「Learning Loop」を回すための KPI
KPI | 目標値 | 理由 |
---|---|---|
End-to-End Latency | ≤ 3 s | 1 min 頭で検証できれば手動オペ損失を最小化 |
Dropped Msgs / day | 0.1 % 以下 | 市場急変時ほど価値があるデータを失わない |
Recorder CPU Util | 25 % 未満 | Bot・バックテスターと共存できる余力確保 |
Backtest Turnaround | ≤ 8 min / 日 | 朝一の “ざっくり検証” を出社後に即確認 |
5-5 まとめ
- Recorder → Queue → Writer の3段構えは、小規模でも安定運用できる骨格。
- そこに ログ監視・自動ローテート・Makefile を重ね、〝人が眠れる〟運用へ一歩前進。
- 次フェーズは ストレージのリモート化+メタデータ管理。
🏷️ おまけコーナー — 仕上げの微修正ログ
「ここまでやっとけば明日また固まっても“あ、あそこ弄れば良かった”で済むはず」
追加したモノ | ファイル/場所 | ひと言メモ |
---|---|---|
FileHandler で常時ファイルログ | recorder/core.py 冒頭の logging 構成 | ~/MMBot/backtest24/logs/recorder/recorder.log に吐き出し。tail -F で追えるようになった。 |
log-recorder ターゲット | Makefile | コマンド一発 make log-recorder → log をカラー付きで追従 (tail -F ) |
録画時間ワンライナー | record-1h ターゲット | REC_DURATION_SEC を 3600 へ差し替えて 1 h だけ試運転。 |
キュー・サイズ表示 | _stats_loop | rss / cpu % / q=<len> に加えて raw=123 KB (現在開いている JSONL のサイズ)も出力可能に(サンプル実装 ↓) |
ダブル Ctrl-C 落とし | SIGINT →グレースフルシャットダウン→2 発目で即終了 | 誤爆でファイルが壊れるのを防止。 |
Queue join タイムアウト警告 | _close_writer() | 5 s 以内に排出できなければ WARNING を吐いて続行(ハング防止)。 |
# --- sample patch for file size in _stats_loop --- size_kb = 0 if self._raw_path and self._raw_path.exists(): size_kb = self._raw_path.stat().st_size // 1024 logger.info("📊 rss=%.1f MB cpu=%.1f %% q=%d raw=%d KB", rss, cpu, qlen, size_kb)
使い方まとめ
# 1時間だけ録画して動作を確認 $ make record-1h # 録画中のログを追いかける $ make log-recorder # ← もう tail -F を毎回書かなくてOK # 後工程(バケット化 → funding 取得 → validate) $ make daily VERBOSE=1 # 途中経過も全部表示
これで 「録る → 見る → こける前に気付く」 の三点セットが完成です。