先日、Xで大変興味深いポストを見かけました。現状、私ができることをアイデアレベルでまとめて取り組んでいるので、自身の定点観測&ロードマップとして記録しておきます。
【アビトラエッジの探し方】
DEG鯖アビトライベントが良かったので僕も少しGiveしてみる。エッジってどうやって探すの?って質問書いてる人がいたのでそのアンサーの一端を書いてみる😇・なんとか1個とっかかりを見つけよう
・見つけたとっかかりを掘ろう
・ChatGPTも活用しよう続きはスレへ📝
— くりぷとべあー🍻 (@cryptoo_bear) July 21, 2025
いまから手を動かすなら──5つの “はじめの一歩”
下に行くほど難易度UP。まず①②の 「差分ウォッチ系」 を回し、データ取得・ストレージ・可視化の土台を固めてから③〜⑤へ進む。
1. TVL 急変アラート 🔍
ねらい:プロトコル/チェーンの“資金流入超過”に最速で気づく。
- データ取り
GET https://api.llama.fi/protocolsで全プロトコルの最新 TVL と 1 d / 7 d 変化率を取得 GitHub- 24 h 毎にスナップショットを保存(SQLite → DuckDB で OK)。
- 異常検知
- 「前日比 +30 % 以上」を閾値に outlier を抽出。
- 急伸したチェーン or カテゴリ (Lending/Perps 等) を絞り込む。
- 深掘り
- 該当プロトコルの
/tvl/{protocol}でヒストリカル推移を取得し “出来高 vs TVL” の乖離をチェック api-docs.defillama.com - 出来高が伴わず TVL だけ跳ねているなら入出金サヤやステーキング・アンステーキングに伴う裁定が隠れていることが多い。
- 該当プロトコルの
2. ステーブルコイン流入/流出マップ 🌊
ねらい:資金の「入口」「出口」を視覚化し、先回りで流動性シフトを捕捉。
- チェーン別循環量
- DeFiLlama の
stablecoins/chainsダッシュボードを API で定期取得(HTML スクレイピングでも JSON 化可) DefiLlama
- DeFiLlama の
- 1 d / 7 d 差分 をヒートマップ描画し 流入上位チェーン を毎朝 Slack 通知。
- ブリッジ TVL も合わせて見ると、どのブリッジ経由で資金が動いたか推測しやすい DefiLlama
- 流入チェーン上位3つを DEX‐CEX 価格差スキャン(③で解説) の対象に追加。
3. DEX ボリューム急増 × CEX 価格乖離 レーダー ⚡
ねらい:板の薄い草トークンや新規上場銘柄で“瞬間アービトラ”を狙う。
- リアルタイム価格源
- Solana 系:Birdeye
GET /defi/history-price?address=など(無料枠あり) docs.birdeye.so - マルチチェーン:Dex Screener
GET /token-profiles/latest/v1でペア一覧と価格・流動性を取得 docs.dexscreener.com
- Solana 系:Birdeye
- CEX 価格
- Bybit / Binance の REST
v5/market/tickersなどで同一銘柄のスポット・パーペツ比を取得。
- Bybit / Binance の REST
- ロジック
|P_dex − P_cex| / P_cex > 1 %かつ5 min ボリューム > 1 σでアラート。- スプレッド検出後は 部分約定対策 として深さ 2〜3 tick までの板厚を同時チェック。
4. ブリッジ・CEX 出入口クラスタリング 🕵️♂️
ねらい:上位アービトラ勢の“行動パターン”を盗み見る。
- ウォレット抽出
- ③で拾ったイベント時刻に Birdeye / Dex Screener から Tx ハッシュ →
solscan / ethscanで呼び出し元アドレスを列挙。
- ③で拾ったイベント時刻に Birdeye / Dex Screener から Tx ハッシュ →
- CEX Deposit 追跡
- アウトバウンド Tx の
toが既知 CEX アドレスなら、その CEX の 入金用ラベル をキーにアドレスを横展開。 - CEX 側はアドレス使い回し率が高いため、クラスター化が容易。
- アウトバウンド Tx の
- ブリッジ追跡
- Tx にブリッジコントラクトが絡む場合は DeFiLlama Bridged TVL ページの 対象チェーン を手掛かりに反対側を探索。
5. “イベント前兆” の統計モデル化 📈
ねらい:①〜④で得たメタデータを特徴量に、勝率の高いパターン を回帰/クラスタリングで抽出。
- 特徴例
ΔTVL%、ΔStablecoinFlow%、DEX Volume Spike σ、Token Age、Bridged TVL Ratioなど。
- 手順
- 90 日間データを学習 → ラベルは “エッジ発生後 15 min で 0.5 % 以上利幅が取れたか”。
- ランダムフォレスト or CatBoost で SHAP 値 を確認し「効いている指標」だけ残す。
- 運用
- スコア上位 n 件をShadow Mode で紙トレードし、真の正例/負例を追加学習。
最小セットアップ例(Python)
pip install requests pandas duckdb
import requests, pandas as pd, duckdb, datetime as dt
# 1) TVL snapshot
today = dt.date.today().isoformat()
data = requests.get("https://api.llama.fi/protocols").json()
df = pd.json_normalize(data)[["name","chain","tvl","change_1d","change_7d"]]
duckdb.sql("CREATE TABLE IF NOT EXISTS tvl AS SELECT * FROM df WHERE 1=0")
duckdb.sql("INSERT INTO tvl SELECT * FROM df")
まずはこれで “毎日 TVL を溜める” パイプラインを cron で回すだけでも十分価値があります。
どこから始める?
- TVL スナップショット → 急変ヒートマップ(最速で ROI が見える)
- Stablecoin Flow ヒートマップ(資金の流入口を可視化)
- DEX/CEX スプレッドスキャナ(即金チャンス)
上記3つが動き出したら、ウォレットクラスタリング → 機械学習による予兆検知 へ発展させる。
次のアクション
| 優先度 | TODO | 目標所要 |
|---|---|---|
| ★★★ | DeFiLlama TVL Cron ジョブ & DuckDB 保存 | 0.5 d |
| ★★☆ | Stablecoin Flow 差分ヒートマップ | 1 d |
| ★★☆ | Dex Screener/Birdeye × CEX スプレッド比較スクリプト | 1 d |
| ★☆☆ | ウォレットクラスタリング PoC | 2 d |
| ★☆☆ | 統計モデルの特徴量設計 | 2 d |
「まず 1 件でいいから“数字の変化”を捕まえる」 ─ そこから洞察と改良のループを回すことがスタートライン。「アビトラbotで私の主力bot群の補完をしたり、新たな稼ぎの柱を増やしたりする」という狙いがあるので、以下のようなロードマップを大まかな指針としてマイペースにやっています。
【Phase1】TVL 急変アラート ― 実装ガイド
(Python×DuckDB×Cron+任意で Slack 通知)
0. 前提環境
| 要素 | 推奨 |
|---|---|
| OS | Linux / macOS (ARM/Intel) |
| Python | ≥ 3.10 |
| 主要ライブラリ | requests pandas duckdb pyyaml |
| デプロイ | 1 GB RAM VM で充分(スナップショットは数 MB/日) |
python -m venv tvl_env && source tvl_env/bin/activate pip install requests pandas duckdb pyyaml
1. データ取得スクリプト(fetch_tvl.py)
import requests, duckdb, datetime as dt, pandas as pd, pathlib, time
DB_PATH = pathlib.Path("data/tvl_history.duckdb")
def snapshot_today():
url = "https://api.llama.fi/protocols" # 一覧エンドポイント :contentReference[oaicite:0]{index=0}
resp = requests.get(url, timeout=30)
resp.raise_for_status()
df = pd.json_normalize(resp.json())
# 欲しい列だけ残す
keep = ["id", "name", "chain", "category", "tvl", "change_1d", "change_7d"]
df = df[keep].assign(snapshot_date=dt.date.today().isoformat())
with duckdb.connect(DB_PATH) as con:
con.sql("""
CREATE TABLE IF NOT EXISTS tvl_snapshots AS
SELECT * FROM df WHERE 1=0
""")
con.register("df", df)
con.sql("INSERT INTO tvl_snapshots SELECT * FROM df")
print("✅ Saved", len(df), "rows")
if __name__ == "__main__":
t0 = time.time()
snapshot_today()
print("⏱️", round(time.time() - t0, 2), "s")
ポイント
- API は無認証・無料枠(現行
/protocolsは ~4 000 B/req) - スナップショットは 毎日 1 レコード × プロトコル数 → 1 年でも <30 MB
idはドキュメント非公開のことが多いため、nameで JOIN しても OK(ただし重複に注意)
2. スケジューリング
Cron(ローカル/EC2 いずれでも)
0 3 * * * /path/to/tvl_env/bin/python /path/to/fetch_tvl.py >> /var/log/tvl_fetch.log 2>&1
- UTC 0 3:00 ≒ 日本時間 12:00 を例示。API 制限ほぼ無しだが混雑時間帯は避けると安心。
- GitHub Actions で回す場合は
on: schedule: cron: '0 3 * * *'で同様。
3. 異常検知クエリ(detect_outliers.py)
import duckdb, datetime as dt
THRESHOLD = 30 # +30 % 以上
LOOKBACK = 1 # 前日比
with duckdb.connect("data/tvl_history.duckdb") as con:
today = dt.date.today().isoformat()
prev = (dt.date.today() - dt.timedelta(days=LOOKBACK)).isoformat()
query = f"""
WITH today AS (
SELECT id, name, chain, category, tvl
FROM tvl_snapshots WHERE snapshot_date = '{today}'
), prev AS (
SELECT id, tvl AS tvl_prev
FROM tvl_snapshots WHERE snapshot_date = '{prev}'
)
SELECT t.name, t.chain, t.category,
ROUND( (t.tvl - p.tvl_prev)*100.0/p.tvl_prev, 2 ) AS pct_change,
t.tvl
FROM today t
JOIN prev p USING(id)
WHERE (t.tvl - p.tvl_prev)*100.0/p.tvl_prev >= {THRESHOLD}
ORDER BY pct_change DESC
"""
alerts = con.sql(query).df()
*返り値 alerts が空でなければ急伸銘柄。
フォールバック:前日データが無いときは COALESCE で 0 割り込みを防ぐ。
Slack 通知(任意)
import os, json, requests
if not alerts.empty:
text = "*🚨 TVL spike alert*\n" + alerts.to_markdown(index=False)
requests.post(
os.environ["SLACK_WEBHOOK"],
headers={"Content-Type": "application/json"},
data=json.dumps({"text": text})
)
4. 急伸チェーン/カテゴリでフィルタリング
agg = alerts.groupby(["chain","category"]).size().reset_index(name="count")
print(agg.sort_values("count", ascending=False).head())
- ここで Lending / Perps など特定カテゴリが固まって急増 → “狙い目”。
5. 深掘り:ヒストリカル TVL 取得
import requests, pandas as pd, matplotlib.pyplot as plt, datetime as dt
slug = "abracadabra" # 例:急伸プロトコル名
hist = requests.get(f"https://api.llama.fi/tvl/{slug}").json() # :contentReference[oaicite:1]{index=1}
df = pd.DataFrame(hist["tvl"])
df["date"] = pd.to_datetime(df["date"], unit="s")
df.plot(x="date", y="totalLiquidityUSD", title=f"TVL History – {slug}")
plt.show()
「出来高 vs TVL」 乖離チェック
- 出来高は
/summary/dexs/{protocolSlug}など Volume API を利用 GitHub - 直近 24 h TVL 比で Volume/TVL < 5 % なら 純入金 > 取引 の可能性
- 取引伴わず TVL だけ膨張 → ブリッジ入金 or ステーキング移行 の匂い
6. ディレクトリ構成例
tvl_monitor/ ├── data/ # DuckDB ファイル ├── fetch_tvl.py ├── detect_outliers.py ├── charts/ # 画像出力 (optional) ├── .env # SLACK_WEBHOOK 等 └── cron.txt # crontab 用
7. ハードニング & 運用 Tips
| 項目 | 推奨 |
|---|---|
| API 落ち対策 | try/except+リトライ (time.sleep(5)) |
| DB VACUUM | 30 d に一度 DuckDB の optimize |
| テーブル肥大化 | tvl_snapshots をパーティション (snapshot_date 列) で Z-order |
| 閾値チューニング | 四分位距離 IQR や Z-score で自動補正にすると β→本番 の移行が楽 |
8. 次のステップ
- Stablecoin 流入ヒートマップ(手法 2)
- DEX/CEX スプレッドスキャナ(手法 3)
- 検知→自動バックテスト Pipeline(手法 5 への布石)
“まず毎日 1 枚のスナップショットを貯め始める”―ここから エッジの霧 が晴れていく。
【Phase2】ステーブルコイン流入/流出マップ ─ 実装ステップ
(Python × DuckDB × matplotlib + Slack 通知)
ゴール
- チェーン別のステーブルコイン循環量を毎日スナップショット → 1 d/7 d 差分をヒートマップ化
- 流入上位チェーンを朝イチで Slack 通知
- 併せてブリッジ TVL を照合し、どのブリッジ経由で資金が動いたかを推測
- 上位 3 チェーンを ③ DEX‑CEX スプレッドスキャナ の監視対象に自動追加
0. 事前セットアップ
python -m venv stable_env && source stable_env/bin/activate pip install requests pandas duckdb matplotlib pyyaml slack_sdk
| 依存 | 推奨バージョン |
|---|---|
| Python | ≥ 3.10 |
| DuckDB | 0.9 系以降(pip install duckdb==0.10.0 でも可) |
1. データ取得 (fetch_stables.py)
# coding: utf-8
"""
毎日 1 回、各チェーンのステーブルコイン循環量を取得して DuckDB に追記
"""
import requests, pandas as pd, duckdb, datetime as dt, pathlib, time
DB = pathlib.Path("data/stables.duckdb")
URL = "https://stablecoins.llama.fi/stablecoinchains" # JSON:チェーン別循環量
TODAY = dt.date.today().isoformat()
def main():
js = requests.get(URL, timeout=30).json() # ← 無認証・無料エンドポイント
df = pd.DataFrame(js["chains"]) # key: chains
keep = ["name","peggedUsd","change_1h","change_1d","change_7d"]
df = df[keep].assign(snapshot_date=TODAY)
with duckdb.connect(DB) as con:
con.sql("""CREATE TABLE IF NOT EXISTS stable_snaps AS
SELECT * FROM df WHERE 1=0""")
con.register("df", df)
con.sql("INSERT INTO stable_snaps SELECT * FROM df")
print(f"✅ {len(df)} rows saved for {TODAY}")
if __name__ == "__main__":
t0 = time.time(); main(); print("⏱️", round(time.time()-t0,2), "s")
チェーン別循環量は DefiLlama ダッシュボード “Stablecoins > Chains” で公開中 DefiLlama
2. 差分計算+ヒートマップ (diff_heatmap.py)
import duckdb, pandas as pd, seaborn as sns, matplotlib.pyplot as plt, datetime as dt
DB = "data/stables.duckdb"
today = dt.date.today()
prev1 = today - dt.timedelta(days=1)
prev7 = today - dt.timedelta(days=7)
query = f"""
WITH t AS (SELECT name, peggedUsd FROM stable_snaps
WHERE snapshot_date = '{today.isoformat()}'),
y1 AS (SELECT name, peggedUsd AS usd_1d FROM stable_snaps
WHERE snapshot_date = '{prev1.isoformat()}'),
y7 AS (SELECT name, peggedUsd AS usd_7d FROM stable_snaps
WHERE snapshot_date = '{prev7.isoformat()}')
SELECT t.name,
ROUND(100*(t.peggedUsd - y1.usd_1d)/y1.usd_1d ,2) AS pct_1d,
ROUND(100*(t.peggedUsd - y7.usd_7d)/y7.usd_7d ,2) AS pct_7d
FROM t JOIN y1 USING(name) JOIN y7 USING(name)
"""
df = duckdb.sql(query).df().set_index("name")
top = df.sort_values("pct_1d", ascending=False).head(10) # 通知用
# ヒートマップ
plt.figure(figsize=(10,6))
sns.heatmap(df[["pct_1d","pct_7d"]], annot=False, center=0)
plt.title(f"Stablecoin Flow % Change ({today})")
plt.xticks(rotation=0)
plt.tight_layout()
plt.savefig(f"charts/stable_heat_{today}.png")
Slack 通知 (任意)
from slack_sdk.webhook import WebhookClient, SlackWebhookError
hook = WebhookClient("https://hooks.slack.com/services/XXX/YYY/ZZZ")
text = "*Stablecoin inflow top 5 (1d %)*\n" + top.head(5).to_markdown()
hook.send(text=text)
3. ブリッジTVL照合 (bridge_lookup.py)
import requests, pandas as pd, duckdb
BRIDGE_URL = "https://bridges.llama.fi/bridges?excludeTotal=false"
br = pd.DataFrame(requests.get(BRIDGE_URL).json()["bridges"])
# ① 上位 inflow チェーン
chains = duckdb.sql("SELECT name FROM stable_snaps WHERE snapshot_date = CURRENT_DATE ORDER BY peggedUsd DESC LIMIT 3").df()["name"].tolist()
# ② 各チェーンで TVL が増えたブリッジ抽出
res = (br[br["chain"].isin(chains)]
.sort_values("change_1d", ascending=False)
.head(10)[["displayName","chain","change_1d","tvl"]])
print(res)
ブリッジ TVL ダッシュボードは /bridges & /bridges/chains で公開 DefiLlama
4. ③ DEX‑CEX スキャナへの自動連携
# config.yaml などに監視チェーンを書き出す
import yaml
cfg = dict(watch_chains=chains)
yaml.safe_dump(cfg, open("config/dex_cex_chains.yaml","w"))
スキャナ側では watch_chains を読んで
各チェーンの上位トークンペアを監視 → スプレッドが 1 % 超えたらアラート、の流れで OK。
5. スケジュール例(cron)
# ❶ 03:05 JST でスナップショット 5 3 * * * /path/stable_env/bin/python /path/fetch_stables.py # ❷ 03:10 JST で差分計算&ヒートマップ&通知 10 3 * * * /path/stable_env/bin/python /path/diff_heatmap.py # ❸ 03:20 JST でブリッジ照合+DEX‑CEX 設定更新 20 3 * * * /path/stable_env/bin/python /path/bridge_lookup.py
6. ディレクトリ構成イメージ
stable_flow_monitor/ ├── data/ # DuckDB ├── charts/ # PNG Heatmaps ├── config/ │ └── dex_cex_chains.yaml ├── fetch_stables.py ├── diff_heatmap.py ├── bridge_lookup.py └── .env # SLACK_WEBHOOK など
運用 Tips
| チェックポイント | 補足 |
|---|---|
| API 落ち | requests.get(..., timeout=30) + for _ in range(3): retry |
| Missing data | 祝日などに 0 レコードになる場合、前日値を ffill() で補完 |
| 色覚対応ヒートマップ | sns.heatmap(..., cmap="coolwarm") などで調整 |
| 閾値チューニング | 30 % → 四分位距離 IQR ベースの動的閾値にしても良い |
次にやると良いこと
- イン/アウト額を “USD 絶対値” でもランキング
- 1 h 変化率で intra‑day 速報(ガス代高騰で USDC が逃げる瞬間など)
- 流入チェーンのブリッジ手数料 × ガス代と併せて ROI 見積もり
- 出来高低迷チェーンなのに資金だけ流入 → “次のエアドロ” マーク
これで 「資金の入口/出口を毎朝可視化 → 即トレード候補へ回す」 ための最小パイプラインが完成します。
【Phase3】DEX ボリューム急増 × CEX 価格乖離レーダー ─ 実装フルガイド ⚡
狙い
板の薄い草トークン/新規上場銘柄で「DEX が盛り上がるが CEX がまだ追随していない」瞬間を検知してアービトラを仕掛ける。データフロー DEX Price & Volume → 正規化 → CEX Price → 乖離&ボリューム検知 → オーダーブック深さチェック → 通知 & Bot へルーティング
0. 必須ライブラリ
pip install requests websockets aiohttp pandas numpy duckdb pyyaml slack_sdk
| モジュール | 用途 |
|---|---|
requests / aiohttp | REST & WS ※Birdeye は WS も可 |
pandas + numpy | 5 min ローリング σ 計算 |
duckdb | 軽量ストア(出来高ベースライン保持) |
slack_sdk | アラート送信 |
1. データソース
| 種別 | エンドポイント | 備考 |
|---|---|---|
| Solana DEX | https://public-api.birdeye.so/defi/history_price?address=<TOKEN>&from=<ts>&to=<ts> (Price, Volume) docs.birdeye.so | X-API-KEY ヘッダ必須・Free 枠あり |
| マルチチェーン DEX | https://api.dexscreener.com/token-profiles/latest/v1(Pair list & metadata) docs.dexscreener.com + latest/dex/pairs/{chainId}/{pairId}(ticker & 24 h vol) | 無料・60 r/m |
| CEX Bybit | GET /v5/market/tickers?category=spot&symbol=XXXX or linear bybit-exchange.github.io | 公開 API・key 不要 |
| CEX Binance | GET /api/v3/ticker/price?symbol=XXXXUSDT(スポット) binance-docs.github.io | 公開 API |
| Orderbook depth | Bybit /v5/market/orderbook?symbol=XXXX&limit=50 Spot / Perps bybit-exchange.github.io | 50 level で十分 |
Binance /api/v3/depth?symbol=XXXXUSDT&limit=100 |
トークンアドレス ↔ シンボルのマッピング
DexScreener のbaseToken.symbolを優先 → 失敗時は独自 YAML を上書き。
2. アーキテクチャ
flowchart TD
%% nodes
DEX[DEX Feeder]
CEX[CEX Feeder]
NORM["Normalizer (symbol, price, volume)"]
SPREAD[Spread Engine]
DEPTH[Depth Check]
ACTION["Slack / Bot Action"]
%% edges
DEX -- poll/websocket --> NORM
NORM -- join on symbol --> SPREAD
CEX -- REST 1 s --> SPREAD
SPREAD -- spread > 1% and vol > sigma --> DEPTH
DEPTH -- depth ok --> ACTION
3. コアロジック (spread_watch.py 抜粋)
import time, asyncio, requests, pandas as pd, numpy as np
from slack_sdk.webhook import WebhookClient
SPREAD = 0.01 # 1 %
VOL_LOOKBACK = 12 # 12 * 5 s = 1 min 例
alert_hook = WebhookClient("<SLACK_WEBHOOK_URL>")
# ---- tiny rolling buffer for volume baseline ----
class VolBaseline:
def __init__(self, size=60): # 5 s × 60 = 5 min
self.size = size
self.vols = []
def update(self, v):
self.vols.append(v)
if len(self.vols) > self.size:
self.vols.pop(0)
return np.std(self.vols) if len(self.vols) > 10 else 0
vol_map = {} # symbol -> VolBaseline
async def poll_dex(symbol, pair_id, chain_id):
while True:
j = requests.get(f"https://api.dexscreener.com/latest/dex/pairs/{chain_id}/{pair_id}",
timeout=5).json()
p = float(j["pairs"][0]["priceUsd"])
v = float(j["pairs"][0]["volume"]["h24"]) / 288 # 5 min ≒ 1/288
ts = time.time()
yield {"symbol": symbol, "price": p, "vol5": v, "ts": ts}
await asyncio.sleep(5) # 5 s poll
async def poll_cex(symbol):
while True:
j = requests.get("https://api.bybit.com/v5/market/tickers",
params={"category":"spot","symbol":symbol},
timeout=5).json()
p = float(j["result"]["list"][0]["lastPrice"])
ts = time.time()
yield {"symbol": symbol, "price": p, "ts": ts}
await asyncio.sleep(5)
async def watcher():
dex_q, cex_q = asyncio.Queue(), asyncio.Queue()
# 例:WEN‐USDC(Solana)を監視
asyncio.create_task(poll_dex("WENUSDC","8jwv...pairAddr","solana"))
asyncio.create_task(poll_cex("WENUSDC"))
books = {} # latest prices
while True:
for q in (dex_q, cex_q):
if not q.empty():
d = q.get_nowait()
key = (d["symbol"], "dex" if "vol5" in d else "cex")
books[key] = d
for sym in set(k[0] for k in books):
try:
dex = books[(sym,"dex")]; cex = books[(sym,"cex")]
except KeyError: continue
spread = abs(dex["price"]-cex["price"])/cex["price"]
std = vol_map.setdefault(sym, VolBaseline()).update(dex["vol5"])
if spread > SPREAD and dex["vol5"] > std:
if await depth_ok(sym, dex["price"]):
alert_hook.send(text=f"*ARBITRA* {sym}\n"
f"DEX ${dex['price']:.4f} vs CEX ${cex['price']:.4f}"
f" | Spread {spread*100:.2f}%\n"
f"5mVol {dex['vol5']:.2f} > σ {std:.2f}")
await asyncio.sleep(1)
注目ポイント
- Volume の基準値:直近 5 分の rolling σ。草トークンは閾値を上げ過ぎない。
- Latency:poll 5 s 例。WebSocket (
wss://public-api.birdeye.so/socket) への置き換えで sub‑sec 化可 docs.birdeye.so - Spread 1 %:流動性が薄いほど ask–bid 開きを埋めるコストが大きいので 1 % 以上を推奨。
4. 部分約定対策 — 深さ 2〜3 tick の板厚
def depth_ok(symbol, p_dex, max_slippage=0.005):
# Bybit depth
ob = requests.get("https://api.bybit.com/v5/market/orderbook",
params={"symbol":symbol,"limit":50}, timeout=5).json()
asks = [(float(pr), float(sz)) for pr,sz,*_ in ob["result"]["a"][:3]]
size_needed = 500 # USD
cost = 0
for pr,sz in asks:
slice_sz = min(sz, size_needed/(pr))
cost += slice_sz*pr
size_needed -= slice_sz*pr
if size_needed <= 0:
break
slip = (cost/(500) - p_dex)/p_dex
return slip <= max_slippage
DEX 側はオーダーブック API が無い場合が多いので 流動性 (liquidityUSD) で代用。
5. ハイパーパラメータ & 運用
| 変数 | 初期値 | チューニング指針 |
|---|---|---|
SPREAD | 0.01 (1 %) | 草コインなら 2〜3 % でもヒットする |
VOL_LOOKBACK | 60 (5 min) | 暑い相場で 30、閑散なら 120 |
depth slippage | 0.5 % | CEX 側 Maker Rebate を考慮して調整 |
- False‑positive 減:
spread & volの二重条件+depth_okでほぼ撲滅。 - Rate Limit:DexScreener 60 r/m、Birdeye
/history_price100 r/s (だが API Key枠) → 5 s poll × 20 pair で安全。 - Time Sync:秒単位の Arbi は NTP sync 推奨 (
chrony).
6. ディレクトリ例
dex_cex_spread/ ├── data/volume.duckdb ├── src/ │ ├── feeders.py │ ├── spread_watch.py │ └── depth_check.py ├── config/tokens.yaml ├── charts/ # optional可視化 └── .env # API keys
7. 次フェーズ🚀
- Multi‑thread / asyncio gather:50 pair 同時監視でも CPU < 20 %.
- Cross‑DEX Spread:同チェーン DEX‑間 乖離も同ロジックで横展開。
- Auto‑Hedge Bot:Slack → Function URL → Trader コンテナへ POST。
- ML Ranking:前手法 2 の Stable Flow 指標を特徴量に統合し “事前スコア” を付与。
まとめ
Birdeye・DexScreener → DEX 価格 / Volume、Bybit・Binance → CEX 価格 を同時ストリーム。Spread > 1 % & Vol(5 m) > σ を満たしたら orderbook 深さチェック → 通知/自動エントリ。
これで 「誰よりも早く気づいて、でも無駄撃ちせずに入る」 最小アビトラ・レーダーが完成します。
【Phase4】ブリッジ/CEX 出入口クラスタリング 🕵️♂️― 実装ロードマップ
目的
① DEX⇄CEX アービトラの主戦プレイヤーを特定し、② 彼らの資金移動ルート(ブリッジ・入金先)をグラフで可視化、③ 再現性の高い「次の動き」を予測できるデータセットを得る。
1. 必要なデータソース & 前準備
| 区分 | エンドポイント/データ | 用途 |
|---|---|---|
| DEX 取引詳細 | Birdeye:/defi/transactions?address=<TOKEN>&from=<ts>&to=<ts>(Solana) ※WS も可 docs.birdeye.so | 該当時刻の Tx ハッシュ取得 |
Dex Screener:/token-profiles/latest/v1 → pairId を取得し /latest/dex/pairs/{chainId}/{pairId} で Tx リスト docs.dexscreener.com | ||
| チェーン Explorer | Solscan API / Etherscan module=account&action=txlist&address= でフル Tx データ Etherscan | from / to / Logs 抽出 |
| CEX ラベル | 自前 JSON(Binance/Bybit/OKX 等ホット & Deposit アドレス)+公開リスト Binance | Deposit 判定キー |
| ブリッジ辞書 | https://defillama.com/bridges + API /bridge 一覧で contract ↔ dest chain マップ DefiLlama | |
| 保管 DB | DuckDB(軽量) or Postgres / Neo4j(後でグラフ分析するなら) |
2. ワークフロー全体像
flowchart TD
%% Nodes
EVT[Event step3 timestamp]
S1["(1) DEX Tx hash 取得"]
S2["(2) Chain Tx 詳細取得"]
S3["(3) CEX or Bridge 判定"]
S4["(4) Address graph 生成"]
S5["(5) Clustering and visualization"]
%% Edges
EVT --> S1
S1 --> S2
S1 --> S3
S2 --> S4
S3 --> S4
S4 --> S5
3. ステップ別実装詳細
3‑1. DEX Tx ハッシュ取得
import requests, time
def get_dex_txs_sol(token_addr, t0, t1):
url = ("https://public-api.birdeye.so/defi/transactions"
f"?address={token_addr}&from={t0}&to={t1}&limit=500")
j = requests.get(url, headers={"X-API-KEY": "..."}).json()
return [tx["txHash"] for tx in j["data"]["transactions"]]
t0 / t1:イベント時刻±90 s が目安。
3‑2. チェーン Tx 詳細取得
import aiohttp, asyncio
ETHERSCAN_KEY="..."
async def fetch_tx_eth(txhash):
url=("https://api.etherscan.io/api"
f"?module=proxy&action=eth_getTransactionByHash&txhash={txhash}"
f"&apikey={ETHERSCAN_KEY}")
async with aiohttp.ClientSession() as s:
async with s.get(url,timeout=10) as r:
j=await r.json(); return j["result"]
取るべきフィールド
{
"hash": "...",
"from": "0x...", // 発信者
"to": "0x...", // コントラクト or EO
"value": "0x...", // Native amount
"input": "0x...", // ブリッジ判定に使う
"logs": [...] // ブリッジ転送 or Deposit Event
}
3‑3. CEX Deposit / ブリッジ判定
CEX_MAP = json.load(open("cex_labels.json")) # addr → {"cex":"Binance", ...}
BRIDGE_MAP = json.load(open("bridge_contracts.json")) # addr → {"bridge":"Wormhole","dst":"Solana"}
def classify(tx):
if tx["to"] in CEX_MAP:
tag = ("CEX", CEX_MAP[tx["to"]]["cex"])
elif tx["to"] in BRIDGE_MAP:
dst = BRIDGE_MAP[tx["to"]]["dst"]
tag = ("BRIDGE", dst)
else:
tag = ("OTHER", None)
return tag
Bridging の場合、ログ内に destinationChainId や recipient があるので抽出して「反対側アドレス」を追跡 queue に追加。
3‑4. グラフ構築 & クラスタリング
import networkx as nx, duckdb
G = nx.MultiDiGraph()
for tx in parsed_txs:
src = tx["from"]; dst = tx["to"]; ttag, meta = classify(tx)
G.add_edge(src, dst, label=ttag, ts=tx["timestamp"], meta=meta)
# ❶ CEX Deposit クラスタ → 入金アドレス単位で Union‑Find
cex_clusters = {}
for u,v,d in G.edges(data=True):
if d["label"] == "CEX":
cex_addr = v # deposit
cex_clusters.setdefault(cex_addr, set()).add(u)
# ❷ Bridge ペア → src ↔ dstAddress を 2‑hop 合流で束ねる
for u,v,d in G.edges(data=True):
if d["label"] == "BRIDGE":
dst_addr = d["meta"]["dst_wallet"]
G.add_edge(u, dst_addr, label="BRIDGED_TO", ts=d["ts"])
# Louvain / Connected Components
clusters =
3‑5. 可視化 & レポート
import matplotlib.pyplot as plt, networkx as nx
for i,cl in enumerate(clusters):
sub = G.subgraph(cl)
nx.draw(sub, with_labels=False, node_size=50, edge_color="grey")
plt.title(f"Cluster {i} ({len(sub.nodes())} wallets)")
plt.savefig(f"charts/cluster_{i}.png")
Slack 送付:slack_sdk で files.upload.
4. 実運用 Tips
| テーマ | 推奨 |
|---|---|
| CEX アドレス更新 | DeBank / Arkham / Chainabuse が公開する “Exchange Tag JSON” を月次で pull → diff |
| 新ブリッジ対応 | DeFiLlama /bridge エンドポイントを毎週 Snapshot → 新規 contract が増えたら自動追登録 DefiLlama |
| Gas & Rate Limit | RPC→Alchemy/Infura の batch & multicall でコスト削減。Etherscan free は 5 r/sec → backoff。 |
| ミラー入金 | 同一 Tx 内で memo/tag を持つ CEX (XRP/ADA 等) は Memo ID までキーにすると誤結合を防げる。 |
| グラフ DB | Neo4j+Bloom を使うと WebGUI で資金ルートのアニメ再生が可能。 |
5. 次ステップ
- 時間窓集計:クラスタごとに「PoC → Bridge → CEX」完了までの平均レイテンシを計測。
- 行動シグネチャ:入金額・ブリッジ先・チェーン滞在時間を特徴量に k‑Means → “高速狩人” vs “遅延リスクヘッジャ” を分類。
- リアルタイム Hook:③のスプレッド検知→該当ウォレットが出現→“Follow 取引” を自動トリガー。
これで 「イベント検知 → アビトラ勢ウォッチ → 出入口クラスタリング」 の基盤が完成します。
【Phase5】“イベント前兆” モデル化 📈 — フル実装ガイド
ゴールの再確認
* 90 日分のイベントメタデータ (手法 ①〜④) を学習
* ラベル = 「検知から 15 分以内に 0.5 % 以上の利幅 (gross edge) が取れたか」<br>⇒ 2 値分類
* CatBoost or RandomForest を使い、SHAP で効いている特徴を確認 → モデル縮小
* 推論スコア上位 N 件を Shadow Mode で紙トレードし、結果を再学習にフィードバック
0. ディレクトリ雛形
edge_model/ ├── data/ # DuckDBファイル & parquet │ ├── events.duckdb # ①〜④で生成 │ └── price_ticks.parquet ├── notebooks/ ├── src/ │ ├── build_dataset.py │ ├── train_model.py │ └── score_live.py ├── models/ │ └── cat_v1.cbm └── config.yaml
DuckDB & Parquet で I/O 軽量化、モデルは catboost.CatBoostClassifier を保存
1. イベント & ラベルテーブル を作る
-- events.duckdb に集約済みと仮定
CREATE OR REPLACE TABLE edge_events AS
SELECT event_id,
ts_event, -- 秒精度
symbol,
-- 特徴例
delta_tvl_pct,
delta_stable_pct,
vol_sigma,
token_age_days,
bridged_tvl_ratio
FROM tvl_snap -- ← 手法①
JOIN stable_snaps USING(snapshot_date)
JOIN dex_vol_snaps USING(symbol, snapshot_date)
JOIN bridge_stats USING(symbol, snapshot_date);
1‑A. ラベル付け
CREATE OR REPLACE TABLE labels AS
SELECT e.event_id,
CASE WHEN abs(p15.price - p0.price)/p0.price >= 0.005
THEN 1 ELSE 0 END AS y
FROM edge_events e
LEFT JOIN LATERAL (
SELECT price FROM price_ticks
WHERE symbol = e.symbol AND ts BETWEEN e.ts_event AND e.ts_event + 60
ORDER BY ts LIMIT 1) p0
LEFT JOIN LATERAL (
SELECT price FROM price_ticks
WHERE symbol = e.symbol AND ts BETWEEN e.ts_event+900 AND e.ts_event+960
ORDER BY ts LIMIT 1) p15
15 min 後 (±60 s バッファ) の価格で 0.5 % 以上 動いていれば y=1
2. データセット生成スクリプト (build_dataset.py)
import duckdb, pandas as pd, pathlib
con = duckdb.connect("data/events.duckdb")
df = con.sql("""
SELECT e.*, l.y
FROM edge_events e JOIN labels l USING(event_id)
WHERE ts_event >= date '2025-04-24' - INTERVAL 90 DAY
""").df()
# カテゴリ→string、欠損→0、対数変換など
cat_cols = ["symbol"]
num_cols =
df[num_cols] = df[num_cols].fillna(0).astype("float32")
df.to_parquet("data/train_90d.parquet", index=False)
3. 時系列ウォークフォワードCV で検証
“未来リーク” を防ぐため TimeSeriesSplit / Walk‑Forward を利用 Kaggle
from sklearn.model_selection import TimeSeriesSplit
import pandas as pd, numpy as np, catboost as cb, shap, joblib, json
df = pd.read_parquet("data/train_90d.parquet")
X = df.drop(columns=["y","event_id","ts_event"])
y = df["y"]
cv = TimeSeriesSplit(n_splits=5, test_size= int(len(df)*0.1))
scores = []
for i, (tr, te) in enumerate(cv.split(X)):
model = cb.CatBoostClassifier(
depth=6, learning_rate=0.05, iterations=400,
cat_features=[X.columns.get_loc(c) for c in ["symbol"]],
eval_metric="AUC", verbose=False, random_state=42
)
model.fit(X.iloc[tr], y.iloc[tr], eval_set=(X.iloc[te], y.iloc[te]), verbose=False)
auc = model.get_best_score()["validation"]["AUC"]
scores.append(auc)
print("Walk‑forward AUC:", np.mean(scores).round(3))
model.save_model("models/cat_v1.cbm")
4. SHAP で特徴寄与を確認 → 削減
explainer = shap.TreeExplainer(model) # CatBoostは直接OK :contentReference[oaicite:1]{index=1}
shap_vals = explainer.shap_values(X.sample(1000))
shap.summary_plot(shap_vals, X) # Notebookで確認
# 重要度が低い (<0.01) 特徴を除外
imp = np.abs(shap_vals).mean(axis=0)
keep = X.columns[imp > 0.01]
json.dump(keep.tolist(), open("config/feature_whitelist.json","w"))
5. 再学習 & 軽量モデル
whitelist = json.load(open("config/feature_whitelist.json"))
X_reduced = X[whitelist]
model_small = cb.CatBoostClassifier(
depth=5, iterations=300, learning_rate=0.08,
cat_features=[whitelist.index("symbol")],
verbose=False, random_state=42)
model_small.fit(X_reduced, y)
model_small.save_model("models/cat_v2.cbm")
6. リアルタイム推論 & Shadow Mode (score_live.py)
import catboost as cb, yaml, time, duckdb
model = cb.CatBoostClassifier()
model.load_model("models/cat_v2.cbm")
watch_chains = yaml.safe_load(open("config.yaml"))["watch_chains"]
def gen_features(event_id):
# ①〜④のストリームから同等の特徴量を構築して返す
...
while True:
events = poll_new_events(chains=watch_chains) # 手法③が出力
for ev in events:
x = gen_features(ev["id"])[whitelist]
score = model.predict_proba(x)[0,1]
if score > 0.65: # 適宜閾値調整
enqueue_shadow_trade(ev, score)
time.sleep(1)
Shadow trade 結果 (Fill・PnL) を duckdb.table('shadow_results') へ格納 → 次回学習で y 列を更新
7. 運用 & 継続学習パイプライン
| フェーズ | スケジュール | 内容 |
|---|---|---|
| データ更新 | 毎 5 min | 手法①〜④ のメタテーブルに追記 |
| ラベル確定 | +16 min バッチ | 15 min 後の利幅を計算し y を確定 |
| モデル再学習 | 週次(月〜金累積) | 直近 90 d データで cat_vN+1 再学習 ➜ SHAP ➜ Feature Pruning |
| 閾値最適化 | 毎再学習 | ROC Youden / Precision‑Recall で β を決定 |
| Shadow 評価 | 常時 | Top N=10 イベントを紙トレード → PnL 補正を DB に追加 |
| 本番投入 | 累積 PnL > 閾値 & hit率 > X% | Trader コンテナへ自動ルーティング |
MLflow や Weights & Biases の offline=True でメタデータを残すと効果検証が楽
8. モデル運用の注意点
| リスク | 対策 |
|---|---|
| クラス不均衡 | scale_pos_weight (CatBoost) ≈ (neg/pos) で補正 |
| 未来リーク | “特徴計算に後続データが混入していないか” を定期チェック |
| データシフト | vol_sigma や delta_tvl_pct の分布を KS‑test → 逸脱で閾値↑ |
| 実装ドリフト | モデルの Feature List (JSON) と 推論側 mapping を Git hash で照合 |
9. 成果物まとめ
| ファイル | 役割 |
|---|---|
data/train_90d.parquet | 学習用データセット |
models/cat_v2.cbm | SHAP 後の軽量モデル |
config/feature_whitelist.json | 採用特徴リスト |
charts/shap_summary.png | 重要度可視化 |
shadow_results.duckdb | 紙トレード検証ログ |
次の拡張アイデア
* CatBoost → LightGBM GPU でレイテンシ短縮
* Meta‑feature: 「同一ウォレットの直近 24 h ROI」
* バンディット (ALG = Thompson Sampling) で 閾値 adapt
これで イベント前兆を“数字”で語るモデル基盤 が完成します。