前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
本記事では「暗号通貨のパンプ&ダンプスキームの検出」に関する論文をベースにbot開発の過程をまとめていきます。
 
		今回のテーマは「リアルタイムデータ収集スクリプト」です。
Detecting Crypto Pump-and-Dump Schemes: A Thresholding-Based Approach to Handling Market Noisehttps://t.co/ctCJEV1MBs
— よだか(夜鷹/yodaka) (@yodakablog) March 22, 2025
🚀 Bybitのリアルタイムデータ収集スクリプト
以下は、Bybit APIを使用してリアルタイムのOHLCVデータおよびOrder Bookデータを収集するためのPythonスクリプトです。asyncioとwebsocketを活用し、効率的かつ高速にデータを収集します。
ファイル構成
/crypto_bot ├── data_collector.py # データ収集スクリプト ├── requirements.txt # 必要なライブラリ └── config.json # 設定ファイル
1.requirements.txt
websocket-client asyncio ccxt pandas
2.config.json
{
    "api_url": "wss://stream.bybit.com/realtime_public",
    "symbols": ["BTCUSDT", "ETHUSDT"],
    "timeframe": "1m",
    "data_folder": "./data",
    "log_file": "./logs/data_collector.log"
}
3.data_collector.py
import asyncio
import json
import websockets
import os
import pandas as pd
from datetime import datetime
# 設定ファイルの読み込み
with open("config.json", "r") as file:
    config = json.load(file)
API_URL = config["api_url"]
SYMBOLS = config["symbols"]
TIMEFRAME = config["timeframe"]
DATA_FOLDER = config["data_folder"]
LOG_FILE = config["log_file"]
# データ格納ディレクトリの作成
os.makedirs(DATA_FOLDER, exist_ok=True)
# ログ出力関数
def log_message(message):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] {message}"
    print(log_entry)
    with open(LOG_FILE, "a") as log_file:
        log_file.write(log_entry + "\n")
# データフレームの初期化
data_frames = {symbol: pd.DataFrame() for symbol in SYMBOLS}
# Bybit WebSocketサーバーに接続
async def connect_to_bybit():
    async with websockets.connect(API_URL) as websocket:
        # 各シンボルのデータ購読
        for symbol in SYMBOLS:
            subscription_msg = {
                "op": "subscribe",
                "args": [
                    f"kline.{TIMEFRAME}.{symbol}"
                ]
            }
            await websocket.send(json.dumps(subscription_msg))
            log_message(f"Subscribed to {symbol}")
        # データ受信ループ
        while True:
            try:
                response = await websocket.recv()
                data = json.loads(response)
                if 'topic' in data and 'data' in data:
                    symbol = data['topic'].split('.')[-1]
                    kline_data = data['data'][0]
                    # データフレームにデータを追加
                    new_data = pd.DataFrame({
                        "timestamp": [datetime.fromtimestamp(kline_data['start'] / 1000)],
                        "open": [float(kline_data['open'])],
                        "high": [float(kline_data['high'])],
                        "low": [float(kline_data['low'])],
                        "close": [float(kline_data['close'])],
                        "volume": [float(kline_data['volume'])]
                    })
                    # データの保存
                    data_frames[symbol] = pd.concat([data_frames[symbol], new_data], ignore_index=True)
                    data_frames[symbol].to_csv(f"{DATA_FOLDER}/{symbol}_data.csv", index=False)
                    log_message(f"Data saved for {symbol}")
            except Exception as e:
                log_message(f"Error: {e}")
                await asyncio.sleep(5)  # 5秒後に再接続
# メイン関数
async def main():
    while True:
        try:
            await connect_to_bybit()
        except Exception as e:
            log_message(f"Connection failed: {e}. Retrying in 10 seconds...")
            await asyncio.sleep(10)
if __name__ == "__main__":
    asyncio.run(main())
4. 実行方法
1.必要なライブラリをインストール
pip install -r requirements.txt
2.スクリプトの実行
python data_collector.py
5. スクリプトのポイント
✅ 非同期処理 (asyncio) により、複数のシンボルのリアルタイムデータを効率的に取得
✅ データは .csv ファイルとして保存し、定期的なデータ更新に対応
✅ エラーハンドリング により、接続エラー発生時の自動再接続が可能
✅ ログファイル でトラブルシューティングの効率化
data_collector.pyの解説
このコードは、BybitのWebSocket API を使って指定された仮想通貨シンボル(例: BTCUSDTなど)のローソク足(Kline)データをリアルタイムで取得し、CSVファイルとして保存するためのPythonスクリプトです。以下、コードのセクションごとに詳しく解説します。
🔧 1. 設定の読み込みと準備
with open("config.json", "r") as file:
    config = json.load(file)
API_URL = config["api_url"]
SYMBOLS = config["symbols"]
TIMEFRAME = config["timeframe"]
DATA_FOLDER = config["data_folder"]
LOG_FILE = config["log_file"]
- config.jsonから設定(APIのURL、取引ペア、時間足、保存フォルダなど)を読み込みます。
- 例えば config.jsonの中身はこんな感じ:
{
  "api_url": "wss://stream.bybit.com/v5/public/linear",
  "symbols": ["BTCUSDT", "ETHUSDT"],
  "timeframe": "1",
  "data_folder": "data",
  "log_file": "websocket.log"
}
📁 2. フォルダの準備・ログ関数
os.makedirs(DATA_FOLDER, exist_ok=True)
- 保存用のフォルダがなければ作成します。
def log_message(message):
    ...
- ログをコンソールとファイル両方に出力する関数です。
📊 3. データ格納用の辞書を用意
data_frames = {symbol: pd.DataFrame() for symbol in SYMBOLS}
- 各シンボルごとに空のDataFrameを用意します(後でCSVに保存するため)。
🌐 4. WebSocket接続と購読処理
async def connect_to_bybit():
- WebSocketでBybitに接続し、シンボルごとにklineデータを購読します:
for symbol in SYMBOLS:
    subscription_msg = {
        "op": "subscribe",
        "args": [f"kline.{TIMEFRAME}.{symbol}"]
    }
    await websocket.send(json.dumps(subscription_msg))
- 購読成功したらログ出力。
🔄 5. データ受信ループ
while True:
    ...
- websocket.recv()でサーバーからのメッセージを受け取り、以下のような処理を行います:- データに topicとdataが含まれているか確認
- シンボル名を取り出し、klineの情報を整形してDataFrameに変換
- DataFrameを更新し、CSVファイルに保存
- 保存したことをログ出力
 
- データに 
⚠️ 6. エラーハンドリング
except Exception as e:
    log_message(f"Error: {e}")
    await asyncio.sleep(5)
- 何かしらのエラーが出た場合、5秒間待ってから再試行します。
🚀 7. メイン関数と再接続ロジック
async def main():
    while True:
        try:
            await connect_to_bybit()
        except Exception as e:
            log_message(f"Connection failed: {e}. Retrying in 10 seconds...")
            await asyncio.sleep(10)
- connect_to_bybit()が例外で落ちても、10秒後に再接続するようにします。
✅ 8. 実行部分
if __name__ == "__main__":
    asyncio.run(main())
💡 補足ポイント
- 非同期処理(async/await)を使って、リアルタイムでのデータ受信と処理を実現。
- 柔軟な設定ファイルにより、複数シンボル・時間足に対応。
- CSVファイルへの保存で、後からのバックテストにも活用可能。
次のステップ
- 異常検出ロジック(EWMAやボラティリティ)を組み込む
- P&Dイベントの検出モデルの設計
- 取引執行ロジックの実装
 
		次回の記事では、「異常検出ロジックの組み込み」についてまとめます。
補足:klineデータとは?
「klineデータ(ケーライン)」**とは、いわゆる ローソク足(Candlestick)データ のことです。トレーディングでよく見る、あの「棒にヒゲが生えたグラフ」の元データです。
📊 補足:kline(ローソク足)とは?
1本のローソク足は、ある一定時間(例:1分、5分、1時間など)の価格変動を1本で表します。
その1本には次のような情報が含まれます:
| 項目 | 説明 | 
|---|---|
| open | 始値(その時間の最初の価格) | 
| high | 高値(その時間中での一番高い価格) | 
| low | 安値(その時間中での一番低い価格) | 
| close | 終値(その時間の最後の価格) | 
| volume | 出来高(その時間に取引された量) | 
| start | このローソク足の開始時刻(UNIXタイム) | 
🔁 時間足(Timeframe)
ローソク足は時間単位で区切られており、Bybitでは例えば以下のような時間足を指定できます:
- "1"→ 1分足
- "3"→ 3分足
- "5"→ 5分足
- "15"→ 15分足
- "60"→ 1時間足
- "D"→ 日足
- "W"→ 週足
🧠 実際のデータの例(Bybit WebSocket経由)
WebSocketで受け取るklineデータは、こんなJSON形式になっています。
{
  "topic": "kline.1.BTCUSDT",
  "data": [{
    "start": 1640995200000,
    "open": "47200",
    "high": "47350",
    "low": "47100",
    "close": "47280",
    "volume": "125.34"
  }]
}
このデータは:
- 1分間のBTCUSDTのローソク足で、
- 開始時刻が2022年1月1日 00:00:00(UNIXミリ秒)
- 始値 47200、終値 47280、高値 47350、安値 47100
- 出来高(取引量)が125.34
という内容です。
✅ まとめ
- klineデータ = ローソク足データ
- 各ローソク足には「始値、高値、安値、終値、出来高、開始時刻」が含まれている
- トレード戦略やチャート分析に不可欠な基本情報
おまけ「なぜklileと呼ぶのか?」
「なぜ kline と呼ぶのか?」は、多くの人が気になってるけど意外と知られていない話です。
🈶 結論:K-line は「K線(K線チャート)」の略
- 「K-line(ケーライン)」は、英語ではなく 中国発祥の呼び名 です。
- 中国ではローソク足チャートのことを K線(K線图:ケイ・シエン・トゥー) と呼びます。
🔤 じゃあ「K」は何の略なのか?
実は「K」に明確な意味はない、という説が有力です。だけど、いくつかの説があります:
有力説①:日本の「株価」の K?
- 日本の「株価」や「価格」を表す言葉の頭文字「K」から来たのでは?という説。
- ローソク足自体は日本で生まれ、中国に伝わった → その過程で「K線」と呼ばれるようになった。
有力説②:「Karte(独語で図・チャート)」の K?
- ドイツ語の「Karte」(カード、図、チャート)から来た可能性もあるというマニアックな説。
🌍 どうして英語でも「kline」って使うの?
- 中国発の取引所(Bybit、Binance など)では、この「K-line」という表現がそのままAPIでも使われています。
- だから、英語圏の開発者も "candlestick" じゃなく「kline」という語をそのまま使うようになった、という流れ。
✅ まとめ
| 用語 | 意味 | 備考 | 
|---|---|---|
| K-line | ローソク足データ(K線) | 中国語の表現が元になっている | 
| Candlestick | 英語圏での正式な呼び名 | 意味はまったく同じ | 
- 
																																										  
- 
																	開発記録#141(2025/3/24)「論文ベースのbot開発フローpart.3 異常検出ロジックの組み込み」続きを見る 
