裁定(アービトラージ)の監視は「価格を取って引き算する」だけでは機能しません。実運用で勝負を決めるのは“ミリ秒”と“約定可能性”です。
本記事では、CEX/DEXアービトラージ監視を≤250msの時刻差と**≤1sの監視サイクル**で回すための設計指針を、実装の勘所と一緒にまとめます。
結論から言うと、集約APIの平均価格を捨てて、Executable Price(実行可能価格)に全てを寄せるのが正解です。
CEX/DEXarbbotの開発もピンポイントな知見が溜まって良いな。開発の優先度高めで取り組んでいるのだが、正しい判断だった。
・価格は“約定できるか”で定義する
→ズレた比較は誤検知の源泉
・サイズ依存の滑りを無視してはいけない
→CEXはWSのL2板、DEXは見積API+TTLキャッシュ(≤300ms)で並列取得— よだか(夜鷹/yodaka) (@yodakablog) August 8, 2025
何がダメだったのか:集約APIの平均値 vs 実行可能価格
「まずはCoinGeckoで価格を…」などの実装は、この手の監視プロダクションでは地雷です。
- 平均値は実行できない
集約APIは複数市場の平均/中間を返しますが、あなたがその価格で今この瞬間に約定できる保証はない。板の厚さも手数料もスリッページも反映されません。 - 遅延が平気で数百ms〜数秒
集約はどうしても更新周期が長い。CEX板(WS)とDEX見積(quote)を同時刻で比較しないと、幻のスプレッドを量産します。 - 通貨建てがバラバラ
CEXはUSDT建て、DEXはUSDC建て——1.0000と思い込みで比較すると平気で数十bpの誤差が出ます。 - サイズ依存性を無視
「0サイズのmid」での比較は、実トレードの意思決定と無関係。指定ノーション($1k/$5k/$20k…)での実行可能価格を比較する必要があります。
要するに、**「見かけの価格」ではなく「その場で実際に約定できる価格」**に全てを合わせる設計が必要です。
本記事のゴール:≤250ms時刻差 / ≤1sサイクルの監視
達成したいSLO(サービス目標)はシンプルです。
- 比較時刻差(DEX vs CEX):p95 ≤ 250ms
これを超える比較は棄却(uncomparable)。ズレた比較は誤検知の源泉です。 - 監視サイクル時間:p95 ≤ 1s
HTTPポーリングでダラダラ取らない。CEXはWSのL2板、DEXは見積API(例:Jupiter /quote)+TTLキャッシュ(≤300ms)で並列取得が基本。 - 幻スプレッド撲滅
価格はノーション指定で実効単価に変換し、手数料・滑り・優先手数料(tip)まで含んだコスト後スプレッドで評価します。
原則(結論)
価格は“約定できるか”で定義する(Executable Price)
- DEX:ルーティング込み見積(
/quote
)で手数料・価格インパクト込みの受取量を取得し、実効単価(投入額 ÷ 受取量)へ変換。 - CEX:L2板からVWAP(ノーション別)を算出し、自口座の手数料まで含めて実効単価化。
- 比較は常に「実行可能価格 vs 実行可能価格」。集約平均・0サイズmidは参考値に降格。
ノーション指定・通貨建て統一(USDC/USDT FX)・時刻差ゲート
- ノーション指定:
N ∈ {1k, 5k, 20k, ...}
で買い/売りを両方向計測。サイズ依存の滑りを無視しない。 - 通貨建て統一:CEX(USDT建て)⇄DEX(USDC建て)を常時換算。USDC/USDTの実測FXをWS板で取り、**乖離(bps)**をメトリクス化。
- 時刻差ゲート:
|ts_dex - ts_cex| ≤ 250ms
を満たさない比較は棄却。鮮度の合わない比較はしない。
静的閾値は禁止:コストモデル+マージンでトリガ
「常に1%超えたらシグナル」は雑です。銘柄・ノーション・市況でコストは変わるから。
- コストモデル(bp)
cost_bp = dex_fee_bp + cex_fee_bp + est_slip_bp(N) + priority_tip_bp + infra_bp
- dex_fee_bp:DEX手数料の合計(ルート依存)
- cex_fee_bp:自口座のメイカー/テイカー手数料(VIP/リベート反映)
- est_slip_bp(N):ノーションNでの滑りの経験則(p95)
- priority_tip_bp:優先手数料・tipの実績(Solana等)
- infra_bp:失敗・再送・撤退コストの移動平均
- トリガ条件
exec_spread_bp > cost_bp + margin_bp(symbol, N, regime)
- exec_spread_bp:実行可能スプレッド(bp)
- margin_bp:銘柄/ノーション/相場レジーム別の上乗せ(初期は5–15bp程度)
- 運用の型
- 比較できないときは棄却(stale/ts_skew/liquidity)
- コスト負け(
exec_spread_bp <= cost_bp
)が続けば自動で停止(サーキットブレーカ)
この原則に従うだけで、「数字は出ているのに全然取れない」監視から卒業できます。次章以降では、WS板×TTLキャッシュ×並列取得で**≤250ms / ≤1s**を実現するための具体アーキテクチャと実装の勘所を紹介します。
トークンのDEX価格観測は本当に多彩な手段があるので、"自分が何をしようとしているのか"に応じて変えていく必要がある。
これも一つずつ自分で試して最適なものを見つけるしかない。
幸いなことに時間だけは十分にあるので総当たりで潰していく。
初めから最適解を考えていると行動量減るし。— よだか(夜鷹/yodaka) (@yodakablog) August 8, 2025
アーキテクチャ概観
本設計は「DEXは見積APIで実効価格」「CEXはWS板からVWAP」「USDC/USDTを実測で換算」の三本柱に、I/O層の速度分離を重ねたものです。以下は全体の流れ。
sequenceDiagram autonumber participant CEX as CEX WS(L2) participant FX as USDC/USDT 板 participant DEX as Jupiter /quote participant ARB as Arb Monitor par 並列取得 CEX->>ARB: VWAP(N)(USDT建て) DEX->>ARB: /quote(outAmount)(USDC建て) and FX->>ARB: fx 実測(USDC/USDT)+ fx_stable_bp ログ end ARB->>ARB: ts_diff 測定(≤250msか?) alt 超過 ARB-->>ARB: 比較棄却(uncomparable: ts_skew) else 以内 ARB->>ARB: USDT→USDC 換算(fx適用) ARB->>ARB: spread_exec_bp 計算 ARB->>ARB: cost_bp = dex_fee + cex_fee + est_slip(N) + tip + infra ARB->>ARB: edge_bp = spread - cost alt edge_bp > margin_bp ARB-->>ARB: Trigger 発火 else ARB-->>ARB: Skip end end
DEX:Jupiter /quote→/swap(同一ルート・minOut・TTL)
目的:手数料・価格インパクトを含む“実行可能価格”をミリ秒精度で取得。
やること:
- /quote で見積
- 入力は整数 amount(mint decimals 適用後)、
slippageBps
を明示。 - 300ms TTLのメモリキャッシュで同一キーの多重呼び出しをデドゥープ。
- キー例:
(inputMint, outputMint, amount_int, slippageBps, onlyDirect)
- 入力は整数 amount(mint decimals 適用後)、
- 同一ルート厳守で /swap(将来の実行段)
- /quote の route_id / routePlan をそのまま /swap に渡す。
- minOut は
outAmount × (1 - slippageBps/1e4 - safety_bps/1e4)
で計算して埋める。 - TTL:
blockhash/lastValidBlockHeight
による内部有効期限+“quote→submit ≤ 250ms” の運用SLO。
- ログに残す最小項目
route_id, pool_ids, total_fee_bp, inAmount, outAmount, minOut, quote_latency_ms, quote_ttl_ms
ポイント:集約サイトの平均値は使わない。Jupiter見積の受取量(outAmount)→実効単価に変換して比較する。
CEX:Bybit v5 WebSocket L2 → ノーション別VWAP
目的:HTTPポーリングを捨てて、常時同期された板からその場で約定できる単価を出す。
- v5 Public WS に接続(
wss://stream.bybit.com/v5/public/{spot|linear}
)orderbook.50.SYMBOL
を購読し、L2を常駐保持(辞書でOK)。- スナップショット→デルタを反映。ping/再接続はバックオフ。
- ノーション別VWAP
- 買い:最良売りから**指定ノーション($1k/$5k/$20k…)**分だけ積み上げ、平均約定単価を算出。
- 売り:最良買いから同様に。
- 自口座の手数料(maker/taker)はbpで上乗せ/控除して“実効単価”へ。
- ログ
vwap_price, filled_base, levels_used, l2_age_ms, compute_ms
ポイント:0サイズmidではなくVWAP。指定ノーションで“実行できる価格”を取りに行く。
FX:USDC/USDT を板経由で実測 → fx_stable_bp
を常時計測
目的:CEXはUSDT建て、DEXはUSDC建て——1.0000前提はNG。安定通貨の微妙なズレがbp単位で効いてくる。
- USDCUSDT(spot)の板を同じく WS で購読
- 小ノーション(例:$10k)でVWAPを取り、
USDT→USDC
のレートを実測。 - 変換は CEX価格(USDT建て) × fx(USDC/USDT) → USDC建てに統一。
- 小ノーション(例:$10k)でVWAPを取り、
fx_stable_bp
をメトリクス化fx_stable_bp = |fx - 1.0| × 1e4
- 閾値超え時(例:>10–15bp)は比較棄却 or マージン上乗せなどの対策を自動適用。
ポイント:通貨建ての統一は必須。微差でも裁定の可否が変わる。
I/O:FastHTTP(1s/2s/1retry) と SlowHTTP の分離
目的:裁定パスを遅い呼び出しから隔離し、p95 ≤ 1sを保証する。
- FastHTTP(裁定用)
- connect ≤ 1s / read ≤ 2s / retry = 1(指数バックオフ+ジッター)
- 用途:Jupiter
/quote
、必要最小のメタAPI - 失敗は即棄却(比較不能として扱う)——“遅い成功”より“速い不成立”
- SlowHTTP(観測用)
- TVL、ステーブルフロー、バックフィル等は別クライアントで 10–30秒級のタイムアウト・多段リトライOK。
- 名前(DI)で明示的に注入して混入を防止する。
- 並列取得 & 時刻差ゲート
- DEX見積(FastHTTP)と CEX VWAP(WS→即時計算)を同時に走らせ、取得完了の差分 ≤ 250msのみ採用。
- 超過した比較は**
uncomparable(reason=ts_skew)
**でカウント・可視化。
ポイント:**速さを“設計として担保”**する。遅い処理を別のレーンに追いやるだけで、SLOは一気に安定する。
これで満たせるSLO
- 比較時刻差:p95 ≤ 250ms(DEX/CEX同時取得+WS常時維持+TTLキャッシュ)
- サイクル時間:p95 ≤ 1s(FastHTTPと並列化で待ち時間を圧縮)
- 誤検知の激減:ノーション別VWAP × 実測FX × コストモデルにより**“見かけのスプレッド”**を排除
次章では、このアーキテクチャをコード断片とメトリクス設計に落とし込み、実際に250ms/1sの壁を超える手順を解説します。
実装A:Bybit v5 WS→VWAP
orderbook.50.SYMBOLを常時購読/L2を辞書で保持し、VWAP時のみ上位Nソート/買い・売りのノーション別VWAP
このセクションでは、HTTPポーリングを捨てて Bybit v5 のPublic WebSocketからL2板を常時同期し、指定ノーションでの実行可能VWAPを数百ミリ秒で返す最小実装を示します。方針はシンプル:
- WSで
orderbook.50.SYMBOL
を購読 - L2は辞書で常駐保持(更新はO(1))
- VWAP計算時だけ上位Nをソート(必要時のみ O(K log K))
- 指定ノーション(クォート建て)でのVWAPを返す
1) L2常駐:スナップショット+デルタ
- スナップショットで全置換 → デルタで増減・削除(size=0で削除)
bids: Dict[price,float]
/asks: Dict[price,float]
(どちらも base数量 を保持)- ハートビート&再接続は最低限でOK(指数バックオフ)
# arbbot/cex/bybit_ws_vwap.py(抜粋) import aiohttp, asyncio, time, json from typing import Dict, List, Optional, Tuple, Literal Side = Literal["buy","sell"] class Orderbook: def __init__(self): self.bids: Dict[float, float] = {} # price → size(base) self.asks: Dict[float, float] = {} self.ts = 0.0 # last update time (sec) def apply_snapshot(self, bids: List[List[str]], asks: List[List[str]]): self.bids.clear(); self.asks.clear() for p, q in bids: self.bids[float(p)] = float(q) for p, q in asks: self.asks[float(p)] = float(q) self.ts = time.time() def apply_delta(self, bids: List[List[str]], asks: List[List[str]]): for p, q in bids: price, size = float(p), float(q) if size <= 0: self.bids.pop(price, None) else: self.bids[price] = size for p, q in asks: price, size = float(p), float(q) if size <= 0: self.asks.pop(price, None) else: self.asks[price] = size self.ts = time.time()
2) ノーション別VWAP:買いはASK、売りはBID
買い(USDT/USDCを支払ってベースを得る)
- ASK側を安い順に積み上げ、**指定ノーション(クォート建て)**を使い切るまで取得
- 有効単価 =
支払ったクォート合計 / 取得できたベース合計
売り(ベースを売ってクォートを得る)
- BID側を高い順に積み上げ、同様に計算
class Orderbook: # ...前掲... def vwap(self, side: Side, notional_quote: float) -> Optional[Tuple[float, float]]: if notional_quote <= 0: return None if side == "buy": if not self.asks: return None ladder = sorted(self.asks.items(), key=lambda kv: kv[0]) # ask: price↑ remain_q = notional_quote; got_base = spent_q = 0.0 for price, size_base in ladder: cap_q = price * size_base if remain_q <= 0: break if remain_q >= cap_q: got_base += size_base; spent_q += cap_q; remain_q -= cap_q else: take_base = remain_q / price got_base += take_base; spent_q += remain_q; remain_q = 0.0; break if got_base == 0: return None return (spent_q / got_base, got_base) # (有効単価=クォート/ベース, 取得ベース量) else: if not self.bids: return None ladder = sorted(self.bids.items(), key=lambda kv: kv[0], reverse=True) # bid: price↓ remain_q = notional_quote; sold_base = recv_q = 0.0 for price, size_base in ladder: cap_q = price * size_base if remain_q <= 0: break if remain_q >= cap_q: sold_base += size_base; recv_q += cap_q; remain_q -= cap_q else: part_base = remain_q / price sold_base += part_base; recv_q += remain_q; remain_q = 0.0; break if sold_base == 0: return None return (recv_q / sold_base, sold_base)
ここではクォート建てのノーション(例:$1k, $5k, $20k)を入力に統一しています。板がUSDT建てなら
notional_quote
はUSDT額、USDC建てならUSDC額。あとでUSDT→USDC換算を掛けて比較します。
3) WebSocket接続:購読・心拍・再接続
class BybitWSVWAP: PING_INTERVAL = 15 def __init__(self, ws_url: str, topic: str, session: aiohttp.ClientSession | None = None): self.ws_url = ws_url; self.topic = topic self.session = session; self._ext = session is not None self.ob = Orderbook() async def __aenter__(self): if not self._ext: self.session = aiohttp.ClientSession() return self async def __aexit__(self, *exc): if not self._ext and self.session: await self.session.close() async def run(self, stop: asyncio.Event): backoff = 1.0 while not stop.is_set(): try: await self._loop(stop); backoff = 1.0 except Exception: await asyncio.sleep(min(backoff, 10.0)); backoff *= 1.7 async def _loop(self, stop: asyncio.Event): assert self.session async with self.session.ws_connect(self.ws_url, heartbeat=self.PING_INTERVAL) as ws: await ws.send_json({"op": "subscribe", "args": [self.topic]}) async for msg in ws: if stop.is_set(): break if msg.type.name == "TEXT": payload = msg.json() t = payload.get("type"); d = payload.get("data", {}) if t == "snapshot": self.ob.apply_snapshot(d.get("b", []), d.get("a", [])) elif t == "delta": self.ob.apply_delta (d.get("b", []), d.get("a", [])) elif msg.type.name == "ERROR": raise RuntimeError("WS error") def vwap(self, side: Side, notional_quote: float): return self.ob.vwap(side, notional_quote)
4) ステール検知と品質ゲート
VWAPを返す前に次をチェックすると誤検知が激減します。
- L2鮮度:
now - orderbook.ts ≤ 500ms
(超過ならNone
) - 深さ不足:指定ノーションに届かない場合は
None
- 時刻差ゲート:DEX見積との取得完了差 ≤ 250ms(超過は比較棄却)
def vwap_safe(client: BybitWSVWAP, side: Side, notional_quote: float, max_age_ms: int = 500): if (time.time() - client.ob.ts) * 1000 > max_age_ms: return None return client.vwap(side, notional_quote)
5) よくある落とし穴(回避策つき)
- 線形/無期限(linear)とスポットの混同
先物は契約サイズや資金調達が絡む。まずはspotでVWAP、perpはcontract→base換算を入れてから。 - 数量の単位
Bybitの板数量はbase数量。VWAPのノーションはクォート額。混ぜない。 - 片側Onlyでの手数料
監視段階でもテイカーfeeをbpで上乗せ/控除して“実効単価”へ。メイカー運用は例外(遅延リスク大)。 - ソート最適化の過剰実装
まずは「辞書保持+必要時ソート」で十分速い。最適化はp95>1sになってからでOK。
6) スモーク(30秒)
# 例:SOLUSDT(spot)を購読して $5k の買いVWAPを1秒おきに出す async def smoke(): stop = asyncio.Event() async with BybitWSVWAP( ws_url="wss://stream.bybit.com/v5/public/spot", topic="orderbook.50.SOLUSDT" ) as c: task = asyncio.create_task(c.run(stop)) await asyncio.sleep(1.0) # 少し溜める for _ in range(30): print("BUY 5k:", c.vwap("buy", 5_000)) await asyncio.sleep(1.0) stop.set(); await task
このWS→VWAPの最小実装をCEX側の標準にすると、HTTPポーリング由来の秒遅延が消え、DEX見積(Jupiter /quote
)との同時比較(≤250ms)が現実になります。次章ではJupiter /quote
TTLキャッシュとの合わせ技で1秒サイクルを安定させるポイントを解説します。
実装B:Jupiter /quote TTLキャッシュ
キー:(inputMint, outputMint, amount_int, slippageBps, direct)/TTL≦300msでデドゥープ/outAmountは整数(decimals適用済)・minOut計算フック
DEX側はJupiterの /quote
を“真実の窓口”にします。ただし生の呼び出しを連打するとレイテンシ悪化&レート制限の温床。そこで極小TTLキャッシュ(≤300ms)で同一クエリをデドゥープし、見積の揺れを抑えつつミリ秒級で返します。
設計ポイント
- キー=
(inputMint, outputMint, amount_int, slippageBps, direct)
amount_int
は mintの最小単位(整数)。小数は送らない。direct
は onlyDirectRoutes の真偽。ルート最適化の仕様差をキーに含める。
- TTL ≦ 300ms:その間に同一キーで来た要求は同一レスポンスを返す(per-keyロックで同時実行も一発に)。
- outAmount は整数:Jupiterはdecimals適用済みの整数で返す。実効単価に変換するときは自分で小数に戻す。
- minOut 計算フック:
outAmount
に slippage + safety を差し引いて**/swap**へ渡す値を出す(将来の実行段にもそのまま使える)。
最小実装(そのまま使える)
# arbbot/dex/jupiter_quote_cache.py from __future__ import annotations import asyncio, time from dataclasses import dataclass from typing import Optional, Dict import aiohttp JUP_QUOTE_URL = "https://quote-api.jup.ag/v6/quote" @dataclass(frozen=True) class QuoteKey: input_mint: str output_mint: str amount_int: int # 最小単位(整数) slippage_bps: int only_direct: bool @dataclass class CachedQuote: ts_ms: float data: dict class JupiterQuoteCache: """TTL≦300msの極小キャッシュ。per-keyロックで同時呼び出しをデドゥープ。""" def __init__(self, session: Optional[aiohttp.ClientSession] = None, ttl_ms: int = 300): self.ttl_ms = ttl_ms self.session = session self._external_session = session is not None self._cache: Dict[QuoteKey, CachedQuote] = {} self._locks: Dict[QuoteKey, asyncio.Lock] = {} self._locks_guard = asyncio.Lock() async def __aenter__(self): if not self._external_session: # FastHTTP 相当:短いタイムアウトで固める timeout = aiohttp.ClientTimeout(total=3.0, connect=1.0, sock_read=2.0) self.session = aiohttp.ClientSession(timeout=timeout) return self async def __aexit__(self, *exc): if not self._external_session and self.session: await self.session.close() async def get_quote(self, key: QuoteKey) -> dict: now = time.time() * 1000 hit = self._cache.get(key) if hit and (now - hit.ts_ms) <= self.ttl_ms: return hit.data # per-key ロックで同一キーのスパムを1発にまとめる lock = await self._get_lock(key) async with lock: now = time.time() * 1000 hit = self._cache.get(key) if hit and (now - hit.ts_ms) <= self.ttl_ms: return hit.data assert self.session is not None params = { "inputMint": key.input_mint, "outputMint": key.output_mint, "amount": str(key.amount_int), # 整数 "slippageBps": str(key.slippage_bps), "onlyDirectRoutes": "true" if key.only_direct else "false", } async with self.session.get(JUP_QUOTE_URL, params=params) as r: r.raise_for_status() data = await r.json() self._cache[key] = CachedQuote(ts_ms=time.time()*1000, data=data) return data async def _get_lock(self, key: QuoteKey) -> asyncio.Lock: async with self._locks_guard: if key not in self._locks: self._locks[key] = asyncio.Lock() return self._locks[key]
decimalsヘルパと minOut
フック
# arbbot/dex/jupiter_utils.py def to_amount_int(amount_units: float, decimals: int) -> int: """人間系の数量(小数)→ mint最小単位(整数)""" return int(round(amount_units * (10 ** decimals))) def from_amount_int(amount_int: int, decimals: int) -> float: """mint最小単位(整数)→ 人間系の数量(小数)""" return amount_int / (10 ** decimals) def calc_min_out(out_amount_int: int, slippage_bps: int, safety_bps: int = 5) -> int: """ JupiterのoutAmount(整数)から /swap に渡す minOut(整数) を算出。 slippageBps に更に safety を上乗せして保守的に。 """ total_bps = slippage_bps + safety_bps # 小数点を使わず整数演算で安全に return max(0, (out_amount_int * (10_000 - total_bps)) // 10_000)
使い方(USDC→SOLの“$5k買い”を見積+実効単価)
# arbbot/dex/example_quote.py import asyncio from arbbot.dex.jupiter_quote_cache import JupiterQuoteCache, QuoteKey from arbbot.dex.jupiter_utils import to_amount_int, from_amount_int, calc_min_out USDC_MINT = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" # 6 SOL_MINT = "So11111111111111111111111111111111111111112" # 9 async def main(): async with JupiterQuoteCache(ttl_ms=300) as cache: key = QuoteKey( input_mint=USDC_MINT, output_mint=SOL_MINT, amount_int=to_amount_int(5_000.0, 6), # 5,000 USDC を整数化 slippage_bps=30, only_direct=False ) q = await cache.get_quote(key) # outAmount(整数, decimals適用済) → SOL数量(小数) out_int = int(q["outAmount"]) out_sol = from_amount_int(out_int, 9) # 実効単価(USDC/SOL) = 投入USDC / 受取SOL effective_px = 5_000.0 / out_sol # /swap 用の minOut(安全寄り) min_out_int = calc_min_out(out_int, slippage_bps=30, safety_bps=5) print({ "route": q.get("routePlan", [])[:1], "out_sol": out_sol, "effective_px_usdc_per_sol": effective_px, "min_out_int": min_out_int }) # asyncio.run(main())
運用で効く“ちょい足し”Tips
- TTLは短く(200–300ms)
ミリ秒の壁を越えるには新鮮さ>キャッシュヒット率。ヒット率は“副産物”でOK。 - 429/5xxは即棄却
裁定パスは**“速い不成立”が正義**。遅い成功は比較の鮮度を壊す。必要なら別レーン(SlowHTTP)で再試行。 - ログを残す
quote_latency_ms
,route_id
,pool_ids
,total_fee_bps
,outAmount
を必ず出し、異常時にルートを辿れるように。 - キーの揺れを抑える
amount_int
が毎回微妙に違うとキャッシュが効かない。ノーションは刻み(例:$100刻み)で丸めても良い。
よくある落とし穴
- amountを小数で送る → 400 or 予期せぬ丸め。必ず整数に。
- decimals取り違え →
outAmount
のスケールが合わず単価が化ける。mintのdecimalsをトークンレジストリで固定。 - slippageとminOutの二重適用ミス → /quoteの
slippageBps
に加えてsafety_bpsを上乗せする形が安全。
このTTLキャッシュにより、/quoteのデドゥープとレイテンシ安定化が実現します。CEXのWS-VWAPと並列取得すれば、時刻差≤250ms・サイクル≤1sのSLOを現実的に満たせます。次は、高速比較パスでの同時取得→通貨統一→スプレッド判定の実装を仕上げます。
実装C:高速比較パス
asyncio.gatherでDEX/CEX同時取得/時刻差ゲート≤250ms・staleは棄却/USDT→USDC換算&fx_stable_bpログ/コストモデル加味で判定
目的:ミリ秒級で“実行可能価格どうし”を比較し、コスト後でも勝てる時だけシグナルを出す。
ここでは DEX(Jupiter /quote)とCEX(Bybit WS→VWAP)を同時並行で取り、時刻差≤250msを満たした比較のみ採用、USDT↔USDC換算とコストモデルまで入れて判定する“高速比較パス”を実装します。
全体フロー(1回の比較)
- 同時取得:
- DEX見積
/quote
(USDC→トークン、ノーション=USDC) - CEX VWAP(Bybit WS L2、ノーション=USDT)
- USDC/USDTのFX(WSでVWAP)
- DEX見積
- ゲート処理:
- 時刻差
ts_diff_ms ≤ 250
でない → 棄却 - 鮮度(L2 age ≤ 500ms、quote TTL ≤ 300ms)を満たさない → 棄却
- 流動性不足(ノーション吸えない) → 棄却
- 時刻差
- 通貨建て統一:
cex_px_usdc = cex_px_usdt × fx(USDC/USDT)
fx_stable_bp = |fx - 1.0| × 1e4
をログ
- スプレッド→コスト後判定:
exec_spread_bp = (cex_px_usdc - dex_px_usdc)/mid × 1e4
(DEX買い→CEX売り 方向の例)cost_bp = dex_fee + cex_fee + est_slip(N) + tip + infra
- トリガ:
exec_spread_bp > cost_bp + margin_bp
コード:最小“高速比較”関数
ここでは前章までのコンポーネント(
BybitWSManager
,JupiterQuoteCache
,JupiterAdapter
,StableFX
)を利用します。
# arbbot/arb/fast_compare.py from __future__ import annotations import asyncio, time from dataclasses import dataclass from typing import Optional, Dict from arbbot.cex.bybit_ws_manager import BybitWSManager from arbbot.dex.jupiter_quote_cache import JupiterQuoteCache from arbbot.dex.jupiter_adapter import JupiterAdapter from arbbot.registry.tokens import USDC, SOL from arbbot.dex.jupiter_utils import from_amount_int from arbbot.util.stable_fx import StableFX def monotonic_ms() -> float: return time.perf_counter() * 1e3 @dataclass class CostParams: dex_fee_bp: float # 例: 30 (0.30%) cex_fee_bp: float # 例: 10 (0.10%) テイカー est_slip_bp: float # 例: 10 (0.10%) Nに依存 priority_tip_bp: float # 例: 3–8 (Solana 優先手数料・tip) infra_bp: float # 例: 2–5 (失敗/再送/撤退コストの移動平均) margin_bp: float # 例: 10 (安全マージン) def cost_model_bp(p: CostParams) -> float: return p.dex_fee_bp + p.cex_fee_bp + p.est_slip_bp + p.priority_tip_bp + p.infra_bp async def compare_once_fast(bybit: BybitWSManager, jup_cache: JupiterQuoteCache, fx: StableFX, token=SOL, notional_usd: float = 5_000.0, cost: Optional[CostParams] = None) -> Dict: """ DEX(USDC→TOKEN)買い vs CEX(spot)売りで比較(片方向の例)。 反対方向は同様に“CEX買い vs DEX売り”を別関数で計算する。 """ if cost is None: cost = CostParams(30, 10, 10, 4, 3, 10) # セットアップ jup = JupiterAdapter(jup_cache, usdc_mint=USDC.mint, token_mint=token.mint, token_decimals=token.decimals, usdc_decimals=USDC.decimals) t0 = monotonic_ms() # 1) DEX quote を非同期でキック dex_task = asyncio.create_task(jup.quote_exec_price("buy", notional_usd, slippage_bps=30)) # 2) CEX VWAP と FX は同期I/O不要(WSから即時計算) # とはいえ gather 形にしておくと拡張が楽 async def _cex_side(): vwap_sell = bybit.vwap("spot", "SOLUSDT", side="sell", notional_quote=notional_usd) # CEXで売る単価 return vwap_sell async def _fx_side(): return fx.usdc_per_usdt() # (fx, fx_stable_bp) cex_px, (fx_usdc_per_usdt, fx_bp) = await asyncio.gather(_cex_side(), _fx_side()) dex_quote = await dex_task t1 = monotonic_ms() ts_diff = t1 - t0 # --- ゲート: 時刻差/鮮度/流動性 --- if ts_diff > 250: return {"status":"uncomparable","reason":"ts_skew","ts_diff_ms":round(ts_diff,1)} if cex_px is None: return {"status":"uncomparable","reason":"cex_liquidity","ts_diff_ms":round(ts_diff,1)} if "outAmount" not in dex_quote: return {"status":"uncomparable","reason":"dex_quote","ts_diff_ms":round(ts_diff,1)} # --- 実効単価の算出 --- # DEX: 受取トークン量(整数→小数)→ 実効単価(USDC/BASE) out_base = from_amount_int(int(dex_quote["outAmount"]), token.decimals) if out_base <= 0: return {"status":"uncomparable","reason":"dex_zero_out","ts_diff_ms":round(ts_diff,1)} dex_px_usdc = notional_usd / out_base # CEX: VWAPはUSDT建て → FXでUSDC建てへ cex_px_usdt = cex_px[0] # (price, filled_base) cex_px_usdc = cex_px_usdt * fx_usdc_per_usdt # --- スプレッド(DEX買い→CEX売り 方向の例) --- mid = (dex_px_usdc + cex_px_usdc) / 2.0 exec_spread_bp = (cex_px_usdc - dex_px_usdc) / mid * 1e4 # --- コスト後判定 --- total_cost_bp = cost_model_bp(cost) edge_bp = exec_spread_bp - total_cost_bp decision = "trigger" if edge_bp > cost.margin_bp else "skip" return { "status": "ok", "ts_diff_ms": round(ts_diff,1), "fx_stable_bp": round(fx_bp,2), "dex_px_usdc": dex_px_usdc, "cex_px_usdc": cex_px_usdc, "exec_spread_bp": exec_spread_bp, "total_cost_bp": total_cost_bp, "edge_bp": edge_bp, "decision": decision, "notional_usd": notional_usd }
片方向(DEX買い→CEX売り)の例を示しました。実運用では反対方向(CEX買い→DEX売り)も同時に計算し、有利な方のみ評価・発火します。
stale棄却・時刻差ゲートの実務メモ
- 時刻差は**“取得完了の差”でOK(
t0→await gather→t1
の差)。より厳密にやるならL2のageとquote作成時刻をログに出して両者の最大差**でゲート。 - 鮮度(age):
- L2:
now - orderbook.ts ≤ 500ms
- quote:
cache_ttl ≤ 300ms
(TTLキャッシュのヒット内)
- L2:
- 棄却は失敗ではない:
status=uncomparable
として理由ラベル(ts_skew|stale|liq|fx
)をメトリクスに積む。
USDT→USDC換算と fx_stable_bp
の扱い
- 換算レートは板から実測:
USDCUSDT
の**WS VWAP(例:$10k)**で求める。 fx_stable_bp
を常時計測し、**閾値超え(例:>10–15bp)**時は- 比較棄却するか、
margin_bp
に上乗せして保守的に判定する。
コストモデルの詰め方(最初の値→学習)
- 初期値(例)
dex_fee_bp=30
(0.30%)cex_fee_bp=10
(VIPやBNB割引を口座に合わせて)est_slip_bp=10
(Nの大小で分岐。実測から p95 を採用)priority_tip_bp=3–8
(Solanaの混雑に応じて可変)infra_bp=2–5
(リトライ・撤退の平均コスト)
- 運用で実測ログ→移動平均に置き換える:
est_slip_bp(N)
とpriority_tip_bp
、infra_bp
は直近X分/日の実績から更新。静的値のままはNG。
ログとメトリクス(最小)
- 比較SLO:
ts_diff_ms
(p95/p99) - 棄却率:
uncomparable_reason{ts_skew|stale|liq|fx}
- FX品質:
fx_stable_bp
(平均・p95) - スプレッド:
exec_spread_bp
、total_cost_bp
、edge_bp
(ヒストグラム) - ノーション別:
notional_usd
ラベルで断面を分ける
スモーク(30回ループ)
# arbbot/arb/smoke_fast_compare.py import asyncio from arbbot.arb.fast_compare import compare_once_fast, CostParams from arbbot.cex.bybit_ws_manager import BybitWSManager from arbbot.dex.jupiter_quote_cache import JupiterQuoteCache from arbbot.util.stable_fx import StableFX async def main(): bybit = BybitWSManager() await bybit.start({"SOLUSDT": "spot", "USDCUSDT": "spot"}) async with JupiterQuoteCache(ttl_ms=300) as cache: fx = StableFX(bybit_mgr=bybit, market="spot", symbol="USDCUSDT") for _ in range(30): res = await compare_once_fast(bybit, cache, fx, notional_usd=5_000.0, cost=CostParams(30,10,10,4,3,10)) print(res) await asyncio.sleep(0.5) await bybit.stop() # asyncio.run(main())
見どころ
ts_diff_ms
が ~80–200ms に収束(p95 ≤ 250ms)status="ok"
が ≥80%(初期目標)edge_bp
が 正 のサンプルがそこそこ出る(市況依存)
よくある落とし穴(回避策)
- USDT=USDCだと思い込む →
fx_stable_bp
を常に見よ。 - サイズ不一致 → DEX/CEXとも同じノーションで評価。
- 片方向だけで判定 → 両方向(DEX買い→CEX売り/CEX買い→DEX売り)を回し、有利側のみ判定。
- 静的コスト → 実測で動的更新。
est_slip_bp
とpriority_tip_bp
の固定は事故のもと。
この高速比較パスを心臓部に据えるだけで、“≤250ms / ≤1sサイクル”の監視が現実になります。あとは実行レイヤ(/swap+Jito送信、CEX発注)と結合し、**“見積=実行”**の契約を守れば、誤検知に振り回されない裁定運用に到達できます。
コストモデル&動的トリガ
結論:シグナルは“見かけのスプレッド”では出さない。
コストをbpで見積もり、コストを上回る“余剰bp(エッジ)”にマージンを足してトリガする。
数式(運用で使う形)
cost_bp = dex_fee_bp # ルート合計のDEX手数料 + cex_fee_bp # 自口座のテイカー/メイカー手数料 + est_slip_bp(N) # 指定ノーションNの滑り(p95) + jito_tip_bp # 優先手数料/チップ(Solana等) + infra_bp # 失敗・再送・撤退コストの移動平均 trigger_bp = cost_bp + margin_bp(symbol, N, regime) exec_spread_bp = (cex_px_usdc - dex_px_usdc)/mid * 1e4 # 例: DEX買い→CEX売り方向
意思決定:exec_spread_bp > trigger_bp
のときだけ 発火。それ以外は skip。
ポイント:
est_slip_bp(N)
やjito_tip_bp
は固定値NG。実測から常に更新する。
銘柄別・ノーション別の初期値(例)と学習
初期パラメータ(たたき台)
- Tier1(SOL/ETH/BTC)
dex_fee_bp=20〜35
、cex_fee_bp=6〜12
、est_slip_bp(5k)=6〜12
、jito_tip_bp=3〜8
、infra_bp=2〜4
、margin_bp=8〜12
- Tier2(RAY/JUP/PYTH)
dex_fee_bp=25〜45
、cex_fee_bp=8〜14
、est_slip_bp(5k)=12〜25
、jito_tip_bp=5〜10
、infra_bp=3〜6
、margin_bp=12〜18
- Tier3(BONK/WIF/POPCAT)
dex_fee_bp=30〜60
、cex_fee_bp=10〜16
、est_slip_bp(1k)=25〜60
、jito_tip_bp=6〜12
、infra_bp=4〜8
、margin_bp=18〜30
目安なので、あなたの口座手数料とルート特性に合わせて上書き。
設定イメージ(YAML/JSON)
symbols: SOL: tiers: "1k": { est_slip_bp: 6, margin_bp: 8 } "5k": { est_slip_bp: 10, margin_bp: 10 } "20k": { est_slip_bp: 18, margin_bp: 12 } cex_fee_bp: 8 jito_tip_bp: 5 dex_fee_bp: 25 infra_bp: 3
学習の仕組み(運用で必ず回す)
- est_slip_bp(N):
Jupiter/quote
のsimulate
/実測との差分を p95 で更新(滑りが跳ねたら即反映)。 - jito_tip_bp:
実際に払ったtip/priorityFee を 移動平均(短期は指数移動平均、長期は日次平均)。 - infra_bp:
失敗・再送・撤退で失ったbpを累計/件数から算出(ローリング1〜7日)。 - margin_bp(symbol,N,regime):
市況レジーム(例:quiet/volatile
)で切替。FX乖離・時刻差逸脱が増えたら自動で上積み。
ルール:“固定”は劣化の温床。実測→更新を自動化し、乖離すればサーキットで止める。
SLOとメトリクス
SLO(目標値)
- 比較時刻差
compare_age_diff_ms
: p95 ≤ 250ms(p99 ≤ 500ms) - 比較不能率
uncomparable_ratio
: ≤ 20%(初期の上限) - サイクル時間(裁定パス): p95 ≤ 1s
必須メトリクス(名前と意味)
レイテンシ系
quote_to_submit_ms{dex=}
/quote
取得→(将来の/swap
送信)まで。監視段階では見積完了→比較完了の擬似で代替。
compare_age_diff_ms
- DEX/CEX取得完了の差。**ゲート(≤250ms)**の健康診断。
比較の健全性
uncomparable_reason{ts_skew|stale|liq|fx|error}
(Counter)- 理由別の棄却数。ts_skew(時刻差超過)、stale(鮮度切れ)、liq(流動性不足)、fx(換算不可/大乖離)。
価格とFX
spread_exec_bp
(Histogram)- 可約定スプレッド(方向別・ノーション別でラベル付け)。
cost_bp_total
/cost_bp_{dex_fee,cex_fee,slip,tip,infra}
(Gauge/Histogram)- 内訳を分解して可視化。モデルの当たり外れが見える。
edge_bp = spread_exec_bp - cost_bp_total
(Histogram)- 実効エッジ。p50>0 を狙い、p10 > -5bp を下限に。
fx_stable_bp
(Histogram)|USDC/USDT - 1|×1e4
。10〜15bp超はリスクシグナル。
付帯
429_rate{source}
/retry_after_ms
(Gauge/Counter)- レート制限の健康診断(Jupiter/Bybit REST 等)。
推奨ダッシュボード(最小)
- SLOパネル:
compare_age_diff_ms p95/p99
、uncomparable_ratio
、cycle_time p95
- エッジ分布:
spread_exec_bp
、cost_bp_total
、edge_bp
(ノーション別の箱ひげ) - コスト内訳:
dex_fee
/cex_fee
/slip
/tip
/infra
(時系列) - FXと棄却理由:
fx_stable_bp
(時系列)、uncomparable_reason
スタック棒
アラート(初期閾値)
compare_age_diff_ms p95 > 250ms
が 5分連続 → 要因切り分け(WS遅延?TTL?並列不足?)uncomparable_ratio > 20%
が 15分連続 → 閾値/TTL/並列/速い回線へedge_bp p50 < 0
が 1時間 → トリガ停止+コストモデル再学習
実装ワンポイント
- メトリクスは“比較前に増やす”:棄却時も必ず理由カウント。
- 単位はbpに統一:費用・スプレッド・エッジを同次元で扱う。
- JSONログ併用:意思決定の1レコードに以下のようなものが残れば事後検証が劇的に楽になる。
{ "ts": 1723098000123, "symbol": "SOL/USDC", "N": 5000, "dex_px": 174.21, "cex_px": 174.28, "spread_exec_bp": 39.8, "cost": {"dex_fee":25,"cex_fee":8,"slip":10,"tip":5,"infra":3,"total":51}, "edge_bp": -11.2, "fx_stable_bp": 3.1, "ts_diff_ms": 142, "decision": "skip", "reason": "edge<=margin" }
これで、“コストを跨いで余剰bpが出た時だけ鳴らす”仕組みが完成します。
SLOで速度と鮮度を担保し、メトリクスでコストを学習する。
この2枚看板こそが、実運用で生き残る裁定監視のコアです。
フェイルセーフ&在庫管理
裁定は必ず失敗する前提で設計します。片足だけ約定、送信遅延、見積ズレ、レート制限——全部“日常”。壊れ方を想定して、即撤退・在庫抑制・自動停止までを仕組みにします。
片足失敗時の撤退ルール/段階ヘッジ
状態機械(最小)
IDLE
└─trigger→ SEND_BOTH (best-effort同時)
├─ DEX_FILLED & CEX_FILLED → FLAT(成功)
├─ DEX_FILLED & CEX_FAIL → HEDGE_CEX(IOC/MKTで即反転)
├─ CEX_FILLED & DEX_FAIL → HEDGE_DEX(最短ルートで即反転)
└─ BOTH_FAIL → NOFILL(在庫0で終了)
ルール(実務)
- タイムボックス:片足 fill 後 t_hedge ≤ 500ms で相手脚送信。超過なら段階撤退(サイズを1/2→1/4で刻む)。
- 価格ガード:ヘッジはVWAPで上限/下限を付ける(
max_slip_bp
)。ガード超過で即中止→“在庫として保有”戦略に切替(上限Δの範囲内)。 - 段階ヘッジ:
- 50% を最短IOCでヘッジ
- 残り 25%/25% を価格条件付きで追撃(最大 2–3 ステップ)
- 追撃失敗時は在庫キープ→撤退価格(損切り)を自動設定
擬似コード:
if leg_A_filled and not leg_B_filled_within(500_ms): hedge_sizes = [0.5, 0.25, 0.25] for s in hedge_sizes: ok = send_hedge(size=s*filled, guard_bp=max_slip_bp, tif="IOC") if ok and position_delta≈0: break if residual_delta > 0: place_stop_out(residual_delta, stop_guard_bp)
在庫上限・Δ上限/サーキットブレーカ
在庫・Δ
- 銘柄別在庫上限(例:
SOL: 2,000
,Tier3: 0
) - Δ上限(USD):ネットエクスポージャの総量(例:
$50k
) - 時間あたり回転数上限:スパイク時の過剰取引を抑制
サーキットブレーカ(自動停止)
- 連続負エッジ:
edge_bp < 0
が N回/5分 → 銘柄停止(30分) - SLO逸脱:
compare_age_diff_ms p95 > 250ms
が 5分継続 → 全体停止(5分) - FX乖離:
fx_stable_bp > 15
が 2分継続 → USDT→USDC換算を強化 or 比較棄却
運用フラグ:
risk: inv_cap_per_symbol: { SOL: 2000, ETH: 500, TIER3: 0 } delta_cap_usd: 50000 circuit_breaker: edge_negative_streak: { count: 5, window_sec: 300, cooldown_sec: 1800 } ts_skew_p95: { limit_ms: 250, window_sec: 300, cooldown_sec: 300 } fx_stable_bp: { limit_bp: 15, window_sec: 120, cooldown_sec: 300 }
429/5xx対策:トークンバケット+短期キャッシュ
落ちるのは前提。落ちても壊れないのが正解。
- トークンバケット(ソース別QPS)
- Jupiter/REST系は QPS上限を設定(例:
5 req/s
) - バーストは300ms TTLキャッシュで吸収(同一キーのデドゥープ)
- Jupiter/REST系は QPS上限を設定(例:
- 指数バックオフ+ジッター:
100ms, 200ms, 400ms
- フェイルファスト:裁定パスはリトライ回数=1(遅い成功より速い不成立)
- CBと統合:429比率がしきい値超→自動で頻度を落とす/一時停止
擬似コード:
if rate_limit_bucket.empty(): return STALE # 比較棄却 resp = fast_http.get(..., timeout=(1,2), retries=1) if resp.status in {429, 502, 503}: backoff() mark_rate_limited(source) return STALE
運用:シャドー→本番
15分スモーク → 24hシャドー
- 15分スモーク
- 監視のみ(注文は発生させない)。
- 指標:
ts_diff_ms p95 ≤ 250
、uncomparable ≤ 20%
、fx_stable_bp ≤ 10
。
- 24hシャドー
- ノーション別に両方向を連続観測。
- 指標:
edge_bp p50 > 0
、p10 > -5
、CB未発火。
朝イチ自動サマリ(毎朝固定フォーマット)
- 棄却率(理由別):
ts_skew / stale / liq / fx
- SLO逸脱:
compare_age_diff_ms p95/p99
、cycle_time p95
- 発生源ランキング:エラー・429のソース別上位(Jupiter/Bybit/自前RPC)
- スプレッドとコスト:
spread_exec_bp
vscost_bp_total
の箱ひげ(ノーション別)
例(テキスト出力):
[Daily Summary] 2025-08-08 UTC - SLO: ts_diff p95=184ms / p99=392ms, cycle p95=0.82s - Uncomparable: 17.3% (ts_skew 9.2%, stale 5.1%, liq 2.6%, fx 0.4%) - FX stable: mean 3.2bp / p95 8.7bp - Top sources (errors): Jupiter 38%, Bybit 34%, RPC 21% - Edge vs Cost (5k): p50 +12.4bp / p10 -3.1bp / triggers 47
逸脱時の“一行ポストモーテム”
短く、事実と対策だけ。SlackやOpsノートにコピペできる形で。
[PM] 2025-08-08 03:12Z | SOL/USDC 5k | ts_skew>250ms 連続 | Bybit WS遅延spike 対策: WS再接→地域エンドポイント切替・Jupiter TTL200msへ | 再発監視: ts_diff_ms p95
テンプレ:
[PM] <UTC> | <symbol>/<notional> | <現象> | <直接原因> | 対策: <具体> | 再発監視: <KPI>
まとめ
- 片足失敗は“正常系”:タイムボックス+段階ヘッジ+撤退価格。
- 在庫とΔは数値で封じる:上限とCBで自動停止。
- レート制限は構造で回避:バケット+TTL+フェイルファスト。
- 運用はリズム:スモーク→シャドー→朝イチサマリ→一行PM。
ここまで入れれば、“壊れ方ごとに壊れない”監視運用が回り始めます。
よくある落とし穴
amountの整数化忘れ/mint違い/decimalsミス
- Jupiterの
amount
は整数(mintの最小単位)必須。 小数を投げると400や謎丸めで地獄行き。 - mint IDが正義。 “SOL/USDC”みたいなシンボル依存は排除して、mint+decimalsをレジストリで固定。
outAmount
も整数(decimals適用済)。自分で小数に戻して実効単価を出す。
USDC/USDTを1.0固定扱い
- CEXはUSDT建て、DEXはUSDC建てが普通。1.0000前提はNG。
- USDCUSDT板のVWAPで実測→
fx_stable_bp = |fx-1|×1e4
をログ。10–15bp超は比較棄却 or マージン上乗せ。
v2 Bybit APIやHTTP板ポーリングの混入
- Bybitはv5 WS L2が基本。HTTPのticker/ポーリングは遅延要因。
- 市場区分(spot/linear)を間違えると404/変な値になる。シンボルごとにcategoryを決め打ち。
付録:最小コード断片
どれも
aiohttp
+asyncio
前提。雑に貼っても動く“最小・実用”版。
Bybit WS → VWAPスニペット
# bybit_ws_vwap_min.py import aiohttp, asyncio, json, time from typing import Dict, List, Optional, Tuple class OB: def __init__(self): self.bids={}, self.asks={}, self.ts=0.0 def snap(self, b:List[List[str]], a:List[List[str]]): self.bids={float(p):float(q) for p,q in b}; self.asks={float(p):float(q) for p,q in a}; self.ts=time.time() def delta(self, b:List[List[str]], a:List[List[str]]): for p,q in b: q=float(q); p=float(p); (self.bids.pop(p,None) if q<=0 else self.bids.__setitem__(p,q)) for p,q in a: q=float(q); p=float(p); (self.asks.pop(p,None) if q<=0 else self.asks.__setitem__(p,q)) self.ts=time.time() def vwap(self, side:str, notional:float)->Optional[Tuple[float,float]]: if notional<=0: return None if side=="buy": # consume asks low→high lv=sorted(self.asks.items(),key=lambda x:x[0]) else: # consume bids high→low lv=sorted(self.bids.items(),key=lambda x:x[0],reverse=True) rem=notional; base=0.0; quote=0.0 for price,size in lv: cap=price*size if rem<=0: break if rem>=cap: base+=size; quote+=cap; rem-=cap else: take=rem/price; base+=take; quote+=rem; rem=0; break return (quote/base, base) if base>0 else None class BybitWSVWAP: def __init__(self, ws_url:str, symbol:str, depth:int=50): self.ws_url=ws_url; self.topic=f"orderbook.{depth}.{symbol}"; self.ob=OB() async def run(self, stop:asyncio.Event): async with aiohttp.ClientSession() as s: async with s.ws_connect(self.ws_url, heartbeat=15) as ws: await ws.send_json({"op":"subscribe","args":[self.topic]}) async for msg in ws: if stop.is_set(): break if msg.type==aiohttp.WSMsgType.TEXT: p=msg.json(loads=json.loads); d=p.get("data",{}) if p.get("type")=="snapshot": self.ob.snap(d.get("b",[]), d.get("a",[])) elif p.get("type")=="delta": self.ob.delta(d.get("b",[]), d.get("a",[])) def vwap(self, side:str, notional:float, max_age_ms:int=500): if (time.time()-self.ob.ts)*1000>max_age_ms: return None return self.ob.vwap(side, notional) # 使用例: # stop=asyncio.Event() # client=BybitWSVWAP("wss://stream.bybit.com/v5/public/spot","SOLUSDT") # asyncio.create_task(client.run(stop)); await asyncio.sleep(1.0); print(client.vwap("buy",5000))
Jupiter /quote TTLキャッシュ
# jupiter_quote_cache_min.py import aiohttp, asyncio, time from dataclasses import dataclass from typing import Optional, Dict JUP="https://quote-api.jup.ag/v6/quote" @dataclass(frozen=True) class Key: input_mint:str; output_mint:str; amount_int:int; slippage_bps:int; direct:bool @dataclass class Entry: ts_ms:float; data:dict class JupCache: def __init__(self, ttl_ms:int=300): self.ttl=ttl_ms; self.c:Dict[Key,Entry]={}; self.locks:Dict[Key,asyncio.Lock]={}; self.guard=asyncio.Lock() async def __aenter__(self): self.s=aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3.0)); return self async def __aexit__(self,*_): await self.s.close() async def get(self, k:Key)->dict: now=time.time()*1000; e=self.c.get(k) if e and now-e.ts_ms<=self.ttl: return e.data lock=await self._lock(k); async with lock: now=time.time()*1000; e=self.c.get(k) if e and now-e.ts_ms<=self.ttl: return e.data p={"inputMint":k.input_mint,"outputMint":k.output_mint,"amount":str(k.amount_int), "slippageBps":str(k.slippage_bps),"onlyDirectRoutes":"true" if k.direct else "false"} async with self.s.get(JUP,params=p) as r: r.raise_for_status(); data=await r.json() self.c[k]=Entry(now,data); return data async def _lock(self,k:Key): async with self.guard: if k not in self.locks: self.locks[k]=asyncio.Lock() return self.locks[k] def to_amount_int(x:float,dec:int)->int: return int(round(x*(10**dec))) def from_amount_int(x:int,dec:int)->float: return x/(10**dec) def calc_min_out(out_int:int, slippage_bps:int, safety_bps:int=5)->int: return max(0,(out_int*(10_000-(slippage_bps+safety_bps)))//10_000) # 使用例: # async with JupCache() as jc: # k=Key(USDC_MINT,SOL_MINT,to_amount_int(5000,6),30,False) # q=await jc.get(k); out_sol=from_amount_int(int(q["outAmount"]),9)
高速比較パス(250msゲート)
# fast_compare_min.py import asyncio, time from typing import Dict, Tuple from jupiter_quote_cache_min import JupCache, Key, to_amount_int, from_amount_int def ms(): return time.perf_counter()*1e3 async def compare_once(bybit_client, jup_cache:JupCache, usdc_mint:str, token_mint:str, token_dec:int, notional_usd:float=5000.0, fx_func=None) -> Dict: """ bybit_client: 前出WSクライアント(.vwap(side, notional)を持つ) fx_func: USDC/USDT レート関数 → (fx, fx_bp) """ t0=ms() # DEX見積(並列)開始 k=Key(usdc_mint, token_mint, to_amount_int(notional_usd,6), 30, False) dex_fut=asyncio.create_task(jup_cache.get(k)) # CEX VWAP & FX はWSから即時計算 cex = bybit_client.vwap("sell", notional_usd) # CEXで売る想定(USDT建て) fx, fx_bp = (1.0, 0.0) if fx_func is None else fx_func() dex = await dex_fut t1=ms(); ts_diff=t1-t0 if ts_diff>250: return {"status":"uncomparable","reason":"ts_skew","ts_diff_ms":round(ts_diff,1)} if cex is None or "outAmount" not in dex: return {"status":"uncomparable","reason":"stale_or_liq","ts_diff_ms":round(ts_diff,1)} # 単価へ out_base = from_amount_int(int(dex["outAmount"]), token_dec) if out_base<=0: return {"status":"uncomparable","reason":"dex_zero","ts_diff_ms":round(ts_diff,1)} dex_px_usdc = notional_usd / out_base cex_px_usdc = cex[0] * fx mid=(dex_px_usdc+cex_px_usdc)/2 spread_bp=(cex_px_usdc-dex_px_usdc)/mid*1e4 return {"status":"ok","ts_diff_ms":round(ts_diff,1),"fx_stable_bp":round(fx_bp,2), "dex_px":dex_px_usdc,"cex_px":cex_px_usdc,"spread_exec_bp":spread_bp} # 使用例(擬似): # fx = lambda: (bybit_fx.vwap("buy",10_000)[0]**-1, abs((bybit_fx.vwap("buy",10_000)[0]**-1-1)*1e4)) # res = await compare_once(bybit_sol_client, jc, USDC_MINT, SOL_MINT, 9, 5000.0, fx)
チェックポイント
ts_diff_ms
が常に≤250ms以内か(p95基準)。fx_stable_bp
を必ず記録し、二桁bpに膨らんだら棄却やマージン上乗せ。spread_exec_bp
はコストモデル(fee/滑り/tip/infra)を差し引いて判定(ここでは割愛)。
結び
この記事でやったのは、「価格を引き算する監視」から**“約定できる価格を比較する監視”への移行です。
鍵はシンプルでした――ミリ秒(≤250ms)と1秒(≤1s)を守る設計、そしてコストを跨いだ余剰bpしか信じない**という姿勢。
- Executable Price:DEXは
/quote
→実効単価、CEXはWS L2→ノーション別VWAP。 - 鮮度の担保:TTL≤300msのキャッシュ、DEX/CEXを並列取得、時刻差ゲート≤250ms。
- 通貨建ての統一:USDC/USDTは板で実測、fx_stable_bpを常時計測。
- 動的トリガ:
cost_bp = fee + slip(N) + tip + infra
を越えた余剰bpだけで鳴らす。 - 運用耐性:片足失敗の撤退/段階ヘッジ、在庫・Δ上限、429/5xxは構造で回避、SLOとメトリクスで可視化。
- 導入手順:15分スモーク → 24hシャドー → 朝イチ自動サマリ → 逸脱は“一行PM”。
この型に乗せれば、「数字は出てるのに取れない」から卒業できます。あとは現場の条件(口座手数料、対象銘柄、回線品質)に合わせてコストモデルを学習させ、SLOを育てていくだけ。
“見かけのスプレッド”を捨て、実行可能性で勝つ――それが、実運用で生き残る唯一の道です。