Bot CEX DEX 戦略ログ 開発ログ

🛠️開発記録#267(2025/7/24)「アビトラエッジの探し方」を読んでからやっていることまとめ

2025年7月24日

先日、Xで大変興味深いポストを見かけました。現状、私ができることをアイデアレベルでまとめて取り組んでいるので、自身の定点観測&ロードマップとして記録しておきます。

いまから手を動かすなら──5つの “はじめの一歩”

下に行くほど難易度UP。まず①②の 「差分ウォッチ系」 を回し、データ取得・ストレージ・可視化の土台を固めてから③〜⑤へ進む。


1. TVL 急変アラート 🔍

ねらい:プロトコル/チェーンの“資金流入超過”に最速で気づく。

  1. データ取り
    • GET https://api.llama.fi/protocols で全プロトコルの最新 TVL と 1 d / 7 d 変化率を取得 GitHub
    • 24 h 毎にスナップショットを保存(SQLite → DuckDB で OK)。
  2. 異常検知
    • 「前日比 +30 % 以上」を閾値に outlier を抽出。
    • 急伸したチェーン or カテゴリ (Lending/Perps 等) を絞り込む。
  3. 深掘り
    • 該当プロトコルの /tvl/{protocol} でヒストリカル推移を取得し “出来高 vs TVL” の乖離をチェック api-docs.defillama.com
    • 出来高が伴わず TVL だけ跳ねているなら入出金サヤやステーキング・アンステーキングに伴う裁定が隠れていることが多い。

2. ステーブルコイン流入/流出マップ 🌊

ねらい:資金の「入口」「出口」を視覚化し、先回りで流動性シフトを捕捉。

  1. チェーン別循環量
    • DeFiLlama の stablecoins/chains ダッシュボードを API で定期取得(HTML スクレイピングでも JSON 化可) DefiLlama
  2. 1 d / 7 d 差分 をヒートマップ描画し 流入上位チェーン を毎朝 Slack 通知。
  3. ブリッジ TVL も合わせて見ると、どのブリッジ経由で資金が動いたか推測しやすい DefiLlama
  4. 流入チェーン上位3つを DEX‐CEX 価格差スキャン(③で解説) の対象に追加。

3. DEX ボリューム急増 × CEX 価格乖離 レーダー ⚡

ねらい:板の薄い草トークンや新規上場銘柄で“瞬間アービトラ”を狙う。

  1. リアルタイム価格源
    • Solana 系:Birdeye GET /defi/history-price?address= など(無料枠あり) docs.birdeye.so
    • マルチチェーン:Dex Screener GET /token-profiles/latest/v1 でペア一覧と価格・流動性を取得 docs.dexscreener.com
  2. CEX 価格
    • Bybit / Binance の REST v5/market/tickers などで同一銘柄のスポット・パーペツ比を取得。
  3. ロジック
    • |P_dex − P_cex| / P_cex > 1 % かつ 5 min ボリューム > 1 σ でアラート。
    • スプレッド検出後は 部分約定対策 として深さ 2〜3 tick までの板厚を同時チェック。

4. ブリッジ・CEX 出入口クラスタリング 🕵️‍♂️

ねらい:上位アービトラ勢の“行動パターン”を盗み見る。

  1. ウォレット抽出
    • ③で拾ったイベント時刻に Birdeye / Dex Screener から Tx ハッシュ → solscan / ethscan で呼び出し元アドレスを列挙。
  2. CEX Deposit 追跡
    • アウトバウンド Tx の to が既知 CEX アドレスなら、その CEX の 入金用ラベル をキーにアドレスを横展開。
    • CEX 側はアドレス使い回し率が高いため、クラスター化が容易。
  3. ブリッジ追跡
    • Tx にブリッジコントラクトが絡む場合は DeFiLlama Bridged TVL ページの 対象チェーン を手掛かりに反対側を探索。

5. “イベント前兆” の統計モデル化 📈

ねらい:①〜④で得たメタデータを特徴量に、勝率の高いパターン を回帰/クラスタリングで抽出。

  1. 特徴例
    • ΔTVL%ΔStablecoinFlow%DEX Volume Spike σToken AgeBridged TVL Ratio など。
  2. 手順
    • 90 日間データを学習 → ラベルは “エッジ発生後 15 min で 0.5 % 以上利幅が取れたか”。
    • ランダムフォレスト or CatBoost で SHAP 値 を確認し「効いている指標」だけ残す。
  3. 運用
    • スコア上位 n 件をShadow Mode で紙トレードし、真の正例/負例を追加学習。

最小セットアップ例(Python)

pip install requests pandas duckdb
import requests, pandas as pd, duckdb, datetime as dt

# 1) TVL snapshot
today = dt.date.today().isoformat()
data = requests.get("https://api.llama.fi/protocols").json()
df = pd.json_normalize(data)[["name","chain","tvl","change_1d","change_7d"]]
duckdb.sql("CREATE TABLE IF NOT EXISTS tvl AS SELECT * FROM df WHERE 1=0")
duckdb.sql("INSERT INTO tvl SELECT * FROM df")

まずはこれで “毎日 TVL を溜める” パイプラインを cron で回すだけでも十分価値があります。


どこから始める?

  1. TVL スナップショット → 急変ヒートマップ(最速で ROI が見える)
  2. Stablecoin Flow ヒートマップ(資金の流入口を可視化)
  3. DEX/CEX スプレッドスキャナ(即金チャンス)

上記3つが動き出したら、ウォレットクラスタリング → 機械学習による予兆検知 へ発展させる。

次のアクション

優先度TODO目標所要
★★★DeFiLlama TVL Cron ジョブ & DuckDB 保存0.5 d
★★☆Stablecoin Flow 差分ヒートマップ1 d
★★☆Dex Screener/Birdeye × CEX スプレッド比較スクリプト1 d
★☆☆ウォレットクラスタリング PoC2 d
★☆☆統計モデルの特徴量設計2 d

「まず 1 件でいいから“数字の変化”を捕まえる」 ─ そこから洞察と改良のループを回すことがスタートライン。「アビトラbotで私の主力bot群の補完をしたり、新たな稼ぎの柱を増やしたりする」という狙いがあるので、以下のようなロードマップを大まかな指針としてマイペースにやっています。

【Phase1】TVL 急変アラート ― 実装ガイド

(Python×DuckDB×Cron+任意で Slack 通知)


0. 前提環境

要素推奨
OSLinux / macOS (ARM/Intel)
Python≥ 3.10
主要ライブラリrequests pandas duckdb pyyaml
デプロイ1 GB RAM VM で充分(スナップショットは数 MB/日)
python -m venv tvl_env && source tvl_env/bin/activate
pip install requests pandas duckdb pyyaml

1. データ取得スクリプト(fetch_tvl.py

import requests, duckdb, datetime as dt, pandas as pd, pathlib, time

DB_PATH = pathlib.Path("data/tvl_history.duckdb")

def snapshot_today():
    url = "https://api.llama.fi/protocols"      # 一覧エンドポイント :contentReference[oaicite:0]{index=0}
    resp = requests.get(url, timeout=30)
    resp.raise_for_status()
    df = pd.json_normalize(resp.json())

    # 欲しい列だけ残す
    keep = ["id", "name", "chain", "category", "tvl", "change_1d", "change_7d"]
    df = df[keep].assign(snapshot_date=dt.date.today().isoformat())

    with duckdb.connect(DB_PATH) as con:
        con.sql("""
            CREATE TABLE IF NOT EXISTS tvl_snapshots AS
            SELECT * FROM df WHERE 1=0
        """)
        con.register("df", df)
        con.sql("INSERT INTO tvl_snapshots SELECT * FROM df")
    print("✅  Saved", len(df), "rows")

if __name__ == "__main__":
    t0 = time.time()
    snapshot_today()
    print("⏱️", round(time.time() - t0, 2), "s")

ポイント

  • API は無認証・無料枠(現行 /protocols は ~4 000 B/req)
  • スナップショットは 毎日 1 レコード × プロトコル数 → 1 年でも <30 MB
  • id はドキュメント非公開のことが多いため、name で JOIN しても OK(ただし重複に注意)

2. スケジューリング

Cron(ローカル/EC2 いずれでも)

0 3 * * * /path/to/tvl_env/bin/python /path/to/fetch_tvl.py >> /var/log/tvl_fetch.log 2>&1
  • UTC 0 3:00日本時間 12:00 を例示。API 制限ほぼ無しだが混雑時間帯は避けると安心。
  • GitHub Actions で回す場合は on: schedule: cron: '0 3 * * *' で同様。

3. 異常検知クエリ(detect_outliers.py

import duckdb, datetime as dt

THRESHOLD = 30        # +30 % 以上
LOOKBACK = 1          # 前日比

with duckdb.connect("data/tvl_history.duckdb") as con:
    today = dt.date.today().isoformat()
    prev  = (dt.date.today() - dt.timedelta(days=LOOKBACK)).isoformat()

    query = f"""
    WITH today AS (
        SELECT id, name, chain, category, tvl
        FROM tvl_snapshots WHERE snapshot_date = '{today}'
    ), prev AS (
        SELECT id, tvl AS tvl_prev
        FROM tvl_snapshots WHERE snapshot_date = '{prev}'
    )
    SELECT t.name, t.chain, t.category,
           ROUND( (t.tvl - p.tvl_prev)*100.0/p.tvl_prev, 2 ) AS pct_change,
           t.tvl
    FROM today t
    JOIN prev p USING(id)
    WHERE (t.tvl - p.tvl_prev)*100.0/p.tvl_prev >= {THRESHOLD}
    ORDER BY pct_change DESC
    """
    alerts = con.sql(query).df()

*返り値 alerts が空でなければ急伸銘柄
フォールバック:前日データが無いときは COALESCE で 0 割り込みを防ぐ。

Slack 通知(任意)

import os, json, requests
if not alerts.empty:
    text = "*🚨 TVL spike alert*\n" + alerts.to_markdown(index=False)
    requests.post(
        os.environ["SLACK_WEBHOOK"],
        headers={"Content-Type": "application/json"},
        data=json.dumps({"text": text})
    )

4. 急伸チェーン/カテゴリでフィルタリング

agg = alerts.groupby(["chain","category"]).size().reset_index(name="count")
print(agg.sort_values("count", ascending=False).head())
  • ここで Lending / Perps など特定カテゴリが固まって急増 → “狙い目”。

5. 深掘り:ヒストリカル TVL 取得

import requests, pandas as pd, matplotlib.pyplot as plt, datetime as dt

slug = "abracadabra"                     # 例:急伸プロトコル名
hist = requests.get(f"https://api.llama.fi/tvl/{slug}").json()   # :contentReference[oaicite:1]{index=1}
df = pd.DataFrame(hist["tvl"])
df["date"] = pd.to_datetime(df["date"], unit="s")
df.plot(x="date", y="totalLiquidityUSD", title=f"TVL History – {slug}")
plt.show()

「出来高 vs TVL」 乖離チェック

  • 出来高は /summary/dexs/{protocolSlug} など Volume API を利用 GitHub
  • 直近 24 h TVL 比で Volume/TVL < 5 % なら 純入金 > 取引 の可能性
  • 取引伴わず TVL だけ膨張 → ブリッジ入金 or ステーキング移行 の匂い

6. ディレクトリ構成例

tvl_monitor/
├── data/                      # DuckDB ファイル
├── fetch_tvl.py
├── detect_outliers.py
├── charts/                    # 画像出力 (optional)
├── .env                       # SLACK_WEBHOOK 等
└── cron.txt                   # crontab 用

7. ハードニング & 運用 Tips

項目推奨
API 落ち対策try/except+リトライ (time.sleep(5))
DB VACUUM30 d に一度 DuckDBoptimize
テーブル肥大化tvl_snapshots をパーティション (snapshot_date 列) で Z-order
閾値チューニング四分位距離 IQR や Z-score で自動補正にすると β→本番 の移行が楽

8. 次のステップ

  1. Stablecoin 流入ヒートマップ(手法 2)
  2. DEX/CEX スプレッドスキャナ(手法 3)
  3. 検知→自動バックテスト Pipeline(手法 5 への布石)

“まず毎日 1 枚のスナップショットを貯め始める”―ここから エッジの霧 が晴れていく。

【Phase2】ステーブルコイン流入/流出マップ ─ 実装ステップ

(Python × DuckDB × matplotlib + Slack 通知)

ゴール

  • チェーン別のステーブルコイン循環量を毎日スナップショット → 1 d/7 d 差分をヒートマップ化
  • 流入上位チェーンを朝イチで Slack 通知
  • 併せてブリッジ TVL を照合し、どのブリッジ経由で資金が動いたかを推測
  • 上位 3 チェーンを ③ DEX‑CEX スプレッドスキャナ の監視対象に自動追加

0. 事前セットアップ

python -m venv stable_env && source stable_env/bin/activate
pip install requests pandas duckdb matplotlib pyyaml slack_sdk
依存推奨バージョン
Python≥ 3.10
DuckDB0.9 系以降(pip install duckdb==0.10.0 でも可)

1. データ取得 (fetch_stables.py)

# coding: utf-8
"""
毎日 1 回、各チェーンのステーブルコイン循環量を取得して DuckDB に追記
"""

import requests, pandas as pd, duckdb, datetime as dt, pathlib, time

DB = pathlib.Path("data/stables.duckdb")
URL = "https://stablecoins.llama.fi/stablecoinchains"  # JSON:チェーン別循環量
TODAY = dt.date.today().isoformat()

def main():
    js = requests.get(URL, timeout=30).json()  # ← 無認証・無料エンドポイント
    df = pd.DataFrame(js["chains"])            # key: chains

    keep = ["name","peggedUsd","change_1h","change_1d","change_7d"]
    df = df[keep].assign(snapshot_date=TODAY)

    with duckdb.connect(DB) as con:
        con.sql("""CREATE TABLE IF NOT EXISTS stable_snaps AS
                   SELECT * FROM df WHERE 1=0""")
        con.register("df", df)
        con.sql("INSERT INTO stable_snaps SELECT * FROM df")
    print(f"✅  {len(df)} rows saved for {TODAY}")

if __name__ == "__main__":
    t0 = time.time(); main(); print("⏱️", round(time.time()-t0,2), "s")

チェーン別循環量は DefiLlama ダッシュボード “Stablecoins > Chains” で公開中 DefiLlama


2. 差分計算+ヒートマップ (diff_heatmap.py)

import duckdb, pandas as pd, seaborn as sns, matplotlib.pyplot as plt, datetime as dt

DB = "data/stables.duckdb"
today = dt.date.today()
prev1 = today - dt.timedelta(days=1)
prev7 = today - dt.timedelta(days=7)

query = f"""
WITH t AS (SELECT name, peggedUsd FROM stable_snaps
           WHERE snapshot_date = '{today.isoformat()}'),
     y1 AS (SELECT name, peggedUsd AS usd_1d FROM stable_snaps
            WHERE snapshot_date = '{prev1.isoformat()}'),
     y7 AS (SELECT name, peggedUsd AS usd_7d FROM stable_snaps
            WHERE snapshot_date = '{prev7.isoformat()}')
SELECT t.name,
       ROUND(100*(t.peggedUsd - y1.usd_1d)/y1.usd_1d ,2) AS pct_1d,
       ROUND(100*(t.peggedUsd - y7.usd_7d)/y7.usd_7d ,2) AS pct_7d
FROM t JOIN y1 USING(name) JOIN y7 USING(name)
"""

df = duckdb.sql(query).df().set_index("name")
top = df.sort_values("pct_1d", ascending=False).head(10)   # 通知用

# ヒートマップ
plt.figure(figsize=(10,6))
sns.heatmap(df[["pct_1d","pct_7d"]], annot=False, center=0)
plt.title(f"Stablecoin Flow % Change ({today})")
plt.xticks(rotation=0)
plt.tight_layout()
plt.savefig(f"charts/stable_heat_{today}.png")

Slack 通知 (任意)

from slack_sdk.webhook import WebhookClient, SlackWebhookError
hook = WebhookClient("https://hooks.slack.com/services/XXX/YYY/ZZZ")
text = "*Stablecoin inflow top 5 (1d %)*\n" + top.head(5).to_markdown()
hook.send(text=text)

3. ブリッジTVL照合 (bridge_lookup.py)

import requests, pandas as pd, duckdb

BRIDGE_URL = "https://bridges.llama.fi/bridges?excludeTotal=false"
br = pd.DataFrame(requests.get(BRIDGE_URL).json()["bridges"])

# ① 上位 inflow チェーン
chains = duckdb.sql("SELECT name FROM stable_snaps WHERE snapshot_date = CURRENT_DATE ORDER BY peggedUsd DESC LIMIT 3").df()["name"].tolist()

# ② 各チェーンで TVL が増えたブリッジ抽出
res = (br[br["chain"].isin(chains)]
          .sort_values("change_1d", ascending=False)
          .head(10)[["displayName","chain","change_1d","tvl"]])

print(res)

ブリッジ TVL ダッシュボードは /bridges & /bridges/chains で公開 DefiLlama


4. ③ DEX‑CEX スキャナへの自動連携

# config.yaml などに監視チェーンを書き出す
import yaml
cfg = dict(watch_chains=chains)
yaml.safe_dump(cfg, open("config/dex_cex_chains.yaml","w"))

スキャナ側では watch_chains を読んで
各チェーンの上位トークンペアを監視 → スプレッドが 1 % 超えたらアラート、の流れで OK。


5. スケジュール例(cron)

# ❶ 03:05 JST でスナップショット
5 3 * * * /path/stable_env/bin/python /path/fetch_stables.py

# ❷ 03:10 JST で差分計算&ヒートマップ&通知
10 3 * * * /path/stable_env/bin/python /path/diff_heatmap.py

# ❸ 03:20 JST でブリッジ照合+DEX‑CEX 設定更新
20 3 * * * /path/stable_env/bin/python /path/bridge_lookup.py

6. ディレクトリ構成イメージ

stable_flow_monitor/
├── data/                  # DuckDB
├── charts/                # PNG Heatmaps
├── config/
│   └── dex_cex_chains.yaml
├── fetch_stables.py
├── diff_heatmap.py
├── bridge_lookup.py
└── .env                   # SLACK_WEBHOOK など

運用 Tips

チェックポイント補足
API 落ちrequests.get(..., timeout=30) + for _ in range(3): retry
Missing data祝日などに 0 レコードになる場合、前日値を ffill() で補完
色覚対応ヒートマップsns.heatmap(..., cmap="coolwarm") などで調整
閾値チューニング30 % → 四分位距離 IQR ベースの動的閾値にしても良い

次にやると良いこと

  1. イン/アウト額を “USD 絶対値” でもランキング
  2. 1 h 変化率で intra‑day 速報(ガス代高騰で USDC が逃げる瞬間など)
  3. 流入チェーンのブリッジ手数料 × ガス代と併せて ROI 見積もり
  4. 出来高低迷チェーンなのに資金だけ流入 → “次のエアドロ” マーク

これで 「資金の入口/出口を毎朝可視化 → 即トレード候補へ回す」 ための最小パイプラインが完成します。

【Phase3】DEX ボリューム急増 × CEX 価格乖離レーダー ─ 実装フルガイド ⚡

狙い
板の薄い草トークン/新規上場銘柄で「DEX が盛り上がるが CEX がまだ追随していない」瞬間を検知してアービトラを仕掛ける。

データフロー   DEX Price & Volume → 正規化 → CEX Price → 乖離&ボリューム検知 → オーダーブック深さチェック → 通知 & Bot へルーティング


0. 必須ライブラリ

pip install requests websockets aiohttp pandas numpy duckdb pyyaml slack_sdk
モジュール用途
requests / aiohttpREST & WS  ※Birdeye は WS も可
pandas + numpy5 min ローリング σ 計算
duckdb軽量ストア(出来高ベースライン保持)
slack_sdkアラート送信

1. データソース

種別エンドポイント備考
Solana DEXhttps://public-api.birdeye.so/defi/history_price?address=<TOKEN>&from=<ts>&to=<ts> (Price, Volume) docs.birdeye.soX-API-KEY ヘッダ必須・Free 枠あり
マルチチェーン DEXhttps://api.dexscreener.com/token-profiles/latest/v1(Pair list & metadata) docs.dexscreener.com + latest/dex/pairs/{chainId}/{pairId}(ticker & 24 h vol)無料・60 r/m
CEX BybitGET /v5/market/tickers?category=spot&symbol=XXXX or linear bybit-exchange.github.io公開 API・key 不要
CEX BinanceGET /api/v3/ticker/price?symbol=XXXXUSDT(スポット) binance-docs.github.io公開 API
Orderbook depthBybit /v5/market/orderbook?symbol=XXXX&limit=50 Spot / Perps bybit-exchange.github.io50 level で十分
 Binance /api/v3/depth?symbol=XXXXUSDT&limit=100

トークンアドレス ↔ シンボルのマッピング
DexScreener の baseToken.symbol を優先 → 失敗時は独自 YAML を上書き。


2. アーキテクチャ

flowchart TD
    %% nodes
    DEX[DEX Feeder]
    CEX[CEX Feeder]
    NORM["Normalizer (symbol, price, volume)"]
    SPREAD[Spread Engine]
    DEPTH[Depth Check]
    ACTION["Slack / Bot Action"]

    %% edges
    DEX -- poll/websocket --> NORM
    NORM -- join on symbol --> SPREAD
    CEX -- REST 1 s --> SPREAD
    SPREAD -- spread > 1% and vol > sigma --> DEPTH
    DEPTH -- depth ok --> ACTION

3. コアロジック (spread_watch.py 抜粋)

import time, asyncio, requests, pandas as pd, numpy as np
from slack_sdk.webhook import WebhookClient

SPREAD = 0.01        # 1 %
VOL_LOOKBACK = 12    # 12 * 5 s = 1 min 例
alert_hook = WebhookClient("<SLACK_WEBHOOK_URL>")

# ---- tiny rolling buffer for volume baseline ----
class VolBaseline:
    def __init__(self, size=60):  # 5 s × 60 = 5 min
        self.size = size
        self.vols = []

    def update(self, v):
        self.vols.append(v)
        if len(self.vols) > self.size:
            self.vols.pop(0)
        return np.std(self.vols) if len(self.vols) > 10 else 0

vol_map = {}      # symbol -> VolBaseline

async def poll_dex(symbol, pair_id, chain_id):
    while True:
        j = requests.get(f"https://api.dexscreener.com/latest/dex/pairs/{chain_id}/{pair_id}",
                         timeout=5).json()
        p = float(j["pairs"][0]["priceUsd"])
        v = float(j["pairs"][0]["volume"]["h24"]) / 288   # 5 min ≒ 1/288
        ts = time.time()
        yield {"symbol": symbol, "price": p, "vol5": v, "ts": ts}
        await asyncio.sleep(5)        # 5 s poll

async def poll_cex(symbol):
    while True:
        j = requests.get("https://api.bybit.com/v5/market/tickers",
                         params={"category":"spot","symbol":symbol},
                         timeout=5).json()
        p = float(j["result"]["list"][0]["lastPrice"])
        ts = time.time()
        yield {"symbol": symbol, "price": p, "ts": ts}
        await asyncio.sleep(5)

async def watcher():
    dex_q, cex_q = asyncio.Queue(), asyncio.Queue()

    # 例:WEN‐USDC(Solana)を監視
    asyncio.create_task(poll_dex("WENUSDC","8jwv...pairAddr","solana"))
    asyncio.create_task(poll_cex("WENUSDC"))

    books = {}   # latest prices
    while True:
        for q in (dex_q, cex_q):
            if not q.empty():
                d = q.get_nowait()
                key = (d["symbol"], "dex" if "vol5" in d else "cex")
                books[key] = d
        for sym in set(k[0] for k in books):
            try:
                dex = books[(sym,"dex")]; cex = books[(sym,"cex")]
            except KeyError: continue

            spread = abs(dex["price"]-cex["price"])/cex["price"]
            std = vol_map.setdefault(sym, VolBaseline()).update(dex["vol5"])
            if spread > SPREAD and dex["vol5"] > std:
                if await depth_ok(sym, dex["price"]):
                    alert_hook.send(text=f"*ARBITRA* {sym}\n"
                                          f"DEX ${dex['price']:.4f} vs CEX ${cex['price']:.4f}"
                                          f" | Spread {spread*100:.2f}%\n"
                                          f"5mVol {dex['vol5']:.2f} > σ {std:.2f}")
        await asyncio.sleep(1)

注目ポイント

  • Volume の基準値:直近 5 分の rolling σ。草トークンは閾値を上げ過ぎない。
  • Latency:poll 5 s 例。WebSocket (wss://public-api.birdeye.so/socket) への置き換えで sub‑sec 化可 docs.birdeye.so
  • Spread 1 %:流動性が薄いほど ask–bid 開きを埋めるコストが大きいので 1 % 以上を推奨。

4. 部分約定対策 — 深さ 2〜3 tick の板厚

def depth_ok(symbol, p_dex, max_slippage=0.005):
    # Bybit depth
    ob = requests.get("https://api.bybit.com/v5/market/orderbook",
                      params={"symbol":symbol,"limit":50}, timeout=5).json()
    asks = [(float(pr), float(sz)) for pr,sz,*_ in ob["result"]["a"][:3]]
    size_needed = 500   # USD
    cost = 0
    for pr,sz in asks:
        slice_sz = min(sz, size_needed/(pr))
        cost += slice_sz*pr
        size_needed -= slice_sz*pr
        if size_needed <= 0:
            break
    slip = (cost/(500) - p_dex)/p_dex
    return slip <= max_slippage

DEX 側はオーダーブック API が無い場合が多いので 流動性 (liquidityUSD) で代用。


5. ハイパーパラメータ & 運用

変数初期値チューニング指針
SPREAD0.01 (1 %)草コインなら 2〜3 % でもヒットする
VOL_LOOKBACK60 (5 min)暑い相場で 30、閑散なら 120
depth slippage0.5 %CEX 側 Maker Rebate を考慮して調整
  • False‑positive 減spread & vol の二重条件+depth_ok でほぼ撲滅。
  • Rate Limit:DexScreener 60 r/m、Birdeye /history_price 100 r/s (だが API Key枠) → 5 s poll × 20 pair で安全。
  • Time Sync:秒単位の Arbi は NTP sync 推奨 (chrony).

6. ディレクトリ例

dex_cex_spread/
├── data/volume.duckdb
├── src/
│   ├── feeders.py
│   ├── spread_watch.py
│   └── depth_check.py
├── config/tokens.yaml
├── charts/           # optional可視化
└── .env              # API keys

7. 次フェーズ🚀

  1. Multi‑thread / asyncio gather:50 pair 同時監視でも CPU < 20 %.
  2. Cross‑DEX Spread:同チェーン DEX‑間 乖離も同ロジックで横展開。
  3. Auto‑Hedge Bot:Slack → Function URL → Trader コンテナへ POST。
  4. ML Ranking:前手法 2 の Stable Flow 指標を特徴量に統合し “事前スコア” を付与。

まとめ

BirdeyeDexScreener → DEX 価格 / VolumeBybitBinance → CEX 価格 を同時ストリーム。
Spread > 1 %Vol(5 m) > σ を満たしたら orderbook 深さチェック → 通知/自動エントリ。
これで 「誰よりも早く気づいて、でも無駄撃ちせずに入る」 最小アビトラ・レーダーが完成します。

【Phase4】ブリッジ/CEX 出入口クラスタリング 🕵️‍♂️― 実装ロードマップ

目的
① DEX⇄CEX アービトラの主戦プレイヤーを特定し、② 彼らの資金移動ルート(ブリッジ・入金先)をグラフで可視化、③ 再現性の高い「次の動き」を予測できるデータセットを得る。


1. 必要なデータソース & 前準備

区分エンドポイント/データ用途
DEX 取引詳細Birdeye:/defi/transactions?address=<TOKEN>&from=<ts>&to=<ts>(Solana)  ※WS も可 docs.birdeye.so該当時刻の Tx ハッシュ取得
 Dex Screener:/token-profiles/latest/v1pairId を取得し /latest/dex/pairs/{chainId}/{pairId} で Tx リスト docs.dexscreener.com
チェーン ExplorerSolscan API / Etherscan module=account&action=txlist&address= でフル Tx データ Etherscanfrom / to / Logs 抽出
CEX ラベル自前 JSON(Binance/Bybit/OKX 等ホット & Deposit アドレス)+公開リスト BinanceDeposit 判定キー
ブリッジ辞書https://defillama.com/bridges + API /bridge 一覧で contract ↔ dest chain マップ DefiLlama
保管 DBDuckDB(軽量) or Postgres / Neo4j(後でグラフ分析するなら)

2. ワークフロー全体像

flowchart TD
    %% Nodes
    EVT[Event step3 timestamp]
    S1["(1) DEX Tx hash 取得"]
    S2["(2) Chain Tx 詳細取得"]
    S3["(3) CEX or Bridge 判定"]
    S4["(4) Address graph 生成"]
    S5["(5) Clustering and visualization"]

    %% Edges
    EVT --> S1
    S1  --> S2
    S1  --> S3
    S2  --> S4
    S3  --> S4
    S4  --> S5

3. ステップ別実装詳細

3‑1. DEX Tx ハッシュ取得

import requests, time
def get_dex_txs_sol(token_addr, t0, t1):
    url = ("https://public-api.birdeye.so/defi/transactions"
           f"?address={token_addr}&from={t0}&to={t1}&limit=500")
    j = requests.get(url, headers={"X-API-KEY": "..."}).json()
    return [tx["txHash"] for tx in j["data"]["transactions"]]
  • t0 / t1:イベント時刻±90 s が目安。

3‑2. チェーン Tx 詳細取得

import aiohttp, asyncio
ETHERSCAN_KEY="..."
async def fetch_tx_eth(txhash):
    url=("https://api.etherscan.io/api"
         f"?module=proxy&action=eth_getTransactionByHash&txhash={txhash}"
         f"&apikey={ETHERSCAN_KEY}")
    async with aiohttp.ClientSession() as s:
        async with s.get(url,timeout=10) as r:
            j=await r.json(); return j["result"]

取るべきフィールド

{
  "hash": "...",
  "from": "0x...",     // 発信者
  "to": "0x...",       // コントラクト or EO
  "value": "0x...",    // Native amount
  "input": "0x...",    // ブリッジ判定に使う
  "logs": [...]        // ブリッジ転送 or Deposit Event
}

3‑3. CEX Deposit / ブリッジ判定

CEX_MAP = json.load(open("cex_labels.json"))   # addr → {"cex":"Binance", ...}
BRIDGE_MAP = json.load(open("bridge_contracts.json"))  # addr → {"bridge":"Wormhole","dst":"Solana"}

def classify(tx):
    if tx["to"] in CEX_MAP:
        tag = ("CEX", CEX_MAP[tx["to"]]["cex"])
    elif tx["to"] in BRIDGE_MAP:
        dst = BRIDGE_MAP[tx["to"]]["dst"]
        tag = ("BRIDGE", dst)
    else:
        tag = ("OTHER", None)
    return tag

Bridging の場合、ログ内に destinationChainIdrecipient があるので抽出して「反対側アドレス」を追跡 queue に追加。

3‑4. グラフ構築 & クラスタリング

import networkx as nx, duckdb
G = nx.MultiDiGraph()
for tx in parsed_txs:
    src = tx["from"]; dst = tx["to"]; ttag, meta = classify(tx)
    G.add_edge(src, dst, label=ttag, ts=tx["timestamp"], meta=meta)

# ❶ CEX Deposit クラスタ → 入金アドレス単位で Union‑Find
cex_clusters = {}
for u,v,d in G.edges(data=True):
    if d["label"] == "CEX":
        cex_addr = v     # deposit
        cex_clusters.setdefault(cex_addr, set()).add(u)

# ❷ Bridge ペア → src ↔ dstAddress を 2‑hop 合流で束ねる
for u,v,d in G.edges(data=True):
    if d["label"] == "BRIDGE":
        dst_addr = d["meta"]["dst_wallet"]
        G.add_edge(u, dst_addr, label="BRIDGED_TO", ts=d["ts"])

# Louvain / Connected Components
clusters = 

3‑5. 可視化 & レポート

import matplotlib.pyplot as plt, networkx as nx
for i,cl in enumerate(clusters):
    sub = G.subgraph(cl)
    nx.draw(sub, with_labels=False, node_size=50, edge_color="grey")
    plt.title(f"Cluster {i} ({len(sub.nodes())} wallets)")
    plt.savefig(f"charts/cluster_{i}.png")

Slack 送付:slack_sdkfiles.upload.


4. 実運用 Tips

テーマ推奨
CEX アドレス更新DeBank / Arkham / Chainabuse が公開する “Exchange Tag JSON” を月次で pull → diff
新ブリッジ対応DeFiLlama /bridge エンドポイントを毎週 Snapshot → 新規 contract が増えたら自動追登録 DefiLlama
Gas & Rate LimitRPC→Alchemy/Infura の batch & multicall でコスト削減。Etherscan free は 5 r/sec → backoff。
ミラー入金同一 Tx 内で memo/tag を持つ CEX (XRP/ADA 等) は Memo ID までキーにすると誤結合を防げる。
グラフ DBNeo4j+Bloom を使うと WebGUI で資金ルートのアニメ再生が可能。

5. 次ステップ

  1. 時間窓集計:クラスタごとに「PoC → Bridge → CEX」完了までの平均レイテンシを計測。
  2. 行動シグネチャ:入金額・ブリッジ先・チェーン滞在時間を特徴量に k‑Means → “高速狩人” vs “遅延リスクヘッジャ” を分類。
  3. リアルタイム Hook:③のスプレッド検知→該当ウォレットが出現→“Follow 取引” を自動トリガー。

これで 「イベント検知 → アビトラ勢ウォッチ → 出入口クラスタリング」 の基盤が完成します。

【Phase5】“イベント前兆” モデル化 📈 — フル実装ガイド

ゴールの再確認
* 90 日分のイベントメタデータ (手法 ①〜④) を学習
* ラベル = 「検知から 15 分以内に 0.5 % 以上の利幅 (gross edge) が取れたか」<br>⇒ 2 値分類
* CatBoost or RandomForest を使い、SHAP で効いている特徴を確認 → モデル縮小
* 推論スコア上位 N 件を Shadow Mode で紙トレードし、結果を再学習にフィードバック


0. ディレクトリ雛形

edge_model/
├── data/               # DuckDBファイル & parquet
│   ├── events.duckdb   # ①〜④で生成
│   └── price_ticks.parquet
├── notebooks/
├── src/
│   ├── build_dataset.py
│   ├── train_model.py
│   └── score_live.py
├── models/
│   └── cat_v1.cbm
└── config.yaml

DuckDB & Parquet で I/O 軽量化、モデルは catboost.CatBoostClassifier を保存


1. イベント & ラベルテーブル を作る

-- events.duckdb に集約済みと仮定
CREATE OR REPLACE TABLE edge_events AS
SELECT  event_id,
        ts_event,                       -- 秒精度
        symbol,
        -- 特徴例
        delta_tvl_pct,
        delta_stable_pct,
        vol_sigma,
        token_age_days,
        bridged_tvl_ratio
FROM tvl_snap  -- ← 手法①
JOIN stable_snaps USING(snapshot_date)
JOIN dex_vol_snaps USING(symbol, snapshot_date)
JOIN bridge_stats USING(symbol, snapshot_date);

1‑A. ラベル付け

CREATE OR REPLACE TABLE labels AS
SELECT  e.event_id,
        CASE WHEN abs(p15.price - p0.price)/p0.price >= 0.005
             THEN 1 ELSE 0 END AS y
FROM edge_events e
LEFT JOIN LATERAL (
       SELECT price FROM price_ticks
       WHERE symbol = e.symbol AND ts BETWEEN e.ts_event AND e.ts_event + 60
       ORDER BY ts LIMIT 1) p0
LEFT JOIN LATERAL (
       SELECT price FROM price_ticks
       WHERE symbol = e.symbol AND ts BETWEEN e.ts_event+900 AND e.ts_event+960
       ORDER BY ts LIMIT 1) p15

15 min 後 (±60 s バッファ) の価格で 0.5 % 以上 動いていれば y=1


2. データセット生成スクリプト (build_dataset.py)

import duckdb, pandas as pd, pathlib
con = duckdb.connect("data/events.duckdb")
df = con.sql("""
    SELECT e.*, l.y
    FROM edge_events e JOIN labels l USING(event_id)
    WHERE ts_event >= date '2025-04-24' - INTERVAL 90 DAY
""").df()

# カテゴリ→string、欠損→0、対数変換など
cat_cols = ["symbol"]
num_cols = 
df[num_cols] = df[num_cols].fillna(0).astype("float32")
df.to_parquet("data/train_90d.parquet", index=False)

3. 時系列ウォークフォワードCV で検証

“未来リーク” を防ぐため TimeSeriesSplit / Walk‑Forward を利用 Kaggle

from sklearn.model_selection import TimeSeriesSplit
import pandas as pd, numpy as np, catboost as cb, shap, joblib, json

df = pd.read_parquet("data/train_90d.parquet")
X = df.drop(columns=["y","event_id","ts_event"])
y = df["y"]

cv = TimeSeriesSplit(n_splits=5, test_size= int(len(df)*0.1))
scores = []
for i, (tr, te) in enumerate(cv.split(X)):
    model = cb.CatBoostClassifier(
        depth=6, learning_rate=0.05, iterations=400,
        cat_features=[X.columns.get_loc(c) for c in ["symbol"]],
        eval_metric="AUC", verbose=False, random_state=42
    )
    model.fit(X.iloc[tr], y.iloc[tr], eval_set=(X.iloc[te], y.iloc[te]), verbose=False)
    auc = model.get_best_score()["validation"]["AUC"]
    scores.append(auc)
print("Walk‑forward AUC:", np.mean(scores).round(3))
model.save_model("models/cat_v1.cbm")

4. SHAP で特徴寄与を確認 → 削減

explainer = shap.TreeExplainer(model)          # CatBoostは直接OK :contentReference[oaicite:1]{index=1}
shap_vals  = explainer.shap_values(X.sample(1000))
shap.summary_plot(shap_vals, X)                # Notebookで確認

# 重要度が低い (<0.01) 特徴を除外
imp = np.abs(shap_vals).mean(axis=0)
keep = X.columns[imp > 0.01]
json.dump(keep.tolist(), open("config/feature_whitelist.json","w"))

5. 再学習 & 軽量モデル

whitelist = json.load(open("config/feature_whitelist.json"))
X_reduced = X[whitelist]
model_small = cb.CatBoostClassifier(
    depth=5, iterations=300, learning_rate=0.08,
    cat_features=[whitelist.index("symbol")],
    verbose=False, random_state=42)
model_small.fit(X_reduced, y)
model_small.save_model("models/cat_v2.cbm")

6. リアルタイム推論 & Shadow Mode (score_live.py)

import catboost as cb, yaml, time, duckdb
model = cb.CatBoostClassifier()
model.load_model("models/cat_v2.cbm")
watch_chains = yaml.safe_load(open("config.yaml"))["watch_chains"]

def gen_features(event_id):
    # ①〜④のストリームから同等の特徴量を構築して返す
    ...

while True:
    events = poll_new_events(chains=watch_chains)   # 手法③が出力
    for ev in events:
        x = gen_features(ev["id"])[whitelist]
        score = model.predict_proba(x)[0,1]
        if score > 0.65:            # 適宜閾値調整
            enqueue_shadow_trade(ev, score)
    time.sleep(1)

Shadow trade 結果 (Fill・PnL) を duckdb.table('shadow_results') へ格納 → 次回学習で y 列を更新


7. 運用 & 継続学習パイプライン

フェーズスケジュール内容
データ更新毎 5 min手法①〜④ のメタテーブルに追記
ラベル確定+16 min バッチ15 min 後の利幅を計算し y を確定
モデル再学習週次(月〜金累積)直近 90 d データで cat_vN+1 再学習 ➜ SHAP ➜ Feature Pruning
閾値最適化毎再学習ROC Youden / Precision‑Recall で β を決定
Shadow 評価常時Top N=10 イベントを紙トレード → PnL 補正を DB に追加
本番投入累積 PnL > 閾値 & hit率 > X%Trader コンテナへ自動ルーティング

MLflow や Weights & Biases の offline=True でメタデータを残すと効果検証が楽


8. モデル運用の注意点

リスク対策
クラス不均衡scale_pos_weight (CatBoost) ≈ (neg/pos) で補正
未来リーク“特徴計算に後続データが混入していないか” を定期チェック
データシフトvol_sigmadelta_tvl_pct の分布を KS‑test → 逸脱で閾値↑
実装ドリフトモデルの Feature List (JSON) と 推論側 mapping を Git hash で照合

9. 成果物まとめ

ファイル役割
data/train_90d.parquet学習用データセット
models/cat_v2.cbmSHAP 後の軽量モデル
config/feature_whitelist.json採用特徴リスト
charts/shap_summary.png重要度可視化
shadow_results.duckdb紙トレード検証ログ

次の拡張アイデア
* CatBoost → LightGBM GPU でレイテンシ短縮
* Meta‑feature: 「同一ウォレットの直近 24 h ROI」
* バンディット (ALG = Thompson Sampling) で 閾値 adapt

これで イベント前兆を“数字”で語るモデル基盤 が完成します。

-Bot, CEX, DEX, 戦略ログ, 開発ログ