Bot

開発記録#146(2025/3/25)「論文ベースのbot開発フローpart.8」

2025年3月25日

前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。

本記事では「暗号通貨のパンプ&ダンプスキームの検出」に関する論文をベースにbot開発の過程をまとめていきます。

🖥️ リアルタイム運用時の監視システム設計

次のステップでは、リアルタイム監視システムを構築し、以下の機能を実装します。


1. 監視システムのアプローチ

🔎 監視対象

リアルタイム価格モニタリング:現在の価格が異常パターンに近づいた場合にアラート
P&Dイベントの即時通知:Slack / Telegram への通知
取引状況のダッシュボード表示:Botの稼働状況、利益、損失などを視覚化


2. システム設計

📊 ダッシュボード
 ├── P&Dイベント検出状況
 ├── 実行中のBotステータス
 ├── 現在の口座残高
 ├── 取引履歴
 └── アラート通知履歴

3. コード実装 (monitoring_system.py)

import asyncio
import json
import websockets
import requests
import pandas as pd
import matplotlib.pyplot as plt
from flask import Flask, render_template

# Flaskアプリケーションの設定
app = Flask(__name__)

# Slack通知用
SLACK_WEBHOOK_URL = "YOUR_SLACK_WEBHOOK_URL"

# データフレームの初期化
price_data = pd.DataFrame(columns=["timestamp", "symbol", "price"])

# WebSocket接続情報 (Bybit)
API_URL = "wss://stream.bybit.com/realtime_public"
SYMBOLS = ["BTCUSDT", "ETHUSDT"]

# 異常検出パラメータ
PRICE_THRESHOLD = 0.05  # 5%以上の急騰/急落でアラート

# Slack通知機能
def send_alert(message):
    payload = {"text": message}
    requests.post(SLACK_WEBHOOK_URL, json=payload)

# 価格モニタリング
async def monitor_prices():
    async with websockets.connect(API_URL) as websocket:
        # シンボルごとにデータ購読
        for symbol in SYMBOLS:
            subscription_msg = {
                "op": "subscribe",
                "args": [f"ticker.{symbol}"]
            }
            await websocket.send(json.dumps(subscription_msg))

        while True:
            try:
                response = await websocket.recv()
                data = json.loads(response)

                if 'topic' in data and 'data' in data:
                    symbol = data['topic'].split('.')[-1]
                    price = float(data['data']['last_price'])
                    timestamp = pd.to_datetime("now")

                    # データ追加
                    global price_data
                    price_data = pd.concat([price_data, pd.DataFrame({
                        "timestamp": [timestamp],
                        "symbol": [symbol],
                        "price": [price]
                    })])

                    # 価格異常の検出
                    recent_data = price_data[price_data['symbol'] == symbol]
                    if len(recent_data) >= 2:
                        price_change = (recent_data.iloc[-1]['price'] / recent_data.iloc[-2]['price']) - 1
                        if abs(price_change) > PRICE_THRESHOLD:
                            alert_message = f"🚨 {symbol}の価格が{price_change*100:.2f}%変動しました"
                            print(alert_message)
                            send_alert(alert_message)

            except Exception as e:
                print(f"Error: {e}")
                await asyncio.sleep(5)

# Flaskダッシュボード
@app.route("/")
def dashboard():
    global price_data
    return render_template("dashboard.html", data=price_data.tail(20).to_dict(orient='records'))

# メイン処理
def start_monitoring():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(monitor_prices())

if __name__ == "__main__":
    from threading import Thread
    # Flaskアプリケーションをスレッドで実行
    Thread(target=start_monitoring).start()
    app.run(debug=True, port=5000)

4. Flaskテンプレートファイル (dashboard.html)

<!DOCTYPE html>
<html>
<head>
    <title>リアルタイム監視システム</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        h1 { color: #4CAF50; }
        table { width: 100%; border-collapse: collapse; }
        th, td { border: 1px solid #ddd; padding: 8px; text-align: center; }
        th { background-color: #4CAF50; color: white; }
    </style>
</head>
<body>
    <h1>🚨 リアルタイム監視システム 🚨</h1>
    <h2>価格データ</h2>
    <table>
        <tr>
            <th>タイムスタンプ</th>
            <th>シンボル</th>
            <th>価格 (USD)</th>
        </tr>
        {% for row in data %}
        <tr>
            <td>{{ row['timestamp'] }}</td>
            <td>{{ row['symbol'] }}</td>
            <td>{{ row['price'] }}</td>
        </tr>
        {% endfor %}
    </table>
</body>
</html>

5. 実行方法

1.必要なライブラリのインストール

pip install flask pandas matplotlib websockets requests

2.Slack Webhook URLSLACK_WEBHOOK_URLに設定

3.スクリプトの実行

python monitoring_system.py
  1. ダッシュボード表示
  • ブラウザで以下にアクセス
    👉 http://localhost:5000

6. 機能のポイント

リアルタイム価格監視:Bybit WebSocket APIで即時データを取得
Slack通知:異常が検出されるとアラートを送信
ダッシュボード:Botの状況、価格、取引履歴を視覚的に確認
エラーハンドリング:接続エラーや通信途絶に対応した再接続機能


7. 次のステップ

次の段階では、これまでに構築したシステムの統合、最終テスト、デプロイ手順の整備を行います。
デプロイには Docker + Kubernetes を活用し、Botのスケーラブルな運用環境を構築します。

Yodaka

次回の記事では、「システム統合とデプロイ (Docker + Kubernetes)」についてまとめます。

関連
開発記録#147(2025/3/25)「論文ベースのbot開発フローpart.9」

続きを見る

-Bot