Bot

開発記録#140(2025/3/24)「論文ベースのbot開発フローpart.2」

2025年3月24日

前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。

本記事では「暗号通貨のパンプ&ダンプスキームの検出」に関する論文をベースにbot開発の過程をまとめていきます。

Yodaka

今回のテーマは「リアルタイムデータ収集スクリプト」です。

🚀 Bybitのリアルタイムデータ収集スクリプト

以下は、Bybit APIを使用してリアルタイムのOHLCVデータおよびOrder Bookデータを収集するためのPythonスクリプトです。asynciowebsocketを活用し、効率的かつ高速にデータを収集します。

ファイル構成

/crypto_bot
 ├── data_collector.py  # データ収集スクリプト
 ├── requirements.txt   # 必要なライブラリ
 └── config.json        # 設定ファイル

1.requirements.txt

websocket-client
asyncio
ccxt
pandas

2.config.json

{
    "api_url": "wss://stream.bybit.com/realtime_public",
    "symbols": ["BTCUSDT", "ETHUSDT"],
    "timeframe": "1m",
    "data_folder": "./data",
    "log_file": "./logs/data_collector.log"
}

3.data_collector.py

import asyncio
import json
import websockets
import os
import pandas as pd
from datetime import datetime

# 設定ファイルの読み込み
with open("config.json", "r") as file:
    config = json.load(file)

API_URL = config["api_url"]
SYMBOLS = config["symbols"]
TIMEFRAME = config["timeframe"]
DATA_FOLDER = config["data_folder"]
LOG_FILE = config["log_file"]

# データ格納ディレクトリの作成
os.makedirs(DATA_FOLDER, exist_ok=True)

# ログ出力関数
def log_message(message):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] {message}"
    print(log_entry)
    with open(LOG_FILE, "a") as log_file:
        log_file.write(log_entry + "\n")

# データフレームの初期化
data_frames = {symbol: pd.DataFrame() for symbol in SYMBOLS}

# Bybit WebSocketサーバーに接続
async def connect_to_bybit():
    async with websockets.connect(API_URL) as websocket:
        # 各シンボルのデータ購読
        for symbol in SYMBOLS:
            subscription_msg = {
                "op": "subscribe",
                "args": [
                    f"kline.{TIMEFRAME}.{symbol}"
                ]
            }
            await websocket.send(json.dumps(subscription_msg))
            log_message(f"Subscribed to {symbol}")

        # データ受信ループ
        while True:
            try:
                response = await websocket.recv()
                data = json.loads(response)

                if 'topic' in data and 'data' in data:
                    symbol = data['topic'].split('.')[-1]
                    kline_data = data['data'][0]

                    # データフレームにデータを追加
                    new_data = pd.DataFrame({
                        "timestamp": [datetime.fromtimestamp(kline_data['start'] / 1000)],
                        "open": [float(kline_data['open'])],
                        "high": [float(kline_data['high'])],
                        "low": [float(kline_data['low'])],
                        "close": [float(kline_data['close'])],
                        "volume": [float(kline_data['volume'])]
                    })

                    # データの保存
                    data_frames[symbol] = pd.concat([data_frames[symbol], new_data], ignore_index=True)
                    data_frames[symbol].to_csv(f"{DATA_FOLDER}/{symbol}_data.csv", index=False)

                    log_message(f"Data saved for {symbol}")

            except Exception as e:
                log_message(f"Error: {e}")
                await asyncio.sleep(5)  # 5秒後に再接続

# メイン関数
async def main():
    while True:
        try:
            await connect_to_bybit()
        except Exception as e:
            log_message(f"Connection failed: {e}. Retrying in 10 seconds...")
            await asyncio.sleep(10)

if __name__ == "__main__":
    asyncio.run(main())

4. 実行方法

1.必要なライブラリをインストール

pip install -r requirements.txt

2.スクリプトの実行

python data_collector.py

5. スクリプトのポイント

非同期処理 (asyncio) により、複数のシンボルのリアルタイムデータを効率的に取得
✅ データは .csv ファイルとして保存し、定期的なデータ更新に対応
エラーハンドリング により、接続エラー発生時の自動再接続が可能
ログファイル でトラブルシューティングの効率化


data_collector.pyの解説

このコードは、BybitのWebSocket API を使って指定された仮想通貨シンボル(例: BTCUSDTなど)のローソク足(Kline)データをリアルタイムで取得し、CSVファイルとして保存するためのPythonスクリプトです。以下、コードのセクションごとに詳しく解説します。


🔧 1. 設定の読み込みと準備

with open("config.json", "r") as file:
    config = json.load(file)
API_URL = config["api_url"]
SYMBOLS = config["symbols"]
TIMEFRAME = config["timeframe"]
DATA_FOLDER = config["data_folder"]
LOG_FILE = config["log_file"]
  • config.json から設定(APIのURL、取引ペア、時間足、保存フォルダなど)を読み込みます。
  • 例えば config.json の中身はこんな感じ:
{
  "api_url": "wss://stream.bybit.com/v5/public/linear",
  "symbols": ["BTCUSDT", "ETHUSDT"],
  "timeframe": "1",
  "data_folder": "data",
  "log_file": "websocket.log"
}

📁 2. フォルダの準備・ログ関数

os.makedirs(DATA_FOLDER, exist_ok=True)
  • 保存用のフォルダがなければ作成します。
def log_message(message):
    ...
  • ログをコンソールとファイル両方に出力する関数です。

📊 3. データ格納用の辞書を用意

data_frames = {symbol: pd.DataFrame() for symbol in SYMBOLS}
  • 各シンボルごとに空のDataFrameを用意します(後でCSVに保存するため)。

🌐 4. WebSocket接続と購読処理

async def connect_to_bybit():
  • WebSocketでBybitに接続し、シンボルごとにklineデータを購読します:
for symbol in SYMBOLS:
    subscription_msg = {
        "op": "subscribe",
        "args": [f"kline.{TIMEFRAME}.{symbol}"]
    }
    await websocket.send(json.dumps(subscription_msg))
  • 購読成功したらログ出力。


🔄 5. データ受信ループ

while True:
    ...
  • websocket.recv() でサーバーからのメッセージを受け取り、以下のような処理を行います:
    • データに topicdata が含まれているか確認
    • シンボル名を取り出し、kline の情報を整形してDataFrameに変換
    • DataFrameを更新し、CSVファイルに保存
    • 保存したことをログ出力

⚠️ 6. エラーハンドリング

except Exception as e:
    log_message(f"Error: {e}")
    await asyncio.sleep(5)
  • 何かしらのエラーが出た場合、5秒間待ってから再試行します。

🚀 7. メイン関数と再接続ロジック

async def main():
    while True:
        try:
            await connect_to_bybit()
        except Exception as e:
            log_message(f"Connection failed: {e}. Retrying in 10 seconds...")
            await asyncio.sleep(10)
  • connect_to_bybit() が例外で落ちても、10秒後に再接続するようにします。

✅ 8. 実行部分

if __name__ == "__main__":
    asyncio.run(main())

💡 補足ポイント

  • 非同期処理async/await)を使って、リアルタイムでのデータ受信と処理を実現。
  • 柔軟な設定ファイルにより、複数シンボル・時間足に対応。
  • CSVファイルへの保存で、後からのバックテストにも活用可能。

次のステップ

  • 異常検出ロジック(EWMAやボラティリティ)を組み込む
  • P&Dイベントの検出モデルの設計
  • 取引執行ロジックの実装
Yodaka

次回の記事では、「異常検出ロジックの組み込み」についてまとめます。

補足:klineデータとは?

「klineデータ(ケーライン)」**とは、いわゆる ローソク足(Candlestick)データ のことです。トレーディングでよく見る、あの「棒にヒゲが生えたグラフ」の元データです。


📊 補足:kline(ローソク足)とは?

1本のローソク足は、ある一定時間(例:1分、5分、1時間など)の価格変動を1本で表します。
その1本には次のような情報が含まれます:

項目説明
open始値(その時間の最初の価格)
high高値(その時間中での一番高い価格)
low安値(その時間中での一番低い価格)
close終値(その時間の最後の価格)
volume出来高(その時間に取引された量)
startこのローソク足の開始時刻(UNIXタイム)

🔁 時間足(Timeframe)

ローソク足は時間単位で区切られており、Bybitでは例えば以下のような時間足を指定できます:

  • "1" → 1分足
  • "3" → 3分足
  • "5" → 5分足
  • "15" → 15分足
  • "60" → 1時間足
  • "D" → 日足
  • "W" → 週足

🧠 実際のデータの例(Bybit WebSocket経由)

WebSocketで受け取るklineデータは、こんなJSON形式になっています。

{
  "topic": "kline.1.BTCUSDT",
  "data": [{
    "start": 1640995200000,
    "open": "47200",
    "high": "47350",
    "low": "47100",
    "close": "47280",
    "volume": "125.34"
  }]
}

このデータは:

  • 1分間のBTCUSDTのローソク足で、
  • 開始時刻が2022年1月1日 00:00:00(UNIXミリ秒)
  • 始値 47200、終値 47280、高値 47350、安値 47100
  • 出来高(取引量)が125.34

という内容です。


✅ まとめ

  • klineデータ = ローソク足データ
  • 各ローソク足には「始値、高値、安値、終値、出来高、開始時刻」が含まれている
  • トレード戦略やチャート分析に不可欠な基本情報

おまけ「なぜklileと呼ぶのか?」

なぜ kline と呼ぶのか?」は、多くの人が気になってるけど意外と知られていない話です。


🈶 結論:K-line は「K線(K線チャート)」の略

  • K-line(ケーライン)」は、英語ではなく 中国発祥の呼び名 です。
  • 中国ではローソク足チャートのことを K線(K線图:ケイ・シエン・トゥー) と呼びます。

🔤 じゃあ「K」は何の略なのか?

実は「K」に明確な意味はない、という説が有力です。だけど、いくつかの説があります:

有力説①:日本の「株価」の K?

  • 日本の「株価」や「価格」を表す言葉の頭文字「K」から来たのでは?という説。
  • ローソク足自体は日本で生まれ、中国に伝わった → その過程で「K線」と呼ばれるようになった。

有力説②:「Karte(独語で図・チャート)」の K?

  • ドイツ語の「Karte」(カード、図、チャート)から来た可能性もあるというマニアックな説。

🌍 どうして英語でも「kline」って使うの?

  • 中国発の取引所(Bybit、Binance など)では、この「K-line」という表現がそのままAPIでも使われています。
  • だから、英語圏の開発者も "candlestick" じゃなく「kline」という語をそのまま使うようになった、という流れ。

✅ まとめ

用語意味備考
K-lineローソク足データ(K線)中国語の表現が元になっている
Candlestick英語圏での正式な呼び名意味はまったく同じ

関連
開発記録#141(2025/3/24)「論文ベースのbot開発フローpart.3」

続きを見る

-Bot