Bot 環境構築・インフラ 開発ログ

🛠️開発記録#248(2025/6/8)バックテストを回し切るまでの学び

セクション1|はじめに — 秒レベルで精度を上げたかった話

トレード戦略の調整を続ける中で、どうしても無視できなくなってきたものがあった。
それは「時間分解能(どれくらい細かく時間軸で物事を測定できるか)」だ。

私は現在、板寄せ系の自動取引戦略を継続開発している。
本番環境での稼働も視野に入れつつ、パフォーマンスチューニングを繰り返している段階だが、
とある時点でバックテストと実運用の数値が乖離していることに気づいた。

バックテストでは、「このロジックなら Maker fill 比率は高くなるはずだ」と思ったものが、
実戦では fill されていなかったり、逆にポジションが不自然に膨らんでいたり。

なぜ?

理由はすぐにわかった。
1分足データでは精度が足りなかったのだ。

例えば、tick_thr(価格差し引き幅)を ±0.1 USDT にしていたとしても、
1分足の open/high/low/close では、実際にどこで値がついたかが正確に見えない。
数秒内に上下に触れて戻るような “ヒゲだけで終わる” 動きは、1分足では潰れてしまう。
その結果、「本当は fill されていたはずの注文」が、バックテスト上では fill されない。

「もっと細かく値動きを追わないと、これはチューニングできないな」

こうして私は、秒足レベルのデータでロジックを評価する仕組みを、自分で作ることにした。

セクション2|公式 API に 1 s 足が無い!→ 自前で作る決断

バックテストの粒度を上げようと考えたとき、最初に確認したのは当然、取引所の公式 APIだった。
私が使っているのは、Bybit の V5 WebSocket API だが、
探してみても 1 秒足の Kline データは提供されていない

  • REST API → 最小粒度は 1 分
  • WebSocket の kline チャンネル → こちらも 1 分が最小
  • markPriceKline.1s という “それっぽい” チャンネルもあるが、試してみると中身が無い

「えっ、1 秒足ないの……?」

最初は信じられなかった。
でも現実として、実用レベルで秒足の Kline データを取得する手段は存在しなかった。

それでも必要なのだ。
であれば、作るしかない。

目をつけたのは、publicTrade チャンネル
これは 全約定(tick)をリアルタイムで送ってくれる WebSocket ストリームで、
1件ごとの価格・数量・売買方向(Buy/Sell)などをミリ秒単位で受け取ることができる。

この tick データを1秒単位でまとめれば、自分で1秒足(OHLCV)を再構築できるはずだ。


✔ この段階で「秒足は自前で再構築する方針」が決まった。

セクション3|アーキテクチャ概要図(Recorder → Bucketizer → Backtester)

方針が決まったら、次は構造の整理だ。
「秒足を自前で生成し、それを使ってロジックの評価まで回す」という目的を実現するために、私は以下のような3ステージ構成を組んだ。

flowchart TD
    subgraph "Recorder (publicTrade)"
        R1[Recorder] --> RAW["raw JSONL.gz<br/>data/raw/trade_YYYYMMDD.jsonl.gz"]
    end

    RAW --> BKT
    subgraph "bucketizer.py"
        BKT["1秒バケット化(OHLCV生成)"] --> PQ["Parquet<br/>data/ohlcv/1s/kline_1s_*.parquet"]
    end

    PQ --> LDR["bt_io.load_1s_parquet()"]

    LDR --> BT
    subgraph "backtest24h_ws.py (Backtester)"
        BT["run_simulation()<br/>tick_thr / lot / timeout"] --> KPI["calculate_kpi()<br/>Maker% / Δ×時間"]
    end

    KPI --> OPT["パラメータ最適化<br/>CI / Slack 通知"]

各パーツの簡単な説明

パーツ役割
RecorderWebSocket から publicTrade データを24時間録音。1行ずつ gzip 圧縮で .jsonl.gz に追記。
BucketizerJSONL を読み込み、1秒ごとに OHLCV + buy/sell volume に集計し Parquet ファイルに変換。
bt_io.pyParquet を Polars DataFrame に読み込むローダ。欠損補完の ON/OFF もここで制御。
Backtester秒足データに対してバックテストを実行。Maker fill 判定や inventory delta を計算し、KPI を出力。
最終段階KPI に基づいてパラメータを調整。目標に届かなければ tick_thr や timeout を再検討。将来的には CI + Slack 通知で自動化予定。

✔ 録画・前処理・評価を完全に分離しておくことで、障害切り分けが簡単になるし、処理の再利用性も高くなる。

この構造が完成したことで、私はようやく**「秒足でブレずに評価できる土台」**を手に入れた。

セクション4|Recorder 実装ポイント — publicTrade購読と自動再接続

秒足を作るにはまず、秒ごとのティック情報が必要だ。
それを得るための最前線が Recorder だった。

この Recorder の役割はただ一つ、
Bybit の publicTrade ストリームを「止まらずに」取り続けること
ただ、これが思った以上に手強かった。


🧩 初期実装の問題

最初のバージョンは、素直に websockets.connect() で接続して
await ws.recv() を回しながらデータを貯めていく、というシンプルな構成だった。

が、40〜60分ほど動かすと突然こうなる:

Error: no close frame received or sent

Bybit 側からの定期切断、ローカルの Wi-Fi スリープ、Pong 応答の遅延、原因は様々。
ただ一つだけ確かなのは、放っておくと止まるということだった。


🔄 自動再接続の導入

そこで、以下のような構成に変更した:

  • 接続ループを while True: にして、何かエラーが起きたら 指数バックオフでリトライ
  • Ping/Pong のインターバルを指定(ping_interval=10
  • 異常終了しても、受信済みデータは gzip 圧縮で1行ずつ即書き込み → ロス無し
  • 録画ファイルは UTC 日付ベースで自動で切り替える設計に
async def record_forever():
    backoff = 1
    while True:
        try:
            async with websockets.connect(WS_ENDPOINT, ping_interval=10) as ws:
                await ws.send(json.dumps({"op":"subscribe","args":[CHANNEL]}))
                async for raw in ws:
                    msg = json.loads(raw)
                    if "data" not in msg:
                        continue
                    with gzip.open(raw_path(), "at") as f:
                        f.write(raw + "\n")
        except Exception as e:
            sleep = backoff + random.uniform(0, backoff)
            await asyncio.sleep(sleep)
            backoff = min(backoff * 2, 60)

✔ これで「落ちたら勝手に繋ぎ直す」状態を実現できた。
日をまたげば trade_YYYYMMDD.jsonl.gz も自動で切り替わる。
もう、朝起きてログが静かになっているのを見る不安とはお別れ。

セクション5|秒足バケット化の落とし穴 — 三項演算子エラーと dtype 固定

Recorder が安定して動きはじめると、次の工程はバケット化だ。
秒足に変換するために、1秒ごとに以下の値をまとめる必要がある:

  • open:その秒で最初に約定した価格
  • high / low:その秒の最大・最小価格
  • close:最後に約定した価格
  • volume:その秒間の総取引量
  • buy_vol / sell_vol:買いと売りの合計量(オプション)

この処理は自作の bucketizer.py で行っている。


💥 落とし穴その1:三項演算子と +=

書き始めた当初、私は以下のようなコードを書いた。

(b["buy_vol"] if side == "Buy" else b["sell_vol"]) += v

そして、怒られた。

SyntaxError: 'conditional expression' is an illegal expression for augmented assignment

つまり、Python では三項演算子を直接 += の左辺に使えないのだ。
これに気づかず、数分間ずっと from_dicts() のせいにしていた。

→ 正解は素直に if-else で分岐:

if side == "Buy":
    b["buy_vol"] += v
else:
    b["sell_vol"] += v

💥 落とし穴その2:pl.from_dicts(dict_values) で TypeError

Polars の from_dicts() を使って集計結果を DataFrame にしようとしたとき、
またもや落とし穴にハマった。

return pl.from_dicts(buckets.values())  # ← これが TypeError

これは、buckets.values()dict_values 型(イテレータ)であるため、
Polars のバージョンによっては直接受け取れない。

→ 解決策は .values() を明示的に list() に変換すること:

return pl.DataFrame(list(buckets.values())).sort("ts")

これでようやく、1秒単位の OHLCV + buy/sell volume を安定して生成できるようになった


✔ 最終的に、bucketizer は「生ティックをきれいな秒足に整形する」確実な中間処理になった。
ここまで来れば、ようやくバックテストにかけられる素材が整う

セクション6|バックテスター改修 — 標準 io 衝突をリネームで回避

秒足の Parquet データができたら、いよいよバックテストフェーズに入る。
ここでは、1 秒足をもとに以下のような処理を行っている:

  • tick_thr(価格幅)をもとに仮想的な Maker 注文を出す
  • high/low を見て fill されたか判定
  • fill されたら delta(在庫量)を増減
  • timeout を超えたら inventory をヘッジ(在庫を 0 に戻す)
  • その推移から KPI(Maker%・Δ×時間)を算出

秒ごとの fill 判定 → 在庫推移の積分 → KPI へ
という、わりとシンプルな構造だ。


🧨 ここで出た意外なバグ:ImportError from io

Parquet を読み込む関数を io.py に切り出して、
backtest24h_ws.py からこう書いた:

from io import load_1s_parquet

そして出たのがこのエラー:

ImportError: cannot import name 'load_1s_parquet' from 'io'

は?何が悪い?

原因は、「Python の標準ライブラリに io という名前のモジュールが存在する」こと。
自作の io.py よりも、Python 標準の io が先に読み込まれてしまったのだ。


✅ 解決策:ファイル名を bt_io.py にリネーム

標準モジュールと名前がかぶらないようにして、

from bt_io import load_1s_parquet

と読み込むことで解決。
たったこれだけのことで、モジュールの衝突が回避できる。


📦 bt_io.py の責務

このモジュールは今後の中核になる。
例えば:

  • 欠損秒を drop する or 補完する(drop_missing フラグ)
  • 他の銘柄にも対応(例:ETHUSDT)
  • 分足にダウングレードする処理も可能

つまり、バックテストに渡す “綺麗な DataFrame” を作る責任者bt_io.py だ。
この切り出しができたことで、以降の処理がかなり見通しよくなった。


✔ この段階で「録画 → 整形 → テスト」が完全に関心分離された。
バグってもどこが悪いか切り分けしやすくなったのが大きい。

セクション7|録画 → バックテストが完走! 初日の KPI と今後の課題

Recorder・Bucketizer・Backtester。
3つのモジュールをつなぎ合わせ、ようやく私は実際のティックデータを使っての秒足バックテストを完走させることができた。

最初に動かしたときの Parquet ファイルは、ほんの10分程度しか入っていないごく小さなものだった。
それでも、Recorder が正しく録れていて、Bucketizer が秒足に変換し、Backtester が KPI を出力する。
この「最後まで通る」という体験は大きかった。


✅ 初回テストのKPI出力例(※値は参考)

===== 24 h KPI (simulation) =====
Maker fill ratio : 96.4%
Δ×時間 積分       : 218.73  USDT·s
総取引量         : 2,880.11  BTC
価格変動         : -33.50  USDT

✔ fill ratio が 97% 近く出ている
✔ Δ×時間も許容ライン内
✔ 価格変動や出来高が大きすぎない(=偏った相場ではない)

もちろんこれは仮の数字で、実際にはまだ24時間分のデータを溜めきれていない状態だった。
それでも、KPIが数値として出る状態まで持っていけたことが非常に重要だった。


🛠 次の課題が見えた

この時点で、今後取り組むべきことがクリアになった。

  • tick_thr / lot / timeout の最適化
    → まだ仮置きの値なので、グリッドサーチして目標KPIを安定的に出せるパラメータを探す必要がある
  • 欠損秒の扱いの再検討
    → 今はドロップしているが、fill率が低下するようなら FFILL などの補完を試す
  • KPI の閾値テストを pytest 化
    → 毎日パラメータが劣化していないかをテストで検知し、Slack 通知へつなげたい
  • CI/CD パイプラインとの統合
    → bucketizer & backtest を GitHub Actions で回して、数字が悪化していたら Slack へ通知する構成を目指す

✔ 機能が通ることで「課題が明確になる」フェーズに入った。
これからは精度を上げるための改善サイクルが回っていく。

セクション8|失敗から得た学びと次の一手

今回の取り組みで、私は多くの“小さな壁”にぶつかり、それを一つずつ乗り越えていった。
その中で見えてきたことは、単なるテクニック以上の「設計感覚」だった。


❌ こういうミス、やりがちだった

  • 標準モジュールとの名前衝突
    io.py というありがちな名前が、まさかの ImportError を引き起こす
    → 今後は、標準名と被るかどうかのチェックは習慣にしたい
  • 録画が途中で止まってるのに気づかない
    websockets が何も言わずに落ちてる
    → Ping/Pong や reconnect ロジックの重要性を実感
  • 「何もエラーが出てない=正しく動いている」と思い込む
    KPI が 0.000% なのに安心していた
    → 入力データの量や質をちゃんと見てから判断すべき
  • 細かいエラーを“なんとなく”で潰そうとする
    三項演算子 + += のエラーに数分ハマった
    → 仕様を1回ちゃんと調べるだけで一発で解決できた

✅ 得られた実感値と学び

  • 「構造」を先に作ると実装が楽になる
    → Recorder / Bucketizer / Backtester を分けておいたことで、どこが壊れてるかの切り分けが非常にしやすかった。
  • まず1日分を安定して録れることが全ての出発点
    → 分析とかモデル以前に、そもそも「ちゃんと取れてるか?」が一番大事。
  • バックテストが通るだけで信頼感が段違いになる
    → 秒足の fill 判定は見た目以上に繊細で、ロジックの土台として必要不可欠だった。

🎯 次の一手(明日以降)

  • 24 h 分の Recorder を回しきる(すでに放置中)
  • グリッドサーチで最適パラメータを探索
  • pytest を書いて KPI 劣化を自動検知
  • Slack 通知と CI 統合で常時監視フローへ
  • 最終的には FRMMbot に実パラメータを反映し、本番稼働へ

おわりに|“ロジックの精度”は“時間の精度”から生まれる

今回の学びをひとことでまとめるなら、

秒で動く戦略は、秒で測ってはじめて評価できる

ということだと思う。

今まで「なんとなく」で通していた fill 判定も、
秒足に変えるだけでずいぶんとロジックがシャープになった。
やっぱり、データと向き合う精度が、そのまま戦略の強さにつながる。
そう実感した一日だった。

-Bot, 環境構築・インフラ, 開発ログ