Bot プログラミングスキル 環境構築・インフラ 開発ログ

🛠️開発記録#251(2025/6/14)バックテスト・パイプライン構築のリアル:1分ランから24hランまでに学んだ5つの教訓

こんなに短いコードなのに、一日中泥臭くハマるとは思いませんでした。1分、5分、30分、そして1時間……想定どおりにスムーズに動くと思いきや、サブスクライブのフォーマットミスや0バイトファイル、秒単位のズレ、空のFunding CSV、Fillゼロの即落ち…気づけば夕方。

本記事では、「バックテスト・パイプライン構築のリアル」と題して、1分ランから24時間ランまでの道のりで直面した5つの教訓を、トラブルレスキューの視点も交えてまとめます。開発中のみなさんが同じ罠にハマらないよう、軽いノリでザクっとお届けします!

序章:想定は簡単、現実は複雑――なぜ1日で完走できなかったのか?

朝、私はこう思っていました。「1分ラン、5分ラン、30分ラン、1時間ラン…ぜんぶサクッと回せるだろう」と。
ところが現実は甘くなく、夕方までトラブルシューティングに追われる羽目に。やっと見えたのは、「テスト用パイプラインのサイズ感を誤ると、次々と想定外のバグが噴出する」という事実です。

  • サブスクライブのフォーマットミス
    WebSocket 接続は確立できたと思いきや、publicTrade や orderbook の引数が微妙に違っていたせいでデータゼロ。
  • 0 バイトファイルの混入
    途中で録画が途切れたタイミングで中身ゼロの JSONL が生成され、バケット化であっさり失敗。
  • 秒粒度のズレ
    OHLCV と Depth の sec 列が1秒ずれただけで inner join が一切ヒットせず、「No overlapping seconds」で即クラッシュ。
  • 空の Funding CSV
    はじめ Funding を取りに行ったらエンドポイントのパラメータ不備で返ってきたのは空の CSV。max() が None を返して TypeError。
  • Fill=0 の即終了
    データが少ないスモークテストでは Maker Fill が一度も立たず、デフォルトではエラー終了。

このように、“小さく回して動作確認” のつもりが、細かいバリデーションと堅牢化コードが裏目に出て、全部止まってしまったわけです。
本記事では、こうした「想定と現実のギャップ」をいかに早く発見し、軽量テストと本番テストを両立させるかを解説していきます。次章から、一つずつ問題と対策を振り返っていきましょう。

第1章:データ取得で躓く――サブスクライブから空ファイル問題まで

私はまず「録画さえ始まればデータは勝手に溜まるだろう」と高をくくっていました。しかし、WebSocket 周りの些細なミスが連鎖して、あっという間に手詰まりに。

1.1 WS サブスクライブ形式ミス

{"op":"subscribe","args":[{"instId":"BTCUSDT","channel":"publicTrade"}]}

と書くつもりが、"args":["publicTrade.BTCUSDT"] に変更した途端、ACK が帰ってこなくなりデータゼロ。

  • 教訓:まずは小セグメント(1ファイル)だけ取り、その中身を headgrep で必ずチェック。
  • 対策:最初に {"success":true} の ACK を受信しているか、grep '"topic":"publicTrade.BTCUSDT"' 件数を確認。

1.2 0 バイト&壊れ gzip の混入

録画中断や接続再試行のタイミングで、サイズ0 の .jsonl.gz が複数生成。
そのままバケット化に回すと「no trade rows parsed」や「FileNotFoundError」で即死。

  • 教訓:生ファイルは「0 byte → 削除」「gzip エラー → フッタ修正」で健全化する仕組みを最初に入れる。
  • 対策:以下のバッチを make bucketize 前に自動実行。
find data/raw/$DATE -type f -size 0c -delete

以上のように、データ取得フェーズでの「想定外」は、
「まずは小さく動かす→ログを覗く→必ず検証」のループを回さないと見えない問題ばかりでした。
次章では、その後の “秒粒度結合” でのズレ地獄を振り返ります。

第2章:秒粒度結合の罠――バケット化&検証時のズレを防ぐ

データが無事取れたら、次は「秒単位で結合」して OHLCV と板情報を組み合わせるステップ。しかしここでも、ほんの数ミリ秒のズレが命取りでした。

2.1 ts 列の単位問題

  • 原因:Parquet に書き出された ts 列がナノ秒だったりミリ秒だったり混在
  • 症状// 1_000 / // 1_000_000_000 による単位変換ロジックを誤ると、sec がズレて inner join がすべて外れる
  • 対策: 以下のように、最大値 >1e14 をトリガーに「ナノ秒 or ミリ秒」を自動判定。
if dtype == pl.Datetime:
    ts_expr = pl.col(ts_col).dt.timestamp()
else:
    div = 1e14 < max_val and 1_000_000_000 or 1_000
    ts_expr = (pl.col(ts_col) // div)

2.2 inner と left の使い分け

  • inner join:完全に重なる秒だけ残す → 本番精度向上
  • left join:OHLCV を軸に板情報を付ける → テスト時のズレ許容
  • ポイント:開発中は --join-mode left、本番では --join-mode inner を CLI で切り替え、
    ズレによるドロップ数をログで必ず確認する。

2.3 validate_inputs.py の落とし穴

  • 厳格設定:デフォルトで --strict が効いていると、重複秒がゼロなら即エラー終了
  • 軽量テスト--strict オフ or 「重複件数を警告にとどめる」ようにコードを緩和
  • 確認方法
make validate DATA=20250615 || echo "重複合計件数: $(grep overlap logs)"

秒単位の“わずかなズレ”が join をすべて外す――
この罠を回避するには、自動判定による単位変換と、inner/left モードの切り替え設計が必須でした。
次章では、「ライトモード設計」で開発を爆速にする方法を詳しく解説します。

第3章:ライトモード設計――動作確認優先の開発サイクル

開発中にいちいち「本番モード」で止まってしまうと心が折れます。そこで私は「ライトモード」と呼ぶ、**“止まらない・結果を出す”**オプション群を導入しました。

3.1 --skip-funding: Funding を一時スキップ

  • 目的:Funding API の空返りや CSV 欠如で止まるのを防ぐ
  • 挙動fr 列をすべて 0.0 に固定し、計算はスルー
  • 恩恵:Funding 周りの問題切り分けが楽。データ結合や PnL ロジック自体を先に確認可能。

3.2 --allow-zero-fill: Fill=0 を許容

  • 目的:データが少ないテスト段階で “Maker fill が立たずに即終了” しないように
  • 挙動:「フィル件数 0」の場合は損益を 0.0 で出力し、警告だけ表示
  • 恩恵:閾値調整中でもスクリプトが最後まで動き、ログを眺めながら次の改善点を探せる

3.3 --join-mode inner|left: 結合モード切替

  • inner:本番では被り秒のみ抽出。精度重視。
  • left:テスト時は秒ズレを気にせず進める。
  • 使い分け
    • 朝一のスモークテストは left → “だいたい動くか” を確認
    • 本番テストは inner → “ちゃんと重なっているか” を検証

3.4 --seed: 乱数シード固定

  • 目的depth モードでの乱数依存結果を再現しやすく
  • 挙動np.random.seed(<指定値>) を設定
  • 恩恵:同じ設定・同じデータで必ず同じ PnL 結果に。比較テストが安定

これらライトモード用フラグを組み合わせることで、
小さく回して→止まらず動かして→結果を眺める」という高速開発サイクルが実現しました。
次章では、この土台をさらに堅牢&可読にする“Refactor & Logging”をご紹介します。

第4章:堅牢化&可読化――ロギング・関数化・定数化で未来を救う

ライトモードで高速サイクルを得たら、次はコード自体をメンテしやすく・拡張しやすく整備します。ここでは3つの改善ポイントをご紹介します。

4.1 logging モジュールによる可視化

  • なぜ必要かprint() ではログレベルも出力先も一律。バッチジョブやテスト時に情報が埋もれる。
  • 改善例
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s [%(levelname)s] %(message)s")
# 使い方
logger.info("Loaded OHLCV rows=%d", ohlcv.height)
logger.warning("Maker fill = 0 → テストモードで継続")
  • 効果
    • INFO / WARNING / ERROR レベルに応じて表示・抑制可能
    • ファイル出力や回帰テストで差分検知もしやすい

4.2 関数化&モジュール分割でテスト容易性アップ

  • なぜ必要か:スクリプト全体が一塊だと、ユニットテストや部分的な入れ替えが困難。
  • 改善例
def load_ohlcv(path: str) -> pl.DataFrame: …
def load_depth(path: str) -> pl.DataFrame: …
def load_funding(path: Optional[str], skip: bool) -> pl.DataFrame: …
def compute_pnl(df: pl.DataFrame, params: dict) -> dict: …
def main(args): …
if __name__ == "__main__":
    main(parse_args())
  • 効果
    • 各関数を個別にモック/テスト可能
    • 将来の機能追加(例:複数ペア対応、ファイル保存先変更)にスムーズ

4.3 マジックナンバーの定数化

  • なぜ必要か28_8001_000_000_000 が直接コードにあると意味不明。
  • 改善例
FUNDING_INTERVAL_SEC = 8 * 60 * 60  # 8時間
NS_TO_SEC = 1_000_000_000
MS_TO_SEC = 1_000
  • 効果
    • 数字の意図が即座に理解できる
    • 変更時も定数を変えるだけで一元管理

これらを組み込むことで、“今すぐ動く” だけではなく、“明日も半年後も安心して使える” コードベースが出来上がります。

次章では、今日の学びを日々のワークフローに落とし込み、再現性あるチェックパターンを定義します。

終章:再現性あるチェックパターンで1日を取り戻す

開発を進めるうえで最も大切なのは、**「同じ失敗を二度と繰り返さない」**ことです。そこで私は、今日のトラブルシュートを再現可能なチェックリストとテストサイクルに落とし込みました。

5.1 障害パターン&事前チェックリスト

障害パターン事前チェック
サブスクライブ形式ミス小セグメントで ACK & メッセージ件数確認
0Bファイル/壊れ gzipfind … -size 0c -delete 実行
秒単位ズレ(join が外れる)--join-mode left でズレ件数確認
空の Funding CSV--skip-funding テスト → 本番ファイル
Fill=0 で即終了--allow-zero-fill テスト

5.2 テストサイクルの段階的ステップ

  1. 5 分スモーク
    • 短時間録画 → smoke_backtest.py で秒結合のみチェック
  2. 30 分ライトモード
    • 本家スクリプトを --skip-funding --allow-zero-fill --join-mode left で実行
  3. 1 時間本番モード
    • --join-mode inner + Funding 有効 + Fill>0 を確認
  4. 6 h → 24 h ラン
    • 同じパラメータでスケールアップ

5.3 “小→中→大”を意識する心構え

  • 小さく始める:問題の切り分けは小セグメントが最適
  • 中くらいで確認:ライトモードでロジック全体を走査
  • 大きく回す:本番データで精度と性能を最終検証

以上、バックテスト・パイプライン構築の5つの教訓を、実体験ベースでお届けしました。
小さなスモークテストからステップアップし、ライトモード → 本番モードへとフェーズを分けることで、開発効率と信頼性を両立できます。

明日からは、このチェックリストを基に「小→中→大」のサイクルを回し、無駄に1日を費やすことなく着実に前進しましょう!
みなさんの開発ライフにお役立ていただければ幸いです。
ご覧いただき、ありがとうございました。

おまけ:バックテスト用スクリプト(暫定版)

#!/usr/bin/env python
"""
backtester.py – Bybit BTCUSDT Perp 対応(リファクタ済みライトモード付き)

- ロギング、関数化、定数導入、明示的警告を追加
- 24hフルデータでも数分スモークでも同じコードで動作
"""
from __future__ import annotations
import argparse
import glob
import logging
import os
from pathlib import Path

import numpy as np
import polars as pl

# ───────── Constants ─────────
FUNDING_INTERVAL_SEC = 8 * 60 * 60  # 8h in seconds

# ───────── Logging Setup ─────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)-8s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)

# ───────── Argument Parsing ─────────

def parse_args() -> argparse.Namespace:
    cli = argparse.ArgumentParser(
        description="Bybit BTCUSDT Perp バックテスター(ライトモード付き)"
    )
    cli.add_argument("yyyymmdd", help="対象日 (UTC) 例: 20250613")
    cli.add_argument("--ohlcv",   required=True, help="kline_1s Parquet pattern")
    cli.add_argument("--depth",   required=True, help="depth_1s Parquet pattern")
    cli.add_argument(
        "--funding", required=True,
        help="Funding CSV pattern (timestamp,fr) 8h cycle"
    )
    cli.add_argument("--spread",      type=float, default=0.10,
                     help="実効スプレッド閾値 (USDT)")
    cli.add_argument("--lot-btc",     type=float, default=0.005,
                     help="ロットサイズ (BTC)")
    cli.add_argument("--maker-fee",   type=float, default=-0.00020,
                     help="Maker fee 例:-0.00020 (= -0.02%)")
    cli.add_argument("--taker-fee",   type=float, default= 0.00055,
                     help="Taker fee 例: 0.00055 (= 0.055%)")
    cli.add_argument(
        "--fill-prob-mode", choices=["instant", "depth"], default="depth",
        help="Maker fill モデル instant|depth"
    )
    # ライトモード オプション
    cli.add_argument(
        "--skip-funding", action="store_true",
        help="Funding コスト計算をスキップ"
    )
    cli.add_argument(
        "--allow-zero-fill", action="store_true",
        help="Fill=0 でもエラーにしない"
    )
    cli.add_argument(
        "--join-mode", choices=["inner", "left"], default="inner",
        help="OHLCV と Depth の結合方法"
    )
    cli.add_argument(
        "--seed", type=int,
        help="乱数シードを固定 (depth モード再現性用)"
    )
    return cli.parse_args()

# ───────── Helpers ─────────

def resolve_pattern(pattern: str) -> str:
    """Glob でファイルを探し、空でない最大サイズファイルを返す"""
    files = [p for p in glob.glob(pattern) if os.path.getsize(p) > 0]
    if not files:
        logger.error("No non-empty file found for pattern: %s", pattern)
        raise FileNotFoundError(f"No non-empty file for pattern: {pattern}")
    chosen = max(files, key=os.path.getsize)
    logger.debug("Resolved %s -> %s", pattern, chosen)
    return chosen

# ───────── Data Loaders ─────────

def load_ohlcv(path: str) -> pl.DataFrame:
    logger.info("Loading OHLCV from %s", path)
    df = pl.read_parquet(path)
    ts_col = "__index_level_0__" if "__index_level_0__" in df.columns else "ts"
    if df[ts_col].dtype == pl.Datetime:
        sec = df[ts_col].dt.timestamp()
    else:
        div = 1_000_000_000 if df[ts_col].max() > 1e14 else 1_000
        sec = df[ts_col] // div
    return df.with_columns(sec.cast(pl.Int64).alias("sec"))


def load_depth(path: str) -> pl.DataFrame:
    logger.info("Loading Depth from %s", path)
    df = pl.read_parquet(path)
    if df["ts"].dtype == pl.Datetime:
        d_ts = df["ts"].dt.timestamp()
    else:
        div = 1_000_000_000 if df["ts"].max() > 1e14 else 1_000
        d_ts = df["ts"] // div
    df = df.with_columns(d_ts.cast(pl.Int64).alias("sec"))
    return df.select(["sec", "bid_px", "bid_sz", "ask_px", "ask_sz"])


def load_funding(path: str) -> pl.DataFrame:
    logger.info("Loading Funding from %s", path)
    df = pl.read_csv(path)
    if df.height == 0:
        logger.warning("Funding CSV is empty: %s", path)
        return pl.DataFrame({"sec": [], "fr": []})
    if df["timestamp"].dtype == pl.Datetime:
        sec = df["timestamp"].dt.timestamp()
    else:
        div = 1_000_000_000 if df["timestamp"].max() > 1e14 else 1_000
        sec = df["timestamp"] // div
    fund = df.with_columns(sec.cast(pl.Int64).alias("sec")).select(["sec", "fr"]).sort("sec")
    # 前方埋め
    full = pl.DataFrame({"sec": pl.Series(range(int(fund["sec"].min()), int(fund["sec"].max() + FUNDING_INTERVAL_SEC)))})
    return full.join(fund, on="sec", how="left").fill_null(strategy="forward")

# ───────── Merge & Calculate ─────────

def merge_data(
    ohlcv: pl.DataFrame,
    depth: pl.DataFrame,
    fund: pl.DataFrame,
    join_mode: str
) -> pl.DataFrame:
    df = (
        ohlcv.join(depth.groupby("sec").agg([
            pl.col("bid_px").first().alias("best_bid"),
            pl.col("ask_px").first().alias("best_ask"),
            pl.col("bid_sz").first().alias("bid_qty"),
            pl.col("ask_sz").first().alias("ask_qty"),
        ]), on="sec", how=join_mode)
        .join(fund, on="sec", how="left")
        .drop_nulls(["best_bid", "best_ask"])
        .with_columns(pl.col("fr").fill_null(0.0))
        .sort("sec")
    )
    if df.height == 0:
        logger.error("No overlapping seconds after merge.")
        raise SystemExit("❌ No overlapping seconds. Check inputs.")
    return df


def calculate_pnl(
    df: pl.DataFrame,
    spread: float,
    lot_btc: float,
    maker_fee: float,
    taker_fee: float,
    fill_mode: str,
    allow_zero: bool
) -> dict[str, float]:
    total_sec = df.height
    mid = (df["best_bid"] + df["best_ask"]) / 2
    fee_w = mid * (taker_fee - maker_fee)
    df = df.with_columns([
        mid.alias("mid_px"),
        (df["best_ask"] - df["best_bid"] - fee_w).alias("eff_spread"),
        pl.when(pl.col("eff_spread") >= spread)
          .then(pl.col("best_bid") + (pl.col("eff_spread") + fee_w)/2)
          .otherwise(None).alias("quote_px"),
        pl.col("best_ask").shift(-1).alias("exit_px_next"),
        pl.col("bid_qty").alias("liq_qty_now"),
    ])
    if fill_mode == "instant":
        mask = (df["quote_px"].is_not_null() & (df["exit_px_next"] >= df["quote_px"]))
    else:
        prob = (df["liq_qty_now"] / lot_btc).clip_max(1.0)
        rand = pl.Series(np.random.random(total_sec))
        mask = (df["quote_px"].is_not_null() &
                (df["exit_px_next"] >= df["quote_px"]) &
                (rand < prob))
    df = df.with_columns(mask.alias("maker_fill"))
    fills = df.filter(pl.col("maker_fill"))

    n = fills.height
    if n == 0:
        msg = "⚠ No fills generated."
        if allow_zero:
            logger.warning(msg)
        else:
            logger.error(msg)
            raise SystemExit(msg + " Adjust spread or lot size.")

    edge = ((fills["exit_px_next"] - fills["quote_px"]) * lot_btc).sum() if n > 0 else 0.0
    notional = (fills["quote_px"].mean() or 0.0) * lot_btc
    fee_m = max(maker_fee, 0) * notional * n
    rebate = max(-maker_fee, 0) * notional * n
    fee_t = taker_fee * notional * n
    fund_cost = (fills["mid_px"] * lot_btc * fills["fr"] / FUNDING_INTERVAL_SEC).sum() if n > 0 else 0.0
    total_fee = fee_m + fee_t - rebate
    net = edge - total_fee - fund_cost
    missing = 100 * (1 - total_sec / (24 * 3600))

    return {
        "seconds": total_sec,
        "fills": n,
        "fill_ratio": n / total_sec * 100,
        "gross_edge": edge,
        "funding_cost": fund_cost,
        "maker_fee": fee_m,
        "maker_rebate": rebate,
        "taker_fee": fee_t,
        "total_fee": total_fee,
        "net_pnl": net,
        "missing_pct": missing,
    }

# ───────── Main ─────────

def main() -> None:
    args = parse_args()
    if args.seed is not None:
        np.random.seed(args.seed)

    ohlcv_path = resolve_pattern(args.ohlcv)
    depth_path = resolve_pattern(args.depth)
    funding_path = None if args.skip_funding else resolve_pattern(args.funding)

    ohlcv = load_ohlcv(ohlcv_path)
    depth = load_depth(depth_path)
    fund = load_funding(funding_path) if funding_path else pl.DataFrame({"sec": ohlcv["sec"], "fr": 0.0})

    df = merge_data(ohlcv, depth, fund, args.join_mode)
    stats = calculate_pnl(
        df,
        spread=args.spread,
        lot_btc=args.lot_btc,
        maker_fee=args.maker_fee,
        taker_fee=args.taker_fee,
        fill_mode=args.fill_prob_mode,
        allow_zero=args.allow_zero_fill,
    )

    # レポート表示
    logger.info("===== Backtest Report =====")
    for k, v in stats.items():
        logger.info("%s: %s", k, v)


if __name__ == "__main__":
    main()

軽くテストしたい時用

#!/usr/bin/env python
"""
smoke_backtest.py
― とりあえずデータ結合と行数チェックだけをやる超シンプル版
"""
import glob, os
import polars as pl

# 1) ファイルを1本ずつ取ってくる
ohlcv = sorted([p for p in glob.glob("data/ohlcv/1s/kline_*.parquet") if os.path.getsize(p)>0])[-1]
depth = sorted([p for p in glob.glob("data/depth/depth_orderbook_*.parquet") if os.path.getsize(p)>0])[-1]

# 2) 読み込んで秒列だけ整形
df_o = pl.read_parquet(ohlcv).with_columns((pl.col("__index_level_0__")//1e3).cast(pl.Int64).alias("sec"))
df_d = pl.read_parquet(depth).with_columns((pl.col("ts")//1e3).cast(pl.Int64).alias("sec"))

# 3) inner join して重なり秒数を確認
joined = df_o.join(df_d, on="sec", how="inner")
print(f"OK: OHLCV rows={df_o.height}, DEPTH rows={df_d.height}, JOINED rows={joined.height}")

-Bot, プログラミングスキル, 環境構築・インフラ, 開発ログ