「WebSocketで取引所のストリームを録画するだけだから、すぐ終わるだろう」
そう思って始めたはずのRecorder(レコーダ)開発。しかし実際には、「録れているように見えて録れていない」「一見動いているが、内部で破損している」「例外が出ないままファイルが壊れている」といった**“静かなバグ”**との闘いが続きました。
このブログでは、取引所のpublic streamを長時間・安定・安全に録画するために、筆者が取り組んだ実装改善の記録を5つのポイントに絞ってまとめます。
具体的には、gzipフッタの整合性、再接続の指数バックオフ、GracefulExitの設計、メモリ・FDの可視化、Prometheus連携による監視体制など、実運用に堪える“本番品質”への強化策を扱います。
同じようにRecorderやクライアントを開発している方にとって、「先に知っておけば避けられた落とし穴」を回避するヒントになれば幸いです。
1. “動いた”は罠──gzip フッタ欠損と仲良くしない方法
一見うまく動いているように見えて、実は壊れている。Recorder開発において最初に直面したのは、この「静かなファイル破損」でした。
Recorderでは、WebSocketから受信したデータを gzip.open()
で圧縮しながら .jsonl.gz
ファイルに逐次書き込んでいきます。しかし、プロセスが SIGINT
(Ctrl-C)などで強制終了されると、gzipファイルの末尾にある“フッタ情報”が書き込まれずに破損してしまうことがあります。
この状態のファイルは見た目上は存在しても、gzip -t
で検証すると「data stream error」などのメッセージとともに正しく展開できないことが判明しました。
解決策①: with構文とWriterの管理
まず採った対応は、gzipの書き込み部分を**with gzip.open()
** で明示的に囲い、flush・close処理が確実に走るようにすることです。また、Writerオブジェクトはキャッシュ管理し、必要に応じて明示的に close()
を呼ぶことで、ローテーションや終了時に後処理が漏れないようにしました。
解決策②: キル猶予の導入
次に重要だったのは、**プロセス終了時の「猶予時間」**を設けることでした。Unix系では gtimeout -k 10s 21600s …
のように、Killシグナルを送るまでに10秒間のグレースピリオドを持たせることで、Recorder内部でWriterをクリーンに閉じる時間を確保します。
解決策③: dry-runとgzip -tによる検証フローの自動化
加えて、5秒だけ接続してすぐ終了する --dry-run
モードを実装し、それと gzip -t
を組み合わせた軽量な検証フローを用意しました。これにより、CIなどでも即座に「ファイルが壊れていないか」をチェックできるようになり、破損系のバグは事前に潰せるようになりました。
「出力されたファイルがある」だけでは、Recorderの動作確認には不十分です。“ちゃんと閉じているか”をどう担保するかを考えることこそが、本番運用に耐えるRecorder設計の第一歩だと痛感しました。
2. 再接続地獄を防ぐ指数バックオフ+ジッター
Recorder開発中、もう一つの大きな問題は**“想定外に元気な再接続ループ”**でした。
WebSocket接続が切れた際、すぐに再接続を試みるようにしていたのですが、ネットワークや取引所側の一時的な不調が発生したとき、1秒間に数十回以上の再接続ログが延々と出続けるという、いわば「再接続地獄」に陥りました。
これはただのログスパムにとどまらず、帯域やCPUを不必要に消費し、最悪の場合サーバー側からDoS(過負荷)と見なされるリスクもあります。
原因:floatをawaitしていた罠
原因の一部は非同期コードの基本的な落とし穴にありました。
具体的には、await backoff
のように float値そのものを await してしまうミスがコード内に潜んでいました。
Pythonの非同期処理では、await
や asyncio.create_task()
に渡すべきはコルーチンオブジェクトであって、数値ではありません。
このミスは TypeError: a coroutine was expected, got 1.0
のような形で現れますが、再接続のたびに例外が発生していたため、エラーログの嵐に気づくまで少し時間がかかりました。
解決策①: 指数バックオフの導入
これを解決するために、まずは**指数バックオフ(exponential backoff)**を導入しました。
接続失敗回数 n
に対して、次のリトライまでの待機時間を 2^n
秒に設定し、最大30秒で上限を設けます。
このようにすれば、失敗直後はすぐ再試行しつつ、連続失敗が続いた場合は自動的に間隔を空けていく挙動になります。
解決策②: ジッター(ランダムゆらぎ)を加える
さらに、待機時間に ±ランダムゆらぎ(ジッター)を加えることで、複数のRecorderが同時に走っていた場合にも再接続タイミングの集中を回避できるようにしました。
backoff = min(2 ** tries, 30) await asyncio.sleep(backoff + random.uniform(0, backoff))
このたった2行の調整で、ログスパムはピタリと止まり、挙動が穏やかになりました。
実際のところ、WebSocketの再接続戦略は「動くかどうか」ではなく、「どう動きすぎないか」が問われる部分です。指数バックオフとジッターの導入は、コードよりもむしろ運用者への配慮に近いと感じています。
3. GracefulExit は “全タスク cancel & finally-close” が鉄則
短時間の動作検証ではうまく動いていたRecorderも、いざ長時間稼働させてCtrl-Cで止めようとしたとき、思いがけない問題に直面しました。
それは、終了時にファイルが壊れる/プロセスが終了しきらない/なぜかWebSocketが再接続してくるといった現象です。これはすべて「終了処理が中途半端」という設計上のミスから来ていました。
症状:止めたつもりが止まっていない
特に問題だったのは、Ctrl-C
を押してRecorderを止めたにもかかわらず、“connected & subscribed → publicTrade.BTCUSDT” という再接続ログがその直後に出てきたことです。
また、.jsonl.gz
のファイルが生成されていたにもかかわらず、gzip -t
でチェックすると**“data stream error”** と表示されるケースもありました。
これは明らかに Writerがきちんとflush・closeされないままプロセスが落ちていたことを意味します。
解決策①: 全タスクをキャンセルする
Recorderは非同期のループ構造でrecord_trade()
とrecord_depth()
という2つのループタスクを動かしています。
これらは明示的にキャンセルしなければ Ctrl-Cを押しても裏で動き続け、再接続したりファイルを開きっぱなしにしたりするのです。
for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True)
これで全ての子タスクが確実に終了します。
解決策②: finallyでWriterを閉じる
また、record_trade()
や record_depth()
のような受信ループには、必ず finally:
を設けて recorder.close_writer(...)
を呼ぶようにしました。
try: async for raw in ws: ... finally: recorder.close_writer("trade_raw")
Writerが開かれていない場合でもエラーにならないようにしつつ、“閉じ忘れ” をゼロにする設計です。
これにより、[close] trade_raw closed rows=…
や [close] depth_raw closed rows=…
といったログが毎回確実に出るようになりました。
解決策③: 再接続ループの脱出条件を設ける
再接続ループ (while True
) にも、グローバルフラグ running = False
を用意しておき、GracefulExitを検知したら速やかに脱出できるようにしました。
さらに await asyncio.sleep(0)
を挿入することで、イベントループを一周させて未キャンセルのタスクを整理する工夫も加えました。
シグナルのハンドリングは、「タスクの終了と後始末をどう制御するか」に尽きます。優雅に終了するための設計(GracefulExit)こそが、録画システムの安定性を根本から支えてくれると実感しました。
4. ヘルスログ+ワンライナー可視化でメモリリークを即発見
Recorderが1分や10分だけ動くのであれば、大きな問題は起きにくいかもしれません。
しかし、24時間365日動かし続ける前提になると、わずかなメモリリークやファイルディスクリプタ(FD)の使いっぱなしがやがて致命的なクラッシュにつながります。
この問題に早めに気づけたのは、Recorderにヘルスログ(health log)出力機能を追加していたおかげでした。
--health
オプションで10分ごとに状態を記録
Recorderは、--health 600
のような引数を渡すことで、600秒(10分)ごとに現在の状態をJSON Lines形式でログファイルに追記するようにしています。
記録内容は以下のようなものです:
{"ts": 1720816230000, "rss": 75489280, "fd": 128, "tps_trade": 6.8, "tps_depth": 7.2}
rss
: Resident Set Size(プロセスの実メモリ使用量)fd
: 開いているファイルディスクリプタの数tps
: 各ストリームのトレード/板更新数(Transactions Per Second)
このログは、通常のログファイルとは別に logs/health_2025XXXX.jsonl
として保存されます。
ワンライナーで折れ線グラフ化できる仕組み
このJSONL形式の良い点は、PolarsやPandasを使って簡単にプロットできるということです。
例えばRSS(メモリ使用量)の推移は以下のようなPythonコードで一発です:
import polars as pl import matplotlib.pyplot as plt df = pl.read_ndjson("logs/health_20250612.jsonl") df = df.with_columns(pl.col("ts").cast(pl.Int64) / 1000) # ms → s plt.plot(df["ts"], df["rss"] / (1024**2)) # RSSをMB単位で表示 plt.title("Memory Usage (RSS)") plt.xlabel("Unix Time (s)") plt.ylabel("RSS (MB)") plt.show()
FDの推移も同様に折れ線グラフにできます。
予防策は“視覚化”から始まる
この可視化によって、「10時間動かしてみたら、実はFDが1秒ごとに1つずつ増えていた」といったサイレントリークをすぐに検出できるようになりました。
一見問題のないように見えるコードでも、ファイルを開いたまま再接続を繰り返すような設計になっていれば、FDが累積していきます。
定期的にログをプロットする習慣がついたことで、「この設計だと24時間で壊れるな」と**“事前に気づける仕組み”**を手に入れられました。
ログは情報ですが、グラフは直感です。
長時間プロセスを信頼して走らせたいなら、「視覚的に変化がないこと」を確認できることが、最も確実な安全装置になると実感しました。
5. “動くコード”のまま監視へ — Prometheus Exporter の最短接続
Recorderが安定して長時間動作するようになったとしても、“何か起きた時に気づけない” のであれば、それは依然として危ういままです。
特に、「再接続が想定以上に多発している」「TPSが極端に落ちている」などの**“静かなる異常”**は、ログを読み返すだけでは見逃しがちです。
その解決策が、Prometheus Exporter の導入でした。
Prometheus Exporterとは?
Prometheusは、アプリケーションやサービスの状態を定期的にスクレイプ(収集)して記録する監視システムです。
Exporterはその“入り口”となるモジュールで、Recorder側で指定ポート(たとえば 9102
)を立ち上げ、そこに状態情報をHTTP経由で提供します。
最短3行で導入完了
RecorderにExporterを導入するのは非常に簡単で、以下の3行を追加するだけで動作しました。
from prometheus_client import Counter, Gauge, start_http_server RECONNECTS = Counter("recorder_ws_reconnects_total", "WS reconnects", ["channel"]) TPS_GAUGE = Gauge("recorder_tps", "Trade TPS", ["channel"]) start_http_server(9102) # このポートでPrometheusが読みに来る
あとは、再接続が発生した場所で:
RECONNECTS.labels("trade").inc()
TPSを記録したタイミングで:
TPS_GAUGE.labels("depth").set(current_tps)
といったように、明示的にメトリクスを更新するだけで、すぐにPrometheusの管理画面に反映されるようになります。
AlertmanagerでSlack通知も可能に
Prometheusと組み合わせて使えるAlertmanagerを導入すれば、「再接続が5分間で5回以上発生したらSlack通知する」といった柔軟なルールベース監視が可能になります。
- alert: RecorderReconnectStorm expr: rate(recorder_ws_reconnects_total[5m]) > 0.2 for: 2m labels: severity: warning
これにより、異常はすぐに検知し、開発者の手元に届くようになりました。
エラーは“発生してから対応”するのではなく、“発生しそうな兆候”を検知して未然に対処するのが理想です。
RecorderにPrometheusを導入したことで、「動かしているつもり」が「確実に観測している状態」へと変わったことを実感しました。
おわりに 〜 “ただ録る”を、“ちゃんと録る”へ 〜
WebSocketストリームのRecorderは、一見すると単純なプログラムです。
接続して、データを受け取って、ファイルに書くだけ。
けれど実際には、その“当たり前”を長時間・安定して・安全に維持し続けることが、想像以上に難しいタスクでした。
今回の取り組みでは、以下の5つの観点からRecorderの品質を高めていきました:
- gzipフッタ欠損の防止(with構文+Kill猶予+dry-run)
- 再接続の制御(指数バックオフ+ジッター+floatバグ修正)
- GracefulExitの設計(全タスクcancel+finally-close)
- ヘルスログと可視化(RSS・FDをJSONL+折れ線プロット)
- 監視体制の強化(Prometheus Exporter+Slack通知)
どれも表面的には「なくても動く」ものですが、「確実に録れている」ことを自信をもって言えるための裏付けとして、必要不可欠な要素でした。
この経験を通じて得た実感
Recorderに限らず、“自動で何かを記録・実行する”プロセスは、異常がない時こそ注意が必要です。
壊れたときにはもう手遅れで、ログも欠けていて、ユーザーにも気づかれないまま損失が広がる。
そうならないためには、「沈黙を観測する仕組み」が必要です。
その意味で、今回の開発で取り入れた「dry-run→gzip検証」「10分ごとのヘルスログ」「Prometheus Exporter」は、沈黙を見える化する装置だったと言えるでしょう。
最後に
もしあなたが、自作のRecorderやクライアントを運用に乗せようとしているなら、
この経験の中に見えないバグを先回りして潰すためのヒントがあれば幸いです。
“ただ録る”ではなく、“ちゃんと録る”ために。
このコードは、これからも磨き続けていきます。