Bot

開発記録#141(2025/3/24)「論文ベースのbot開発フローpart.3」

2025年3月24日

前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。

本記事では「暗号通貨のパンプ&ダンプスキームの検出」に関する論文をベースにbot開発の過程をまとめていきます。

Yodaka

今回のテーマは「異常検出ロジックの組み込み」です。

データ収集スクリプトに異常検出ロジックを追加し、パンプ&ダンプイベントの兆候となる価格急騰取引量の急増を検出します。


1. アプローチ概要「🚨 異常検出ロジックの組み込み

🔎 検出基準

価格の急騰 (Price Surge)

  • 12時間移動平均の90%以上の価格上昇

取引量の急増 (Volume Spike)

  • 400%以上の取引量増加

データノイズ除去 (Noise Filtering)

  • 30日間の総取引量の30%以上
  • 30日間の最大取引量の60%以上

指数加重移動平均 (EWMA)

  • EWMAを用いて短期的な価格変動ノイズを低減し、安定した異常検出を行う

2. 修正後のコード (data_collector.py)

import asyncio
import json
import websockets
import os
import pandas as pd
import numpy as np
from datetime import datetime

# 設定ファイルの読み込み
with open("config.json", "r") as file:
    config = json.load(file)

# API URL の更新
API_URL = config["api_url"]
SYMBOLS = config["symbols"]
TIMEFRAME = config["timeframe"]
DATA_FOLDER = config["data_folder"]
LOG_FILE = config["log_file"]

# データ格納ディレクトリの作成
os.makedirs(DATA_FOLDER, exist_ok=True)

# データフレームの初期化
data_frames = {symbol: pd.DataFrame() for symbol in SYMBOLS}

# 異常検出パラメータ
PRICE_THRESHOLD = 0.9
VOLUME_THRESHOLD = 4.0
EWMA_SPAN = 20  # EWMAの期間 (20日)

# ログ出力関数
def log_message(message):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] {message}"
    print(log_entry)
    with open(LOG_FILE, "a") as log_file:
        log_file.write(log_entry + "\n")

# 異常検出関数
def detect_anomalies(df, symbol):
    if len(df) < EWMA_SPAN:
        return None  # データが不足している場合はスキップ

    # EWMA (指数加重移動平均)
    df['ewma'] = df['close'].ewm(span=EWMA_SPAN).mean()

    # 異常判定条件
    df['price_anomaly'] = (df['close'] > df['ewma'] * (1 + PRICE_THRESHOLD))
    df['volume_anomaly'] = (df['volume'] > df['volume'].mean() * VOLUME_THRESHOLD)

    # 両方の条件が同時に満たされた場合にのみ異常と判定
    df['combined_anomaly'] = df['price_anomaly'] & df['volume_anomaly']

    # 異常イベントの検出
    anomalies = df[df['combined_anomaly']]

    if not anomalies.empty:
        log_message(f"🚨 Anomaly detected for {symbol} at {anomalies.iloc[-1]['timestamp']}")
        anomalies.to_csv(f"{DATA_FOLDER}/{symbol}_anomalies.csv", mode='a', header=not os.path.exists(f"{DATA_FOLDER}/{symbol}_anomalies.csv"), index=False)

# Bybit WebSocketサーバーに接続
async def connect_to_bybit():
    async with websockets.connect(API_URL) as websocket:
        for symbol in SYMBOLS:
            subscription_msg = {
                "op": "subscribe",
                "args": [f"kline.{TIMEFRAME}.{symbol}"]
            }
            await websocket.send(json.dumps(subscription_msg))
            log_message(f"Subscribed to {symbol}")

        # データ受信ループ
        while True:
            try:
                response = await websocket.recv()
                data = json.loads(response)

                if 'topic' in data and 'data' in data:
                    symbol = data['topic'].split('.')[-1]
                    kline_data = data['data'][0]

                    # データの追加
                    new_data = pd.DataFrame({
                        "timestamp": [datetime.fromtimestamp(kline_data['start'] / 1000)],
                        "open": [float(kline_data['open'])],
                        "high": [float(kline_data['high'])],
                        "low": [float(kline_data['low'])],
                        "close": [float(kline_data['close'])],
                        "volume": [float(kline_data['volume'])]
                    })

                    # データ更新と異常検出
                    data_frames[symbol] = pd.concat([data_frames[symbol], new_data], ignore_index=True)
                    detect_anomalies(data_frames[symbol], symbol)

                    # データ保存
                    data_frames[symbol].to_csv(f"{DATA_FOLDER}/{symbol}_data.csv", index=False)

            except Exception as e:
                log_message(f"Error: {e}")
                await asyncio.sleep(5)  # 5秒後に再接続

# メイン関数
async def main():
    while True:
        try:
            await connect_to_bybit()
        except Exception as e:
            log_message(f"Connection failed: {e}. Retrying in 10 seconds...")
            await asyncio.sleep(10)

if __name__ == "__main__":
    asyncio.run(main())

このコードは、BybitのWebSocket APIを通じて仮想通貨のリアルタイムkline(ローソク足)データを取得し、異常(アノマリー)を検出してログ・保存するPythonスクリプトです。

機能のポイント

異常検出機能の統合:価格とボリュームの両方の異常が検出された場合にのみ警告
EWMA (指数加重移動平均):短期的なノイズを除去し、パンプ&ダンプ特有のスパイクに対応
データ保存とログ記録:異常イベントは個別の.csvファイルに保存し、後続のモデル学習に活用
耐障害設計:WebSocketエラーやサーバーダウン時も自動再接続


Yodaka

セクションごとにわかりやすく解説していきます👇


🧾 概要

このスクリプトが行う処理の流れは:

  1. config.json から設定を読み込む
  2. WebSocketでBybitに接続し、複数の仮想通貨のローソク足を購読
  3. リアルタイムでklineデータを受信し、DataFrameに追加
  4. 価格と出来高に基づいた異常(アノマリー)を検出
  5. 異常があればログとCSVに保存
  6. データは常にCSVファイルとしても保存(バックアップ)

1. 📁 設定と準備

with open("config.json", "r") as file:
    config = json.load(file)
  • config.json に定義された設定値(APIのURL、取引ペア、時間足、保存先ディレクトリ、ログファイル名)を読み込みます。
os.makedirs(DATA_FOLDER, exist_ok=True)
  • データ保存フォルダがなければ作成します。
data_frames = {symbol: pd.DataFrame() for symbol in SYMBOLS}
  • 各シンボル(例: BTCUSDT, ETHUSDT)ごとに空のDataFrameを用意。

2. 🧠 異常検出のための設定

PRICE_THRESHOLD = 0.9  # 価格が平均の1.9倍以上になったら
VOLUME_THRESHOLD = 4.0  # 出来高が平均の4倍以上だったら
EWMA_SPAN = 20  # 移動平均期間
  • 異常と判定するための基準を設定。
  • EWMA_SPAN は指数加重移動平均(Exponential Weighted Moving Average)に使う期間。

3. 📝 ログ関数

def log_message(message):
    ...
  • 標準出力とログファイルの両方にメッセージを書き込む便利関数。

4. 🚨 異常検出ロジック

def detect_anomalies(df, symbol):
    ...

この関数では:

  • EWMA(指数加重移動平均)と出来高の平均を使って、
    • 価格が異常に高い(平均より90%以上高い)
    • 出来高が異常に大きい(平均の4倍以上)
  • という条件を両方満たしたら、それを異常(アノマリー)として検出します。
  • 検出されたデータはsymbol_anomalies.csvに追記保存されます。

5. 🌐 WebSocket接続とデータ処理

async def connect_to_bybit():

この関数では:

  • websockets.connect() を使ってBybitのWebSocketサーバーに接続。
  • 各シンボルのkline(ローソク足)を購読。
  • WebSocketで受信したデータをDataFrameに整形し、更新。
  • その後、detect_anomalies() で異常チェック。
  • 最新データはCSVにも保存されます。

6. 🔁 再接続とエラーハンドリング

while True:
    try:
        await connect_to_bybit()
    except Exception as e:
        ...
  • エラーや切断が発生した場合、10秒待ってから再接続。
  • 通信やデータエラーに強い構造になってます。

✅ 最後に実行部分

if __name__ == "__main__":
    asyncio.run(main())
  • このコード全体を非同期で実行開始します。

🧠 このコードのポイント

  • リアルタイムで価格データを監視
  • 異常(価格急騰+出来高急増)を自動で検出
  • ログとCSVに記録して後で分析しやすい設計
  • ロバストな再接続対応あり

💡 拡張アイデア

もしこのコードをもっと進化させたいなら、以下の方法が考えられます。

  • 📈 異常を検出したらSlackやLINEに通知
  • 🔍 異常発生後にトリガーとしてBotが自動売買
  • 📊 異常データをダッシュボードで可視化(Streamlitなど)
  • 🧠 異常検出をMLベースに切り替える(例:Isolation Forest)

3. 実行方法

1.依存ライブラリのインストール

pip install -r requirements.txt

2.スクリプトの実行

python data_collector.py

Yodaka

次回の記事では、P&Dイベントの検出モデルの設計についてまとめます。


✅ おまけ:拡張アイデアとその根拠

それぞれの拡張アイデアは、このスクリプトの目的(異常検出)に対する実用性・発展性・トレード応用性を考えて提案したものです。以下、1つずつ根拠付きで詳しく解説します👇


1. 📩 SlackやLINE通知:リアルタイム対応を可能にする

✔️ 根拠:

  • 異常(価格急騰+出来高急増)を検出したタイミングで、人間や別のシステムに即時通知できることで、**「気づけなかった」「見逃した」**を防げます。
  • 通知システムを入れることで、人的判断 or 自動売買のトリガーにも使える。

💡 例:

if not anomalies.empty:
    send_slack_alert(f"🚨 {symbol}で異常検出: {timestamp}")

🔧 使用技術:

  • Slack API / LINE Notify
  • requests モジュール or httpx

2. 🤖 Botによる自動売買トリガー:実際のトレードアクションと連携

✔️ 根拠:

  • 異常が検出された=価格が急に動いたシグナル → トレード戦略に直結する
  • 例えば、「異常が出たら逆張り/順張りで注文出す」など、トレードロジックと直結可能

💡 応用例:

  • 異常検出後にBybit APIでmarket buyを実行
  • 直近のスプレッドや板状況と連携してポジション管理

3. 📊 Streamlitなどで異常イベントを可視化:後からの分析やモニタリングに便利

✔️ 根拠:

  • 検出した異常イベントがCSVに記録されている → それをWeb上で視覚的に表示すれば人間のモニタリング・パターン把握がしやすくなる
  • トレードルールの再設計や、イベント頻度の傾向分析にも使える。

💡 UI例:

  • ローソク足チャート+異常検出ポイントを赤点で表示
  • 直近24時間分の異常イベントリストを表形式で表示

🔧 使用技術:

  • streamlit, plotly, pandas, altair

4. 🧠 異常検出をML(機械学習)ベースに:ルールベースより柔軟・高精度

✔️ 根拠:

  • 今は「価格が1.9倍以上」「出来高が平均の4倍以上」といったルールベースの閾値設定ですが、
    • これは設定次第で「誤検出」や「見逃し」が多くなる
  • 機械学習を使えば、異常のパターンを学習させてより柔軟・高精度な検出が可能になる

💡 ML手法の例:

  • Isolation Forest(異常検知用アルゴリズム)
  • Autoencoder(再構成誤差をもとに異常を判断)
  • LSTM(時系列異常検知)

🔁 それぞれの立ち位置まとめ

拡張案目的タイミング想定ユーザー
Slack/LINE通知即時通知と注意喚起異常発生時人・他Bot
自動売買トリガートレードアクションの開始異常検出時Bot
異常イベントの可視化 (Streamlit)モニタリング・後分析リアルタイム & 後分析トレーダー
機械学習ベースの異常検出精度向上・自動パターン学習検出ロジック時開発者・研究者

どれも「今のルールベース検出をベースにして、さらに効果的に使えるようにする」ための拡張です。

関連
開発記録#142(2025/3/24)「論文ベースのbot開発フローpart.4」

続きを見る

-Bot