Bot

開発記録#161(2025/3/31)「論文ベースのbot開発フローpart.23 運用ルールの確立」

2025年3月31日

前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。

本記事では「暗号通貨のパンプ&ダンプスキームの検出」に関する論文をベースにbot開発の過程をまとめていきます。

Yodaka

次のステップとして、以下のタスクに取り組みます。

1. 運用ルールの確立

  • Botの稼働スケジュール設計
  • 障害発生時の対応フロー策定
  • 異常検出やリスク回避の優先順位決定

2. 最終パフォーマンスチェック

  • シミュレーション結果と実運用結果の比較
  • 取引成功率、利益率、エラー率の再評価
  • 調整が必要なパラメータの特定

3. デプロイ後の安定性確認

  • Podの状態、リソース使用率の監視
  • APIエラーやデータ欠損の確認
  • Slack通知やログ記録の精度確認
Yodaka

今回は「1. 運用ルールの確立」についてまとめます。

1. 運用ルールの確立

運用ルールは、Botがいつ、どのように稼働し、異常時にどう対応するかを明確に定義するための重要な基盤です。以下に整理していきます。


🔸 1-1. Botの稼働スケジュール設計

時間帯モード説明
平日 9:00〜21:00フル稼働モードデータ収集、P&D検出、取引執行をすべて有効化
深夜 21:00〜3:00モニタリングモードデータ収集とP&D検出のみ実行、取引は停止
土日低頻度モード(手動切替可)相場が薄い時間帯のため監視・ログ記録のみ

📌 Bot起動時に設定するモードによって、モジュールの有効/無効を切り替えるようにします。

Yodaka

1-1. Botの稼働スケジュール設計に基づいて、Botの起動時に時間帯によって自動的にモードを切り替えるコードを実装します。


✅ 目的

  • Botの起動時、現在の時間帯に応じて「フル稼働 / モニタリング / 低頻度モード」のいずれかを自動的に選択
  • 選択されたモードに応じて、どの処理を有効化するか切り替える

✅ ファイル構成の想定(例)

bot/
├── scheduler.py          ← モード切替と管理
├── collector.py          ← データ収集
├── detector.py           ← P&D検出
├── executor.py           ← 取引執行
└── main.py               ← 起動スクリプト(←ここに集約)

✅ scheduler.py(時間帯に応じたモード判定)

from datetime import datetime

def get_bot_mode():
    now = datetime.now()
    hour = now.hour
    weekday = now.weekday()  # 0=月, ..., 6=日

    if weekday >= 5:
        return "low_freq"  # 土日
    elif 9 <= hour < 21:
        return "full"
    elif 21 <= hour or hour < 3:
        return "monitor"
    else:
        return "low_freq"

✅ main.py(モードに応じたモジュール起動)

from scheduler import get_bot_mode

def run_collector():
    print("✅ データ収集開始")
    # from collector import run_collector
    # run_collector()
    pass  # ここに実処理

def run_detector():
    print("✅ P&D検出モジュール起動")
    # from detector import run_detector
    # run_detector()
    pass

def run_executor():
    print("✅ 取引実行モジュール起動")
    # from executor import run_executor
    # run_executor()
    pass

if __name__ == "__main__":
    mode = get_bot_mode()
    print(f"🕒 現在のBotモード: {mode}")

    if mode == "full":
        run_collector()
        run_detector()
        run_executor()
    elif mode == "monitor":
        run_collector()
        run_detector()
        print("🔕 モニタリングモード中(取引実行はスキップ)")
    elif mode == "low_freq":
        run_collector()
        print("💤 低頻度モード中(検出・取引は停止)")

✅ 実行例(平日 10:00)

$ python main.py
🕒 現在のBotモード: full
✅ データ収集開始
✅ P&D検出モジュール起動
✅ 取引実行モジュール起動

✅ 実行例(平日 22:00)

🕒 現在のBotモード: monitor
✅ データ収集開始
✅ P&D検出モジュール起動
🔕 モニタリングモード中(取引実行はスキップ)


✅ 実行例(土曜 15:00)

🕒 現在のBotモード: low_freq
✅ データ収集開始
💤 低頻度モード中(検出・取引は停止)

🔸 1-2. 異常時の対応フロー

異常検出方法対応フロー
APIエラーException ログ、Slack通知3回リトライ → Slack通知 → 再起動フラグ送信
データ欠損異常データのnullチェック欠損をスキップ or 再取得 → ログに記録
BotクラッシュPod終了監視 (K8s)Kubernetes の restartPolicy: Always で自動復旧
P&D検出過剰イベント検出数が閾値超え取引一時停止モードに自動移行 + Slack通知
高負荷 (CPU>80%)Prometheus監視値AutoScalerでPod追加 + Slack通知

1-2. 異常時の対応フロー:リトライ+Slack通知機構

📄 utils/retry_with_alert.py

import time
import requests

SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/xxx/yyy/zzz"

def send_slack_alert(message: str):
    payload = {"text": f"🚨 {message}"}
    try:
        requests.post(SLACK_WEBHOOK_URL, json=payload)
    except Exception as e:
        print(f"Slack通知失敗: {e}")

def retry_with_alert(task_func, retries=3, delay=5, task_name=""):
    for attempt in range(1, retries + 1):
        try:
            return task_func()
        except Exception as e:
            print(f"⚠️ {task_name} 失敗 ({attempt}/{retries}) : {e}")
            if attempt == retries:
                alert_msg = f"{task_name} が {retries} 回連続で失敗しました。即時対応が必要です。"
                send_slack_alert(alert_msg)
            time.sleep(delay)

使用例

from utils.retry_with_alert import retry_with_alert

def risky_api_call():
    # API接続処理 (例:Bybit)
    raise ConnectionError("APIタイムアウト")

# 使用方法
retry_with_alert(risky_api_call, retries=5, delay=3, task_name="Bybit API接続")

🔸 1-3. ログと通知の運用ルール

  • すべての主要アクションは logs/*.log に記録される
  • 異常発生時には Slackに通知
  • 毎日 3:00 に前日分のログを自動バックアップ (S3など)

1-3. ログと通知の運用ルール

📄 utils/logger.py

import logging
from datetime import datetime

def get_logger(name: str, log_file: str = None):
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    if not logger.handlers:
        formatter = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S")

        if log_file is None:
            log_file = f"./logs/{name}_{datetime.now().strftime('%Y%m%d')}.log"
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)

        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)

        logger.addHandler(file_handler)
        logger.addHandler(console_handler)

    return logger

使用例

from utils.logger import get_logger

logger = get_logger("trade_executor")

logger.info("取引開始:BTCUSDT ロング")
logger.warning("残高が閾値を下回っています")
logger.error("APIキー認証に失敗しました")

🔸 1-4. フォールトトレラント設計

システム構成備考
data_collector再起動しても最新状態を自動再取得するようにする
pnd_detectorイベント履歴をファイルに記録し、復元可能にする
trade_executor同一イベントへの再実行防止のために、UUID付き履歴を記録

1-4. フォールトトレラント設計:イベント再実行防止と状態復元

📄 state/history_tracker.py

import json
import os

HISTORY_FILE = "./state/event_history.json"

def load_history():
    if os.path.exists(HISTORY_FILE):
        with open(HISTORY_FILE, "r") as f:
            return json.load(f)
    return {}

def save_history(history):
    with open(HISTORY_FILE, "w") as f:
        json.dump(history, f, indent=2)

def has_event_been_processed(event_id):
    history = load_history()
    return history.get(event_id, False)

def mark_event_as_processed(event_id):
    history = load_history()
    history[event_id] = True
    save_history(history)

使用例(取引実行時)

from state.history_tracker import has_event_been_processed, mark_event_as_processed

event_id = "2025-03-31_14:30_BTCUSDT"

if has_event_been_processed(event_id):
    print(f"✅ このイベント {event_id} はすでに処理済です。スキップします。")
else:
    print(f"🚀 新規イベント {event_id} を処理中...")
    # 取引ロジック実行...
    mark_event_as_processed(event_id)

🧩 構成案まとめ (図式イメージ)

┌──────────────┐         ┌────────────┐
│  data_collector ├────▶ │  P&D_detector │
└──────┬───────┘         └────┬───────┘
       ▼                        ▼
   [market_data.csv]      [pnd_events.csv]
       ▼                        ▼
    trade_executor <───────────┘
       │
       ▼
 [order_log.csv]  → Slack通知 + Grafana表示

✅ 今後の実装チェックリスト

  • settings.yaml または .env で時間帯に応じたモード切り替え設定を作成
  • 監視モジュール による Pod 状態とログ監視処理の組み込み
  • Slack 通知テンプレートの整備(エラー/成功/重大アラート)

📦 今後の拡張ポイント

  • history_tracker.py を Redis や SQLite で永続化することで冗長性向上
  • Slack通知のテンプレート化
  • logger.py をFluentdやCloud Loggingなどに接続
Yodaka

次回は2. 最終パフォーマンスチェック(シミュレーションと実運用の比較検証) についてまとめます。

関連
開発記録#162(2025/4/1)「論文ベースのbot開発フローpart.24 最終パフォーマンスチェック― バックテストと実運用を⼀括検証 ―」

続きを見る

👇ラジオで話したこと

🎙 「Bot運用ルールの確立とは?」

こんにちは、よだかです。
本日もラジオをお聴きいただきありがとうございます。

今回は、仮想通貨の自動売買Botの開発記録の中から、**「運用ルールの確立」**というテーマでお届けしていきます。


🔍 そもそも運用ルールって何のためにあるの?

自動売買Botというのは、名前のとおり、あるルールに従って人の代わりに取引を自動で実行するプログラムのことです。

じゃあ、「そのBot自身のルールはどうやって決まっているの?」
ここがポイントです。

  • いつ動くか?
  • どんな条件で止まるか?
  • 異常が起きたらどうするか?

これをあらかじめ決めておかないと、たとえば、 「深夜にエラー出てるのに誰も気づかない」
「週末の薄い相場で暴走していた」
みたいなことが起こるわけです。


✅ 本日のテーマ:Bot運用ルールをどう整えるか?

今日は4つのポイントに分けてお話ししていきます。

  1. 稼働スケジュールの設計
  2. 異常時の対応フロー
  3. ログと通知のルール整備
  4. フォールトトレラント設計(障害に強くする)

⏰ 1. 稼働スケジュールの設計

Botには「働かせすぎてもダメ、サボらせすぎてもダメ」な理想の時間帯があります。

たとえば、以下のようなモードで運用します:

  • 平日 9時〜21時:フル稼働(取引も検出も全部やる)
  • 深夜 21時〜3時:モニタリングのみ(取引はやらない)
  • 土日は超低頻度モード(ログだけ記録)

これに応じてBotの動作を切り替えるコードが自動で実装されていて、起動時に現在時刻を見て、自動で「今はこのモード」と判断してくれます。


🧯 2. 異常時の対応フロー

どんなに優れたBotでもエラーや異常は避けられません。だからこそ「どうリカバリするか」が大事。

たとえば:

異常タイプ対応
APIが落ちた3回リトライ → Slack通知 → 再起動フラグ
データ欠損再取得してスキップ、エラーログに記録
過剰検出取引を一時停止し、通知して安全第一に

Slack通知機構も備えているので、運営側にリアルタイムでエラーが飛ぶようになっています。


📝 3. ログと通知のルール整備

「記録がなければ、検証も反省もできない」

ということで、ログはとても重要です。すべての主要アクションは、

  • 日付ごとのログファイルに記録され、
  • 重大なエラーはSlack通知、
  • 毎日深夜3時に自動バックアップ(例:S3保存)

これで証拠と証明が残る環境が整います。


💪 4. フォールトトレラント設計(壊れても再起動)

「倒れても立ち上がるBot」が理想です。

Botの各モジュールは、次のような仕組みを持っています:

  • data_collector:落ちても最新の状態から再取得
  • detector:イベント履歴をファイルに記録
  • executor:同じ取引を繰り返さないよう、イベントにIDを付与して実行履歴を保存

イベントIDは「2025-03-31_14:30_BTCUSDT」みたいな感じで、過去に処理したかどうかを自動で判断してくれます。

これがあるおかげで「同じことを2回繰り返すミス」が防げます。


🧩 Botの構造はこんな感じ

collector.py → detector.py → executor.py
   ↓               ↓                  ↓
[データ]      [検出結果]    [取引ログ + 通知]

この3層構造で、

  • 市場データを集め、
  • P&Dイベントを検出し、
  • 実際に売買し、ログと通知を出す。

という流れになっています。


🧪 まとめ:このルールが「勝ち方」を支える

「Botが勝てるか?」はロジックの良し悪しだけでなく、

  • ルールが整っているか?
  • 壊れても自動で復旧するか?
  • 失敗を見える形で記録できるか?

この仕組みづくりが根幹になります。



🎁 おまけコーナー:リアルな運用の話、ぶっちゃけどうなの?

さて、ここからはちょっとだけリアルな「稼いでるbotter目線」のお話をしましょう。

さっきまでは“理想の設計”の話をしてきましたよね?
でも現場では、そんなに綺麗にはいかないことも多いんです。


🧨 ツッコミその1:「スケジュール切り替え」って、そこまで単純じゃない

スケジュールによってモードを変えるって、確かに基本設計としては良いです。
でも実際には──

  • 深夜の海外市場でチャンスが来ることもある
  • Botが停止中に大きなイベントが起きると、チャンスを逃す

つまり、**完全なスケジューリングには“機会損失リスク”**があるんです。

そのため、私自身は「ベーススケジュール+マーケットイベントトリガーの併用」が基本です。
特定条件で強制的にフル稼働モードに移行するスイッチを入れています。


🧯 ツッコミその2:「異常対応」は自動化しても、人間の目は必要

APIエラーのリトライ、Slack通知、Podの自動再起動──

これ、理論上は安心ですが、現場では「通知に気づかない」ことが一番の事故原因だったりします。

Slackが鳴っても「通知の嵐で埋もれる」とか、「休日でスマホ見てなかった」とか。

だから私は「重大系通知だけはLINE+電話通知に切り替え」てます。
Slackはいいけど、それだけに頼るのはリスクです。


🧠 ツッコミその3:「ログと復元」も万能じゃない

たとえばBotがクラッシュして再起動したとき、「前回どこまでやったか?」っていうのをhistory_trackerで判定するじゃないですか。

でも、これBot自体のミスじゃなく、データ提供元のズレや欠損だった場合
“処理済みだけど異常だったデータ”をスキップしてしまうこともあります。

要するに、履歴判定も100%じゃない。信じすぎると危険。

私は、イベントごとに**「確認モード」っていう人間による二重チェックフェーズ**を用意して、そこだけレビュー入れる設計にしています。


⚠️ ツッコミその4:「過剰設計」しすぎると動かない

これが一番伝えたい。

理想を積みすぎると「重すぎて動かない」「ロジックが複雑化しすぎて修正が困難」ってことが起きる。

実際、**「まずは取引実行まで動かす」**ことの方がずっと重要なんです。
Slack通知なんて後からでもいい。
最初の1円を稼ぐ方が、どんな仕様書より強い。


🎤 まとめ:理想設計は“方向”、現場は“変化球”

Bot開発って、完璧な理論を描いてからスタートするより、まず雑でも動かしてみることが圧倒的に大事です。

今日紹介した運用ルールの設計は、あくまで「地図」であって、「道そのもの」ではありません。

実際の現場は、

  • 思わぬイベントが起きる
  • APIが落ちる
  • 板がバグる
  • 取引が滑る

そんな“変化球”に対して、柔軟に対応できるかどうか──
そこに勝てるBotと負けるBotの違いが出てくると思っています。


ということで、今日は「Bot運用ルールの確立」と、「それを稼ぐ現場から見たリアル」という2つの視点でお届けしました。

🔜 次回予告:「パフォーマンス評価フェーズ」へ

次回は、Botが実際にどれだけ成果を出せているのか──
「シミュレーションと実運用の結果比較」をテーマに、もっと数字に踏み込んでいきます。
**「シミュレーション vs 実運用」**の結果を比較しながら、パフォーマンスの見極め方について掘り下げていきます。

「Botが本当に機能しているか?」
その答えを見つける回になりますので、どうぞお楽しみに!


それでは、またお会いしましょう。
仮想通貨botterのよだかでした。

-Bot