裁定(アービトラージ)の監視は「価格を取って引き算する」だけでは機能しません。実運用で勝負を決めるのは“ミリ秒”と“約定可能性”です。
本記事では、CEX/DEXアービトラージ監視を≤250msの時刻差と**≤1sの監視サイクル**で回すための設計指針を、実装の勘所と一緒にまとめます。
結論から言うと、集約APIの平均価格を捨てて、Executable Price(実行可能価格)に全てを寄せるのが正解です。
CEX/DEXarbbotの開発もピンポイントな知見が溜まって良いな。開発の優先度高めで取り組んでいるのだが、正しい判断だった。
・価格は“約定できるか”で定義する
→ズレた比較は誤検知の源泉
・サイズ依存の滑りを無視してはいけない
→CEXはWSのL2板、DEXは見積API+TTLキャッシュ(≤300ms)で並列取得— よだか(夜鷹/yodaka) (@yodakablog) August 8, 2025
何がダメだったのか:集約APIの平均値 vs 実行可能価格
「まずはCoinGeckoで価格を…」などの実装は、この手の監視プロダクションでは地雷です。
- 平均値は実行できない
集約APIは複数市場の平均/中間を返しますが、あなたがその価格で今この瞬間に約定できる保証はない。板の厚さも手数料もスリッページも反映されません。 - 遅延が平気で数百ms〜数秒
集約はどうしても更新周期が長い。CEX板(WS)とDEX見積(quote)を同時刻で比較しないと、幻のスプレッドを量産します。 - 通貨建てがバラバラ
CEXはUSDT建て、DEXはUSDC建て——1.0000と思い込みで比較すると平気で数十bpの誤差が出ます。 - サイズ依存性を無視
「0サイズのmid」での比較は、実トレードの意思決定と無関係。指定ノーション($1k/$5k/$20k…)での実行可能価格を比較する必要があります。
要するに、**「見かけの価格」ではなく「その場で実際に約定できる価格」**に全てを合わせる設計が必要です。
本記事のゴール:≤250ms時刻差 / ≤1sサイクルの監視
達成したいSLO(サービス目標)はシンプルです。
- 比較時刻差(DEX vs CEX):p95 ≤ 250ms
これを超える比較は棄却(uncomparable)。ズレた比較は誤検知の源泉です。 - 監視サイクル時間:p95 ≤ 1s
HTTPポーリングでダラダラ取らない。CEXはWSのL2板、DEXは見積API(例:Jupiter /quote)+TTLキャッシュ(≤300ms)で並列取得が基本。 - 幻スプレッド撲滅
価格はノーション指定で実効単価に変換し、手数料・滑り・優先手数料(tip)まで含んだコスト後スプレッドで評価します。
原則(結論)
価格は“約定できるか”で定義する(Executable Price)
- DEX:ルーティング込み見積(
/quote)で手数料・価格インパクト込みの受取量を取得し、実効単価(投入額 ÷ 受取量)へ変換。 - CEX:L2板からVWAP(ノーション別)を算出し、自口座の手数料まで含めて実効単価化。
- 比較は常に「実行可能価格 vs 実行可能価格」。集約平均・0サイズmidは参考値に降格。
ノーション指定・通貨建て統一(USDC/USDT FX)・時刻差ゲート
- ノーション指定:
N ∈ {1k, 5k, 20k, ...}で買い/売りを両方向計測。サイズ依存の滑りを無視しない。 - 通貨建て統一:CEX(USDT建て)⇄DEX(USDC建て)を常時換算。USDC/USDTの実測FXをWS板で取り、**乖離(bps)**をメトリクス化。
- 時刻差ゲート:
|ts_dex - ts_cex| ≤ 250msを満たさない比較は棄却。鮮度の合わない比較はしない。
静的閾値は禁止:コストモデル+マージンでトリガ
「常に1%超えたらシグナル」は雑です。銘柄・ノーション・市況でコストは変わるから。
- コストモデル(bp)
cost_bp = dex_fee_bp + cex_fee_bp + est_slip_bp(N) + priority_tip_bp + infra_bp - dex_fee_bp:DEX手数料の合計(ルート依存)
- cex_fee_bp:自口座のメイカー/テイカー手数料(VIP/リベート反映)
- est_slip_bp(N):ノーションNでの滑りの経験則(p95)
- priority_tip_bp:優先手数料・tipの実績(Solana等)
- infra_bp:失敗・再送・撤退コストの移動平均
- トリガ条件
exec_spread_bp > cost_bp + margin_bp(symbol, N, regime)- exec_spread_bp:実行可能スプレッド(bp)
- margin_bp:銘柄/ノーション/相場レジーム別の上乗せ(初期は5–15bp程度)
- 運用の型
- 比較できないときは棄却(stale/ts_skew/liquidity)
- コスト負け(
exec_spread_bp <= cost_bp)が続けば自動で停止(サーキットブレーカ)
この原則に従うだけで、「数字は出ているのに全然取れない」監視から卒業できます。次章以降では、WS板×TTLキャッシュ×並列取得で**≤250ms / ≤1s**を実現するための具体アーキテクチャと実装の勘所を紹介します。
トークンのDEX価格観測は本当に多彩な手段があるので、"自分が何をしようとしているのか"に応じて変えていく必要がある。
これも一つずつ自分で試して最適なものを見つけるしかない。
幸いなことに時間だけは十分にあるので総当たりで潰していく。
初めから最適解を考えていると行動量減るし。— よだか(夜鷹/yodaka) (@yodakablog) August 8, 2025
アーキテクチャ概観
本設計は「DEXは見積APIで実効価格」「CEXはWS板からVWAP」「USDC/USDTを実測で換算」の三本柱に、I/O層の速度分離を重ねたものです。以下は全体の流れ。
sequenceDiagram
autonumber
participant CEX as CEX WS(L2)
participant FX as USDC/USDT 板
participant DEX as Jupiter /quote
participant ARB as Arb Monitor
par 並列取得
CEX->>ARB: VWAP(N)(USDT建て)
DEX->>ARB: /quote(outAmount)(USDC建て)
and
FX->>ARB: fx 実測(USDC/USDT)+ fx_stable_bp ログ
end
ARB->>ARB: ts_diff 測定(≤250msか?)
alt 超過
ARB-->>ARB: 比較棄却(uncomparable: ts_skew)
else 以内
ARB->>ARB: USDT→USDC 換算(fx適用)
ARB->>ARB: spread_exec_bp 計算
ARB->>ARB: cost_bp = dex_fee + cex_fee + est_slip(N) + tip + infra
ARB->>ARB: edge_bp = spread - cost
alt edge_bp > margin_bp
ARB-->>ARB: Trigger 発火
else
ARB-->>ARB: Skip
end
end
DEX:Jupiter /quote→/swap(同一ルート・minOut・TTL)
目的:手数料・価格インパクトを含む“実行可能価格”をミリ秒精度で取得。
やること:
- /quote で見積
- 入力は整数 amount(mint decimals 適用後)、
slippageBpsを明示。 - 300ms TTLのメモリキャッシュで同一キーの多重呼び出しをデドゥープ。
- キー例:
(inputMint, outputMint, amount_int, slippageBps, onlyDirect)
- 入力は整数 amount(mint decimals 適用後)、
- 同一ルート厳守で /swap(将来の実行段)
- /quote の route_id / routePlan をそのまま /swap に渡す。
- minOut は
outAmount × (1 - slippageBps/1e4 - safety_bps/1e4)で計算して埋める。 - TTL:
blockhash/lastValidBlockHeightによる内部有効期限+“quote→submit ≤ 250ms” の運用SLO。
- ログに残す最小項目
route_id, pool_ids, total_fee_bp, inAmount, outAmount, minOut, quote_latency_ms, quote_ttl_ms
ポイント:集約サイトの平均値は使わない。Jupiter見積の受取量(outAmount)→実効単価に変換して比較する。
CEX:Bybit v5 WebSocket L2 → ノーション別VWAP
目的:HTTPポーリングを捨てて、常時同期された板からその場で約定できる単価を出す。
- v5 Public WS に接続(
wss://stream.bybit.com/v5/public/{spot|linear})orderbook.50.SYMBOLを購読し、L2を常駐保持(辞書でOK)。- スナップショット→デルタを反映。ping/再接続はバックオフ。
- ノーション別VWAP
- 買い:最良売りから**指定ノーション($1k/$5k/$20k…)**分だけ積み上げ、平均約定単価を算出。
- 売り:最良買いから同様に。
- 自口座の手数料(maker/taker)はbpで上乗せ/控除して“実効単価”へ。
- ログ
vwap_price, filled_base, levels_used, l2_age_ms, compute_ms
ポイント:0サイズmidではなくVWAP。指定ノーションで“実行できる価格”を取りに行く。
FX:USDC/USDT を板経由で実測 → fx_stable_bp を常時計測
目的:CEXはUSDT建て、DEXはUSDC建て——1.0000前提はNG。安定通貨の微妙なズレがbp単位で効いてくる。
- USDCUSDT(spot)の板を同じく WS で購読
- 小ノーション(例:$10k)でVWAPを取り、
USDT→USDCのレートを実測。 - 変換は CEX価格(USDT建て) × fx(USDC/USDT) → USDC建てに統一。
- 小ノーション(例:$10k)でVWAPを取り、
fx_stable_bpをメトリクス化fx_stable_bp = |fx - 1.0| × 1e4- 閾値超え時(例:>10–15bp)は比較棄却 or マージン上乗せなどの対策を自動適用。
ポイント:通貨建ての統一は必須。微差でも裁定の可否が変わる。
I/O:FastHTTP(1s/2s/1retry) と SlowHTTP の分離
目的:裁定パスを遅い呼び出しから隔離し、p95 ≤ 1sを保証する。
- FastHTTP(裁定用)
- connect ≤ 1s / read ≤ 2s / retry = 1(指数バックオフ+ジッター)
- 用途:Jupiter
/quote、必要最小のメタAPI - 失敗は即棄却(比較不能として扱う)——“遅い成功”より“速い不成立”
- SlowHTTP(観測用)
- TVL、ステーブルフロー、バックフィル等は別クライアントで 10–30秒級のタイムアウト・多段リトライOK。
- 名前(DI)で明示的に注入して混入を防止する。
- 並列取得 & 時刻差ゲート
- DEX見積(FastHTTP)と CEX VWAP(WS→即時計算)を同時に走らせ、取得完了の差分 ≤ 250msのみ採用。
- 超過した比較は**
uncomparable(reason=ts_skew)**でカウント・可視化。
ポイント:**速さを“設計として担保”**する。遅い処理を別のレーンに追いやるだけで、SLOは一気に安定する。
これで満たせるSLO
- 比較時刻差:p95 ≤ 250ms(DEX/CEX同時取得+WS常時維持+TTLキャッシュ)
- サイクル時間:p95 ≤ 1s(FastHTTPと並列化で待ち時間を圧縮)
- 誤検知の激減:ノーション別VWAP × 実測FX × コストモデルにより**“見かけのスプレッド”**を排除
次章では、このアーキテクチャをコード断片とメトリクス設計に落とし込み、実際に250ms/1sの壁を超える手順を解説します。
実装A:Bybit v5 WS→VWAP
orderbook.50.SYMBOLを常時購読/L2を辞書で保持し、VWAP時のみ上位Nソート/買い・売りのノーション別VWAP
このセクションでは、HTTPポーリングを捨てて Bybit v5 のPublic WebSocketからL2板を常時同期し、指定ノーションでの実行可能VWAPを数百ミリ秒で返す最小実装を示します。方針はシンプル:
- WSで
orderbook.50.SYMBOLを購読 - L2は辞書で常駐保持(更新はO(1))
- VWAP計算時だけ上位Nをソート(必要時のみ O(K log K))
- 指定ノーション(クォート建て)でのVWAPを返す
1) L2常駐:スナップショット+デルタ
- スナップショットで全置換 → デルタで増減・削除(size=0で削除)
bids: Dict[price,float]/asks: Dict[price,float](どちらも base数量 を保持)- ハートビート&再接続は最低限でOK(指数バックオフ)
# arbbot/cex/bybit_ws_vwap.py(抜粋)
import aiohttp, asyncio, time, json
from typing import Dict, List, Optional, Tuple, Literal
Side = Literal["buy","sell"]
class Orderbook:
def __init__(self):
self.bids: Dict[float, float] = {} # price → size(base)
self.asks: Dict[float, float] = {}
self.ts = 0.0 # last update time (sec)
def apply_snapshot(self, bids: List[List[str]], asks: List[List[str]]):
self.bids.clear(); self.asks.clear()
for p, q in bids: self.bids[float(p)] = float(q)
for p, q in asks: self.asks[float(p)] = float(q)
self.ts = time.time()
def apply_delta(self, bids: List[List[str]], asks: List[List[str]]):
for p, q in bids:
price, size = float(p), float(q)
if size <= 0: self.bids.pop(price, None)
else: self.bids[price] = size
for p, q in asks:
price, size = float(p), float(q)
if size <= 0: self.asks.pop(price, None)
else: self.asks[price] = size
self.ts = time.time()
2) ノーション別VWAP:買いはASK、売りはBID
買い(USDT/USDCを支払ってベースを得る)
- ASK側を安い順に積み上げ、**指定ノーション(クォート建て)**を使い切るまで取得
- 有効単価 =
支払ったクォート合計 / 取得できたベース合計
売り(ベースを売ってクォートを得る)
- BID側を高い順に積み上げ、同様に計算
class Orderbook:
# ...前掲...
def vwap(self, side: Side, notional_quote: float) -> Optional[Tuple[float, float]]:
if notional_quote <= 0: return None
if side == "buy":
if not self.asks: return None
ladder = sorted(self.asks.items(), key=lambda kv: kv[0]) # ask: price↑
remain_q = notional_quote; got_base = spent_q = 0.0
for price, size_base in ladder:
cap_q = price * size_base
if remain_q <= 0: break
if remain_q >= cap_q:
got_base += size_base; spent_q += cap_q; remain_q -= cap_q
else:
take_base = remain_q / price
got_base += take_base; spent_q += remain_q; remain_q = 0.0; break
if got_base == 0: return None
return (spent_q / got_base, got_base) # (有効単価=クォート/ベース, 取得ベース量)
else:
if not self.bids: return None
ladder = sorted(self.bids.items(), key=lambda kv: kv[0], reverse=True) # bid: price↓
remain_q = notional_quote; sold_base = recv_q = 0.0
for price, size_base in ladder:
cap_q = price * size_base
if remain_q <= 0: break
if remain_q >= cap_q:
sold_base += size_base; recv_q += cap_q; remain_q -= cap_q
else:
part_base = remain_q / price
sold_base += part_base; recv_q += remain_q; remain_q = 0.0; break
if sold_base == 0: return None
return (recv_q / sold_base, sold_base)
ここではクォート建てのノーション(例:$1k, $5k, $20k)を入力に統一しています。板がUSDT建てなら
notional_quoteはUSDT額、USDC建てならUSDC額。あとでUSDT→USDC換算を掛けて比較します。
3) WebSocket接続:購読・心拍・再接続
class BybitWSVWAP:
PING_INTERVAL = 15
def __init__(self, ws_url: str, topic: str, session: aiohttp.ClientSession | None = None):
self.ws_url = ws_url; self.topic = topic
self.session = session; self._ext = session is not None
self.ob = Orderbook()
async def __aenter__(self):
if not self._ext: self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *exc):
if not self._ext and self.session: await self.session.close()
async def run(self, stop: asyncio.Event):
backoff = 1.0
while not stop.is_set():
try:
await self._loop(stop); backoff = 1.0
except Exception:
await asyncio.sleep(min(backoff, 10.0)); backoff *= 1.7
async def _loop(self, stop: asyncio.Event):
assert self.session
async with self.session.ws_connect(self.ws_url, heartbeat=self.PING_INTERVAL) as ws:
await ws.send_json({"op": "subscribe", "args": [self.topic]})
async for msg in ws:
if stop.is_set(): break
if msg.type.name == "TEXT":
payload = msg.json()
t = payload.get("type"); d = payload.get("data", {})
if t == "snapshot": self.ob.apply_snapshot(d.get("b", []), d.get("a", []))
elif t == "delta": self.ob.apply_delta (d.get("b", []), d.get("a", []))
elif msg.type.name == "ERROR":
raise RuntimeError("WS error")
def vwap(self, side: Side, notional_quote: float):
return self.ob.vwap(side, notional_quote)
4) ステール検知と品質ゲート
VWAPを返す前に次をチェックすると誤検知が激減します。
- L2鮮度:
now - orderbook.ts ≤ 500ms(超過ならNone) - 深さ不足:指定ノーションに届かない場合は
None - 時刻差ゲート:DEX見積との取得完了差 ≤ 250ms(超過は比較棄却)
def vwap_safe(client: BybitWSVWAP, side: Side, notional_quote: float, max_age_ms: int = 500):
if (time.time() - client.ob.ts) * 1000 > max_age_ms:
return None
return client.vwap(side, notional_quote)
5) よくある落とし穴(回避策つき)
- 線形/無期限(linear)とスポットの混同
先物は契約サイズや資金調達が絡む。まずはspotでVWAP、perpはcontract→base換算を入れてから。 - 数量の単位
Bybitの板数量はbase数量。VWAPのノーションはクォート額。混ぜない。 - 片側Onlyでの手数料
監視段階でもテイカーfeeをbpで上乗せ/控除して“実効単価”へ。メイカー運用は例外(遅延リスク大)。 - ソート最適化の過剰実装
まずは「辞書保持+必要時ソート」で十分速い。最適化はp95>1sになってからでOK。
6) スモーク(30秒)
# 例:SOLUSDT(spot)を購読して $5k の買いVWAPを1秒おきに出す
async def smoke():
stop = asyncio.Event()
async with BybitWSVWAP(
ws_url="wss://stream.bybit.com/v5/public/spot",
topic="orderbook.50.SOLUSDT"
) as c:
task = asyncio.create_task(c.run(stop))
await asyncio.sleep(1.0) # 少し溜める
for _ in range(30):
print("BUY 5k:", c.vwap("buy", 5_000))
await asyncio.sleep(1.0)
stop.set(); await task
このWS→VWAPの最小実装をCEX側の標準にすると、HTTPポーリング由来の秒遅延が消え、DEX見積(Jupiter /quote)との同時比較(≤250ms)が現実になります。次章ではJupiter /quote TTLキャッシュとの合わせ技で1秒サイクルを安定させるポイントを解説します。
実装B:Jupiter /quote TTLキャッシュ
キー:(inputMint, outputMint, amount_int, slippageBps, direct)/TTL≦300msでデドゥープ/outAmountは整数(decimals適用済)・minOut計算フック
DEX側はJupiterの /quoteを“真実の窓口”にします。ただし生の呼び出しを連打するとレイテンシ悪化&レート制限の温床。そこで極小TTLキャッシュ(≤300ms)で同一クエリをデドゥープし、見積の揺れを抑えつつミリ秒級で返します。
設計ポイント
- キー=
(inputMint, outputMint, amount_int, slippageBps, direct)amount_intは mintの最小単位(整数)。小数は送らない。directは onlyDirectRoutes の真偽。ルート最適化の仕様差をキーに含める。
- TTL ≦ 300ms:その間に同一キーで来た要求は同一レスポンスを返す(per-keyロックで同時実行も一発に)。
- outAmount は整数:Jupiterはdecimals適用済みの整数で返す。実効単価に変換するときは自分で小数に戻す。
- minOut 計算フック:
outAmountに slippage + safety を差し引いて**/swap**へ渡す値を出す(将来の実行段にもそのまま使える)。
最小実装(そのまま使える)
# arbbot/dex/jupiter_quote_cache.py
from __future__ import annotations
import asyncio, time
from dataclasses import dataclass
from typing import Optional, Dict
import aiohttp
JUP_QUOTE_URL = "https://quote-api.jup.ag/v6/quote"
@dataclass(frozen=True)
class QuoteKey:
input_mint: str
output_mint: str
amount_int: int # 最小単位(整数)
slippage_bps: int
only_direct: bool
@dataclass
class CachedQuote:
ts_ms: float
data: dict
class JupiterQuoteCache:
"""TTL≦300msの極小キャッシュ。per-keyロックで同時呼び出しをデドゥープ。"""
def __init__(self, session: Optional[aiohttp.ClientSession] = None, ttl_ms: int = 300):
self.ttl_ms = ttl_ms
self.session = session
self._external_session = session is not None
self._cache: Dict[QuoteKey, CachedQuote] = {}
self._locks: Dict[QuoteKey, asyncio.Lock] = {}
self._locks_guard = asyncio.Lock()
async def __aenter__(self):
if not self._external_session:
# FastHTTP 相当:短いタイムアウトで固める
timeout = aiohttp.ClientTimeout(total=3.0, connect=1.0, sock_read=2.0)
self.session = aiohttp.ClientSession(timeout=timeout)
return self
async def __aexit__(self, *exc):
if not self._external_session and self.session:
await self.session.close()
async def get_quote(self, key: QuoteKey) -> dict:
now = time.time() * 1000
hit = self._cache.get(key)
if hit and (now - hit.ts_ms) <= self.ttl_ms:
return hit.data
# per-key ロックで同一キーのスパムを1発にまとめる
lock = await self._get_lock(key)
async with lock:
now = time.time() * 1000
hit = self._cache.get(key)
if hit and (now - hit.ts_ms) <= self.ttl_ms:
return hit.data
assert self.session is not None
params = {
"inputMint": key.input_mint,
"outputMint": key.output_mint,
"amount": str(key.amount_int), # 整数
"slippageBps": str(key.slippage_bps),
"onlyDirectRoutes": "true" if key.only_direct else "false",
}
async with self.session.get(JUP_QUOTE_URL, params=params) as r:
r.raise_for_status()
data = await r.json()
self._cache[key] = CachedQuote(ts_ms=time.time()*1000, data=data)
return data
async def _get_lock(self, key: QuoteKey) -> asyncio.Lock:
async with self._locks_guard:
if key not in self._locks:
self._locks[key] = asyncio.Lock()
return self._locks[key]
decimalsヘルパと minOut フック
# arbbot/dex/jupiter_utils.py
def to_amount_int(amount_units: float, decimals: int) -> int:
"""人間系の数量(小数)→ mint最小単位(整数)"""
return int(round(amount_units * (10 ** decimals)))
def from_amount_int(amount_int: int, decimals: int) -> float:
"""mint最小単位(整数)→ 人間系の数量(小数)"""
return amount_int / (10 ** decimals)
def calc_min_out(out_amount_int: int, slippage_bps: int, safety_bps: int = 5) -> int:
"""
JupiterのoutAmount(整数)から /swap に渡す minOut(整数) を算出。
slippageBps に更に safety を上乗せして保守的に。
"""
total_bps = slippage_bps + safety_bps
# 小数点を使わず整数演算で安全に
return max(0, (out_amount_int * (10_000 - total_bps)) // 10_000)
使い方(USDC→SOLの“$5k買い”を見積+実効単価)
# arbbot/dex/example_quote.py
import asyncio
from arbbot.dex.jupiter_quote_cache import JupiterQuoteCache, QuoteKey
from arbbot.dex.jupiter_utils import to_amount_int, from_amount_int, calc_min_out
USDC_MINT = "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v" # 6
SOL_MINT = "So11111111111111111111111111111111111111112" # 9
async def main():
async with JupiterQuoteCache(ttl_ms=300) as cache:
key = QuoteKey(
input_mint=USDC_MINT,
output_mint=SOL_MINT,
amount_int=to_amount_int(5_000.0, 6), # 5,000 USDC を整数化
slippage_bps=30,
only_direct=False
)
q = await cache.get_quote(key)
# outAmount(整数, decimals適用済) → SOL数量(小数)
out_int = int(q["outAmount"])
out_sol = from_amount_int(out_int, 9)
# 実効単価(USDC/SOL) = 投入USDC / 受取SOL
effective_px = 5_000.0 / out_sol
# /swap 用の minOut(安全寄り)
min_out_int = calc_min_out(out_int, slippage_bps=30, safety_bps=5)
print({
"route": q.get("routePlan", [])[:1],
"out_sol": out_sol,
"effective_px_usdc_per_sol": effective_px,
"min_out_int": min_out_int
})
# asyncio.run(main())
運用で効く“ちょい足し”Tips
- TTLは短く(200–300ms)
ミリ秒の壁を越えるには新鮮さ>キャッシュヒット率。ヒット率は“副産物”でOK。 - 429/5xxは即棄却
裁定パスは**“速い不成立”が正義**。遅い成功は比較の鮮度を壊す。必要なら別レーン(SlowHTTP)で再試行。 - ログを残す
quote_latency_ms,route_id,pool_ids,total_fee_bps,outAmountを必ず出し、異常時にルートを辿れるように。 - キーの揺れを抑える
amount_intが毎回微妙に違うとキャッシュが効かない。ノーションは刻み(例:$100刻み)で丸めても良い。
よくある落とし穴
- amountを小数で送る → 400 or 予期せぬ丸め。必ず整数に。
- decimals取り違え →
outAmountのスケールが合わず単価が化ける。mintのdecimalsをトークンレジストリで固定。 - slippageとminOutの二重適用ミス → /quoteの
slippageBpsに加えてsafety_bpsを上乗せする形が安全。
このTTLキャッシュにより、/quoteのデドゥープとレイテンシ安定化が実現します。CEXのWS-VWAPと並列取得すれば、時刻差≤250ms・サイクル≤1sのSLOを現実的に満たせます。次は、高速比較パスでの同時取得→通貨統一→スプレッド判定の実装を仕上げます。
実装C:高速比較パス
asyncio.gatherでDEX/CEX同時取得/時刻差ゲート≤250ms・staleは棄却/USDT→USDC換算&fx_stable_bpログ/コストモデル加味で判定
目的:ミリ秒級で“実行可能価格どうし”を比較し、コスト後でも勝てる時だけシグナルを出す。
ここでは DEX(Jupiter /quote)とCEX(Bybit WS→VWAP)を同時並行で取り、時刻差≤250msを満たした比較のみ採用、USDT↔USDC換算とコストモデルまで入れて判定する“高速比較パス”を実装します。
全体フロー(1回の比較)
- 同時取得:
- DEX見積
/quote(USDC→トークン、ノーション=USDC) - CEX VWAP(Bybit WS L2、ノーション=USDT)
- USDC/USDTのFX(WSでVWAP)
- DEX見積
- ゲート処理:
- 時刻差
ts_diff_ms ≤ 250でない → 棄却 - 鮮度(L2 age ≤ 500ms、quote TTL ≤ 300ms)を満たさない → 棄却
- 流動性不足(ノーション吸えない) → 棄却
- 時刻差
- 通貨建て統一:
cex_px_usdc = cex_px_usdt × fx(USDC/USDT)fx_stable_bp = |fx - 1.0| × 1e4をログ
- スプレッド→コスト後判定:
exec_spread_bp = (cex_px_usdc - dex_px_usdc)/mid × 1e4(DEX買い→CEX売り 方向の例)cost_bp = dex_fee + cex_fee + est_slip(N) + tip + infra- トリガ:
exec_spread_bp > cost_bp + margin_bp
コード:最小“高速比較”関数
ここでは前章までのコンポーネント(
BybitWSManager,JupiterQuoteCache,JupiterAdapter,StableFX)を利用します。
# arbbot/arb/fast_compare.py
from __future__ import annotations
import asyncio, time
from dataclasses import dataclass
from typing import Optional, Dict
from arbbot.cex.bybit_ws_manager import BybitWSManager
from arbbot.dex.jupiter_quote_cache import JupiterQuoteCache
from arbbot.dex.jupiter_adapter import JupiterAdapter
from arbbot.registry.tokens import USDC, SOL
from arbbot.dex.jupiter_utils import from_amount_int
from arbbot.util.stable_fx import StableFX
def monotonic_ms() -> float:
return time.perf_counter() * 1e3
@dataclass
class CostParams:
dex_fee_bp: float # 例: 30 (0.30%)
cex_fee_bp: float # 例: 10 (0.10%) テイカー
est_slip_bp: float # 例: 10 (0.10%) Nに依存
priority_tip_bp: float # 例: 3–8 (Solana 優先手数料・tip)
infra_bp: float # 例: 2–5 (失敗/再送/撤退コストの移動平均)
margin_bp: float # 例: 10 (安全マージン)
def cost_model_bp(p: CostParams) -> float:
return p.dex_fee_bp + p.cex_fee_bp + p.est_slip_bp + p.priority_tip_bp + p.infra_bp
async def compare_once_fast(bybit: BybitWSManager,
jup_cache: JupiterQuoteCache,
fx: StableFX,
token=SOL,
notional_usd: float = 5_000.0,
cost: Optional[CostParams] = None) -> Dict:
"""
DEX(USDC→TOKEN)買い vs CEX(spot)売りで比較(片方向の例)。
反対方向は同様に“CEX買い vs DEX売り”を別関数で計算する。
"""
if cost is None:
cost = CostParams(30, 10, 10, 4, 3, 10)
# セットアップ
jup = JupiterAdapter(jup_cache, usdc_mint=USDC.mint,
token_mint=token.mint, token_decimals=token.decimals, usdc_decimals=USDC.decimals)
t0 = monotonic_ms()
# 1) DEX quote を非同期でキック
dex_task = asyncio.create_task(jup.quote_exec_price("buy", notional_usd, slippage_bps=30))
# 2) CEX VWAP と FX は同期I/O不要(WSから即時計算)
# とはいえ gather 形にしておくと拡張が楽
async def _cex_side():
vwap_sell = bybit.vwap("spot", "SOLUSDT", side="sell", notional_quote=notional_usd) # CEXで売る単価
return vwap_sell
async def _fx_side():
return fx.usdc_per_usdt() # (fx, fx_stable_bp)
cex_px, (fx_usdc_per_usdt, fx_bp) = await asyncio.gather(_cex_side(), _fx_side())
dex_quote = await dex_task
t1 = monotonic_ms()
ts_diff = t1 - t0
# --- ゲート: 時刻差/鮮度/流動性 ---
if ts_diff > 250:
return {"status":"uncomparable","reason":"ts_skew","ts_diff_ms":round(ts_diff,1)}
if cex_px is None:
return {"status":"uncomparable","reason":"cex_liquidity","ts_diff_ms":round(ts_diff,1)}
if "outAmount" not in dex_quote:
return {"status":"uncomparable","reason":"dex_quote","ts_diff_ms":round(ts_diff,1)}
# --- 実効単価の算出 ---
# DEX: 受取トークン量(整数→小数)→ 実効単価(USDC/BASE)
out_base = from_amount_int(int(dex_quote["outAmount"]), token.decimals)
if out_base <= 0:
return {"status":"uncomparable","reason":"dex_zero_out","ts_diff_ms":round(ts_diff,1)}
dex_px_usdc = notional_usd / out_base
# CEX: VWAPはUSDT建て → FXでUSDC建てへ
cex_px_usdt = cex_px[0] # (price, filled_base)
cex_px_usdc = cex_px_usdt * fx_usdc_per_usdt
# --- スプレッド(DEX買い→CEX売り 方向の例) ---
mid = (dex_px_usdc + cex_px_usdc) / 2.0
exec_spread_bp = (cex_px_usdc - dex_px_usdc) / mid * 1e4
# --- コスト後判定 ---
total_cost_bp = cost_model_bp(cost)
edge_bp = exec_spread_bp - total_cost_bp
decision = "trigger" if edge_bp > cost.margin_bp else "skip"
return {
"status": "ok",
"ts_diff_ms": round(ts_diff,1),
"fx_stable_bp": round(fx_bp,2),
"dex_px_usdc": dex_px_usdc,
"cex_px_usdc": cex_px_usdc,
"exec_spread_bp": exec_spread_bp,
"total_cost_bp": total_cost_bp,
"edge_bp": edge_bp,
"decision": decision,
"notional_usd": notional_usd
}
片方向(DEX買い→CEX売り)の例を示しました。実運用では反対方向(CEX買い→DEX売り)も同時に計算し、有利な方のみ評価・発火します。
stale棄却・時刻差ゲートの実務メモ
- 時刻差は**“取得完了の差”でOK(
t0→await gather→t1の差)。より厳密にやるならL2のageとquote作成時刻をログに出して両者の最大差**でゲート。 - 鮮度(age):
- L2:
now - orderbook.ts ≤ 500ms - quote:
cache_ttl ≤ 300ms(TTLキャッシュのヒット内)
- L2:
- 棄却は失敗ではない:
status=uncomparableとして理由ラベル(ts_skew|stale|liq|fx)をメトリクスに積む。
USDT→USDC換算と fx_stable_bp の扱い
- 換算レートは板から実測:
USDCUSDTの**WS VWAP(例:$10k)**で求める。 fx_stable_bpを常時計測し、**閾値超え(例:>10–15bp)**時は- 比較棄却するか、
margin_bpに上乗せして保守的に判定する。
コストモデルの詰め方(最初の値→学習)
- 初期値(例)
dex_fee_bp=30(0.30%)cex_fee_bp=10(VIPやBNB割引を口座に合わせて)est_slip_bp=10(Nの大小で分岐。実測から p95 を採用)priority_tip_bp=3–8(Solanaの混雑に応じて可変)infra_bp=2–5(リトライ・撤退の平均コスト)
- 運用で実測ログ→移動平均に置き換える:
est_slip_bp(N)とpriority_tip_bp、infra_bpは直近X分/日の実績から更新。静的値のままはNG。
ログとメトリクス(最小)
- 比較SLO:
ts_diff_ms(p95/p99) - 棄却率:
uncomparable_reason{ts_skew|stale|liq|fx} - FX品質:
fx_stable_bp(平均・p95) - スプレッド:
exec_spread_bp、total_cost_bp、edge_bp(ヒストグラム) - ノーション別:
notional_usdラベルで断面を分ける
スモーク(30回ループ)
# arbbot/arb/smoke_fast_compare.py
import asyncio
from arbbot.arb.fast_compare import compare_once_fast, CostParams
from arbbot.cex.bybit_ws_manager import BybitWSManager
from arbbot.dex.jupiter_quote_cache import JupiterQuoteCache
from arbbot.util.stable_fx import StableFX
async def main():
bybit = BybitWSManager()
await bybit.start({"SOLUSDT": "spot", "USDCUSDT": "spot"})
async with JupiterQuoteCache(ttl_ms=300) as cache:
fx = StableFX(bybit_mgr=bybit, market="spot", symbol="USDCUSDT")
for _ in range(30):
res = await compare_once_fast(bybit, cache, fx, notional_usd=5_000.0,
cost=CostParams(30,10,10,4,3,10))
print(res)
await asyncio.sleep(0.5)
await bybit.stop()
# asyncio.run(main())
見どころ
ts_diff_msが ~80–200ms に収束(p95 ≤ 250ms)status="ok"が ≥80%(初期目標)edge_bpが 正 のサンプルがそこそこ出る(市況依存)
よくある落とし穴(回避策)
- USDT=USDCだと思い込む →
fx_stable_bpを常に見よ。 - サイズ不一致 → DEX/CEXとも同じノーションで評価。
- 片方向だけで判定 → 両方向(DEX買い→CEX売り/CEX買い→DEX売り)を回し、有利側のみ判定。
- 静的コスト → 実測で動的更新。
est_slip_bpとpriority_tip_bpの固定は事故のもと。
この高速比較パスを心臓部に据えるだけで、“≤250ms / ≤1sサイクル”の監視が現実になります。あとは実行レイヤ(/swap+Jito送信、CEX発注)と結合し、**“見積=実行”**の契約を守れば、誤検知に振り回されない裁定運用に到達できます。
コストモデル&動的トリガ
結論:シグナルは“見かけのスプレッド”では出さない。
コストをbpで見積もり、コストを上回る“余剰bp(エッジ)”にマージンを足してトリガする。
数式(運用で使う形)
cost_bp = dex_fee_bp # ルート合計のDEX手数料 + cex_fee_bp # 自口座のテイカー/メイカー手数料 + est_slip_bp(N) # 指定ノーションNの滑り(p95) + jito_tip_bp # 優先手数料/チップ(Solana等) + infra_bp # 失敗・再送・撤退コストの移動平均 trigger_bp = cost_bp + margin_bp(symbol, N, regime) exec_spread_bp = (cex_px_usdc - dex_px_usdc)/mid * 1e4 # 例: DEX買い→CEX売り方向
意思決定:exec_spread_bp > trigger_bp のときだけ 発火。それ以外は skip。
ポイント:
est_slip_bp(N)やjito_tip_bpは固定値NG。実測から常に更新する。
銘柄別・ノーション別の初期値(例)と学習
初期パラメータ(たたき台)
- Tier1(SOL/ETH/BTC)
dex_fee_bp=20〜35、cex_fee_bp=6〜12、est_slip_bp(5k)=6〜12、jito_tip_bp=3〜8、infra_bp=2〜4、margin_bp=8〜12
- Tier2(RAY/JUP/PYTH)
dex_fee_bp=25〜45、cex_fee_bp=8〜14、est_slip_bp(5k)=12〜25、jito_tip_bp=5〜10、infra_bp=3〜6、margin_bp=12〜18
- Tier3(BONK/WIF/POPCAT)
dex_fee_bp=30〜60、cex_fee_bp=10〜16、est_slip_bp(1k)=25〜60、jito_tip_bp=6〜12、infra_bp=4〜8、margin_bp=18〜30
目安なので、あなたの口座手数料とルート特性に合わせて上書き。
設定イメージ(YAML/JSON)
symbols:
SOL:
tiers:
"1k": { est_slip_bp: 6, margin_bp: 8 }
"5k": { est_slip_bp: 10, margin_bp: 10 }
"20k": { est_slip_bp: 18, margin_bp: 12 }
cex_fee_bp: 8
jito_tip_bp: 5
dex_fee_bp: 25
infra_bp: 3
学習の仕組み(運用で必ず回す)
- est_slip_bp(N):
Jupiter/quoteのsimulate/実測との差分を p95 で更新(滑りが跳ねたら即反映)。 - jito_tip_bp:
実際に払ったtip/priorityFee を 移動平均(短期は指数移動平均、長期は日次平均)。 - infra_bp:
失敗・再送・撤退で失ったbpを累計/件数から算出(ローリング1〜7日)。 - margin_bp(symbol,N,regime):
市況レジーム(例:quiet/volatile)で切替。FX乖離・時刻差逸脱が増えたら自動で上積み。
ルール:“固定”は劣化の温床。実測→更新を自動化し、乖離すればサーキットで止める。
SLOとメトリクス
SLO(目標値)
- 比較時刻差
compare_age_diff_ms: p95 ≤ 250ms(p99 ≤ 500ms) - 比較不能率
uncomparable_ratio: ≤ 20%(初期の上限) - サイクル時間(裁定パス): p95 ≤ 1s
必須メトリクス(名前と意味)
レイテンシ系
quote_to_submit_ms{dex=}/quote取得→(将来の/swap送信)まで。監視段階では見積完了→比較完了の擬似で代替。
compare_age_diff_ms- DEX/CEX取得完了の差。**ゲート(≤250ms)**の健康診断。
比較の健全性
uncomparable_reason{ts_skew|stale|liq|fx|error}(Counter)- 理由別の棄却数。ts_skew(時刻差超過)、stale(鮮度切れ)、liq(流動性不足)、fx(換算不可/大乖離)。
価格とFX
spread_exec_bp(Histogram)- 可約定スプレッド(方向別・ノーション別でラベル付け)。
cost_bp_total/cost_bp_{dex_fee,cex_fee,slip,tip,infra}(Gauge/Histogram)- 内訳を分解して可視化。モデルの当たり外れが見える。
edge_bp = spread_exec_bp - cost_bp_total(Histogram)- 実効エッジ。p50>0 を狙い、p10 > -5bp を下限に。
fx_stable_bp(Histogram)|USDC/USDT - 1|×1e4。10〜15bp超はリスクシグナル。
付帯
429_rate{source}/retry_after_ms(Gauge/Counter)- レート制限の健康診断(Jupiter/Bybit REST 等)。
推奨ダッシュボード(最小)
- SLOパネル:
compare_age_diff_ms p95/p99、uncomparable_ratio、cycle_time p95 - エッジ分布:
spread_exec_bp、cost_bp_total、edge_bp(ノーション別の箱ひげ) - コスト内訳:
dex_fee/cex_fee/slip/tip/infra(時系列) - FXと棄却理由:
fx_stable_bp(時系列)、uncomparable_reasonスタック棒
アラート(初期閾値)
compare_age_diff_ms p95 > 250msが 5分連続 → 要因切り分け(WS遅延?TTL?並列不足?)uncomparable_ratio > 20%が 15分連続 → 閾値/TTL/並列/速い回線へedge_bp p50 < 0が 1時間 → トリガ停止+コストモデル再学習
実装ワンポイント
- メトリクスは“比較前に増やす”:棄却時も必ず理由カウント。
- 単位はbpに統一:費用・スプレッド・エッジを同次元で扱う。
- JSONログ併用:意思決定の1レコードに以下のようなものが残れば事後検証が劇的に楽になる。
{
"ts": 1723098000123,
"symbol": "SOL/USDC",
"N": 5000,
"dex_px": 174.21,
"cex_px": 174.28,
"spread_exec_bp": 39.8,
"cost": {"dex_fee":25,"cex_fee":8,"slip":10,"tip":5,"infra":3,"total":51},
"edge_bp": -11.2,
"fx_stable_bp": 3.1,
"ts_diff_ms": 142,
"decision": "skip",
"reason": "edge<=margin"
}
これで、“コストを跨いで余剰bpが出た時だけ鳴らす”仕組みが完成します。
SLOで速度と鮮度を担保し、メトリクスでコストを学習する。
この2枚看板こそが、実運用で生き残る裁定監視のコアです。
フェイルセーフ&在庫管理
裁定は必ず失敗する前提で設計します。片足だけ約定、送信遅延、見積ズレ、レート制限——全部“日常”。壊れ方を想定して、即撤退・在庫抑制・自動停止までを仕組みにします。
片足失敗時の撤退ルール/段階ヘッジ
状態機械(最小)
IDLE
└─trigger→ SEND_BOTH (best-effort同時)
├─ DEX_FILLED & CEX_FILLED → FLAT(成功)
├─ DEX_FILLED & CEX_FAIL → HEDGE_CEX(IOC/MKTで即反転)
├─ CEX_FILLED & DEX_FAIL → HEDGE_DEX(最短ルートで即反転)
└─ BOTH_FAIL → NOFILL(在庫0で終了)
ルール(実務)
- タイムボックス:片足 fill 後 t_hedge ≤ 500ms で相手脚送信。超過なら段階撤退(サイズを1/2→1/4で刻む)。
- 価格ガード:ヘッジはVWAPで上限/下限を付ける(
max_slip_bp)。ガード超過で即中止→“在庫として保有”戦略に切替(上限Δの範囲内)。 - 段階ヘッジ:
- 50% を最短IOCでヘッジ
- 残り 25%/25% を価格条件付きで追撃(最大 2–3 ステップ)
- 追撃失敗時は在庫キープ→撤退価格(損切り)を自動設定
擬似コード:
if leg_A_filled and not leg_B_filled_within(500_ms):
hedge_sizes = [0.5, 0.25, 0.25]
for s in hedge_sizes:
ok = send_hedge(size=s*filled, guard_bp=max_slip_bp, tif="IOC")
if ok and position_delta≈0: break
if residual_delta > 0:
place_stop_out(residual_delta, stop_guard_bp)
在庫上限・Δ上限/サーキットブレーカ
在庫・Δ
- 銘柄別在庫上限(例:
SOL: 2,000,Tier3: 0) - Δ上限(USD):ネットエクスポージャの総量(例:
$50k) - 時間あたり回転数上限:スパイク時の過剰取引を抑制
サーキットブレーカ(自動停止)
- 連続負エッジ:
edge_bp < 0が N回/5分 → 銘柄停止(30分) - SLO逸脱:
compare_age_diff_ms p95 > 250msが 5分継続 → 全体停止(5分) - FX乖離:
fx_stable_bp > 15が 2分継続 → USDT→USDC換算を強化 or 比較棄却
運用フラグ:
risk:
inv_cap_per_symbol: { SOL: 2000, ETH: 500, TIER3: 0 }
delta_cap_usd: 50000
circuit_breaker:
edge_negative_streak: { count: 5, window_sec: 300, cooldown_sec: 1800 }
ts_skew_p95: { limit_ms: 250, window_sec: 300, cooldown_sec: 300 }
fx_stable_bp: { limit_bp: 15, window_sec: 120, cooldown_sec: 300 }
429/5xx対策:トークンバケット+短期キャッシュ
落ちるのは前提。落ちても壊れないのが正解。
- トークンバケット(ソース別QPS)
- Jupiter/REST系は QPS上限を設定(例:
5 req/s) - バーストは300ms TTLキャッシュで吸収(同一キーのデドゥープ)
- Jupiter/REST系は QPS上限を設定(例:
- 指数バックオフ+ジッター:
100ms, 200ms, 400ms - フェイルファスト:裁定パスはリトライ回数=1(遅い成功より速い不成立)
- CBと統合:429比率がしきい値超→自動で頻度を落とす/一時停止
擬似コード:
if rate_limit_bucket.empty():
return STALE # 比較棄却
resp = fast_http.get(..., timeout=(1,2), retries=1)
if resp.status in {429, 502, 503}:
backoff()
mark_rate_limited(source)
return STALE
運用:シャドー→本番
15分スモーク → 24hシャドー
- 15分スモーク
- 監視のみ(注文は発生させない)。
- 指標:
ts_diff_ms p95 ≤ 250、uncomparable ≤ 20%、fx_stable_bp ≤ 10。
- 24hシャドー
- ノーション別に両方向を連続観測。
- 指標:
edge_bp p50 > 0、p10 > -5、CB未発火。
朝イチ自動サマリ(毎朝固定フォーマット)
- 棄却率(理由別):
ts_skew / stale / liq / fx - SLO逸脱:
compare_age_diff_ms p95/p99、cycle_time p95 - 発生源ランキング:エラー・429のソース別上位(Jupiter/Bybit/自前RPC)
- スプレッドとコスト:
spread_exec_bpvscost_bp_totalの箱ひげ(ノーション別)
例(テキスト出力):
[Daily Summary] 2025-08-08 UTC - SLO: ts_diff p95=184ms / p99=392ms, cycle p95=0.82s - Uncomparable: 17.3% (ts_skew 9.2%, stale 5.1%, liq 2.6%, fx 0.4%) - FX stable: mean 3.2bp / p95 8.7bp - Top sources (errors): Jupiter 38%, Bybit 34%, RPC 21% - Edge vs Cost (5k): p50 +12.4bp / p10 -3.1bp / triggers 47
逸脱時の“一行ポストモーテム”
短く、事実と対策だけ。SlackやOpsノートにコピペできる形で。
[PM] 2025-08-08 03:12Z | SOL/USDC 5k | ts_skew>250ms 連続 | Bybit WS遅延spike 対策: WS再接→地域エンドポイント切替・Jupiter TTL200msへ | 再発監視: ts_diff_ms p95
テンプレ:
[PM] <UTC> | <symbol>/<notional> | <現象> | <直接原因> | 対策: <具体> | 再発監視: <KPI>
まとめ
- 片足失敗は“正常系”:タイムボックス+段階ヘッジ+撤退価格。
- 在庫とΔは数値で封じる:上限とCBで自動停止。
- レート制限は構造で回避:バケット+TTL+フェイルファスト。
- 運用はリズム:スモーク→シャドー→朝イチサマリ→一行PM。
ここまで入れれば、“壊れ方ごとに壊れない”監視運用が回り始めます。
よくある落とし穴
amountの整数化忘れ/mint違い/decimalsミス
- Jupiterの
amountは整数(mintの最小単位)必須。 小数を投げると400や謎丸めで地獄行き。 - mint IDが正義。 “SOL/USDC”みたいなシンボル依存は排除して、mint+decimalsをレジストリで固定。
outAmountも整数(decimals適用済)。自分で小数に戻して実効単価を出す。
USDC/USDTを1.0固定扱い
- CEXはUSDT建て、DEXはUSDC建てが普通。1.0000前提はNG。
- USDCUSDT板のVWAPで実測→
fx_stable_bp = |fx-1|×1e4をログ。10–15bp超は比較棄却 or マージン上乗せ。
v2 Bybit APIやHTTP板ポーリングの混入
- Bybitはv5 WS L2が基本。HTTPのticker/ポーリングは遅延要因。
- 市場区分(spot/linear)を間違えると404/変な値になる。シンボルごとにcategoryを決め打ち。
付録:最小コード断片
どれも
aiohttp+asyncio前提。雑に貼っても動く“最小・実用”版。
Bybit WS → VWAPスニペット
# bybit_ws_vwap_min.py
import aiohttp, asyncio, json, time
from typing import Dict, List, Optional, Tuple
class OB:
def __init__(self): self.bids={}, self.asks={}, self.ts=0.0
def snap(self, b:List[List[str]], a:List[List[str]]):
self.bids={float(p):float(q) for p,q in b}; self.asks={float(p):float(q) for p,q in a}; self.ts=time.time()
def delta(self, b:List[List[str]], a:List[List[str]]):
for p,q in b: q=float(q); p=float(p); (self.bids.pop(p,None) if q<=0 else self.bids.__setitem__(p,q))
for p,q in a: q=float(q); p=float(p); (self.asks.pop(p,None) if q<=0 else self.asks.__setitem__(p,q))
self.ts=time.time()
def vwap(self, side:str, notional:float)->Optional[Tuple[float,float]]:
if notional<=0: return None
if side=="buy": # consume asks low→high
lv=sorted(self.asks.items(),key=lambda x:x[0])
else: # consume bids high→low
lv=sorted(self.bids.items(),key=lambda x:x[0],reverse=True)
rem=notional; base=0.0; quote=0.0
for price,size in lv:
cap=price*size
if rem<=0: break
if rem>=cap: base+=size; quote+=cap; rem-=cap
else: take=rem/price; base+=take; quote+=rem; rem=0; break
return (quote/base, base) if base>0 else None
class BybitWSVWAP:
def __init__(self, ws_url:str, symbol:str, depth:int=50):
self.ws_url=ws_url; self.topic=f"orderbook.{depth}.{symbol}"; self.ob=OB()
async def run(self, stop:asyncio.Event):
async with aiohttp.ClientSession() as s:
async with s.ws_connect(self.ws_url, heartbeat=15) as ws:
await ws.send_json({"op":"subscribe","args":[self.topic]})
async for msg in ws:
if stop.is_set(): break
if msg.type==aiohttp.WSMsgType.TEXT:
p=msg.json(loads=json.loads); d=p.get("data",{})
if p.get("type")=="snapshot": self.ob.snap(d.get("b",[]), d.get("a",[]))
elif p.get("type")=="delta": self.ob.delta(d.get("b",[]), d.get("a",[]))
def vwap(self, side:str, notional:float, max_age_ms:int=500):
if (time.time()-self.ob.ts)*1000>max_age_ms: return None
return self.ob.vwap(side, notional)
# 使用例:
# stop=asyncio.Event()
# client=BybitWSVWAP("wss://stream.bybit.com/v5/public/spot","SOLUSDT")
# asyncio.create_task(client.run(stop)); await asyncio.sleep(1.0); print(client.vwap("buy",5000))
Jupiter /quote TTLキャッシュ
# jupiter_quote_cache_min.py
import aiohttp, asyncio, time
from dataclasses import dataclass
from typing import Optional, Dict
JUP="https://quote-api.jup.ag/v6/quote"
@dataclass(frozen=True)
class Key: input_mint:str; output_mint:str; amount_int:int; slippage_bps:int; direct:bool
@dataclass class Entry: ts_ms:float; data:dict
class JupCache:
def __init__(self, ttl_ms:int=300): self.ttl=ttl_ms; self.c:Dict[Key,Entry]={}; self.locks:Dict[Key,asyncio.Lock]={}; self.guard=asyncio.Lock()
async def __aenter__(self): self.s=aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3.0)); return self
async def __aexit__(self,*_): await self.s.close()
async def get(self, k:Key)->dict:
now=time.time()*1000; e=self.c.get(k)
if e and now-e.ts_ms<=self.ttl: return e.data
lock=await self._lock(k); async with lock:
now=time.time()*1000; e=self.c.get(k)
if e and now-e.ts_ms<=self.ttl: return e.data
p={"inputMint":k.input_mint,"outputMint":k.output_mint,"amount":str(k.amount_int),
"slippageBps":str(k.slippage_bps),"onlyDirectRoutes":"true" if k.direct else "false"}
async with self.s.get(JUP,params=p) as r:
r.raise_for_status(); data=await r.json()
self.c[k]=Entry(now,data); return data
async def _lock(self,k:Key):
async with self.guard:
if k not in self.locks: self.locks[k]=asyncio.Lock()
return self.locks[k]
def to_amount_int(x:float,dec:int)->int: return int(round(x*(10**dec)))
def from_amount_int(x:int,dec:int)->float: return x/(10**dec)
def calc_min_out(out_int:int, slippage_bps:int, safety_bps:int=5)->int:
return max(0,(out_int*(10_000-(slippage_bps+safety_bps)))//10_000)
# 使用例:
# async with JupCache() as jc:
# k=Key(USDC_MINT,SOL_MINT,to_amount_int(5000,6),30,False)
# q=await jc.get(k); out_sol=from_amount_int(int(q["outAmount"]),9)
高速比較パス(250msゲート)
# fast_compare_min.py
import asyncio, time
from typing import Dict, Tuple
from jupiter_quote_cache_min import JupCache, Key, to_amount_int, from_amount_int
def ms(): return time.perf_counter()*1e3
async def compare_once(bybit_client, jup_cache:JupCache, usdc_mint:str, token_mint:str, token_dec:int,
notional_usd:float=5000.0, fx_func=None) -> Dict:
"""
bybit_client: 前出WSクライアント(.vwap(side, notional)を持つ)
fx_func: USDC/USDT レート関数 → (fx, fx_bp)
"""
t0=ms()
# DEX見積(並列)開始
k=Key(usdc_mint, token_mint, to_amount_int(notional_usd,6), 30, False)
dex_fut=asyncio.create_task(jup_cache.get(k))
# CEX VWAP & FX はWSから即時計算
cex = bybit_client.vwap("sell", notional_usd) # CEXで売る想定(USDT建て)
fx, fx_bp = (1.0, 0.0) if fx_func is None else fx_func()
dex = await dex_fut
t1=ms(); ts_diff=t1-t0
if ts_diff>250: return {"status":"uncomparable","reason":"ts_skew","ts_diff_ms":round(ts_diff,1)}
if cex is None or "outAmount" not in dex: return {"status":"uncomparable","reason":"stale_or_liq","ts_diff_ms":round(ts_diff,1)}
# 単価へ
out_base = from_amount_int(int(dex["outAmount"]), token_dec)
if out_base<=0: return {"status":"uncomparable","reason":"dex_zero","ts_diff_ms":round(ts_diff,1)}
dex_px_usdc = notional_usd / out_base
cex_px_usdc = cex[0] * fx
mid=(dex_px_usdc+cex_px_usdc)/2
spread_bp=(cex_px_usdc-dex_px_usdc)/mid*1e4
return {"status":"ok","ts_diff_ms":round(ts_diff,1),"fx_stable_bp":round(fx_bp,2),
"dex_px":dex_px_usdc,"cex_px":cex_px_usdc,"spread_exec_bp":spread_bp}
# 使用例(擬似):
# fx = lambda: (bybit_fx.vwap("buy",10_000)[0]**-1, abs((bybit_fx.vwap("buy",10_000)[0]**-1-1)*1e4))
# res = await compare_once(bybit_sol_client, jc, USDC_MINT, SOL_MINT, 9, 5000.0, fx)
チェックポイント
ts_diff_msが常に≤250ms以内か(p95基準)。fx_stable_bpを必ず記録し、二桁bpに膨らんだら棄却やマージン上乗せ。spread_exec_bpはコストモデル(fee/滑り/tip/infra)を差し引いて判定(ここでは割愛)。
結び
この記事でやったのは、「価格を引き算する監視」から**“約定できる価格を比較する監視”への移行です。
鍵はシンプルでした――ミリ秒(≤250ms)と1秒(≤1s)を守る設計、そしてコストを跨いだ余剰bpしか信じない**という姿勢。
- Executable Price:DEXは
/quote→実効単価、CEXはWS L2→ノーション別VWAP。 - 鮮度の担保:TTL≤300msのキャッシュ、DEX/CEXを並列取得、時刻差ゲート≤250ms。
- 通貨建ての統一:USDC/USDTは板で実測、fx_stable_bpを常時計測。
- 動的トリガ:
cost_bp = fee + slip(N) + tip + infraを越えた余剰bpだけで鳴らす。 - 運用耐性:片足失敗の撤退/段階ヘッジ、在庫・Δ上限、429/5xxは構造で回避、SLOとメトリクスで可視化。
- 導入手順:15分スモーク → 24hシャドー → 朝イチ自動サマリ → 逸脱は“一行PM”。
この型に乗せれば、「数字は出てるのに取れない」から卒業できます。あとは現場の条件(口座手数料、対象銘柄、回線品質)に合わせてコストモデルを学習させ、SLOを育てていくだけ。
“見かけのスプレッド”を捨て、実行可能性で勝つ――それが、実運用で生き残る唯一の道です。