
先日、Xで大変興味深いポストを見かけました。現状、私ができることをアイデアレベルでまとめて取り組んでいるので、自身の定点観測&ロードマップとして記録しておきます。
【アビトラエッジの探し方】
DEG鯖アビトライベントが良かったので僕も少しGiveしてみる。エッジってどうやって探すの?って質問書いてる人がいたのでそのアンサーの一端を書いてみる😇・なんとか1個とっかかりを見つけよう
・見つけたとっかかりを掘ろう
・ChatGPTも活用しよう続きはスレへ📝
— くりぷとべあー🍻 (@cryptoo_bear) July 21, 2025
いまから手を動かすなら──5つの “はじめの一歩”
下に行くほど難易度UP。まず①②の 「差分ウォッチ系」 を回し、データ取得・ストレージ・可視化の土台を固めてから③〜⑤へ進む。
1. TVL 急変アラート 🔍
ねらい:プロトコル/チェーンの“資金流入超過”に最速で気づく。
- データ取り
GET https://api.llama.fi/protocols
で全プロトコルの最新 TVL と 1 d / 7 d 変化率を取得 GitHub- 24 h 毎にスナップショットを保存(SQLite → DuckDB で OK)。
- 異常検知
- 「前日比 +30 % 以上」を閾値に outlier を抽出。
- 急伸したチェーン or カテゴリ (Lending/Perps 等) を絞り込む。
- 深掘り
- 該当プロトコルの
/tvl/{protocol}
でヒストリカル推移を取得し “出来高 vs TVL” の乖離をチェック api-docs.defillama.com - 出来高が伴わず TVL だけ跳ねているなら入出金サヤやステーキング・アンステーキングに伴う裁定が隠れていることが多い。
- 該当プロトコルの
2. ステーブルコイン流入/流出マップ 🌊
ねらい:資金の「入口」「出口」を視覚化し、先回りで流動性シフトを捕捉。
- チェーン別循環量
- DeFiLlama の
stablecoins/chains
ダッシュボードを API で定期取得(HTML スクレイピングでも JSON 化可) DefiLlama
- DeFiLlama の
- 1 d / 7 d 差分 をヒートマップ描画し 流入上位チェーン を毎朝 Slack 通知。
- ブリッジ TVL も合わせて見ると、どのブリッジ経由で資金が動いたか推測しやすい DefiLlama
- 流入チェーン上位3つを DEX‐CEX 価格差スキャン(③で解説) の対象に追加。
3. DEX ボリューム急増 × CEX 価格乖離 レーダー ⚡
ねらい:板の薄い草トークンや新規上場銘柄で“瞬間アービトラ”を狙う。
- リアルタイム価格源
- Solana 系:Birdeye
GET /defi/history-price?address=
など(無料枠あり) docs.birdeye.so - マルチチェーン:Dex Screener
GET /token-profiles/latest/v1
でペア一覧と価格・流動性を取得 docs.dexscreener.com
- Solana 系:Birdeye
- CEX 価格
- Bybit / Binance の REST
v5/market/tickers
などで同一銘柄のスポット・パーペツ比を取得。
- Bybit / Binance の REST
- ロジック
|P_dex − P_cex| / P_cex > 1 %
かつ5 min ボリューム > 1 σ
でアラート。- スプレッド検出後は 部分約定対策 として深さ 2〜3 tick までの板厚を同時チェック。
4. ブリッジ・CEX 出入口クラスタリング 🕵️♂️
ねらい:上位アービトラ勢の“行動パターン”を盗み見る。
- ウォレット抽出
- ③で拾ったイベント時刻に Birdeye / Dex Screener から Tx ハッシュ →
solscan / ethscan
で呼び出し元アドレスを列挙。
- ③で拾ったイベント時刻に Birdeye / Dex Screener から Tx ハッシュ →
- CEX Deposit 追跡
- アウトバウンド Tx の
to
が既知 CEX アドレスなら、その CEX の 入金用ラベル をキーにアドレスを横展開。 - CEX 側はアドレス使い回し率が高いため、クラスター化が容易。
- アウトバウンド Tx の
- ブリッジ追跡
- Tx にブリッジコントラクトが絡む場合は DeFiLlama Bridged TVL ページの 対象チェーン を手掛かりに反対側を探索。
5. “イベント前兆” の統計モデル化 📈
ねらい:①〜④で得たメタデータを特徴量に、勝率の高いパターン を回帰/クラスタリングで抽出。
- 特徴例
ΔTVL%
、ΔStablecoinFlow%
、DEX Volume Spike σ
、Token Age
、Bridged TVL Ratio
など。
- 手順
- 90 日間データを学習 → ラベルは “エッジ発生後 15 min で 0.5 % 以上利幅が取れたか”。
- ランダムフォレスト or CatBoost で SHAP 値 を確認し「効いている指標」だけ残す。
- 運用
- スコア上位 n 件をShadow Mode で紙トレードし、真の正例/負例を追加学習。
最小セットアップ例(Python)
pip install requests pandas duckdb
import requests, pandas as pd, duckdb, datetime as dt # 1) TVL snapshot today = dt.date.today().isoformat() data = requests.get("https://api.llama.fi/protocols").json() df = pd.json_normalize(data)[["name","chain","tvl","change_1d","change_7d"]] duckdb.sql("CREATE TABLE IF NOT EXISTS tvl AS SELECT * FROM df WHERE 1=0") duckdb.sql("INSERT INTO tvl SELECT * FROM df")

まずはこれで “毎日 TVL を溜める” パイプラインを cron で回すだけでも十分価値があります。
どこから始める?
- TVL スナップショット → 急変ヒートマップ(最速で ROI が見える)
- Stablecoin Flow ヒートマップ(資金の流入口を可視化)
- DEX/CEX スプレッドスキャナ(即金チャンス)
上記3つが動き出したら、ウォレットクラスタリング → 機械学習による予兆検知 へ発展させる。
次のアクション
優先度 | TODO | 目標所要 |
---|---|---|
★★★ | DeFiLlama TVL Cron ジョブ & DuckDB 保存 | 0.5 d |
★★☆ | Stablecoin Flow 差分ヒートマップ | 1 d |
★★☆ | Dex Screener/Birdeye × CEX スプレッド比較スクリプト | 1 d |
★☆☆ | ウォレットクラスタリング PoC | 2 d |
★☆☆ | 統計モデルの特徴量設計 | 2 d |

「まず 1 件でいいから“数字の変化”を捕まえる」 ─ そこから洞察と改良のループを回すことがスタートライン。「アビトラbotで私の主力bot群の補完をしたり、新たな稼ぎの柱を増やしたりする」という狙いがあるので、以下のようなロードマップを大まかな指針としてマイペースにやっています。
【Phase1】TVL 急変アラート ― 実装ガイド
(Python×DuckDB×Cron+任意で Slack 通知)
0. 前提環境
要素 | 推奨 |
---|---|
OS | Linux / macOS (ARM/Intel) |
Python | ≥ 3.10 |
主要ライブラリ | requests pandas duckdb pyyaml |
デプロイ | 1 GB RAM VM で充分(スナップショットは数 MB/日) |
python -m venv tvl_env && source tvl_env/bin/activate pip install requests pandas duckdb pyyaml
1. データ取得スクリプト(fetch_tvl.py
)
import requests, duckdb, datetime as dt, pandas as pd, pathlib, time DB_PATH = pathlib.Path("data/tvl_history.duckdb") def snapshot_today(): url = "https://api.llama.fi/protocols" # 一覧エンドポイント :contentReference[oaicite:0]{index=0} resp = requests.get(url, timeout=30) resp.raise_for_status() df = pd.json_normalize(resp.json()) # 欲しい列だけ残す keep = ["id", "name", "chain", "category", "tvl", "change_1d", "change_7d"] df = df[keep].assign(snapshot_date=dt.date.today().isoformat()) with duckdb.connect(DB_PATH) as con: con.sql(""" CREATE TABLE IF NOT EXISTS tvl_snapshots AS SELECT * FROM df WHERE 1=0 """) con.register("df", df) con.sql("INSERT INTO tvl_snapshots SELECT * FROM df") print("✅ Saved", len(df), "rows") if __name__ == "__main__": t0 = time.time() snapshot_today() print("⏱️", round(time.time() - t0, 2), "s")
ポイント
- API は無認証・無料枠(現行
/protocols
は ~4 000 B/req) - スナップショットは 毎日 1 レコード × プロトコル数 → 1 年でも <30 MB
id
はドキュメント非公開のことが多いため、name
で JOIN しても OK(ただし重複に注意)
2. スケジューリング
Cron(ローカル/EC2 いずれでも)
0 3 * * * /path/to/tvl_env/bin/python /path/to/fetch_tvl.py >> /var/log/tvl_fetch.log 2>&1
- UTC 0 3:00 ≒ 日本時間 12:00 を例示。API 制限ほぼ無しだが混雑時間帯は避けると安心。
- GitHub Actions で回す場合は
on: schedule: cron: '0 3 * * *'
で同様。
3. 異常検知クエリ(detect_outliers.py
)
import duckdb, datetime as dt THRESHOLD = 30 # +30 % 以上 LOOKBACK = 1 # 前日比 with duckdb.connect("data/tvl_history.duckdb") as con: today = dt.date.today().isoformat() prev = (dt.date.today() - dt.timedelta(days=LOOKBACK)).isoformat() query = f""" WITH today AS ( SELECT id, name, chain, category, tvl FROM tvl_snapshots WHERE snapshot_date = '{today}' ), prev AS ( SELECT id, tvl AS tvl_prev FROM tvl_snapshots WHERE snapshot_date = '{prev}' ) SELECT t.name, t.chain, t.category, ROUND( (t.tvl - p.tvl_prev)*100.0/p.tvl_prev, 2 ) AS pct_change, t.tvl FROM today t JOIN prev p USING(id) WHERE (t.tvl - p.tvl_prev)*100.0/p.tvl_prev >= {THRESHOLD} ORDER BY pct_change DESC """ alerts = con.sql(query).df()
*返り値 alerts
が空でなければ急伸銘柄。
フォールバック:前日データが無いときは COALESCE
で 0 割り込みを防ぐ。
Slack 通知(任意)
import os, json, requests if not alerts.empty: text = "*🚨 TVL spike alert*\n" + alerts.to_markdown(index=False) requests.post( os.environ["SLACK_WEBHOOK"], headers={"Content-Type": "application/json"}, data=json.dumps({"text": text}) )
4. 急伸チェーン/カテゴリでフィルタリング
agg = alerts.groupby(["chain","category"]).size().reset_index(name="count") print(agg.sort_values("count", ascending=False).head())
- ここで Lending / Perps など特定カテゴリが固まって急増 → “狙い目”。
5. 深掘り:ヒストリカル TVL 取得
import requests, pandas as pd, matplotlib.pyplot as plt, datetime as dt slug = "abracadabra" # 例:急伸プロトコル名 hist = requests.get(f"https://api.llama.fi/tvl/{slug}").json() # :contentReference[oaicite:1]{index=1} df = pd.DataFrame(hist["tvl"]) df["date"] = pd.to_datetime(df["date"], unit="s") df.plot(x="date", y="totalLiquidityUSD", title=f"TVL History – {slug}") plt.show()
「出来高 vs TVL」 乖離チェック
- 出来高は
/summary/dexs/{protocolSlug}
など Volume API を利用 GitHub - 直近 24 h TVL 比で Volume/TVL < 5 % なら 純入金 > 取引 の可能性
- 取引伴わず TVL だけ膨張 → ブリッジ入金 or ステーキング移行 の匂い
6. ディレクトリ構成例
tvl_monitor/ ├── data/ # DuckDB ファイル ├── fetch_tvl.py ├── detect_outliers.py ├── charts/ # 画像出力 (optional) ├── .env # SLACK_WEBHOOK 等 └── cron.txt # crontab 用
7. ハードニング & 運用 Tips
項目 | 推奨 |
---|---|
API 落ち対策 | try/except +リトライ (time.sleep(5) ) |
DB VACUUM | 30 d に一度 DuckDB の optimize |
テーブル肥大化 | tvl_snapshots をパーティション (snapshot_date 列) で Z-order |
閾値チューニング | 四分位距離 IQR や Z-score で自動補正にすると β→本番 の移行が楽 |
8. 次のステップ
- Stablecoin 流入ヒートマップ(手法 2)
- DEX/CEX スプレッドスキャナ(手法 3)
- 検知→自動バックテスト Pipeline(手法 5 への布石)

“まず毎日 1 枚のスナップショットを貯め始める”―ここから エッジの霧 が晴れていく。
【Phase2】ステーブルコイン流入/流出マップ ─ 実装ステップ
(Python × DuckDB × matplotlib + Slack 通知)
ゴール
- チェーン別のステーブルコイン循環量を毎日スナップショット → 1 d/7 d 差分をヒートマップ化
- 流入上位チェーンを朝イチで Slack 通知
- 併せてブリッジ TVL を照合し、どのブリッジ経由で資金が動いたかを推測
- 上位 3 チェーンを ③ DEX‑CEX スプレッドスキャナ の監視対象に自動追加
0. 事前セットアップ
python -m venv stable_env && source stable_env/bin/activate pip install requests pandas duckdb matplotlib pyyaml slack_sdk
依存 | 推奨バージョン |
---|---|
Python | ≥ 3.10 |
DuckDB | 0.9 系以降(pip install duckdb==0.10.0 でも可) |
1. データ取得 (fetch_stables.py
)
# coding: utf-8 """ 毎日 1 回、各チェーンのステーブルコイン循環量を取得して DuckDB に追記 """ import requests, pandas as pd, duckdb, datetime as dt, pathlib, time DB = pathlib.Path("data/stables.duckdb") URL = "https://stablecoins.llama.fi/stablecoinchains" # JSON:チェーン別循環量 TODAY = dt.date.today().isoformat() def main(): js = requests.get(URL, timeout=30).json() # ← 無認証・無料エンドポイント df = pd.DataFrame(js["chains"]) # key: chains keep = ["name","peggedUsd","change_1h","change_1d","change_7d"] df = df[keep].assign(snapshot_date=TODAY) with duckdb.connect(DB) as con: con.sql("""CREATE TABLE IF NOT EXISTS stable_snaps AS SELECT * FROM df WHERE 1=0""") con.register("df", df) con.sql("INSERT INTO stable_snaps SELECT * FROM df") print(f"✅ {len(df)} rows saved for {TODAY}") if __name__ == "__main__": t0 = time.time(); main(); print("⏱️", round(time.time()-t0,2), "s")
チェーン別循環量は DefiLlama ダッシュボード “Stablecoins > Chains” で公開中 DefiLlama
2. 差分計算+ヒートマップ (diff_heatmap.py
)
import duckdb, pandas as pd, seaborn as sns, matplotlib.pyplot as plt, datetime as dt DB = "data/stables.duckdb" today = dt.date.today() prev1 = today - dt.timedelta(days=1) prev7 = today - dt.timedelta(days=7) query = f""" WITH t AS (SELECT name, peggedUsd FROM stable_snaps WHERE snapshot_date = '{today.isoformat()}'), y1 AS (SELECT name, peggedUsd AS usd_1d FROM stable_snaps WHERE snapshot_date = '{prev1.isoformat()}'), y7 AS (SELECT name, peggedUsd AS usd_7d FROM stable_snaps WHERE snapshot_date = '{prev7.isoformat()}') SELECT t.name, ROUND(100*(t.peggedUsd - y1.usd_1d)/y1.usd_1d ,2) AS pct_1d, ROUND(100*(t.peggedUsd - y7.usd_7d)/y7.usd_7d ,2) AS pct_7d FROM t JOIN y1 USING(name) JOIN y7 USING(name) """ df = duckdb.sql(query).df().set_index("name") top = df.sort_values("pct_1d", ascending=False).head(10) # 通知用 # ヒートマップ plt.figure(figsize=(10,6)) sns.heatmap(df[["pct_1d","pct_7d"]], annot=False, center=0) plt.title(f"Stablecoin Flow % Change ({today})") plt.xticks(rotation=0) plt.tight_layout() plt.savefig(f"charts/stable_heat_{today}.png")
Slack 通知 (任意)
from slack_sdk.webhook import WebhookClient, SlackWebhookError hook = WebhookClient("https://hooks.slack.com/services/XXX/YYY/ZZZ") text = "*Stablecoin inflow top 5 (1d %)*\n" + top.head(5).to_markdown() hook.send(text=text)
3. ブリッジTVL照合 (bridge_lookup.py
)
import requests, pandas as pd, duckdb BRIDGE_URL = "https://bridges.llama.fi/bridges?excludeTotal=false" br = pd.DataFrame(requests.get(BRIDGE_URL).json()["bridges"]) # ① 上位 inflow チェーン chains = duckdb.sql("SELECT name FROM stable_snaps WHERE snapshot_date = CURRENT_DATE ORDER BY peggedUsd DESC LIMIT 3").df()["name"].tolist() # ② 各チェーンで TVL が増えたブリッジ抽出 res = (br[br["chain"].isin(chains)] .sort_values("change_1d", ascending=False) .head(10)[["displayName","chain","change_1d","tvl"]]) print(res)
ブリッジ TVL ダッシュボードは /bridges
& /bridges/chains
で公開 DefiLlama
4. ③ DEX‑CEX スキャナへの自動連携
# config.yaml などに監視チェーンを書き出す import yaml cfg = dict(watch_chains=chains) yaml.safe_dump(cfg, open("config/dex_cex_chains.yaml","w"))
スキャナ側では watch_chains
を読んで
各チェーンの上位トークンペアを監視 → スプレッドが 1 % 超えたらアラート、の流れで OK。
5. スケジュール例(cron)
# ❶ 03:05 JST でスナップショット 5 3 * * * /path/stable_env/bin/python /path/fetch_stables.py # ❷ 03:10 JST で差分計算&ヒートマップ&通知 10 3 * * * /path/stable_env/bin/python /path/diff_heatmap.py # ❸ 03:20 JST でブリッジ照合+DEX‑CEX 設定更新 20 3 * * * /path/stable_env/bin/python /path/bridge_lookup.py
6. ディレクトリ構成イメージ
stable_flow_monitor/ ├── data/ # DuckDB ├── charts/ # PNG Heatmaps ├── config/ │ └── dex_cex_chains.yaml ├── fetch_stables.py ├── diff_heatmap.py ├── bridge_lookup.py └── .env # SLACK_WEBHOOK など
運用 Tips
チェックポイント | 補足 |
---|---|
API 落ち | requests.get(..., timeout=30) + for _ in range(3): retry |
Missing data | 祝日などに 0 レコードになる場合、前日値を ffill() で補完 |
色覚対応ヒートマップ | sns.heatmap(..., cmap="coolwarm") などで調整 |
閾値チューニング | 30 % → 四分位距離 IQR ベースの動的閾値にしても良い |
次にやると良いこと
- イン/アウト額を “USD 絶対値” でもランキング
- 1 h 変化率で intra‑day 速報(ガス代高騰で USDC が逃げる瞬間など)
- 流入チェーンのブリッジ手数料 × ガス代と併せて ROI 見積もり
- 出来高低迷チェーンなのに資金だけ流入 → “次のエアドロ” マーク

これで 「資金の入口/出口を毎朝可視化 → 即トレード候補へ回す」 ための最小パイプラインが完成します。
【Phase3】DEX ボリューム急増 × CEX 価格乖離レーダー ─ 実装フルガイド ⚡
狙い
板の薄い草トークン/新規上場銘柄で「DEX が盛り上がるが CEX がまだ追随していない」瞬間を検知してアービトラを仕掛ける。データフロー DEX Price & Volume → 正規化 → CEX Price → 乖離&ボリューム検知 → オーダーブック深さチェック → 通知 & Bot へルーティング
0. 必須ライブラリ
pip install requests websockets aiohttp pandas numpy duckdb pyyaml slack_sdk
モジュール | 用途 |
---|---|
requests / aiohttp | REST & WS ※Birdeye は WS も可 |
pandas + numpy | 5 min ローリング σ 計算 |
duckdb | 軽量ストア(出来高ベースライン保持) |
slack_sdk | アラート送信 |
1. データソース
種別 | エンドポイント | 備考 |
---|---|---|
Solana DEX | https://public-api.birdeye.so/defi/history_price?address=<TOKEN>&from=<ts>&to=<ts> (Price, Volume) docs.birdeye.so | X-API-KEY ヘッダ必須・Free 枠あり |
マルチチェーン DEX | https://api.dexscreener.com/token-profiles/latest/v1 (Pair list & metadata) docs.dexscreener.com + latest/dex/pairs/{chainId}/{pairId} (ticker & 24 h vol) | 無料・60 r/m |
CEX Bybit | GET /v5/market/tickers?category=spot&symbol=XXXX or linear bybit-exchange.github.io | 公開 API・key 不要 |
CEX Binance | GET /api/v3/ticker/price?symbol=XXXXUSDT (スポット) binance-docs.github.io | 公開 API |
Orderbook depth | Bybit /v5/market/orderbook?symbol=XXXX&limit=50 Spot / Perps bybit-exchange.github.io | 50 level で十分 |
Binance /api/v3/depth?symbol=XXXXUSDT&limit=100 |
トークンアドレス ↔ シンボルのマッピング
DexScreener のbaseToken.symbol
を優先 → 失敗時は独自 YAML を上書き。
2. アーキテクチャ
flowchart TD %% nodes DEX[DEX Feeder] CEX[CEX Feeder] NORM["Normalizer (symbol, price, volume)"] SPREAD[Spread Engine] DEPTH[Depth Check] ACTION["Slack / Bot Action"] %% edges DEX -- poll/websocket --> NORM NORM -- join on symbol --> SPREAD CEX -- REST 1 s --> SPREAD SPREAD -- spread > 1% and vol > sigma --> DEPTH DEPTH -- depth ok --> ACTION
3. コアロジック (spread_watch.py
抜粋)
import time, asyncio, requests, pandas as pd, numpy as np from slack_sdk.webhook import WebhookClient SPREAD = 0.01 # 1 % VOL_LOOKBACK = 12 # 12 * 5 s = 1 min 例 alert_hook = WebhookClient("<SLACK_WEBHOOK_URL>") # ---- tiny rolling buffer for volume baseline ---- class VolBaseline: def __init__(self, size=60): # 5 s × 60 = 5 min self.size = size self.vols = [] def update(self, v): self.vols.append(v) if len(self.vols) > self.size: self.vols.pop(0) return np.std(self.vols) if len(self.vols) > 10 else 0 vol_map = {} # symbol -> VolBaseline async def poll_dex(symbol, pair_id, chain_id): while True: j = requests.get(f"https://api.dexscreener.com/latest/dex/pairs/{chain_id}/{pair_id}", timeout=5).json() p = float(j["pairs"][0]["priceUsd"]) v = float(j["pairs"][0]["volume"]["h24"]) / 288 # 5 min ≒ 1/288 ts = time.time() yield {"symbol": symbol, "price": p, "vol5": v, "ts": ts} await asyncio.sleep(5) # 5 s poll async def poll_cex(symbol): while True: j = requests.get("https://api.bybit.com/v5/market/tickers", params={"category":"spot","symbol":symbol}, timeout=5).json() p = float(j["result"]["list"][0]["lastPrice"]) ts = time.time() yield {"symbol": symbol, "price": p, "ts": ts} await asyncio.sleep(5) async def watcher(): dex_q, cex_q = asyncio.Queue(), asyncio.Queue() # 例:WEN‐USDC(Solana)を監視 asyncio.create_task(poll_dex("WENUSDC","8jwv...pairAddr","solana")) asyncio.create_task(poll_cex("WENUSDC")) books = {} # latest prices while True: for q in (dex_q, cex_q): if not q.empty(): d = q.get_nowait() key = (d["symbol"], "dex" if "vol5" in d else "cex") books[key] = d for sym in set(k[0] for k in books): try: dex = books[(sym,"dex")]; cex = books[(sym,"cex")] except KeyError: continue spread = abs(dex["price"]-cex["price"])/cex["price"] std = vol_map.setdefault(sym, VolBaseline()).update(dex["vol5"]) if spread > SPREAD and dex["vol5"] > std: if await depth_ok(sym, dex["price"]): alert_hook.send(text=f"*ARBITRA* {sym}\n" f"DEX ${dex['price']:.4f} vs CEX ${cex['price']:.4f}" f" | Spread {spread*100:.2f}%\n" f"5mVol {dex['vol5']:.2f} > σ {std:.2f}") await asyncio.sleep(1)
注目ポイント
- Volume の基準値:直近 5 分の rolling σ。草トークンは閾値を上げ過ぎない。
- Latency:poll 5 s 例。WebSocket (
wss://public-api.birdeye.so/socket
) への置き換えで sub‑sec 化可 docs.birdeye.so - Spread 1 %:流動性が薄いほど ask–bid 開きを埋めるコストが大きいので 1 % 以上を推奨。
4. 部分約定対策 — 深さ 2〜3 tick の板厚
def depth_ok(symbol, p_dex, max_slippage=0.005): # Bybit depth ob = requests.get("https://api.bybit.com/v5/market/orderbook", params={"symbol":symbol,"limit":50}, timeout=5).json() asks = [(float(pr), float(sz)) for pr,sz,*_ in ob["result"]["a"][:3]] size_needed = 500 # USD cost = 0 for pr,sz in asks: slice_sz = min(sz, size_needed/(pr)) cost += slice_sz*pr size_needed -= slice_sz*pr if size_needed <= 0: break slip = (cost/(500) - p_dex)/p_dex return slip <= max_slippage
DEX 側はオーダーブック API が無い場合が多いので 流動性 (liquidityUSD) で代用。
5. ハイパーパラメータ & 運用
変数 | 初期値 | チューニング指針 |
---|---|---|
SPREAD | 0.01 (1 %) | 草コインなら 2〜3 % でもヒットする |
VOL_LOOKBACK | 60 (5 min) | 暑い相場で 30、閑散なら 120 |
depth slippage | 0.5 % | CEX 側 Maker Rebate を考慮して調整 |
- False‑positive 減:
spread & vol
の二重条件+depth_ok
でほぼ撲滅。 - Rate Limit:DexScreener 60 r/m、Birdeye
/history_price
100 r/s (だが API Key枠) → 5 s poll × 20 pair で安全。 - Time Sync:秒単位の Arbi は NTP sync 推奨 (
chrony
).
6. ディレクトリ例
dex_cex_spread/ ├── data/volume.duckdb ├── src/ │ ├── feeders.py │ ├── spread_watch.py │ └── depth_check.py ├── config/tokens.yaml ├── charts/ # optional可視化 └── .env # API keys
7. 次フェーズ🚀
- Multi‑thread / asyncio gather:50 pair 同時監視でも CPU < 20 %.
- Cross‑DEX Spread:同チェーン DEX‑間 乖離も同ロジックで横展開。
- Auto‑Hedge Bot:Slack → Function URL → Trader コンテナへ POST。
- ML Ranking:前手法 2 の Stable Flow 指標を特徴量に統合し “事前スコア” を付与。
まとめ
Birdeye・DexScreener → DEX 価格 / Volume、Bybit・Binance → CEX 価格 を同時ストリーム。Spread > 1 %
& Vol(5 m) > σ
を満たしたら orderbook 深さチェック → 通知/自動エントリ。
これで 「誰よりも早く気づいて、でも無駄撃ちせずに入る」 最小アビトラ・レーダーが完成します。
【Phase4】ブリッジ/CEX 出入口クラスタリング 🕵️♂️― 実装ロードマップ
目的
① DEX⇄CEX アービトラの主戦プレイヤーを特定し、② 彼らの資金移動ルート(ブリッジ・入金先)をグラフで可視化、③ 再現性の高い「次の動き」を予測できるデータセットを得る。
1. 必要なデータソース & 前準備
区分 | エンドポイント/データ | 用途 |
---|---|---|
DEX 取引詳細 | Birdeye:/defi/transactions?address=<TOKEN>&from=<ts>&to=<ts> (Solana) ※WS も可 docs.birdeye.so | 該当時刻の Tx ハッシュ取得 |
Dex Screener:/token-profiles/latest/v1 → pairId を取得し /latest/dex/pairs/{chainId}/{pairId} で Tx リスト docs.dexscreener.com | ||
チェーン Explorer | Solscan API / Etherscan module=account&action=txlist&address= でフル Tx データ Etherscan | from / to / Logs 抽出 |
CEX ラベル | 自前 JSON(Binance/Bybit/OKX 等ホット & Deposit アドレス)+公開リスト Binance | Deposit 判定キー |
ブリッジ辞書 | https://defillama.com/bridges + API /bridge 一覧で contract ↔ dest chain マップ DefiLlama | |
保管 DB | DuckDB(軽量) or Postgres / Neo4j(後でグラフ分析するなら) |
2. ワークフロー全体像
flowchart TD %% Nodes EVT[Event step3 timestamp] S1["(1) DEX Tx hash 取得"] S2["(2) Chain Tx 詳細取得"] S3["(3) CEX or Bridge 判定"] S4["(4) Address graph 生成"] S5["(5) Clustering and visualization"] %% Edges EVT --> S1 S1 --> S2 S1 --> S3 S2 --> S4 S3 --> S4 S4 --> S5
3. ステップ別実装詳細
3‑1. DEX Tx ハッシュ取得
import requests, time def get_dex_txs_sol(token_addr, t0, t1): url = ("https://public-api.birdeye.so/defi/transactions" f"?address={token_addr}&from={t0}&to={t1}&limit=500") j = requests.get(url, headers={"X-API-KEY": "..."}).json() return [tx["txHash"] for tx in j["data"]["transactions"]]
t0 / t1
:イベント時刻±90 s が目安。
3‑2. チェーン Tx 詳細取得
import aiohttp, asyncio ETHERSCAN_KEY="..." async def fetch_tx_eth(txhash): url=("https://api.etherscan.io/api" f"?module=proxy&action=eth_getTransactionByHash&txhash={txhash}" f"&apikey={ETHERSCAN_KEY}") async with aiohttp.ClientSession() as s: async with s.get(url,timeout=10) as r: j=await r.json(); return j["result"]
取るべきフィールド
{ "hash": "...", "from": "0x...", // 発信者 "to": "0x...", // コントラクト or EO "value": "0x...", // Native amount "input": "0x...", // ブリッジ判定に使う "logs": [...] // ブリッジ転送 or Deposit Event }
3‑3. CEX Deposit / ブリッジ判定
CEX_MAP = json.load(open("cex_labels.json")) # addr → {"cex":"Binance", ...} BRIDGE_MAP = json.load(open("bridge_contracts.json")) # addr → {"bridge":"Wormhole","dst":"Solana"} def classify(tx): if tx["to"] in CEX_MAP: tag = ("CEX", CEX_MAP[tx["to"]]["cex"]) elif tx["to"] in BRIDGE_MAP: dst = BRIDGE_MAP[tx["to"]]["dst"] tag = ("BRIDGE", dst) else: tag = ("OTHER", None) return tag
Bridging の場合、ログ内に destinationChainId
や recipient
があるので抽出して「反対側アドレス」を追跡 queue に追加。
3‑4. グラフ構築 & クラスタリング
import networkx as nx, duckdb G = nx.MultiDiGraph() for tx in parsed_txs: src = tx["from"]; dst = tx["to"]; ttag, meta = classify(tx) G.add_edge(src, dst, label=ttag, ts=tx["timestamp"], meta=meta) # ❶ CEX Deposit クラスタ → 入金アドレス単位で Union‑Find cex_clusters = {} for u,v,d in G.edges(data=True): if d["label"] == "CEX": cex_addr = v # deposit cex_clusters.setdefault(cex_addr, set()).add(u) # ❷ Bridge ペア → src ↔ dstAddress を 2‑hop 合流で束ねる for u,v,d in G.edges(data=True): if d["label"] == "BRIDGE": dst_addr = d["meta"]["dst_wallet"] G.add_edge(u, dst_addr, label="BRIDGED_TO", ts=d["ts"]) # Louvain / Connected Components clusters =
3‑5. 可視化 & レポート
import matplotlib.pyplot as plt, networkx as nx for i,cl in enumerate(clusters): sub = G.subgraph(cl) nx.draw(sub, with_labels=False, node_size=50, edge_color="grey") plt.title(f"Cluster {i} ({len(sub.nodes())} wallets)") plt.savefig(f"charts/cluster_{i}.png")
Slack 送付:slack_sdk
で files.upload
.
4. 実運用 Tips
テーマ | 推奨 |
---|---|
CEX アドレス更新 | DeBank / Arkham / Chainabuse が公開する “Exchange Tag JSON” を月次で pull → diff |
新ブリッジ対応 | DeFiLlama /bridge エンドポイントを毎週 Snapshot → 新規 contract が増えたら自動追登録 DefiLlama |
Gas & Rate Limit | RPC→Alchemy/Infura の batch & multicall でコスト削減。Etherscan free は 5 r/sec → backoff。 |
ミラー入金 | 同一 Tx 内で memo/tag を持つ CEX (XRP/ADA 等) は Memo ID までキーにすると誤結合を防げる。 |
グラフ DB | Neo4j+Bloom を使うと WebGUI で資金ルートのアニメ再生が可能。 |
5. 次ステップ
- 時間窓集計:クラスタごとに「PoC → Bridge → CEX」完了までの平均レイテンシを計測。
- 行動シグネチャ:入金額・ブリッジ先・チェーン滞在時間を特徴量に k‑Means → “高速狩人” vs “遅延リスクヘッジャ” を分類。
- リアルタイム Hook:③のスプレッド検知→該当ウォレットが出現→“Follow 取引” を自動トリガー。

これで 「イベント検知 → アビトラ勢ウォッチ → 出入口クラスタリング」 の基盤が完成します。
【Phase5】“イベント前兆” モデル化 📈 — フル実装ガイド
ゴールの再確認
* 90 日分のイベントメタデータ (手法 ①〜④) を学習
* ラベル = 「検知から 15 分以内に 0.5 % 以上の利幅 (gross edge) が取れたか」<br>⇒ 2 値分類
* CatBoost or RandomForest を使い、SHAP で効いている特徴を確認 → モデル縮小
* 推論スコア上位 N 件を Shadow Mode で紙トレードし、結果を再学習にフィードバック
0. ディレクトリ雛形
edge_model/ ├── data/ # DuckDBファイル & parquet │ ├── events.duckdb # ①〜④で生成 │ └── price_ticks.parquet ├── notebooks/ ├── src/ │ ├── build_dataset.py │ ├── train_model.py │ └── score_live.py ├── models/ │ └── cat_v1.cbm └── config.yaml
DuckDB & Parquet で I/O 軽量化、モデルは catboost.CatBoostClassifier
を保存
1. イベント & ラベルテーブル を作る
-- events.duckdb に集約済みと仮定 CREATE OR REPLACE TABLE edge_events AS SELECT event_id, ts_event, -- 秒精度 symbol, -- 特徴例 delta_tvl_pct, delta_stable_pct, vol_sigma, token_age_days, bridged_tvl_ratio FROM tvl_snap -- ← 手法① JOIN stable_snaps USING(snapshot_date) JOIN dex_vol_snaps USING(symbol, snapshot_date) JOIN bridge_stats USING(symbol, snapshot_date);
1‑A. ラベル付け
CREATE OR REPLACE TABLE labels AS SELECT e.event_id, CASE WHEN abs(p15.price - p0.price)/p0.price >= 0.005 THEN 1 ELSE 0 END AS y FROM edge_events e LEFT JOIN LATERAL ( SELECT price FROM price_ticks WHERE symbol = e.symbol AND ts BETWEEN e.ts_event AND e.ts_event + 60 ORDER BY ts LIMIT 1) p0 LEFT JOIN LATERAL ( SELECT price FROM price_ticks WHERE symbol = e.symbol AND ts BETWEEN e.ts_event+900 AND e.ts_event+960 ORDER BY ts LIMIT 1) p15
15 min 後 (±60 s バッファ) の価格で 0.5 % 以上 動いていれば y=1
2. データセット生成スクリプト (build_dataset.py
)
import duckdb, pandas as pd, pathlib con = duckdb.connect("data/events.duckdb") df = con.sql(""" SELECT e.*, l.y FROM edge_events e JOIN labels l USING(event_id) WHERE ts_event >= date '2025-04-24' - INTERVAL 90 DAY """).df() # カテゴリ→string、欠損→0、対数変換など cat_cols = ["symbol"] num_cols = df[num_cols] = df[num_cols].fillna(0).astype("float32") df.to_parquet("data/train_90d.parquet", index=False)
3. 時系列ウォークフォワードCV で検証
“未来リーク” を防ぐため TimeSeriesSplit / Walk‑Forward を利用 Kaggle
from sklearn.model_selection import TimeSeriesSplit import pandas as pd, numpy as np, catboost as cb, shap, joblib, json df = pd.read_parquet("data/train_90d.parquet") X = df.drop(columns=["y","event_id","ts_event"]) y = df["y"] cv = TimeSeriesSplit(n_splits=5, test_size= int(len(df)*0.1)) scores = [] for i, (tr, te) in enumerate(cv.split(X)): model = cb.CatBoostClassifier( depth=6, learning_rate=0.05, iterations=400, cat_features=[X.columns.get_loc(c) for c in ["symbol"]], eval_metric="AUC", verbose=False, random_state=42 ) model.fit(X.iloc[tr], y.iloc[tr], eval_set=(X.iloc[te], y.iloc[te]), verbose=False) auc = model.get_best_score()["validation"]["AUC"] scores.append(auc) print("Walk‑forward AUC:", np.mean(scores).round(3)) model.save_model("models/cat_v1.cbm")
4. SHAP で特徴寄与を確認 → 削減
explainer = shap.TreeExplainer(model) # CatBoostは直接OK :contentReference[oaicite:1]{index=1} shap_vals = explainer.shap_values(X.sample(1000)) shap.summary_plot(shap_vals, X) # Notebookで確認 # 重要度が低い (<0.01) 特徴を除外 imp = np.abs(shap_vals).mean(axis=0) keep = X.columns[imp > 0.01] json.dump(keep.tolist(), open("config/feature_whitelist.json","w"))
5. 再学習 & 軽量モデル
whitelist = json.load(open("config/feature_whitelist.json")) X_reduced = X[whitelist] model_small = cb.CatBoostClassifier( depth=5, iterations=300, learning_rate=0.08, cat_features=[whitelist.index("symbol")], verbose=False, random_state=42) model_small.fit(X_reduced, y) model_small.save_model("models/cat_v2.cbm")
6. リアルタイム推論 & Shadow Mode (score_live.py
)
import catboost as cb, yaml, time, duckdb model = cb.CatBoostClassifier() model.load_model("models/cat_v2.cbm") watch_chains = yaml.safe_load(open("config.yaml"))["watch_chains"] def gen_features(event_id): # ①〜④のストリームから同等の特徴量を構築して返す ... while True: events = poll_new_events(chains=watch_chains) # 手法③が出力 for ev in events: x = gen_features(ev["id"])[whitelist] score = model.predict_proba(x)[0,1] if score > 0.65: # 適宜閾値調整 enqueue_shadow_trade(ev, score) time.sleep(1)
Shadow trade 結果 (Fill・PnL) を duckdb.table('shadow_results')
へ格納 → 次回学習で y
列を更新
7. 運用 & 継続学習パイプライン
フェーズ | スケジュール | 内容 |
---|---|---|
データ更新 | 毎 5 min | 手法①〜④ のメタテーブルに追記 |
ラベル確定 | +16 min バッチ | 15 min 後の利幅を計算し y を確定 |
モデル再学習 | 週次(月〜金累積) | 直近 90 d データで cat_vN+1 再学習 ➜ SHAP ➜ Feature Pruning |
閾値最適化 | 毎再学習 | ROC Youden / Precision‑Recall で β を決定 |
Shadow 評価 | 常時 | Top N=10 イベントを紙トレード → PnL 補正を DB に追加 |
本番投入 | 累積 PnL > 閾値 & hit率 > X% | Trader コンテナへ自動ルーティング |
MLflow や Weights & Biases の offline=True
でメタデータを残すと効果検証が楽
8. モデル運用の注意点
リスク | 対策 |
---|---|
クラス不均衡 | scale_pos_weight (CatBoost) ≈ (neg/pos) で補正 |
未来リーク | “特徴計算に後続データが混入していないか” を定期チェック |
データシフト | vol_sigma や delta_tvl_pct の分布を KS‑test → 逸脱で閾値↑ |
実装ドリフト | モデルの Feature List (JSON) と 推論側 mapping を Git hash で照合 |
9. 成果物まとめ
ファイル | 役割 |
---|---|
data/train_90d.parquet | 学習用データセット |
models/cat_v2.cbm | SHAP 後の軽量モデル |
config/feature_whitelist.json | 採用特徴リスト |
charts/shap_summary.png | 重要度可視化 |
shadow_results.duckdb | 紙トレード検証ログ |
次の拡張アイデア
* CatBoost → LightGBM GPU でレイテンシ短縮
* Meta‑feature: 「同一ウォレットの直近 24 h ROI」
* バンディット (ALG = Thompson Sampling) で 閾値 adapt
これで イベント前兆を“数字”で語るモデル基盤 が完成します。