Bot プログラミングスキル 環境構築・インフラ 開発ログ

🛠️開発記録#252(2025/6/16)WSレコーダーを“実戦配備”するまで:Mac が固まるまでに学んだこと

試験録画で Mac がフリーズし、ログも残らず、Parquet 生成はエラー祭り──。
「とりあえず動くスクリプト」を Bybit の 24 時間連続レコーダーへ昇華させるまでには、小さなつまずきの連続です。

本記事では、WebSocket ストリームの非同期設計、ファイルローテーション、メモリ圧迫の真犯人探し、そして Makefile での一発自動化──という一連の格闘を、実際に詰まったログと修正版スクリプト付きで振り返ります。

「過学習しがちなバックテスター」「固まる Mac に怯えるトレーダー」「24h 録画を安定運用したいエンジニア」──そんな自分(たち)への備忘録としても読んでいただければ幸いです。
“実戦配備” までの学びを、章立てで追っていきましょう。

1. WebSocket 録画は何が重い?──Mac が固まった原因分析

1-1. “固まった” と感じた瞬間に起こっていたこと

観測項目症状補足
メモリ (RSS)50 MB → 3 GB へ急膨張ps で確認。swap は 0 だったため純粋な RAM 圧迫。
CPU単一プロセスが瞬間的に 400 %4 コアを飽和。top -o cpu で検知。
ディスク I/O書き込みレイテンシ > 300 msfs_usage で write 呼び出しが詰まりまくり。
ログ出力されずStreamHandler のみ → 端末が固まれば見えない。

結論

  • CPU スパイク … 1 秒間に数千メッセージを直列処理(JSON → write()
  • I/O バックプレッシャ … gzip 圧縮をメインスレッドで実行
  • メモリ肥大 … Writer キューが無制限で溜まり、GC が間に合わない

1-2. ボトルネックをあぶり出す 3 コマンド

# ① CPU / メモリを 1 秒間隔で観測(終了は Ctrl-C)
while true; do ps -o pid,%mem,%cpu,command -p $PID; sleep 1; done

# ② ディスク書き込みのシステムコールを可視化(macOS)
sudo fs_usage -w -f filesys $PID

# ③ キュー長をログに吐く(サブスレッド内)
logger.info("queue=%d", self._queue.qsize())

1-3. “根本治療” につながった 3 つの設計変更

BeforeAfter効果
メインスレッドで write()asyncio.Queue + 専用 Writer TaskCPU/I-O を分離しスパイク緩和
gzip を その場で実行Writer 側の終了時に実行(キューが空になってから)I/O が他処理をブロックしなくなった
キュー 無制限maxsize=10000+ドロップ警告メモリが天井知らず → 数十 MB で安定

改修ポイント抜粋

# token-bucket で 1 秒あたりの処理件数を平滑化
if MSG_CAP_PER_SEC:
    now = time.monotonic()
    self._tokens += (now - self._last_refill) * MSG_CAP_PER_SEC
    self._tokens = min(self._tokens, MSG_CAP_PER_SEC)
    self._last_refill = now
    if self._tokens < 1:
        await asyncio.sleep(0)
        continue
    self._tokens -= 1

# キューに入れるだけ(Writer が非同期 flush)
try:
    self._queue.put_nowait(msg)
except asyncio.QueueFull:
    logger.warning("⚠️  writer queue full – dropping message")

1-4. “動かしてみた” ベンチマーク

テスト条件旧版改修後
50 Mbps のローカル WS リプレイCPU 380 % / RSS 1.9 GBCPU 45 % / RSS 120 MB
本番 WS(Bybit、50 depth + trade)Mac 操作不能(数分)連続 24 h 録画可、マシン常用可

1-5. ここまでのチェックリスト

  • CPU スパイク対策:トークンバケット
  • I/O 分離:Writer Task + 非同期キュー
  • メモリ制御:maxsize & ドロップ検知
  • gzip 圧縮はキュー drain 後に実行
  • ログは FileHandler で永続化

次章では、この安定化した Recorder を Makefile ひとコマンドで回す 自動化パイプラインを組み立てていきます。

2. 改修 Recorder のコード全公開──キュー/トークンバケット/ローテートの実装

目的
“CPU・I/O スパイクで Mac が固まる” を根絶するため、ボトルネック 3 点 をコードで直接つぶす。

  • メインスレッド I/OWriter Task
  • 無制限メッセージ処理Token-Bucket
  • run 中 gzipローテート後 gzip

2-1. 全体構造(鳥瞰図)

flowchart TD
    Start["Recorder.run\n(stop by timer)"]

    Recv["_recv_loop"]
    Stats["_stats_loop\n(log CPU/RSS every 30 s)"]

    Q["asyncio.Queue (max 10k)\n(drop -> WARNING)"]
    Writer["_writer_loop\n(async write & rotate)"]

    Start --> Recv
    Start --> Stats
    Recv  -- "put(msg)" --> Q
    Q --> Writer

2-2. 非同期 Writer Task ―― CPU と I/O を分離

self._queue: asyncio.Queue[str] = asyncio.Queue(maxsize=QUEUE_MAX)
self._writer_task = asyncio.create_task(self._writer_loop())

async def _writer_loop(self) -> None:
    """
    Queue を drain しながらファイルへ書き込み。
    Sentinel を受け取ったら安全に終了。
    """
    try:
        while True:
            line = await self._queue.get()
            if line is _SENTINEL:
                self._queue.task_done()
                break
            self._raw_fh.write(line + "\n")       # ← ファイル I/O はここだけ
            self._queue.task_done()
    finally:
        if self._raw_fh:
            self._raw_fh.flush()
  • ポイント
    • Producer (_recv_loop) と Consumer (_writer_loop) を分離。
    • キュー枯渇/飽和をロギングして可観測化。
    • _queue.task_done() を忘れずに。

2-3. Token-Bucket ―― 1 秒あたりの処理量を平滑化

MSG_CAP_PER_SEC = int(os.getenv("MSG_CAP_PER_SEC", "2500"))
self._tokens = float(MSG_CAP_PER_SEC) or float("inf")
self._last_refill = time.monotonic()

async def _recv_loop(self):
    async for msg in self.ws:
        # --- refill ---
        now = time.monotonic()
        self._tokens = min(self._tokens + (now - self._last_refill) * MSG_CAP_PER_SEC,
                           MSG_CAP_PER_SEC)
        self._last_refill = now

        # --- guard ---
        if self._tokens < 1:
            await asyncio.sleep(0)      # cooperative yield
            continue
        self._tokens -= 1

        # --- enqueue ---
        try:
            self._queue.put_nowait(msg)
        except asyncio.QueueFull:
            logger.warning("⚠️  writer queue full – dropping message")
  • 結果
    • Bybit 本番ストリーム(depth+trade)で CPU 400 % → 45 % に低減。
    • 背景:急に来る delta ラッシュでもキューが吸収、JSON パースが暴走しない。

2-4. 安全ローテート & 後段 gzip

async def _rotate(self):
    logger.info("🔄 rotating file (interval %ss)", self.rotate_sec)
    await self._close_writer()          # キュー flush & ファイル close
    await self._compress_last()         # gzip はここで実行
    await self._open_writer()           # 次ファイルを開く
async def _compress_last(self):
    if not (self._raw_path and self._compressed_path):
        return
    try:
        with open(self._raw_path, "rb") as fin, \
             gzip.open(self._compressed_path, "wb", compresslevel=1) as fout:
            shutil.copyfileobj(fin, fout)
    except Exception as e:
        logger.error("gzip failed: %s", e)
        with suppress(FileNotFoundError):
            os.remove(self._compressed_path)
    else:
        os.remove(self._raw_path)
        logger.info("🗜  compressed → %s", self._compressed_path)
    finally:
        self._raw_path = self._compressed_path = None
  • 工夫点
    • gzip 失敗時に壊れた .gz を必ず削除 → 後段パイプラインがコケない。
    • Writer キューを join() で完全ドレインしてから close。

2-5. FileHandler で永続ログ ―― 端末が固まっても追跡可

log_dir = Path(__file__).resolve().parent.parent / "logs" / "recorder"
log_dir.mkdir(parents=True, exist_ok=True)
fh = logging.FileHandler(log_dir / "recorder.log", encoding="utf-8")
fh.setFormatter(logging.Formatter(fmt, datefmt="%Y-%m-%d %H:%M:%S"))
logger.addHandler(fh)

実際は StreamHandler + FileHandler の二刀流。
Makefile の log-recorder ターゲットで tail -F するだけで OK。

log-recorder:
	@echo "📑 tailing recorder log …  <Ctrl-C で終了>"
	@tail -F $(shell ls -1tr $(ROOT)/backtest24/logs/recorder/*.log | tail -1)

3.Recorderアーキテクチャ徹底解剖 ――「落とさず、詰まらず、眠らせる」

3-1 全体フローを 1 枚で俯瞰

flowchart TD
    A["Recorder.run()<br/>⏰ タイマー停止"] -->|spawn| B["_recv_loop"]
    A -->|spawn| C["_stats_loop"]

    B -- "put(msg)" --> Q["asyncio.Queue<br/>maxlen = 10 000"]
    Q -->|drain| D["_writer_loop"]

    subgraph WRITE
        direction LR
        D -- "I/O & rotate" --> E["raw *.jsonl"]
        E -- gzip --> F["*.jsonl.gz"]
    end

    C -- "30 s 毎" --> G["CPU / RSS ログ"]
  • _recv_loop
    • Bybit WS を async for でストリーム。
    • token-bucketMSG_CAP_PER_SEC を消費し、突発バーストを平滑化。
    • QueueFull を検知すると WARN を吐き捨て、メインループをブロックさせない。
  • asyncio.Queue
    • キュー長は 1 万行。<br>「落としても 0.1 % 未満」の経験則から逆算。
    • Sentinel _SENTINEL 経由でクリーンシャットダウン。
  • _writer_loop
    • 書き込みは 非同期 で、WS 受信とは完全分離。
    • rotate_sec 経過ごとにファイルを閉じて gzip 圧縮。
    • queue.task_done() を忘れず呼び、queue.join() がちゃんと機能するよう保証。
  • _stats_loop
    • psutil.Process().memory_info().rsscpu_percent(interval=1.0) を 30 秒おきにログ。
    • RSS ≈ 25 MB → 15 MB に落ちるのは GZip 後に OS がページアウトした証拠。

3-2 落ちないための 4 つの防波堤

防波堤実装ポイント効果
Token Bucket_tokens & _last_refillCPUスパイク平滑化。1 秒あたり処理量を論理的に制限
Async QueueQueue(maxsize=10 000)受信・書き込みスレッドを分離し I/O 待ちを無害化
Sentinel Drain_SENTINEL + queue.join()強制終了しても 書きかけファイルを残さない
Back-pressure LogQueueFull → logger.warningドロップ発生を 即座に検知、上流へ閾値調整フィードバック

3-3 ボトルネック別チューニング Tips

症状原因候補ワンポイント処方
Queue が常に満杯MSG_CAP_PER_SEC が小さすぎ上限を段階的に +20 % ずつ増やして観測
RSS が右肩上がりgzip 前の raw が溜まるrotate_sec を短くする / tmpfs をやめローカルSSDへ
CPU が 100 % 張り付きGZip 圧縮負荷compresslevel=13 でトレードオフ調整
ドロップ急増回線断・WS リトライBybit Public ではなく Private エンドポイントも検討

3-4 Recorder の“死活監視”を一行で

# dropped 行数 ≒ WARN のカウント
grep -c "writer queue full" logs/recorder/recorder.log

0 が理想。1 日あたり 10 行以下なら実戦投入でもまず問題なし。


3-5 まとめ ――「最小コストで最大信頼性」

  • 非同期 + キュー分離 だけで MacBook-Air でも 1 秒バケットを余裕で賄えることを実証。
  • 落ちてもすぐ起きる を優先し、複雑な再送ロジックは実装しないポリシー。
  • この“録る仕組み”が固まれば、あとは 解析系(backtest)可視化系(BI) を乗せるだけ。
    プロセス分離&ログ整備の王道は、やっぱりコスパ最強だった。

5.チューニング実践録 ――「1 GB → 40 MB」への道

「速さは正義。でも“速いだけ”は罪」
ここでは Recorder を“触りながら痩せさせた”生ログを公開する。

4-1 メモリダイエット 3ステップ

StepBeforeAfter削減幅
Buffer サイズ調整buffering=8 MB 固定1 MB に縮小-5 MB
gzip ASAP15 分ごと5 分ごと-10 MB
Delta フィルターsnapshot + delta 全保存delta だけ で圧縮率↑-42 %

4-2 CPU プロファイル & Hot-Path 対策

# py-spy で 20 秒サンプリング
py-spy top --pid $(pgrep -f recorder.core) --duration 20
Hot Func%CPU施策
json.dumps()38 %受信は“文字列”で保持、ダンプしない
gzip.write()27 %compresslevel=1 & batched copy
psutil.cpu_percent()12 %interval=1.0 に変更 → 回数 1/30

4-3 ネットワーク回線で詰まった件

  • 現象:Starlink 回線で WS ping-pong が 5 s 超え、asyncio.TimeoutError 連発。
  • 対策ping_interval=20 -> 40 に伸ばし、close_timeout=5 に緩和。
  • 教訓“クラウド再現テスト”は必須。自宅 Wi-Fi だけで OK と思うと痛い目を見る。

4-4 CI / CD に組み込むヘルスチェック

# .github/workflows/health.yml の核心
- name: Recorder health
  run: |
    python -m recorder.core \
      --channels publicTrade.BTCUSDT \
      --out-dir /tmp \
      --prefix ci_test \
      --seconds 10
    test $(ls /tmp/ci_test*.jsonl | wc -l) -gt 0

10 秒だけ起動し、ファイルが出来れば PASS。シンプルだが破壊力抜群。

4-5 まとめ ――“動いたあと”こそゴールデンタイム

  1. メトリクス可視化 → ボトルネック特定 → ピンポイント修正 のサイクルが最短コスト。
  2. Recorder の最適化は バックテスターの待ち時間短縮 に直結し、開発サイクル全体を加速。
  3. “あと 10 %” は 掘れば掘るほどコスパ低下。8 割ラインで止め、次フェーズ(S3 化 etc.)へ。

5.今後の展望 ―––「取るだけ」から“育てる”データ基盤へ

5-1 1年先を見据えた課題リスト

分類現状今後やりたいこと
耐障害性シングルノードで動作。突発的なネットワーク断に弱い① Recorder を systemd + Restart=always で常駐化
② 書き込み先を NFS/S3 などリモートにして冗長化
スケーラビリティBTC/USDT 1ペアのみ① ポリシーを絞った マルチシンボル対応
② Queue の back-pressure を Prometheus/Grafana で可視化
低レイテンシ1 s OHLCV で十分将来的に 100 ms バケット を試し、板アルゴのバックテストへ
ガバナンス手元ノートにメモGitHub Actions + PR テンプレートでレビューを標準化
コストMac 1台で完結Lightsail/EC2 t4g.small へ載せ替え、電気代を 1/4 に

5-2 「データレイク化」という選択肢

  • Why:Parquet を貯めているだけでは “死蔵” しやすい。
  • How
    1. S3 (互換 OSS: MinIO) に置き換え、
    2. AWS Glue / DuckDB / polars で サーバレス SQL
    3. Athena + QuickSight でダッシュボード自動更新。

“取るデータ <br>⇢ 回すクエリ <br>⇢ 見えるインサイト”
—— このループが1クリックで回ると、裁量判断も俄然スピードアップする。

5-3 MLOps との橋渡し

  1. 特徴量ストア:VWAP やスプレッドを Feature Store に反映。
  2. オンライン推論:Bot のポジションサイズを 軽量 GBDT で自動調整。
  3. CI/CD:モデル更新を GitHub Actions で nightly retrain → Canary Deploy。

5-4 「Learning Loop」を回すための KPI

KPI目標値理由
End-to-End Latency≤ 3 s1 min 頭で検証できれば手動オペ損失を最小化
Dropped Msgs / day0.1 % 以下市場急変時ほど価値があるデータを失わない
Recorder CPU Util25 % 未満Bot・バックテスターと共存できる余力確保
Backtest Turnaround≤ 8 min / 日朝一の “ざっくり検証” を出社後に即確認

5-5 まとめ

  • Recorder → Queue → Writer の3段構えは、小規模でも安定運用できる骨格。
  • そこに ログ監視・自動ローテート・Makefile を重ね、〝人が眠れる〟運用へ一歩前進。
  • 次フェーズは ストレージのリモート化+メタデータ管理

🏷️ おまけコーナー — 仕上げの微修正ログ

「ここまでやっとけば明日また固まっても“あ、あそこ弄れば良かった”で済むはず」

追加したモノファイル/場所ひと言メモ
FileHandler で常時ファイルログrecorder/core.py 冒頭の logging 構成~/MMBot/backtest24/logs/recorder/recorder.log に吐き出し。tail -F で追えるようになった。
log-recorder ターゲットMakefileコマンド一発 make log-recorder → log をカラー付きで追従 (tail -F)
録画時間ワンライナーrecord-1h ターゲットREC_DURATION_SEC を 3600 へ差し替えて 1 h だけ試運転。
キュー・サイズ表示_stats_looprss / cpu % / q=<len> に加えて raw=123 KB(現在開いている JSONL のサイズ)も出力可能に(サンプル実装 ↓)
ダブル Ctrl-C 落としSIGINT→グレースフルシャットダウン→2 発目で即終了誤爆でファイルが壊れるのを防止。
Queue join タイムアウト警告_close_writer()5 s 以内に排出できなければ WARNING を吐いて続行(ハング防止)。
# --- sample patch for file size in _stats_loop ---
size_kb = 0
if self._raw_path and self._raw_path.exists():
    size_kb = self._raw_path.stat().st_size // 1024
logger.info("📊 rss=%.1f MB  cpu=%.1f %%  q=%d  raw=%d KB",
            rss, cpu, qlen, size_kb)

使い方まとめ

# 1時間だけ録画して動作を確認
$ make record-1h

# 録画中のログを追いかける
$ make log-recorder       # ← もう tail -F を毎回書かなくてOK

# 後工程(バケット化 → funding 取得 → validate)
$ make daily VERBOSE=1    # 途中経過も全部表示

これで 「録る → 見る → こける前に気付く」 の三点セットが完成です。

-Bot, プログラミングスキル, 環境構築・インフラ, 開発ログ