前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
本記事では「暗号通貨のパンプ&ダンプスキームの検出」に関する論文をベースにbot開発の過程をまとめていきます。
今回のテーマは「異常検出ロジックの組み込み」です。
データ収集スクリプトに異常検出ロジックを追加し、パンプ&ダンプイベントの兆候となる価格急騰や取引量の急増を検出します。
Detecting Crypto Pump-and-Dump Schemes: A Thresholding-Based Approach to Handling Market Noisehttps://t.co/ctCJEV1MBs
— よだか(夜鷹/yodaka) (@yodakablog) March 22, 2025
1. アプローチ概要「🚨 異常検出ロジックの組み込み」
🔎 検出基準
✅ 価格の急騰 (Price Surge)
- 12時間移動平均の90%以上の価格上昇
✅ 取引量の急増 (Volume Spike)
- 400%以上の取引量増加
✅ データノイズ除去 (Noise Filtering)
- 30日間の総取引量の30%以上
- 30日間の最大取引量の60%以上
✅ 指数加重移動平均 (EWMA)
- EWMAを用いて短期的な価格変動ノイズを低減し、安定した異常検出を行う
2. 修正後のコード (data_collector.py)
import asyncio
import json
import websockets
import os
import pandas as pd
import numpy as np
from datetime import datetime
# 設定ファイルの読み込み
with open("config.json", "r") as file:
config = json.load(file)
# API URL の更新
API_URL = config["api_url"]
SYMBOLS = config["symbols"]
TIMEFRAME = config["timeframe"]
DATA_FOLDER = config["data_folder"]
LOG_FILE = config["log_file"]
# データ格納ディレクトリの作成
os.makedirs(DATA_FOLDER, exist_ok=True)
# データフレームの初期化
data_frames = {symbol: pd.DataFrame() for symbol in SYMBOLS}
# 異常検出パラメータ
PRICE_THRESHOLD = 0.9
VOLUME_THRESHOLD = 4.0
EWMA_SPAN = 20 # EWMAの期間 (20日)
# ログ出力関数
def log_message(message):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
print(log_entry)
with open(LOG_FILE, "a") as log_file:
log_file.write(log_entry + "\n")
# 異常検出関数
def detect_anomalies(df, symbol):
if len(df) < EWMA_SPAN:
return None # データが不足している場合はスキップ
# EWMA (指数加重移動平均)
df['ewma'] = df['close'].ewm(span=EWMA_SPAN).mean()
# 異常判定条件
df['price_anomaly'] = (df['close'] > df['ewma'] * (1 + PRICE_THRESHOLD))
df['volume_anomaly'] = (df['volume'] > df['volume'].mean() * VOLUME_THRESHOLD)
# 両方の条件が同時に満たされた場合にのみ異常と判定
df['combined_anomaly'] = df['price_anomaly'] & df['volume_anomaly']
# 異常イベントの検出
anomalies = df[df['combined_anomaly']]
if not anomalies.empty:
log_message(f"🚨 Anomaly detected for {symbol} at {anomalies.iloc[-1]['timestamp']}")
anomalies.to_csv(f"{DATA_FOLDER}/{symbol}_anomalies.csv", mode='a', header=not os.path.exists(f"{DATA_FOLDER}/{symbol}_anomalies.csv"), index=False)
# Bybit WebSocketサーバーに接続
async def connect_to_bybit():
async with websockets.connect(API_URL) as websocket:
for symbol in SYMBOLS:
subscription_msg = {
"op": "subscribe",
"args": [f"kline.{TIMEFRAME}.{symbol}"]
}
await websocket.send(json.dumps(subscription_msg))
log_message(f"Subscribed to {symbol}")
# データ受信ループ
while True:
try:
response = await websocket.recv()
data = json.loads(response)
if 'topic' in data and 'data' in data:
symbol = data['topic'].split('.')[-1]
kline_data = data['data'][0]
# データの追加
new_data = pd.DataFrame({
"timestamp": [datetime.fromtimestamp(kline_data['start'] / 1000)],
"open": [float(kline_data['open'])],
"high": [float(kline_data['high'])],
"low": [float(kline_data['low'])],
"close": [float(kline_data['close'])],
"volume": [float(kline_data['volume'])]
})
# データ更新と異常検出
data_frames[symbol] = pd.concat([data_frames[symbol], new_data], ignore_index=True)
detect_anomalies(data_frames[symbol], symbol)
# データ保存
data_frames[symbol].to_csv(f"{DATA_FOLDER}/{symbol}_data.csv", index=False)
except Exception as e:
log_message(f"Error: {e}")
await asyncio.sleep(5) # 5秒後に再接続
# メイン関数
async def main():
while True:
try:
await connect_to_bybit()
except Exception as e:
log_message(f"Connection failed: {e}. Retrying in 10 seconds...")
await asyncio.sleep(10)
if __name__ == "__main__":
asyncio.run(main())
このコードは、BybitのWebSocket APIを通じて仮想通貨のリアルタイムkline(ローソク足)データを取得し、異常(アノマリー)を検出してログ・保存するPythonスクリプトです。
機能のポイント
✅ 異常検出機能の統合:価格とボリュームの両方の異常が検出された場合にのみ警告
✅ EWMA (指数加重移動平均):短期的なノイズを除去し、パンプ&ダンプ特有のスパイクに対応
✅ データ保存とログ記録:異常イベントは個別の.csvファイルに保存し、後続のモデル学習に活用
✅ 耐障害設計:WebSocketエラーやサーバーダウン時も自動再接続
セクションごとにわかりやすく解説していきます👇
🧾 概要
このスクリプトが行う処理の流れは:
config.jsonから設定を読み込む- WebSocketでBybitに接続し、複数の仮想通貨のローソク足を購読
- リアルタイムでklineデータを受信し、DataFrameに追加
- 価格と出来高に基づいた異常(アノマリー)を検出
- 異常があればログとCSVに保存
- データは常にCSVファイルとしても保存(バックアップ)
1. 📁 設定と準備
with open("config.json", "r") as file:
config = json.load(file)
config.jsonに定義された設定値(APIのURL、取引ペア、時間足、保存先ディレクトリ、ログファイル名)を読み込みます。
os.makedirs(DATA_FOLDER, exist_ok=True)
- データ保存フォルダがなければ作成します。
data_frames = {symbol: pd.DataFrame() for symbol in SYMBOLS}
- 各シンボル(例: BTCUSDT, ETHUSDT)ごとに空の
DataFrameを用意。
2. 🧠 異常検出のための設定
PRICE_THRESHOLD = 0.9 # 価格が平均の1.9倍以上になったら VOLUME_THRESHOLD = 4.0 # 出来高が平均の4倍以上だったら EWMA_SPAN = 20 # 移動平均期間
- 異常と判定するための基準を設定。
EWMA_SPANは指数加重移動平均(Exponential Weighted Moving Average)に使う期間。
3. 📝 ログ関数
def log_message(message):
...
- 標準出力とログファイルの両方にメッセージを書き込む便利関数。
4. 🚨 異常検出ロジック
def detect_anomalies(df, symbol):
...
この関数では:
EWMA(指数加重移動平均)と出来高の平均を使って、- 価格が異常に高い(平均より90%以上高い)
- 出来高が異常に大きい(平均の4倍以上)
- という条件を両方満たしたら、それを異常(アノマリー)として検出します。
- 検出されたデータは
symbol_anomalies.csvに追記保存されます。
5. 🌐 WebSocket接続とデータ処理
async def connect_to_bybit():
この関数では:
websockets.connect()を使ってBybitのWebSocketサーバーに接続。- 各シンボルの
kline(ローソク足)を購読。 - WebSocketで受信したデータをDataFrameに整形し、更新。
- その後、
detect_anomalies()で異常チェック。 - 最新データはCSVにも保存されます。
6. 🔁 再接続とエラーハンドリング
while True:
try:
await connect_to_bybit()
except Exception as e:
...
- エラーや切断が発生した場合、10秒待ってから再接続。
- 通信やデータエラーに強い構造になってます。
✅ 最後に実行部分
if __name__ == "__main__":
asyncio.run(main())
- このコード全体を非同期で実行開始します。
🧠 このコードのポイント
- リアルタイムで価格データを監視
- 異常(価格急騰+出来高急増)を自動で検出
- ログとCSVに記録して後で分析しやすい設計
- ロバストな再接続対応あり
💡 拡張アイデア
もしこのコードをもっと進化させたいなら、以下の方法が考えられます。
- 📈 異常を検出したらSlackやLINEに通知
- 🔍 異常発生後にトリガーとしてBotが自動売買
- 📊 異常データをダッシュボードで可視化(Streamlitなど)
- 🧠 異常検出をMLベースに切り替える(例:Isolation Forest)
3. 実行方法
1.依存ライブラリのインストール
pip install -r requirements.txt
2.スクリプトの実行
python data_collector.py
次回の記事では、P&Dイベントの検出モデルの設計についてまとめます。
✅ おまけ:拡張アイデアとその根拠
それぞれの拡張アイデアは、このスクリプトの目的(異常検出)に対する実用性・発展性・トレード応用性を考えて提案したものです。以下、1つずつ根拠付きで詳しく解説します👇
1. 📩 SlackやLINE通知:リアルタイム対応を可能にする
✔️ 根拠:
- 異常(価格急騰+出来高急増)を検出したタイミングで、人間や別のシステムに即時通知できることで、**「気づけなかった」「見逃した」**を防げます。
- 通知システムを入れることで、人的判断 or 自動売買のトリガーにも使える。
💡 例:
if not anomalies.empty:
send_slack_alert(f"🚨 {symbol}で異常検出: {timestamp}")
🔧 使用技術:
- Slack API / LINE Notify
requestsモジュール orhttpx
2. 🤖 Botによる自動売買トリガー:実際のトレードアクションと連携
✔️ 根拠:
- 異常が検出された=価格が急に動いたシグナル → トレード戦略に直結する
- 例えば、「異常が出たら逆張り/順張りで注文出す」など、トレードロジックと直結可能
💡 応用例:
- 異常検出後にBybit APIで
market buyを実行 - 直近のスプレッドや板状況と連携してポジション管理
3. 📊 Streamlitなどで異常イベントを可視化:後からの分析やモニタリングに便利
✔️ 根拠:
- 検出した異常イベントがCSVに記録されている → それをWeb上で視覚的に表示すれば人間のモニタリング・パターン把握がしやすくなる
- トレードルールの再設計や、イベント頻度の傾向分析にも使える。
💡 UI例:
- ローソク足チャート+異常検出ポイントを赤点で表示
- 直近24時間分の異常イベントリストを表形式で表示
🔧 使用技術:
streamlit,plotly,pandas,altair
4. 🧠 異常検出をML(機械学習)ベースに:ルールベースより柔軟・高精度
✔️ 根拠:
- 今は「価格が1.9倍以上」「出来高が平均の4倍以上」といったルールベースの閾値設定ですが、
- これは設定次第で「誤検出」や「見逃し」が多くなる
- 機械学習を使えば、異常のパターンを学習させてより柔軟・高精度な検出が可能になる
💡 ML手法の例:
Isolation Forest(異常検知用アルゴリズム)Autoencoder(再構成誤差をもとに異常を判断)LSTM(時系列異常検知)
🔁 それぞれの立ち位置まとめ
| 拡張案 | 目的 | タイミング | 想定ユーザー |
|---|---|---|---|
| Slack/LINE通知 | 即時通知と注意喚起 | 異常発生時 | 人・他Bot |
| 自動売買トリガー | トレードアクションの開始 | 異常検出時 | Bot |
| 異常イベントの可視化 (Streamlit) | モニタリング・後分析 | リアルタイム & 後分析 | トレーダー |
| 機械学習ベースの異常検出 | 精度向上・自動パターン学習 | 検出ロジック時 | 開発者・研究者 |
どれも「今のルールベース検出をベースにして、さらに効果的に使えるようにする」ための拡張です。
-
-
開発記録#142(2025/3/24)「論文ベースのbot開発フローpart.4 P&Dイベントの最終判定ロジックを構築する」
続きを見る