Bot

開発記録#161(2025/3/31)「論文ベースのbot開発フローpart.23」

2025年3月31日

前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。

本記事では「暗号通貨のパンプ&ダンプスキームの検出」に関する論文をベースにbot開発の過程をまとめていきます。

Yodaka

次のステップとして、以下のタスクに取り組みます。

1. 運用ルールの確立

  • Botの稼働スケジュール設計
  • 障害発生時の対応フロー策定
  • 異常検出やリスク回避の優先順位決定

2. 最終パフォーマンスチェック

  • シミュレーション結果と実運用結果の比較
  • 取引成功率、利益率、エラー率の再評価
  • 調整が必要なパラメータの特定

3. デプロイ後の安定性確認

  • Podの状態、リソース使用率の監視
  • APIエラーやデータ欠損の確認
  • Slack通知やログ記録の精度確認
Yodaka

今回は「1. 運用ルールの確立」についてまとめます。

1. 運用ルールの確立

運用ルールは、Botがいつ、どのように稼働し、異常時にどう対応するかを明確に定義するための重要な基盤です。以下に整理していきます。


🔸 1-1. Botの稼働スケジュール設計

時間帯モード説明
平日 9:00〜21:00フル稼働モードデータ収集、P&D検出、取引執行をすべて有効化
深夜 21:00〜3:00モニタリングモードデータ収集とP&D検出のみ実行、取引は停止
土日低頻度モード(手動切替可)相場が薄い時間帯のため監視・ログ記録のみ

📌 Bot起動時に設定するモードによって、モジュールの有効/無効を切り替えるようにします。

Yodaka

1-1. Botの稼働スケジュール設計に基づいて、Botの起動時に時間帯によって自動的にモードを切り替えるコードを実装します。


✅ 目的

  • Botの起動時、現在の時間帯に応じて「フル稼働 / モニタリング / 低頻度モード」のいずれかを自動的に選択
  • 選択されたモードに応じて、どの処理を有効化するか切り替える

✅ ファイル構成の想定(例)

bot/
├── scheduler.py          ← モード切替と管理
├── collector.py          ← データ収集
├── detector.py           ← P&D検出
├── executor.py           ← 取引執行
└── main.py               ← 起動スクリプト(←ここに集約)

✅ scheduler.py(時間帯に応じたモード判定)

from datetime import datetime

def get_bot_mode():
    now = datetime.now()
    hour = now.hour
    weekday = now.weekday()  # 0=月, ..., 6=日

    if weekday >= 5:
        return "low_freq"  # 土日
    elif 9 <= hour < 21:
        return "full"
    elif 21 <= hour or hour < 3:
        return "monitor"
    else:
        return "low_freq"

✅ main.py(モードに応じたモジュール起動)

from scheduler import get_bot_mode

def run_collector():
    print("✅ データ収集開始")
    # from collector import run_collector
    # run_collector()
    pass  # ここに実処理

def run_detector():
    print("✅ P&D検出モジュール起動")
    # from detector import run_detector
    # run_detector()
    pass

def run_executor():
    print("✅ 取引実行モジュール起動")
    # from executor import run_executor
    # run_executor()
    pass

if __name__ == "__main__":
    mode = get_bot_mode()
    print(f"🕒 現在のBotモード: {mode}")

    if mode == "full":
        run_collector()
        run_detector()
        run_executor()
    elif mode == "monitor":
        run_collector()
        run_detector()
        print("🔕 モニタリングモード中(取引実行はスキップ)")
    elif mode == "low_freq":
        run_collector()
        print("💤 低頻度モード中(検出・取引は停止)")

✅ 実行例(平日 10:00)

$ python main.py
🕒 現在のBotモード: full
✅ データ収集開始
✅ P&D検出モジュール起動
✅ 取引実行モジュール起動

✅ 実行例(平日 22:00)

🕒 現在のBotモード: monitor
✅ データ収集開始
✅ P&D検出モジュール起動
🔕 モニタリングモード中(取引実行はスキップ)


✅ 実行例(土曜 15:00)

🕒 現在のBotモード: low_freq
✅ データ収集開始
💤 低頻度モード中(検出・取引は停止)

🔸 1-2. 異常時の対応フロー

異常検出方法対応フロー
APIエラーException ログ、Slack通知3回リトライ → Slack通知 → 再起動フラグ送信
データ欠損異常データのnullチェック欠損をスキップ or 再取得 → ログに記録
BotクラッシュPod終了監視 (K8s)Kubernetes の restartPolicy: Always で自動復旧
P&D検出過剰イベント検出数が閾値超え取引一時停止モードに自動移行 + Slack通知
高負荷 (CPU>80%)Prometheus監視値AutoScalerでPod追加 + Slack通知

1-2. 異常時の対応フロー:リトライ+Slack通知機構

📄 utils/retry_with_alert.py

import time
import requests

SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/xxx/yyy/zzz"

def send_slack_alert(message: str):
    payload = {"text": f"🚨 {message}"}
    try:
        requests.post(SLACK_WEBHOOK_URL, json=payload)
    except Exception as e:
        print(f"Slack通知失敗: {e}")

def retry_with_alert(task_func, retries=3, delay=5, task_name=""):
    for attempt in range(1, retries + 1):
        try:
            return task_func()
        except Exception as e:
            print(f"⚠️ {task_name} 失敗 ({attempt}/{retries}) : {e}")
            if attempt == retries:
                alert_msg = f"{task_name} が {retries} 回連続で失敗しました。即時対応が必要です。"
                send_slack_alert(alert_msg)
            time.sleep(delay)

使用例

from utils.retry_with_alert import retry_with_alert

def risky_api_call():
    # API接続処理 (例:Bybit)
    raise ConnectionError("APIタイムアウト")

# 使用方法
retry_with_alert(risky_api_call, retries=5, delay=3, task_name="Bybit API接続")

🔸 1-3. ログと通知の運用ルール

  • すべての主要アクションは logs/*.log に記録される
  • 異常発生時には Slackに通知
  • 毎日 3:00 に前日分のログを自動バックアップ (S3など)

1-3. ログと通知の運用ルール

📄 utils/logger.py

import logging
from datetime import datetime

def get_logger(name: str, log_file: str = None):
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    if not logger.handlers:
        formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S")

        if log_file is None:
            log_file = f"./logs/{name}_{datetime.now().strftime('%Y%m%d')}.log"
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)

        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)

        logger.addHandler(file_handler)
        logger.addHandler(console_handler)

    return logger

使用例

from utils.logger import get_logger

logger = get_logger("trade_executor")

logger.info("取引開始:BTCUSDT ロング")
logger.warning("残高が閾値を下回っています")
logger.error("APIキー認証に失敗しました")

🔸 1-4. フォールトトレラント設計

システム構成備考
data_collector再起動しても最新状態を自動再取得するようにする
pnd_detectorイベント履歴をファイルに記録し、復元可能にする
trade_executor同一イベントへの再実行防止のために、UUID付き履歴を記録

1-4. フォールトトレラント設計:イベント再実行防止と状態復元

📄 state/history_tracker.py

import json
import os

HISTORY_FILE = "./state/event_history.json"

def load_history():
    if os.path.exists(HISTORY_FILE):
        with open(HISTORY_FILE, "r") as f:
            return json.load(f)
    return {}

def save_history(history):
    with open(HISTORY_FILE, "w") as f:
        json.dump(history, f, indent=2)

def has_event_been_processed(event_id):
    history = load_history()
    return history.get(event_id, False)

def mark_event_as_processed(event_id):
    history = load_history()
    history[event_id] = True
    save_history(history)

使用例(取引実行時)

from state.history_tracker import has_event_been_processed, mark_event_as_processed

event_id = "2025-03-31_14:30_BTCUSDT"

if has_event_been_processed(event_id):
    print(f"✅ このイベント {event_id} はすでに処理済です。スキップします。")
else:
    print(f"🚀 新規イベント {event_id} を処理中...")
    # 取引ロジック実行...
    mark_event_as_processed(event_id)

🧩 構成案まとめ (図式イメージ)

┌──────────────┐         ┌────────────┐
│  data_collector ├────▶ │  P&D_detector │
└──────┬───────┘         └────┬───────┘
       ▼                        ▼
   [market_data.csv]      [pnd_events.csv]
       ▼                        ▼
    trade_executor <───────────┘
       │
       ▼
 [order_log.csv]  → Slack通知 + Grafana表示

✅ 今後の実装チェックリスト

  • settings.yaml または .env で時間帯に応じたモード切り替え設定を作成
  • 監視モジュール による Pod 状態とログ監視処理の組み込み
  • Slack 通知テンプレートの整備(エラー/成功/重大アラート)

📦 今後の拡張ポイント

  • history_tracker.py を Redis や SQLite で永続化することで冗長性向上
  • Slack通知のテンプレート化
  • logger.py をFluentdやCloud Loggingなどに接続
Yodaka

次回は2. 最終パフォーマンスチェック(シミュレーションと実運用の比較検証) についてまとめます。

関連
開発記録#162(2025/4/1)「論文ベースのbot開発フローpart.24」

続きを見る

-Bot