前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
本記事では「暗号通貨のパンプ&ダンプスキームの検出」に関する論文をベースにbot開発の過程をまとめていきます。

今回のテーマは「リアルタイムデータ収集スクリプト」です。
Detecting Crypto Pump-and-Dump Schemes: A Thresholding-Based Approach to Handling Market Noisehttps://t.co/ctCJEV1MBs
— よだか(夜鷹/yodaka) (@yodakablog) March 22, 2025
🚀 Bybitのリアルタイムデータ収集スクリプト
以下は、Bybit APIを使用してリアルタイムのOHLCVデータおよびOrder Bookデータを収集するためのPythonスクリプトです。asyncio
とwebsocket
を活用し、効率的かつ高速にデータを収集します。
ファイル構成
/crypto_bot ├── data_collector.py # データ収集スクリプト ├── requirements.txt # 必要なライブラリ └── config.json # 設定ファイル
1.requirements.txt
websocket-client asyncio ccxt pandas
2.config.json
{ "api_url": "wss://stream.bybit.com/realtime_public", "symbols": ["BTCUSDT", "ETHUSDT"], "timeframe": "1m", "data_folder": "./data", "log_file": "./logs/data_collector.log" }
3.data_collector.py
import asyncio import json import websockets import os import pandas as pd from datetime import datetime # 設定ファイルの読み込み with open("config.json", "r") as file: config = json.load(file) API_URL = config["api_url"] SYMBOLS = config["symbols"] TIMEFRAME = config["timeframe"] DATA_FOLDER = config["data_folder"] LOG_FILE = config["log_file"] # データ格納ディレクトリの作成 os.makedirs(DATA_FOLDER, exist_ok=True) # ログ出力関数 def log_message(message): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}" print(log_entry) with open(LOG_FILE, "a") as log_file: log_file.write(log_entry + "\n") # データフレームの初期化 data_frames = {symbol: pd.DataFrame() for symbol in SYMBOLS} # Bybit WebSocketサーバーに接続 async def connect_to_bybit(): async with websockets.connect(API_URL) as websocket: # 各シンボルのデータ購読 for symbol in SYMBOLS: subscription_msg = { "op": "subscribe", "args": [ f"kline.{TIMEFRAME}.{symbol}" ] } await websocket.send(json.dumps(subscription_msg)) log_message(f"Subscribed to {symbol}") # データ受信ループ while True: try: response = await websocket.recv() data = json.loads(response) if 'topic' in data and 'data' in data: symbol = data['topic'].split('.')[-1] kline_data = data['data'][0] # データフレームにデータを追加 new_data = pd.DataFrame({ "timestamp": [datetime.fromtimestamp(kline_data['start'] / 1000)], "open": [float(kline_data['open'])], "high": [float(kline_data['high'])], "low": [float(kline_data['low'])], "close": [float(kline_data['close'])], "volume": [float(kline_data['volume'])] }) # データの保存 data_frames[symbol] = pd.concat([data_frames[symbol], new_data], ignore_index=True) data_frames[symbol].to_csv(f"{DATA_FOLDER}/{symbol}_data.csv", index=False) log_message(f"Data saved for {symbol}") except Exception as e: log_message(f"Error: {e}") await asyncio.sleep(5) # 5秒後に再接続 # メイン関数 async def main(): while True: try: await connect_to_bybit() except Exception as e: log_message(f"Connection failed: {e}. Retrying in 10 seconds...") await asyncio.sleep(10) if __name__ == "__main__": asyncio.run(main())
4. 実行方法
1.必要なライブラリをインストール
pip install -r requirements.txt
2.スクリプトの実行
python data_collector.py
5. スクリプトのポイント
✅ 非同期処理 (asyncio
) により、複数のシンボルのリアルタイムデータを効率的に取得
✅ データは .csv
ファイルとして保存し、定期的なデータ更新に対応
✅ エラーハンドリング により、接続エラー発生時の自動再接続が可能
✅ ログファイル でトラブルシューティングの効率化
data_collector.pyの解説
このコードは、BybitのWebSocket API を使って指定された仮想通貨シンボル(例: BTCUSDTなど)のローソク足(Kline)データをリアルタイムで取得し、CSVファイルとして保存するためのPythonスクリプトです。以下、コードのセクションごとに詳しく解説します。
🔧 1. 設定の読み込みと準備
with open("config.json", "r") as file: config = json.load(file) API_URL = config["api_url"] SYMBOLS = config["symbols"] TIMEFRAME = config["timeframe"] DATA_FOLDER = config["data_folder"] LOG_FILE = config["log_file"]
config.json
から設定(APIのURL、取引ペア、時間足、保存フォルダなど)を読み込みます。- 例えば
config.json
の中身はこんな感じ:
{ "api_url": "wss://stream.bybit.com/v5/public/linear", "symbols": ["BTCUSDT", "ETHUSDT"], "timeframe": "1", "data_folder": "data", "log_file": "websocket.log" }
📁 2. フォルダの準備・ログ関数
os.makedirs(DATA_FOLDER, exist_ok=True)
- 保存用のフォルダがなければ作成します。
def log_message(message): ...
- ログをコンソールとファイル両方に出力する関数です。
📊 3. データ格納用の辞書を用意
data_frames = {symbol: pd.DataFrame() for symbol in SYMBOLS}
- 各シンボルごとに空のDataFrameを用意します(後でCSVに保存するため)。
🌐 4. WebSocket接続と購読処理
async def connect_to_bybit():
- WebSocketでBybitに接続し、シンボルごとにklineデータを購読します:
for symbol in SYMBOLS: subscription_msg = { "op": "subscribe", "args": [f"kline.{TIMEFRAME}.{symbol}"] } await websocket.send(json.dumps(subscription_msg))
- 購読成功したらログ出力。
🔄 5. データ受信ループ
while True: ...
websocket.recv()
でサーバーからのメッセージを受け取り、以下のような処理を行います:- データに
topic
とdata
が含まれているか確認 - シンボル名を取り出し、
kline
の情報を整形してDataFrameに変換 - DataFrameを更新し、CSVファイルに保存
- 保存したことをログ出力
- データに
⚠️ 6. エラーハンドリング
except Exception as e: log_message(f"Error: {e}") await asyncio.sleep(5)
- 何かしらのエラーが出た場合、5秒間待ってから再試行します。
🚀 7. メイン関数と再接続ロジック
async def main(): while True: try: await connect_to_bybit() except Exception as e: log_message(f"Connection failed: {e}. Retrying in 10 seconds...") await asyncio.sleep(10)
connect_to_bybit()
が例外で落ちても、10秒後に再接続するようにします。
✅ 8. 実行部分
if __name__ == "__main__": asyncio.run(main())
💡 補足ポイント
- 非同期処理(
async/await
)を使って、リアルタイムでのデータ受信と処理を実現。 - 柔軟な設定ファイルにより、複数シンボル・時間足に対応。
- CSVファイルへの保存で、後からのバックテストにも活用可能。
次のステップ
- 異常検出ロジック(EWMAやボラティリティ)を組み込む
- P&Dイベントの検出モデルの設計
- 取引執行ロジックの実装

次回の記事では、「異常検出ロジックの組み込み」についてまとめます。
補足:klineデータとは?
「klineデータ(ケーライン)」**とは、いわゆる ローソク足(Candlestick)データ のことです。トレーディングでよく見る、あの「棒にヒゲが生えたグラフ」の元データです。
📊 補足:kline(ローソク足)とは?
1本のローソク足は、ある一定時間(例:1分、5分、1時間など)の価格変動を1本で表します。
その1本には次のような情報が含まれます:
項目 | 説明 |
---|---|
open | 始値(その時間の最初の価格) |
high | 高値(その時間中での一番高い価格) |
low | 安値(その時間中での一番低い価格) |
close | 終値(その時間の最後の価格) |
volume | 出来高(その時間に取引された量) |
start | このローソク足の開始時刻(UNIXタイム) |
🔁 時間足(Timeframe)
ローソク足は時間単位で区切られており、Bybitでは例えば以下のような時間足を指定できます:
"1"
→ 1分足"3"
→ 3分足"5"
→ 5分足"15"
→ 15分足"60"
→ 1時間足"D"
→ 日足"W"
→ 週足
🧠 実際のデータの例(Bybit WebSocket経由)
WebSocketで受け取るklineデータは、こんなJSON形式になっています。
{ "topic": "kline.1.BTCUSDT", "data": [{ "start": 1640995200000, "open": "47200", "high": "47350", "low": "47100", "close": "47280", "volume": "125.34" }] }
このデータは:
- 1分間のBTCUSDTのローソク足で、
- 開始時刻が2022年1月1日 00:00:00(UNIXミリ秒)
- 始値 47200、終値 47280、高値 47350、安値 47100
- 出来高(取引量)が125.34
という内容です。
✅ まとめ
- klineデータ = ローソク足データ
- 各ローソク足には「始値、高値、安値、終値、出来高、開始時刻」が含まれている
- トレード戦略やチャート分析に不可欠な基本情報
おまけ「なぜklileと呼ぶのか?」
「なぜ kline と呼ぶのか?」は、多くの人が気になってるけど意外と知られていない話です。
🈶 結論:K-line は「K線(K線チャート)」の略
- 「K-line(ケーライン)」は、英語ではなく 中国発祥の呼び名 です。
- 中国ではローソク足チャートのことを K線(K線图:ケイ・シエン・トゥー) と呼びます。
🔤 じゃあ「K」は何の略なのか?
実は「K」に明確な意味はない、という説が有力です。だけど、いくつかの説があります:
有力説①:日本の「株価」の K?
- 日本の「株価」や「価格」を表す言葉の頭文字「K」から来たのでは?という説。
- ローソク足自体は日本で生まれ、中国に伝わった → その過程で「K線」と呼ばれるようになった。
有力説②:「Karte(独語で図・チャート)」の K?
- ドイツ語の「Karte」(カード、図、チャート)から来た可能性もあるというマニアックな説。
🌍 どうして英語でも「kline」って使うの?
- 中国発の取引所(Bybit、Binance など)では、この「K-line」という表現がそのままAPIでも使われています。
- だから、英語圏の開発者も "candlestick" じゃなく「kline」という語をそのまま使うようになった、という流れ。
✅ まとめ
用語 | 意味 | 備考 |
---|---|---|
K-line | ローソク足データ(K線) | 中国語の表現が元になっている |
Candlestick | 英語圏での正式な呼び名 | 意味はまったく同じ |
-
-
開発記録#141(2025/3/24)「論文ベースのbot開発フローpart.3」
続きを見る