前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
本記事では「暗号通貨のパンプ&ダンプスキームの検出」に関する論文をベースにbot開発の過程をまとめていきます。
Detecting Crypto Pump-and-Dump Schemes: A Thresholding-Based Approach to Handling Market Noisehttps://t.co/ctCJEV1MBs
— よだか(夜鷹/yodaka) (@yodakablog) March 22, 2025
🖥️ リアルタイム運用時の監視システム設計
次のステップでは、リアルタイム監視システムを構築し、以下の機能を実装します。
1. 監視システムのアプローチ
🔎 監視対象
✅ リアルタイム価格モニタリング:現在の価格が異常パターンに近づいた場合にアラート
✅ P&Dイベントの即時通知:Slack / Telegram への通知
✅ 取引状況のダッシュボード表示:Botの稼働状況、利益、損失などを視覚化
2. システム設計
📊 ダッシュボード ├── P&Dイベント検出状況 ├── 実行中のBotステータス ├── 現在の口座残高 ├── 取引履歴 └── アラート通知履歴
3. コード実装 (monitoring_system.py)
import asyncio import json import websockets import requests import pandas as pd import matplotlib.pyplot as plt from flask import Flask, render_template # Flaskアプリケーションの設定 app = Flask(__name__) # Slack通知用 SLACK_WEBHOOK_URL = "YOUR_SLACK_WEBHOOK_URL" # データフレームの初期化 price_data = pd.DataFrame(columns=["timestamp", "symbol", "price"]) # WebSocket接続情報 (Bybit) API_URL = "wss://stream.bybit.com/realtime_public" SYMBOLS = ["BTCUSDT", "ETHUSDT"] # 異常検出パラメータ PRICE_THRESHOLD = 0.05 # 5%以上の急騰/急落でアラート # Slack通知機能 def send_alert(message): payload = {"text": message} requests.post(SLACK_WEBHOOK_URL, json=payload) # 価格モニタリング async def monitor_prices(): async with websockets.connect(API_URL) as websocket: # シンボルごとにデータ購読 for symbol in SYMBOLS: subscription_msg = { "op": "subscribe", "args": [f"ticker.{symbol}"] } await websocket.send(json.dumps(subscription_msg)) while True: try: response = await websocket.recv() data = json.loads(response) if 'topic' in data and 'data' in data: symbol = data['topic'].split('.')[-1] price = float(data['data']['last_price']) timestamp = pd.to_datetime("now") # データ追加 global price_data price_data = pd.concat([price_data, pd.DataFrame({ "timestamp": [timestamp], "symbol": [symbol], "price": [price] })]) # 価格異常の検出 recent_data = price_data[price_data['symbol'] == symbol] if len(recent_data) >= 2: price_change = (recent_data.iloc[-1]['price'] / recent_data.iloc[-2]['price']) - 1 if abs(price_change) > PRICE_THRESHOLD: alert_message = f"🚨 {symbol}の価格が{price_change*100:.2f}%変動しました" print(alert_message) send_alert(alert_message) except Exception as e: print(f"Error: {e}") await asyncio.sleep(5) # Flaskダッシュボード @app.route("/") def dashboard(): global price_data return render_template("dashboard.html", data=price_data.tail(20).to_dict(orient='records')) # メイン処理 def start_monitoring(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(monitor_prices()) if __name__ == "__main__": from threading import Thread # Flaskアプリケーションをスレッドで実行 Thread(target=start_monitoring).start() app.run(debug=True, port=5000)
4. Flaskテンプレートファイル (dashboard.html)
<!DOCTYPE html> <html> <head> <title>リアルタイム監視システム</title> <style> body { font-family: Arial, sans-serif; margin: 20px; } h1 { color: #4CAF50; } table { width: 100%; border-collapse: collapse; } th, td { border: 1px solid #ddd; padding: 8px; text-align: center; } th { background-color: #4CAF50; color: white; } </style> </head> <body> <h1>🚨 リアルタイム監視システム 🚨</h1> <h2>価格データ</h2> <table> <tr> <th>タイムスタンプ</th> <th>シンボル</th> <th>価格 (USD)</th> </tr> {% for row in data %} <tr> <td>{{ row['timestamp'] }}</td> <td>{{ row['symbol'] }}</td> <td>{{ row['price'] }}</td> </tr> {% endfor %} </table> </body> </html>
5. 実行方法
1.必要なライブラリのインストール
pip install flask pandas matplotlib websockets requests
2.Slack Webhook URLをSLACK_WEBHOOK_URL
に設定
3.スクリプトの実行
python monitoring_system.py
- ダッシュボード表示
- ブラウザで以下にアクセス
👉http://localhost:5000
6. 機能のポイント
✅ リアルタイム価格監視:Bybit WebSocket APIで即時データを取得
✅ Slack通知:異常が検出されるとアラートを送信
✅ ダッシュボード:Botの状況、価格、取引履歴を視覚的に確認
✅ エラーハンドリング:接続エラーや通信途絶に対応した再接続機能
7. 次のステップ
次の段階では、これまでに構築したシステムの統合、最終テスト、デプロイ手順の整備を行います。
デプロイには Docker + Kubernetes を活用し、Botのスケーラブルな運用環境を構築します。

次回の記事では、「システム統合とデプロイ (Docker + Kubernetes)」についてまとめます。
-
-
開発記録#147(2025/3/25)「論文ベースのbot開発フローpart.9」
続きを見る