Bot CEX DEX 戦略ログ 開発ログ

🛠️開発記録#274(2025/8/8)実運用で生き残るCEX/DEXアービトラージ監視――250msと1秒の壁を越える設計

裁定(アービトラージ)の監視は「価格を取って引き算する」だけでは機能しません。実運用で勝負を決めるのは“ミリ秒”と“約定可能性”です。
本記事では、CEX/DEXアービトラージ監視を≤250msの時刻差
と**≤1sの監視サイクル**で回すための設計指針を、実装の勘所と一緒にまとめます。
結論から言うと、集約APIの平均価格を捨てて、Executable Price(実行可能価格)に全てを寄せるのが正解です。

何がダメだったのか:集約APIの平均値 vs 実行可能価格

「まずはCoinGeckoで価格を…」などの実装は、この手の監視プロダクションでは地雷です。

  • 平均値は実行できない
    集約APIは複数市場の平均/中間を返しますが、あなたがその価格で今この瞬間に約定できる保証はない。板の厚さも手数料もスリッページも反映されません。
  • 遅延が平気で数百ms〜数秒
    集約はどうしても更新周期が長い。CEX板(WS)とDEX見積(quote)を同時刻で比較しないと、幻のスプレッドを量産します。
  • 通貨建てがバラバラ
    CEXはUSDT建て、DEXはUSDC建て——1.0000と思い込みで比較すると平気で数十bpの誤差が出ます。
  • サイズ依存性を無視
    「0サイズのmid」での比較は、実トレードの意思決定と無関係。指定ノーション($1k/$5k/$20k…)での実行可能価格を比較する必要があります。

要するに、**「見かけの価格」ではなく「その場で実際に約定できる価格」**に全てを合わせる設計が必要です。


本記事のゴール:≤250ms時刻差 / ≤1sサイクルの監視

達成したいSLO(サービス目標)はシンプルです。

  • 比較時刻差(DEX vs CEX):p95 ≤ 250ms
    これを超える比較は棄却(uncomparable)。ズレた比較は誤検知の源泉です。
  • 監視サイクル時間:p95 ≤ 1s
    HTTPポーリングでダラダラ取らない。CEXはWSのL2板DEXは見積API(例:Jupiter /quote)TTLキャッシュ(≤300ms)で並列取得が基本。
  • 幻スプレッド撲滅
    価格はノーション指定実効単価に変換し、手数料・滑り・優先手数料(tip)まで含んだコスト後スプレッドで評価します。

原則(結論)

価格は“約定できるか”で定義する(Executable Price)

  • DEX:ルーティング込み見積(/quote)で手数料・価格インパクト込み受取量を取得し、実効単価(投入額 ÷ 受取量)へ変換。
  • CEXL2板からVWAP(ノーション別)を算出し、自口座の手数料まで含めて実効単価化。
  • 比較は常に「実行可能価格 vs 実行可能価格」。集約平均・0サイズmidは参考値に降格。

ノーション指定・通貨建て統一(USDC/USDT FX)・時刻差ゲート

  • ノーション指定N ∈ {1k, 5k, 20k, ...}買い/売りを両方向計測。サイズ依存の滑りを無視しない。
  • 通貨建て統一:CEX(USDT建て)⇄DEX(USDC建て)を常時換算USDC/USDTの実測FXをWS板で取り、**乖離(bps)**をメトリクス化。
  • 時刻差ゲート|ts_dex - ts_cex| ≤ 250ms を満たさない比較は棄却。鮮度の合わない比較はしない。

静的閾値は禁止:コストモデル+マージンでトリガ

「常に1%超えたらシグナル」は雑です。銘柄・ノーション・市況でコストは変わるから。

  • コストモデル(bp)
    cost_bp = dex_fee_bp + cex_fee_bp + est_slip_bp(N) + priority_tip_bp + infra_bp
  • dex_fee_bp:DEX手数料の合計(ルート依存)
  • cex_fee_bp:自口座のメイカー/テイカー手数料(VIP/リベート反映)
  • est_slip_bp(N):ノーションNでの滑りの経験則(p95)
  • priority_tip_bp:優先手数料・tipの実績(Solana等)
  • infra_bp:失敗・再送・撤退コストの移動平均
  • トリガ条件
    exec_spread_bp > cost_bp + margin_bp(symbol, N, regime)
    • exec_spread_bp:実行可能スプレッド(bp)
    • margin_bp:銘柄/ノーション/相場レジーム別の上乗せ(初期は5–15bp程度)
  • 運用の型
    • 比較できないときは棄却(stale/ts_skew/liquidity)
    • コスト負け(exec_spread_bp <= cost_bp)が続けば自動で停止(サーキットブレーカ)

この原則に従うだけで、「数字は出ているのに全然取れない」監視から卒業できます。次章以降では、WS板×TTLキャッシュ×並列取得で**≤250ms / ≤1s**を実現するための具体アーキテクチャと実装の勘所を紹介します。

アーキテクチャ概観

本設計は「DEXは見積APIで実効価格」「CEXはWS板からVWAP」「USDC/USDTを実測で換算」の三本柱に、I/O層の速度分離を重ねたものです。以下は全体の流れ。

sequenceDiagram
  autonumber
  participant CEX as CEX WS(L2)
  participant FX as USDC/USDT 板
  participant DEX as Jupiter /quote
  participant ARB as Arb Monitor

  par 並列取得
    CEX->>ARB: VWAP(N)(USDT建て)
    DEX->>ARB: /quote(outAmount)(USDC建て)
  and
    FX->>ARB: fx 実測(USDC/USDT)+ fx_stable_bp ログ
  end

  ARB->>ARB: ts_diff 測定(≤250msか?)
  alt 超過
    ARB-->>ARB: 比較棄却(uncomparable: ts_skew)
  else 以内
    ARB->>ARB: USDT→USDC 換算(fx適用)
    ARB->>ARB: spread_exec_bp 計算
    ARB->>ARB: cost_bp = dex_fee + cex_fee + est_slip(N) + tip + infra
    ARB->>ARB: edge_bp = spread - cost
    alt edge_bp > margin_bp
      ARB-->>ARB: Trigger 発火
    else
      ARB-->>ARB: Skip
    end
  end

DEX:Jupiter /quote→/swap(同一ルート・minOut・TTL)

目的:手数料・価格インパクトを含む“実行可能価格”をミリ秒精度で取得。
やること

  • /quote で見積
    • 入力は整数 amount(mint decimals 適用後)slippageBps を明示。
    • 300ms TTLのメモリキャッシュで同一キーの多重呼び出しをデドゥープ
    • キー例:(inputMint, outputMint, amount_int, slippageBps, onlyDirect)
  • 同一ルート厳守で /swap(将来の実行段)
    • /quote の route_id / routePlan をそのまま /swap に渡す。
    • minOutoutAmount × (1 - slippageBps/1e4 - safety_bps/1e4) で計算して埋める。
    • TTLblockhash/lastValidBlockHeight による内部有効期限“quote→submit ≤ 250ms” の運用SLO。
  • ログに残す最小項目
    route_id, pool_ids, total_fee_bp, inAmount, outAmount, minOut, quote_latency_ms, quote_ttl_ms

ポイント:集約サイトの平均値は使わない。Jupiter見積の受取量(outAmount)→実効単価に変換して比較する。


CEX:Bybit v5 WebSocket L2 → ノーション別VWAP

目的HTTPポーリングを捨てて、常時同期された板からその場で約定できる単価を出す。

  • v5 Public WS に接続(wss://stream.bybit.com/v5/public/{spot|linear}
    • orderbook.50.SYMBOL を購読し、L2を常駐保持(辞書でOK)。
    • スナップショット→デルタを反映。ping/再接続はバックオフ。
  • ノーション別VWAP
    • 買い:最良売りから**指定ノーション($1k/$5k/$20k…)**分だけ積み上げ、平均約定単価を算出。
    • 売り:最良買いから同様に。
    • 自口座の手数料(maker/taker)はbpで上乗せ/控除して“実効単価”へ。
  • ログ
    vwap_price, filled_base, levels_used, l2_age_ms, compute_ms

ポイント:0サイズmidではなくVWAP指定ノーションで“実行できる価格”を取りに行く。


FX:USDC/USDT を板経由で実測 → fx_stable_bp を常時計測

目的:CEXはUSDT建て、DEXはUSDC建て——1.0000前提はNG。安定通貨の微妙なズレbp単位で効いてくる。

  • USDCUSDT(spot)の板を同じく WS で購読
    • 小ノーション(例:$10k)でVWAPを取り、USDT→USDC のレートを実測
    • 変換は CEX価格(USDT建て) × fx(USDC/USDT)USDC建てに統一。
  • fx_stable_bp をメトリクス化
    • fx_stable_bp = |fx - 1.0| × 1e4
    • 閾値超え時(例:>10–15bp)は比較棄却 or マージン上乗せなどの対策を自動適用。

ポイント:通貨建ての統一は必須。微差でも裁定の可否が変わる。


I/O:FastHTTP(1s/2s/1retry) と SlowHTTP の分離

目的裁定パスを遅い呼び出しから隔離し、p95 ≤ 1sを保証する。

  • FastHTTP(裁定用)
    • connect ≤ 1s / read ≤ 2s / retry = 1(指数バックオフ+ジッター)
    • 用途:Jupiter /quote、必要最小のメタAPI
    • 失敗は即棄却(比較不能として扱う)——“遅い成功”より“速い不成立”
  • SlowHTTP(観測用)
    • TVL、ステーブルフロー、バックフィル等は別クライアントで 10–30秒級のタイムアウト・多段リトライOK。
    • 名前(DI)で明示的に注入して混入を防止する。
  • 並列取得 & 時刻差ゲート
    • DEX見積(FastHTTP)と CEX VWAP(WS→即時計算)を同時に走らせ、取得完了の差分 ≤ 250msのみ採用。
    • 超過した比較は**uncomparable(reason=ts_skew)**でカウント・可視化。

ポイント:**速さを“設計として担保”**する。遅い処理を別のレーンに追いやるだけで、SLOは一気に安定する。


これで満たせるSLO

  • 比較時刻差:p95 ≤ 250ms(DEX/CEX同時取得+WS常時維持+TTLキャッシュ)
  • サイクル時間:p95 ≤ 1s(FastHTTPと並列化で待ち時間を圧縮)
  • 誤検知の激減ノーション別VWAP × 実測FX × コストモデルにより**“見かけのスプレッド”**を排除

次章では、このアーキテクチャをコード断片メトリクス設計に落とし込み、実際に250ms/1sの壁を超える手順を解説します。

実装A:Bybit v5 WS→VWAP

orderbook.50.SYMBOLを常時購読/L2を辞書で保持し、VWAP時のみ上位Nソート/買い・売りのノーション別VWAP

このセクションでは、HTTPポーリングを捨てて Bybit v5 のPublic WebSocketからL2板を常時同期し、指定ノーションでの実行可能VWAPを数百ミリ秒で返す最小実装を示します。方針はシンプル:

  • WSで orderbook.50.SYMBOL を購読
  • L2は辞書で常駐保持(更新はO(1))
  • VWAP計算時だけ上位Nをソート(必要時のみ O(K log K))
  • 指定ノーション(クォート建て)でのVWAPを返す

1) L2常駐:スナップショット+デルタ

  • スナップショットで全置換 → デルタで増減・削除(size=0で削除)
  • bids: Dict[price,float] / asks: Dict[price,float](どちらも base数量 を保持)
  • ハートビート&再接続は最低限でOK(指数バックオフ)
# arbbot/cex/bybit_ws_vwap.py(抜粋)
import aiohttp, asyncio, time, json
from typing import Dict, List, Optional, Tuple, Literal

Side = Literal["buy","sell"]

class Orderbook:
    def __init__(self):
        self.bids: Dict[float, float] = {}  # price → size(base)
        self.asks: Dict[float, float] = {}
        self.ts = 0.0  # last update time (sec)

    def apply_snapshot(self, bids: List[List[str]], asks: List[List[str]]):
        self.bids.clear(); self.asks.clear()
        for p, q in bids: self.bids[float(p)] = float(q)
        for p, q in asks: self.asks[float(p)] = float(q)
        self.ts = time.time()

    def apply_delta(self, bids: List[List[str]], asks: List[List[str]]):
        for p, q in bids:
            price, size = float(p), float(q)
            if size <= 0: self.bids.pop(price, None)
            else:         self.bids[price] = size
        for p, q in asks:
            price, size = float(p), float(q)
            if size <= 0: self.asks.pop(price, None)
            else:         self.asks[price] = size
        self.ts = time.time()

2) ノーション別VWAP:買いはASK、売りはBID

買い(USDT/USDCを支払ってベースを得る)

  • ASK側安い順に積み上げ、**指定ノーション(クォート建て)**を使い切るまで取得
  • 有効単価 = 支払ったクォート合計 / 取得できたベース合計

売り(ベースを売ってクォートを得る)

  • BID側高い順に積み上げ、同様に計算
class Orderbook:
    # ...前掲...

    def vwap(self, side: Side, notional_quote: float) -> Optional[Tuple[float, float]]:
        if notional_quote <= 0: return None
        if side == "buy":
            if not self.asks: return None
            ladder = sorted(self.asks.items(), key=lambda kv: kv[0])  # ask: price↑
            remain_q = notional_quote; got_base = spent_q = 0.0
            for price, size_base in ladder:
                cap_q = price * size_base
                if remain_q <= 0: break
                if remain_q >= cap_q:
                    got_base += size_base; spent_q += cap_q; remain_q -= cap_q
                else:
                    take_base = remain_q / price
                    got_base += take_base; spent_q += remain_q; remain_q = 0.0; break
            if got_base == 0: return None
            return (spent_q / got_base, got_base)  # (有効単価=クォート/ベース, 取得ベース量)
        else:
            if not self.bids: return None
            ladder = sorted(self.bids.items(), key=lambda kv: kv[0], reverse=True)  # bid: price↓
            remain_q = notional_quote; sold_base = recv_q = 0.0
            for price, size_base in ladder:
                cap_q = price * size_base
                if remain_q <= 0: break
                if remain_q >= cap_q:
                    sold_base += size_base; recv_q += cap_q; remain_q -= cap_q
                else:
                    part_base = remain_q / price
                    sold_base += part_base; recv_q += remain_q; remain_q = 0.0; break
            if sold_base == 0: return None
            return (recv_q / sold_base, sold_base)

ここではクォート建てのノーション(例:$1k, $5k, $20k)を入力に統一しています。板がUSDT建てならnotional_quoteはUSDT額、USDC建てならUSDC額。あとでUSDT→USDC換算を掛けて比較します。


3) WebSocket接続:購読・心拍・再接続

class BybitWSVWAP:
    PING_INTERVAL = 15

    def __init__(self, ws_url: str, topic: str, session: aiohttp.ClientSession | None = None):
        self.ws_url = ws_url; self.topic = topic
        self.session = session; self._ext = session is not None
        self.ob = Orderbook()

    async def __aenter__(self):
        if not self._ext: self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, *exc):
        if not self._ext and self.session: await self.session.close()

    async def run(self, stop: asyncio.Event):
        backoff = 1.0
        while not stop.is_set():
            try:
                await self._loop(stop); backoff = 1.0
            except Exception:
                await asyncio.sleep(min(backoff, 10.0)); backoff *= 1.7

    async def _loop(self, stop: asyncio.Event):
        assert self.session
        async with self.session.ws_connect(self.ws_url, heartbeat=self.PING_INTERVAL) as ws:
            await ws.send_json({"op": "subscribe", "args": [self.topic]})
            async for msg in ws:
                if stop.is_set(): break
                if msg.type.name == "TEXT":
                    payload = msg.json()
                    t = payload.get("type"); d = payload.get("data", {})
                    if t == "snapshot": self.ob.apply_snapshot(d.get("b", []), d.get("a", []))
                    elif t == "delta":  self.ob.apply_delta   (d.get("b", []), d.get("a", []))
                elif msg.type.name == "ERROR":
                    raise RuntimeError("WS error")

    def vwap(self, side: Side, notional_quote: float):
        return self.ob.vwap(side, notional_quote)

4) ステール検知と品質ゲート

VWAPを返す前に次をチェックすると誤検知が激減します。

  • L2鮮度now - orderbook.ts ≤ 500ms(超過ならNone
  • 深さ不足:指定ノーションに届かない場合はNone
  • 時刻差ゲート:DEX見積との取得完了差 ≤ 250ms(超過は比較棄却)
def vwap_safe(client: BybitWSVWAP, side: Side, notional_quote: float, max_age_ms: int = 500):
    if (time.time() - client.ob.ts) * 1000 > max_age_ms:
        return None
    return client.vwap(side, notional_quote)

5) よくある落とし穴(回避策つき)

  • 線形/無期限(linear)とスポットの混同
    先物は契約サイズ資金調達が絡む。まずはspotでVWAP、perpはcontract→base換算を入れてから。
  • 数量の単位
    Bybitの板数量はbase数量。VWAPのノーションはクォート額。混ぜない。
  • 片側Onlyでの手数料
    監視段階でもテイカーfeeをbpで上乗せ/控除して“実効単価”へ。メイカー運用は例外(遅延リスク大)。
  • ソート最適化の過剰実装
    まずは「辞書保持+必要時ソート」で十分速い。最適化はp95>1sになってからでOK。

6) スモーク(30秒)

# 例:SOLUSDT(spot)を購読して $5k の買いVWAPを1秒おきに出す
async def smoke():
    stop = asyncio.Event()
    async with BybitWSVWAP(
        ws_url="wss://stream.bybit.com/v5/public/spot",
        topic="orderbook.50.SOLUSDT"
    ) as c:
        task = asyncio.create_task(c.run(stop))
        await asyncio.sleep(1.0)  # 少し溜める
        for _ in range(30):
            print("BUY 5k:", c.vwap("buy", 5_000))
            await asyncio.sleep(1.0)
        stop.set(); await task

このWS→VWAPの最小実装をCEX側の標準にすると、HTTPポーリング由来の秒遅延が消え、DEX見積(Jupiter /quote)との同時比較(≤250ms)が現実になります。次章ではJupiter /quote TTLキャッシュとの合わせ技で1秒サイクルを安定させるポイントを解説します。

実装B:Jupiter /quote TTLキャッシュ

キー:(inputMint, outputMint, amount_int, slippageBps, direct)/TTL≦300msでデドゥープ/outAmountは整数(decimals適用済)・minOut計算フック

DEX側はJupiterの /quoteを“真実の窓口”にします。ただし生の呼び出しを連打するとレイテンシ悪化&レート制限の温床。そこで極小TTLキャッシュ(≤300ms)で同一クエリをデドゥープし、見積の揺れを抑えつつミリ秒級で返します。


設計ポイント

  • キー(inputMint, outputMint, amount_int, slippageBps, direct)
    • amount_intmintの最小単位(整数)。小数は送らない。
    • directonlyDirectRoutes の真偽。ルート最適化の仕様差をキーに含める。
  • TTL ≦ 300ms:その間に同一キーで来た要求は同一レスポンスを返す(per-keyロックで同時実行も一発に)。
  • outAmount は整数:Jupiterはdecimals適用済みの整数で返す。実効単価に変換するときは自分で小数に戻す
  • minOut 計算フックoutAmountslippage + safety を差し引いて**/swap**へ渡す値を出す(将来の実行段にもそのまま使える)。

最小実装(そのまま使える)

# arbbot/dex/jupiter_quote_cache.py
from __future__ import annotations
import asyncio, time
from dataclasses import dataclass
from typing import Optional, Dict
import aiohttp

JUP_QUOTE_URL = "https://quote-api.jup.ag/v6/quote"

@dataclass(frozen=True)
class QuoteKey:
    input_mint: str
    output_mint: str
    amount_int: int        # 最小単位(整数)
    slippage_bps: int
    only_direct: bool

@dataclass
class CachedQuote:
    ts_ms: float
    data: dict

class JupiterQuoteCache:
    """TTL≦300msの極小キャッシュ。per-keyロックで同時呼び出しをデドゥープ。"""
    def __init__(self, session: Optional[aiohttp.ClientSession] = None, ttl_ms: int = 300):
        self.ttl_ms = ttl_ms
        self.session = session
        self._external_session = session is not None
        self._cache: Dict[QuoteKey, CachedQuote] = {}
        self._locks: Dict[QuoteKey, asyncio.Lock] = {}
        self._locks_guard = asyncio.Lock()

    async def __aenter__(self):
        if not self._external_session:
            # FastHTTP 相当:短いタイムアウトで固める
            timeout = aiohttp.ClientTimeout(total=3.0, connect=1.0, sock_read=2.0)
            self.session = aiohttp.ClientSession(timeout=timeout)
        return self

    async def __aexit__(self, *exc):
        if not self._external_session and self.session:
            await self.session.close()

    async def get_quote(self, key: QuoteKey) -> dict:
        now = time.time() * 1000
        hit = self._cache.get(key)
        if hit and (now - hit.ts_ms) <= self.ttl_ms:
            return hit.data

        # per-key ロックで同一キーのスパムを1発にまとめる
        lock = await self._get_lock(key)
        async with lock:
            now = time.time() * 1000
            hit = self._cache.get(key)
            if hit and (now - hit.ts_ms) <= self.ttl_ms:
                return hit.data

            assert self.session is not None
            params = {
                "inputMint": key.input_mint,
                "outputMint": key.output_mint,
                "amount": str(key.amount_int),            # 整数
                "slippageBps": str(key.slippage_bps),
                "onlyDirectRoutes": "true" if key.only_direct else "false",
            }
            async with self.session.get(JUP_QUOTE_URL, params=params) as r:
                r.raise_for_status()
                data = await r.json()

            self._cache[key] = CachedQuote(ts_ms=time.time()*1000, data=data)
            return data

    async def _get_lock(self, key: QuoteKey) -> asyncio.Lock:
        async with self._locks_guard:
            if key not in self._locks:
                self._locks[key] = asyncio.Lock()
            return self._locks[key]

decimalsヘルパと minOut フック

# arbbot/dex/jupiter_utils.py
def to_amount_int(amount_units: float, decimals: int) -> int:
    """人間系の数量(小数)→ mint最小単位(整数)"""
    return int(round(amount_units * (10 ** decimals)))

def from_amount_int(amount_int: int, decimals: int) -> float:
    """mint最小単位(整数)→ 人間系の数量(小数)"""
    return amount_int / (10 ** decimals)

def calc_min_out(out_amount_int: int, slippage_bps: int, safety_bps: int = 5) -> int:
    """
    JupiterのoutAmount(整数)から /swap に渡す minOut(整数) を算出。
    slippageBps に更に safety を上乗せして保守的に。
    """
    total_bps = slippage_bps + safety_bps
    # 小数点を使わず整数演算で安全に
    return max(0, (out_amount_int * (10_000 - total_bps)) // 10_000)

使い方(USDC→SOLの“$5k買い”を見積+実効単価)

# arbbot/dex/example_quote.py
import asyncio
from arbbot.dex.jupiter_quote_cache import JupiterQuoteCache, QuoteKey
from arbbot.dex.jupiter_utils import to_amount_int, from_amount_int, calc_min_out

USDC_MINT = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"  # 6
SOL_MINT  = "So11111111111111111111111111111111111111112"  # 9

async def main():
    async with JupiterQuoteCache(ttl_ms=300) as cache:
        key = QuoteKey(
            input_mint=USDC_MINT,
            output_mint=SOL_MINT,
            amount_int=to_amount_int(5_000.0, 6),  # 5,000 USDC を整数化
            slippage_bps=30,
            only_direct=False
        )
        q = await cache.get_quote(key)

        # outAmount(整数, decimals適用済) → SOL数量(小数)
        out_int = int(q["outAmount"])
        out_sol = from_amount_int(out_int, 9)

        # 実効単価(USDC/SOL) = 投入USDC / 受取SOL
        effective_px = 5_000.0 / out_sol

        # /swap 用の minOut(安全寄り)
        min_out_int = calc_min_out(out_int, slippage_bps=30, safety_bps=5)

        print({
            "route": q.get("routePlan", [])[:1],
            "out_sol": out_sol,
            "effective_px_usdc_per_sol": effective_px,
            "min_out_int": min_out_int
        })

# asyncio.run(main())

運用で効く“ちょい足し”Tips

  • TTLは短く(200–300ms)
    ミリ秒の壁を越えるには新鮮さ>キャッシュヒット率。ヒット率は“副産物”でOK。
  • 429/5xxは即棄却
    裁定パスは**“速い不成立”が正義**。遅い成功は比較の鮮度を壊す。必要なら別レーン(SlowHTTP)で再試行。
  • ログを残す
    quote_latency_ms, route_id, pool_ids, total_fee_bps, outAmount を必ず出し、異常時にルートを辿れるように。
  • キーの揺れを抑える
    amount_int が毎回微妙に違うとキャッシュが効かない。ノーションは刻み(例:$100刻み)で丸めても良い。

よくある落とし穴

  • amountを小数で送る400 or 予期せぬ丸め必ず整数に。
  • decimals取り違えoutAmount のスケールが合わず単価が化ける。mintのdecimalsをトークンレジストリで固定
  • slippageとminOutの二重適用ミス → /quoteのslippageBpsに加えてsafety_bpsを上乗せする形が安全。

このTTLキャッシュにより、/quoteのデドゥープレイテンシ安定化が実現します。CEXのWS-VWAP並列取得すれば、時刻差≤250ms・サイクル≤1sのSLOを現実的に満たせます。次は、高速比較パスでの同時取得→通貨統一→スプレッド判定の実装を仕上げます。

実装C:高速比較パス

asyncio.gatherでDEX/CEX同時取得/時刻差ゲート≤250ms・staleは棄却/USDT→USDC換算&fx_stable_bpログ/コストモデル加味で判定

目的ミリ秒級で“実行可能価格どうし”を比較し、コスト後でも勝てる時だけシグナルを出す。
ここでは DEX(Jupiter /quote)とCEX(Bybit WS→VWAP)を同時並行で取り、時刻差≤250msを満たした比較のみ採用、USDT↔USDC換算コストモデルまで入れて判定する“高速比較パス”を実装します。


全体フロー(1回の比較)

  1. 同時取得
    • DEX見積 /quote(USDC→トークン、ノーション=USDC)
    • CEX VWAP(Bybit WS L2、ノーション=USDT)
    • USDC/USDTのFX(WSでVWAP)
  2. ゲート処理
    • 時刻差 ts_diff_ms ≤ 250 でない → 棄却
    • 鮮度(L2 age ≤ 500ms、quote TTL ≤ 300ms)を満たさない → 棄却
    • 流動性不足(ノーション吸えない) → 棄却
  3. 通貨建て統一
    • cex_px_usdc = cex_px_usdt × fx(USDC/USDT)
    • fx_stable_bp = |fx - 1.0| × 1e4 をログ
  4. スプレッド→コスト後判定
    • exec_spread_bp = (cex_px_usdc - dex_px_usdc)/mid × 1e4(DEX買い→CEX売り 方向の例)
    • cost_bp = dex_fee + cex_fee + est_slip(N) + tip + infra
    • トリガexec_spread_bp > cost_bp + margin_bp

コード:最小“高速比較”関数

ここでは前章までのコンポーネント(BybitWSManager, JupiterQuoteCache, JupiterAdapter, StableFX)を利用します。

# arbbot/arb/fast_compare.py
from __future__ import annotations
import asyncio, time
from dataclasses import dataclass
from typing import Optional, Dict
from arbbot.cex.bybit_ws_manager import BybitWSManager
from arbbot.dex.jupiter_quote_cache import JupiterQuoteCache
from arbbot.dex.jupiter_adapter import JupiterAdapter
from arbbot.registry.tokens import USDC, SOL
from arbbot.dex.jupiter_utils import from_amount_int
from arbbot.util.stable_fx import StableFX

def monotonic_ms() -> float:
    return time.perf_counter() * 1e3

@dataclass
class CostParams:
    dex_fee_bp: float      # 例: 30 (0.30%)
    cex_fee_bp: float      # 例: 10 (0.10%) テイカー
    est_slip_bp: float     # 例: 10 (0.10%) Nに依存
    priority_tip_bp: float # 例: 3–8 (Solana 優先手数料・tip)
    infra_bp: float        # 例: 2–5 (失敗/再送/撤退コストの移動平均)
    margin_bp: float       # 例: 10 (安全マージン)

def cost_model_bp(p: CostParams) -> float:
    return p.dex_fee_bp + p.cex_fee_bp + p.est_slip_bp + p.priority_tip_bp + p.infra_bp

async def compare_once_fast(bybit: BybitWSManager,
                            jup_cache: JupiterQuoteCache,
                            fx: StableFX,
                            token=SOL,
                            notional_usd: float = 5_000.0,
                            cost: Optional[CostParams] = None) -> Dict:
    """
    DEX(USDC→TOKEN)買い vs CEX(spot)売りで比較(片方向の例)。
    反対方向は同様に“CEX買い vs DEX売り”を別関数で計算する。
    """
    if cost is None:
        cost = CostParams(30, 10, 10, 4, 3, 10)

    # セットアップ
    jup = JupiterAdapter(jup_cache, usdc_mint=USDC.mint,
                         token_mint=token.mint, token_decimals=token.decimals, usdc_decimals=USDC.decimals)

    t0 = monotonic_ms()
    # 1) DEX quote を非同期でキック
    dex_task = asyncio.create_task(jup.quote_exec_price("buy", notional_usd, slippage_bps=30))
    # 2) CEX VWAP と FX は同期I/O不要(WSから即時計算)
    #    とはいえ gather 形にしておくと拡張が楽
    async def _cex_side():
        vwap_sell = bybit.vwap("spot", "SOLUSDT", side="sell", notional_quote=notional_usd)  # CEXで売る単価
        return vwap_sell

    async def _fx_side():
        return fx.usdc_per_usdt()  # (fx, fx_stable_bp)

    cex_px, (fx_usdc_per_usdt, fx_bp) = await asyncio.gather(_cex_side(), _fx_side())
    dex_quote = await dex_task
    t1 = monotonic_ms()
    ts_diff = t1 - t0

    # --- ゲート: 時刻差/鮮度/流動性 ---
    if ts_diff > 250:
        return {"status":"uncomparable","reason":"ts_skew","ts_diff_ms":round(ts_diff,1)}

    if cex_px is None:
        return {"status":"uncomparable","reason":"cex_liquidity","ts_diff_ms":round(ts_diff,1)}

    if "outAmount" not in dex_quote:
        return {"status":"uncomparable","reason":"dex_quote","ts_diff_ms":round(ts_diff,1)}

    # --- 実効単価の算出 ---
    # DEX: 受取トークン量(整数→小数)→ 実効単価(USDC/BASE)
    out_base = from_amount_int(int(dex_quote["outAmount"]), token.decimals)
    if out_base <= 0:
        return {"status":"uncomparable","reason":"dex_zero_out","ts_diff_ms":round(ts_diff,1)}
    dex_px_usdc = notional_usd / out_base

    # CEX: VWAPはUSDT建て → FXでUSDC建てへ
    cex_px_usdt = cex_px[0]  # (price, filled_base)
    cex_px_usdc = cex_px_usdt * fx_usdc_per_usdt

    # --- スプレッド(DEX買い→CEX売り 方向の例) ---
    mid = (dex_px_usdc + cex_px_usdc) / 2.0
    exec_spread_bp = (cex_px_usdc - dex_px_usdc) / mid * 1e4

    # --- コスト後判定 ---
    total_cost_bp = cost_model_bp(cost)
    edge_bp = exec_spread_bp - total_cost_bp
    decision = "trigger" if edge_bp > cost.margin_bp else "skip"

    return {
        "status": "ok",
        "ts_diff_ms": round(ts_diff,1),
        "fx_stable_bp": round(fx_bp,2),
        "dex_px_usdc": dex_px_usdc,
        "cex_px_usdc": cex_px_usdc,
        "exec_spread_bp": exec_spread_bp,
        "total_cost_bp": total_cost_bp,
        "edge_bp": edge_bp,
        "decision": decision,
        "notional_usd": notional_usd
    }

片方向(DEX買い→CEX売り)の例を示しました。実運用では反対方向(CEX買い→DEX売り)も同時に計算し、有利な方のみ評価・発火します。


stale棄却・時刻差ゲートの実務メモ

  • 時刻差は**“取得完了の差”でOK(t0→await gather→t1 の差)。より厳密にやるならL2のagequote作成時刻をログに出して両者の最大差**でゲート。
  • 鮮度(age)
    • L2:now - orderbook.ts ≤ 500ms
    • quote:cache_ttl ≤ 300ms(TTLキャッシュのヒット内)
  • 棄却は失敗ではないstatus=uncomparable として理由ラベルts_skew|stale|liq|fx)をメトリクスに積む。

USDT→USDC換算と fx_stable_bp の扱い

  • 換算レートは板から実測USDCUSDT の**WS VWAP(例:$10k)**で求める。
  • fx_stable_bp を常時計測し、**閾値超え(例:>10–15bp)**時は
    • 比較棄却するか、
    • margin_bp に上乗せして保守的に判定する。

コストモデルの詰め方(最初の値→学習)

  • 初期値(例)
    • dex_fee_bp=30(0.30%)
    • cex_fee_bp=10(VIPやBNB割引を口座に合わせて)
    • est_slip_bp=10(Nの大小で分岐。実測から p95 を採用)
    • priority_tip_bp=3–8(Solanaの混雑に応じて可変)
    • infra_bp=2–5(リトライ・撤退の平均コスト)
  • 運用で実測ログ→移動平均に置き換える:
    est_slip_bp(N)priority_tip_bpinfra_bp直近X分/日の実績から更新。静的値のままはNG

ログとメトリクス(最小)

  • 比較SLOts_diff_ms(p95/p99)
  • 棄却率uncomparable_reason{ts_skew|stale|liq|fx}
  • FX品質fx_stable_bp(平均・p95)
  • スプレッドexec_spread_bptotal_cost_bpedge_bp(ヒストグラム)
  • ノーション別notional_usd ラベルで断面を分ける

スモーク(30回ループ)

# arbbot/arb/smoke_fast_compare.py
import asyncio
from arbbot.arb.fast_compare import compare_once_fast, CostParams
from arbbot.cex.bybit_ws_manager import BybitWSManager
from arbbot.dex.jupiter_quote_cache import JupiterQuoteCache
from arbbot.util.stable_fx import StableFX

async def main():
    bybit = BybitWSManager()
    await bybit.start({"SOLUSDT": "spot", "USDCUSDT": "spot"})
    async with JupiterQuoteCache(ttl_ms=300) as cache:
        fx = StableFX(bybit_mgr=bybit, market="spot", symbol="USDCUSDT")
        for _ in range(30):
            res = await compare_once_fast(bybit, cache, fx, notional_usd=5_000.0,
                                          cost=CostParams(30,10,10,4,3,10))
            print(res)
            await asyncio.sleep(0.5)
    await bybit.stop()

# asyncio.run(main())

見どころ

  • ts_diff_ms~80–200ms に収束(p95 ≤ 250ms)
  • status="ok"≥80%(初期目標)
  • edge_bp のサンプルがそこそこ出る(市況依存)

よくある落とし穴(回避策)

  • USDT=USDCだと思い込むfx_stable_bp を常に見よ。
  • サイズ不一致 → DEX/CEXとも同じノーションで評価。
  • 片方向だけで判定両方向(DEX買い→CEX売り/CEX買い→DEX売り)を回し、有利側のみ判定。
  • 静的コスト → 実測で動的更新est_slip_bppriority_tip_bp の固定は事故のもと。

この高速比較パスを心臓部に据えるだけで、“≤250ms / ≤1sサイクル”の監視が現実になります。あとは実行レイヤ(/swap+Jito送信、CEX発注)と結合し、**“見積=実行”**の契約を守れば、誤検知に振り回されない裁定運用に到達できます。

コストモデル&動的トリガ

結論:シグナルは“見かけのスプレッド”では出さない。
コストをbpで見積もり、コストを上回る“余剰bp(エッジ)”にマージンを足してトリガする。

数式(運用で使う形)

cost_bp
 = dex_fee_bp               # ルート合計のDEX手数料
 + cex_fee_bp               # 自口座のテイカー/メイカー手数料
 + est_slip_bp(N)           # 指定ノーションNの滑り(p95)
 + jito_tip_bp              # 優先手数料/チップ(Solana等)
 + infra_bp                 # 失敗・再送・撤退コストの移動平均

trigger_bp = cost_bp + margin_bp(symbol, N, regime)

exec_spread_bp
 = (cex_px_usdc - dex_px_usdc)/mid * 1e4      # 例: DEX買い→CEX売り方向

意思決定
exec_spread_bp > trigger_bp のときだけ 発火。それ以外は skip

ポイント:est_slip_bp(N)jito_tip_bp固定値NG実測から常に更新する。


銘柄別・ノーション別の初期値(例)と学習

初期パラメータ(たたき台)

  • Tier1(SOL/ETH/BTC)
    • dex_fee_bp=20〜35cex_fee_bp=6〜12est_slip_bp(5k)=6〜12jito_tip_bp=3〜8infra_bp=2〜4margin_bp=8〜12
  • Tier2(RAY/JUP/PYTH)
    • dex_fee_bp=25〜45cex_fee_bp=8〜14est_slip_bp(5k)=12〜25jito_tip_bp=5〜10infra_bp=3〜6margin_bp=12〜18
  • Tier3(BONK/WIF/POPCAT)
    • dex_fee_bp=30〜60cex_fee_bp=10〜16est_slip_bp(1k)=25〜60jito_tip_bp=6〜12infra_bp=4〜8margin_bp=18〜30

目安なので、あなたの口座手数料とルート特性に合わせて上書き。

設定イメージ(YAML/JSON)

symbols:
  SOL:
    tiers:
      "1k":  { est_slip_bp: 6,  margin_bp: 8 }
      "5k":  { est_slip_bp: 10, margin_bp: 10 }
      "20k": { est_slip_bp: 18, margin_bp: 12 }
    cex_fee_bp: 8
    jito_tip_bp: 5
    dex_fee_bp: 25
    infra_bp: 3

学習の仕組み(運用で必ず回す)

  • est_slip_bp(N)
    Jupiter /quotesimulate/実測との差分を p95 で更新(滑りが跳ねたら即反映)。
  • jito_tip_bp
    実際に払ったtip/priorityFee移動平均(短期は指数移動平均、長期は日次平均)。
  • infra_bp
    失敗・再送・撤退で失ったbpを累計/件数から算出(ローリング1〜7日)。
  • margin_bp(symbol,N,regime)
    市況レジーム(例:quiet/volatile)で切替。FX乖離・時刻差逸脱が増えたら自動で上積み。

ルール:“固定”は劣化の温床実測→更新を自動化し、乖離すればサーキットで止める


SLOとメトリクス

SLO(目標値)

  • 比較時刻差 compare_age_diff_ms: p95 ≤ 250ms(p99 ≤ 500ms)
  • 比較不能率 uncomparable_ratio: ≤ 20%(初期の上限)
  • サイクル時間(裁定パス): p95 ≤ 1s

必須メトリクス(名前と意味)

レイテンシ系

  • quote_to_submit_ms{dex=}
    • /quote 取得→(将来の /swap 送信)まで。監視段階では見積完了→比較完了の擬似で代替。
  • compare_age_diff_ms
    • DEX/CEX取得完了の差。**ゲート(≤250ms)**の健康診断。

比較の健全性

  • uncomparable_reason{ts_skew|stale|liq|fx|error}Counter
    • 理由別の棄却数。ts_skew(時刻差超過)、stale(鮮度切れ)、liq(流動性不足)、fx(換算不可/大乖離)。

価格とFX

  • spread_exec_bpHistogram
    • 可約定スプレッド(方向別・ノーション別でラベル付け)。
  • cost_bp_total / cost_bp_{dex_fee,cex_fee,slip,tip,infra}Gauge/Histogram
    • 内訳を分解して可視化。モデルの当たり外れが見える。
  • edge_bp = spread_exec_bp - cost_bp_totalHistogram
    • 実効エッジp50>0 を狙い、p10 > -5bp を下限に。
  • fx_stable_bpHistogram
    • |USDC/USDT - 1|×1e410〜15bp超はリスクシグナル。

付帯

  • 429_rate{source} / retry_after_msGauge/Counter
    • レート制限の健康診断(Jupiter/Bybit REST 等)。

推奨ダッシュボード(最小)

  1. SLOパネルcompare_age_diff_ms p95/p99uncomparable_ratiocycle_time p95
  2. エッジ分布spread_exec_bpcost_bp_totaledge_bp(ノーション別の箱ひげ)
  3. コスト内訳dex_fee/cex_fee/slip/tip/infra(時系列)
  4. FXと棄却理由fx_stable_bp(時系列)、uncomparable_reason スタック棒

アラート(初期閾値)

  • compare_age_diff_ms p95 > 250ms5分連続要因切り分け(WS遅延?TTL?並列不足?)
  • uncomparable_ratio > 20%15分連続閾値/TTL/並列/速い回線へ
  • edge_bp p50 < 01時間トリガ停止+コストモデル再学習

実装ワンポイント

  • メトリクスは“比較前に増やす”:棄却時も必ず理由カウント
  • 単位はbpに統一:費用・スプレッド・エッジを同次元で扱う。
  • JSONログ併用:意思決定の1レコードに以下のようなものが残れば事後検証が劇的に楽になる。
{
  "ts": 1723098000123,
  "symbol": "SOL/USDC",
  "N": 5000,
  "dex_px": 174.21,
  "cex_px": 174.28,
  "spread_exec_bp": 39.8,
  "cost": {"dex_fee":25,"cex_fee":8,"slip":10,"tip":5,"infra":3,"total":51},
  "edge_bp": -11.2,
  "fx_stable_bp": 3.1,
  "ts_diff_ms": 142,
  "decision": "skip",
  "reason": "edge<=margin"
}

これで、“コストを跨いで余剰bpが出た時だけ鳴らす”仕組みが完成します。
SLOで速度と鮮度を担保し、メトリクスでコストを学習する。
この2枚看板こそが、実運用で生き残る裁定監視のコアです。

フェイルセーフ&在庫管理

裁定は必ず失敗する前提で設計します。片足だけ約定、送信遅延、見積ズレ、レート制限——全部“日常”。壊れ方を想定して、即撤退・在庫抑制・自動停止までを仕組みにします。


片足失敗時の撤退ルール/段階ヘッジ

状態機械(最小)

IDLE
└─trigger→ SEND_BOTH (best-effort同時)
├─ DEX_FILLED & CEX_FILLED → FLAT(成功)
├─ DEX_FILLED & CEX_FAIL → HEDGE_CEX(IOC/MKTで即反転)
├─ CEX_FILLED & DEX_FAIL → HEDGE_DEX(最短ルートで即反転)
└─ BOTH_FAIL → NOFILL(在庫0で終了)

ルール(実務)

  • タイムボックス:片足 fill 後 t_hedge ≤ 500ms で相手脚送信。超過なら段階撤退(サイズを1/2→1/4で刻む)。
  • 価格ガード:ヘッジはVWAPで上限/下限を付ける(max_slip_bp)。ガード超過で即中止→“在庫として保有”戦略に切替(上限Δの範囲内)。
  • 段階ヘッジ
    1. 50% を最短IOCでヘッジ
    2. 残り 25%/25% を価格条件付きで追撃(最大 2–3 ステップ)
    3. 追撃失敗時は在庫キープ→撤退価格(損切り)を自動設定

擬似コード:

if leg_A_filled and not leg_B_filled_within(500_ms):
    hedge_sizes = [0.5, 0.25, 0.25]
    for s in hedge_sizes:
        ok = send_hedge(size=s*filled, guard_bp=max_slip_bp, tif="IOC")
        if ok and position_delta≈0: break
    if residual_delta > 0:
        place_stop_out(residual_delta, stop_guard_bp)

在庫上限・Δ上限/サーキットブレーカ

在庫・Δ

  • 銘柄別在庫上限(例:SOL: 2,000, Tier3: 0
  • Δ上限(USD):ネットエクスポージャの総量(例:$50k
  • 時間あたり回転数上限:スパイク時の過剰取引を抑制

サーキットブレーカ(自動停止)

  • 連続負エッジedge_bp < 0N回/5分銘柄停止(30分)
  • SLO逸脱compare_age_diff_ms p95 > 250ms5分継続全体停止(5分)
  • FX乖離fx_stable_bp > 152分継続USDT→USDC換算を強化 or 比較棄却

運用フラグ:

risk:
  inv_cap_per_symbol: { SOL: 2000, ETH: 500, TIER3: 0 }
  delta_cap_usd: 50000
  circuit_breaker:
    edge_negative_streak: { count: 5, window_sec: 300, cooldown_sec: 1800 }
    ts_skew_p95:          { limit_ms: 250, window_sec: 300, cooldown_sec: 300 }
    fx_stable_bp:         { limit_bp: 15, window_sec: 120, cooldown_sec: 300 }

429/5xx対策:トークンバケット+短期キャッシュ

落ちるのは前提落ちても壊れないのが正解。

  • トークンバケット(ソース別QPS)
    • Jupiter/REST系は QPS上限を設定(例:5 req/s
    • バーストは300ms TTLキャッシュで吸収(同一キーのデドゥープ)
  • 指数バックオフ+ジッター100ms, 200ms, 400ms
  • フェイルファスト:裁定パスはリトライ回数=1(遅い成功より速い不成立)
  • CBと統合:429比率がしきい値超自動で頻度を落とす/一時停止

擬似コード:

if rate_limit_bucket.empty():
    return STALE  # 比較棄却
resp = fast_http.get(..., timeout=(1,2), retries=1)
if resp.status in {429, 502, 503}:
    backoff()
    mark_rate_limited(source)
    return STALE

運用:シャドー→本番

15分スモーク → 24hシャドー

  1. 15分スモーク
    • 監視のみ(注文は発生させない)。
    • 指標:ts_diff_ms p95 ≤ 250uncomparable ≤ 20%fx_stable_bp ≤ 10
  2. 24hシャドー
    • ノーション別に両方向を連続観測。
    • 指標:edge_bp p50 > 0p10 > -5、CB未発火。

朝イチ自動サマリ(毎朝固定フォーマット)

  • 棄却率(理由別)ts_skew / stale / liq / fx
  • SLO逸脱compare_age_diff_ms p95/p99cycle_time p95
  • 発生源ランキング:エラー・429のソース別上位(Jupiter/Bybit/自前RPC)
  • スプレッドとコストspread_exec_bp vs cost_bp_total の箱ひげ(ノーション別)

例(テキスト出力):

[Daily Summary] 2025-08-08 UTC
- SLO: ts_diff p95=184ms / p99=392ms, cycle p95=0.82s
- Uncomparable: 17.3% (ts_skew 9.2%, stale 5.1%, liq 2.6%, fx 0.4%)
- FX stable: mean 3.2bp / p95 8.7bp
- Top sources (errors): Jupiter 38%, Bybit 34%, RPC 21%
- Edge vs Cost (5k): p50 +12.4bp / p10 -3.1bp / triggers 47

逸脱時の“一行ポストモーテム”

短く、事実と対策だけ。SlackやOpsノートにコピペできる形で。

[PM] 2025-08-08 03:12Z | SOL/USDC 5k | ts_skew>250ms 連続 | Bybit WS遅延spike
対策: WS再接→地域エンドポイント切替・Jupiter TTL200msへ | 再発監視: ts_diff_ms p95

テンプレ:

[PM] <UTC> | <symbol>/<notional> | <現象> | <直接原因> | 対策: <具体> | 再発監視: <KPI>

まとめ

  • 片足失敗は“正常系”:タイムボックス+段階ヘッジ+撤退価格。
  • 在庫とΔは数値で封じる:上限とCBで自動停止。
  • レート制限は構造で回避:バケット+TTL+フェイルファスト。
  • 運用はリズム:スモーク→シャドー→朝イチサマリ→一行PM。

ここまで入れれば、“壊れ方ごとに壊れない”監視運用が回り始めます。

よくある落とし穴

amountの整数化忘れ/mint違い/decimalsミス

  • Jupiterのamountは整数(mintの最小単位)必須。 小数を投げると400や謎丸めで地獄行き。
  • mint IDが正義。 “SOL/USDC”みたいなシンボル依存は排除して、mint+decimalsをレジストリで固定
  • outAmountも整数(decimals適用済)。自分で小数に戻して実効単価を出す。

USDC/USDTを1.0固定扱い

  • CEXはUSDT建て、DEXはUSDC建てが普通。1.0000前提はNG
  • USDCUSDT板のVWAPで実測fx_stable_bp = |fx-1|×1e4 をログ。10–15bp超は比較棄却 or マージン上乗せ。

v2 Bybit APIやHTTP板ポーリングの混入

  • Bybitはv5 WS L2が基本。HTTPのticker/ポーリングは遅延要因
  • 市場区分(spot/linear)を間違えると404/変な値になる。シンボルごとにcategoryを決め打ち

付録:最小コード断片

どれも aiohttp + asyncio 前提。雑に貼っても動く“最小・実用”版。

Bybit WS → VWAPスニペット

# bybit_ws_vwap_min.py
import aiohttp, asyncio, json, time
from typing import Dict, List, Optional, Tuple

class OB:
    def __init__(self): self.bids={}, self.asks={}, self.ts=0.0
    def snap(self, b:List[List[str]], a:List[List[str]]):
        self.bids={float(p):float(q) for p,q in b}; self.asks={float(p):float(q) for p,q in a}; self.ts=time.time()
    def delta(self, b:List[List[str]], a:List[List[str]]):
        for p,q in b: q=float(q); p=float(p); (self.bids.pop(p,None) if q<=0 else self.bids.__setitem__(p,q))
        for p,q in a: q=float(q); p=float(p); (self.asks.pop(p,None) if q<=0 else self.asks.__setitem__(p,q))
        self.ts=time.time()
    def vwap(self, side:str, notional:float)->Optional[Tuple[float,float]]:
        if notional<=0: return None
        if side=="buy":  # consume asks low→high
            lv=sorted(self.asks.items(),key=lambda x:x[0])
        else:            # consume bids high→low
            lv=sorted(self.bids.items(),key=lambda x:x[0],reverse=True)
        rem=notional; base=0.0; quote=0.0
        for price,size in lv:
            cap=price*size
            if rem<=0: break
            if rem>=cap: base+=size; quote+=cap; rem-=cap
            else: take=rem/price; base+=take; quote+=rem; rem=0; break
        return (quote/base, base) if base>0 else None

class BybitWSVWAP:
    def __init__(self, ws_url:str, symbol:str, depth:int=50):
        self.ws_url=ws_url; self.topic=f"orderbook.{depth}.{symbol}"; self.ob=OB()
    async def run(self, stop:asyncio.Event):
        async with aiohttp.ClientSession() as s:
            async with s.ws_connect(self.ws_url, heartbeat=15) as ws:
                await ws.send_json({"op":"subscribe","args":[self.topic]})
                async for msg in ws:
                    if stop.is_set(): break
                    if msg.type==aiohttp.WSMsgType.TEXT:
                        p=msg.json(loads=json.loads); d=p.get("data",{})
                        if p.get("type")=="snapshot": self.ob.snap(d.get("b",[]), d.get("a",[]))
                        elif p.get("type")=="delta": self.ob.delta(d.get("b",[]), d.get("a",[]))
    def vwap(self, side:str, notional:float, max_age_ms:int=500):
        if (time.time()-self.ob.ts)*1000>max_age_ms: return None
        return self.ob.vwap(side, notional)

# 使用例:
# stop=asyncio.Event()
# client=BybitWSVWAP("wss://stream.bybit.com/v5/public/spot","SOLUSDT")
# asyncio.create_task(client.run(stop)); await asyncio.sleep(1.0); print(client.vwap("buy",5000))

Jupiter /quote TTLキャッシュ

# jupiter_quote_cache_min.py
import aiohttp, asyncio, time
from dataclasses import dataclass
from typing import Optional, Dict

JUP="https://quote-api.jup.ag/v6/quote"

@dataclass(frozen=True)
class Key: input_mint:str; output_mint:str; amount_int:int; slippage_bps:int; direct:bool
@dataclass class Entry: ts_ms:float; data:dict

class JupCache:
    def __init__(self, ttl_ms:int=300): self.ttl=ttl_ms; self.c:Dict[Key,Entry]={}; self.locks:Dict[Key,asyncio.Lock]={}; self.guard=asyncio.Lock()
    async def __aenter__(self): self.s=aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3.0)); return self
    async def __aexit__(self,*_): await self.s.close()
    async def get(self, k:Key)->dict:
        now=time.time()*1000; e=self.c.get(k)
        if e and now-e.ts_ms<=self.ttl: return e.data
        lock=await self._lock(k); async with lock:
            now=time.time()*1000; e=self.c.get(k)
            if e and now-e.ts_ms<=self.ttl: return e.data
            p={"inputMint":k.input_mint,"outputMint":k.output_mint,"amount":str(k.amount_int),
               "slippageBps":str(k.slippage_bps),"onlyDirectRoutes":"true" if k.direct else "false"}
            async with self.s.get(JUP,params=p) as r:
                r.raise_for_status(); data=await r.json()
            self.c[k]=Entry(now,data); return data
    async def _lock(self,k:Key):
        async with self.guard:
            if k not in self.locks: self.locks[k]=asyncio.Lock()
            return self.locks[k]

def to_amount_int(x:float,dec:int)->int: return int(round(x*(10**dec)))
def from_amount_int(x:int,dec:int)->float: return x/(10**dec)
def calc_min_out(out_int:int, slippage_bps:int, safety_bps:int=5)->int:
    return max(0,(out_int*(10_000-(slippage_bps+safety_bps)))//10_000)

# 使用例:
# async with JupCache() as jc:
#   k=Key(USDC_MINT,SOL_MINT,to_amount_int(5000,6),30,False)
#   q=await jc.get(k); out_sol=from_amount_int(int(q["outAmount"]),9)

高速比較パス(250msゲート)

# fast_compare_min.py
import asyncio, time
from typing import Dict, Tuple
from jupiter_quote_cache_min import JupCache, Key, to_amount_int, from_amount_int

def ms(): return time.perf_counter()*1e3

async def compare_once(bybit_client, jup_cache:JupCache, usdc_mint:str, token_mint:str, token_dec:int,
                       notional_usd:float=5000.0, fx_func=None) -> Dict:
    """
    bybit_client: 前出WSクライアント(.vwap(side, notional)を持つ)
    fx_func: USDC/USDT レート関数 → (fx, fx_bp)
    """
    t0=ms()
    # DEX見積(並列)開始
    k=Key(usdc_mint, token_mint, to_amount_int(notional_usd,6), 30, False)
    dex_fut=asyncio.create_task(jup_cache.get(k))
    # CEX VWAP & FX はWSから即時計算
    cex = bybit_client.vwap("sell", notional_usd)  # CEXで売る想定(USDT建て)
    fx, fx_bp = (1.0, 0.0) if fx_func is None else fx_func()
    dex = await dex_fut
    t1=ms(); ts_diff=t1-t0
    if ts_diff>250: return {"status":"uncomparable","reason":"ts_skew","ts_diff_ms":round(ts_diff,1)}
    if cex is None or "outAmount" not in dex: return {"status":"uncomparable","reason":"stale_or_liq","ts_diff_ms":round(ts_diff,1)}
    # 単価へ
    out_base = from_amount_int(int(dex["outAmount"]), token_dec)
    if out_base<=0: return {"status":"uncomparable","reason":"dex_zero","ts_diff_ms":round(ts_diff,1)}
    dex_px_usdc = notional_usd / out_base
    cex_px_usdc = cex[0] * fx
    mid=(dex_px_usdc+cex_px_usdc)/2
    spread_bp=(cex_px_usdc-dex_px_usdc)/mid*1e4
    return {"status":"ok","ts_diff_ms":round(ts_diff,1),"fx_stable_bp":round(fx_bp,2),
            "dex_px":dex_px_usdc,"cex_px":cex_px_usdc,"spread_exec_bp":spread_bp}

# 使用例(擬似):
# fx = lambda: (bybit_fx.vwap("buy",10_000)[0]**-1, abs((bybit_fx.vwap("buy",10_000)[0]**-1-1)*1e4))
# res = await compare_once(bybit_sol_client, jc, USDC_MINT, SOL_MINT, 9, 5000.0, fx)

チェックポイント

  • ts_diff_ms常に≤250ms以内か(p95基準)。
  • fx_stable_bp必ず記録し、二桁bpに膨らんだら棄却やマージン上乗せ
  • spread_exec_bpコストモデル(fee/滑り/tip/infra)を差し引いて判定(ここでは割愛)。

結び

この記事でやったのは、「価格を引き算する監視」から**“約定できる価格を比較する監視”への移行です。
鍵はシンプルでした――ミリ秒(≤250ms)と1秒(≤1s)を守る設計、そして
コストを跨いだ余剰bpしか信じない**という姿勢。

  • Executable Price:DEXは/quote→実効単価、CEXはWS L2→ノーション別VWAP。
  • 鮮度の担保:TTL≤300msのキャッシュ、DEX/CEXを並列取得、時刻差ゲート≤250ms
  • 通貨建ての統一:USDC/USDTは板で実測、fx_stable_bpを常時計測。
  • 動的トリガcost_bp = fee + slip(N) + tip + infraを越えた余剰bpだけで鳴らす。
  • 運用耐性:片足失敗の撤退/段階ヘッジ、在庫・Δ上限、429/5xxは構造で回避、SLOとメトリクスで可視化
  • 導入手順:15分スモーク → 24hシャドー → 朝イチ自動サマリ → 逸脱は“一行PM”。

この型に乗せれば、「数字は出てるのに取れない」から卒業できます。あとは現場の条件(口座手数料、対象銘柄、回線品質)に合わせてコストモデルを学習させ、SLOを育てていくだけ。
“見かけのスプレッド”を捨て、実行可能性で勝つ――それが、実運用で生き残る唯一の道です。

-Bot, CEX, DEX, 戦略ログ, 開発ログ