前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
本記事では「暗号通貨のパンプ&ダンプスキームの検出」に関する論文をベースにbot開発の過程をまとめていきます。
今回のテーマは「リアルタイムデータ収集スクリプト」です。
Detecting Crypto Pump-and-Dump Schemes: A Thresholding-Based Approach to Handling Market Noisehttps://t.co/ctCJEV1MBs
— よだか(夜鷹/yodaka) (@yodakablog) March 22, 2025
🚀 Bybitのリアルタイムデータ収集スクリプト
以下は、Bybit APIを使用してリアルタイムのOHLCVデータおよびOrder Bookデータを収集するためのPythonスクリプトです。asyncioとwebsocketを活用し、効率的かつ高速にデータを収集します。
ファイル構成
/crypto_bot ├── data_collector.py # データ収集スクリプト ├── requirements.txt # 必要なライブラリ └── config.json # 設定ファイル
1.requirements.txt
websocket-client asyncio ccxt pandas
2.config.json
{
"api_url": "wss://stream.bybit.com/realtime_public",
"symbols": ["BTCUSDT", "ETHUSDT"],
"timeframe": "1m",
"data_folder": "./data",
"log_file": "./logs/data_collector.log"
}
3.data_collector.py
import asyncio
import json
import websockets
import os
import pandas as pd
from datetime import datetime
# 設定ファイルの読み込み
with open("config.json", "r") as file:
config = json.load(file)
API_URL = config["api_url"]
SYMBOLS = config["symbols"]
TIMEFRAME = config["timeframe"]
DATA_FOLDER = config["data_folder"]
LOG_FILE = config["log_file"]
# データ格納ディレクトリの作成
os.makedirs(DATA_FOLDER, exist_ok=True)
# ログ出力関数
def log_message(message):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
print(log_entry)
with open(LOG_FILE, "a") as log_file:
log_file.write(log_entry + "\n")
# データフレームの初期化
data_frames = {symbol: pd.DataFrame() for symbol in SYMBOLS}
# Bybit WebSocketサーバーに接続
async def connect_to_bybit():
async with websockets.connect(API_URL) as websocket:
# 各シンボルのデータ購読
for symbol in SYMBOLS:
subscription_msg = {
"op": "subscribe",
"args": [
f"kline.{TIMEFRAME}.{symbol}"
]
}
await websocket.send(json.dumps(subscription_msg))
log_message(f"Subscribed to {symbol}")
# データ受信ループ
while True:
try:
response = await websocket.recv()
data = json.loads(response)
if 'topic' in data and 'data' in data:
symbol = data['topic'].split('.')[-1]
kline_data = data['data'][0]
# データフレームにデータを追加
new_data = pd.DataFrame({
"timestamp": [datetime.fromtimestamp(kline_data['start'] / 1000)],
"open": [float(kline_data['open'])],
"high": [float(kline_data['high'])],
"low": [float(kline_data['low'])],
"close": [float(kline_data['close'])],
"volume": [float(kline_data['volume'])]
})
# データの保存
data_frames[symbol] = pd.concat([data_frames[symbol], new_data], ignore_index=True)
data_frames[symbol].to_csv(f"{DATA_FOLDER}/{symbol}_data.csv", index=False)
log_message(f"Data saved for {symbol}")
except Exception as e:
log_message(f"Error: {e}")
await asyncio.sleep(5) # 5秒後に再接続
# メイン関数
async def main():
while True:
try:
await connect_to_bybit()
except Exception as e:
log_message(f"Connection failed: {e}. Retrying in 10 seconds...")
await asyncio.sleep(10)
if __name__ == "__main__":
asyncio.run(main())
4. 実行方法
1.必要なライブラリをインストール
pip install -r requirements.txt
2.スクリプトの実行
python data_collector.py
5. スクリプトのポイント
✅ 非同期処理 (asyncio) により、複数のシンボルのリアルタイムデータを効率的に取得
✅ データは .csv ファイルとして保存し、定期的なデータ更新に対応
✅ エラーハンドリング により、接続エラー発生時の自動再接続が可能
✅ ログファイル でトラブルシューティングの効率化
data_collector.pyの解説
このコードは、BybitのWebSocket API を使って指定された仮想通貨シンボル(例: BTCUSDTなど)のローソク足(Kline)データをリアルタイムで取得し、CSVファイルとして保存するためのPythonスクリプトです。以下、コードのセクションごとに詳しく解説します。
🔧 1. 設定の読み込みと準備
with open("config.json", "r") as file:
config = json.load(file)
API_URL = config["api_url"]
SYMBOLS = config["symbols"]
TIMEFRAME = config["timeframe"]
DATA_FOLDER = config["data_folder"]
LOG_FILE = config["log_file"]
config.jsonから設定(APIのURL、取引ペア、時間足、保存フォルダなど)を読み込みます。- 例えば
config.jsonの中身はこんな感じ:
{
"api_url": "wss://stream.bybit.com/v5/public/linear",
"symbols": ["BTCUSDT", "ETHUSDT"],
"timeframe": "1",
"data_folder": "data",
"log_file": "websocket.log"
}
📁 2. フォルダの準備・ログ関数
os.makedirs(DATA_FOLDER, exist_ok=True)
- 保存用のフォルダがなければ作成します。
def log_message(message):
...
- ログをコンソールとファイル両方に出力する関数です。
📊 3. データ格納用の辞書を用意
data_frames = {symbol: pd.DataFrame() for symbol in SYMBOLS}
- 各シンボルごとに空のDataFrameを用意します(後でCSVに保存するため)。
🌐 4. WebSocket接続と購読処理
async def connect_to_bybit():
- WebSocketでBybitに接続し、シンボルごとにklineデータを購読します:
for symbol in SYMBOLS:
subscription_msg = {
"op": "subscribe",
"args": [f"kline.{TIMEFRAME}.{symbol}"]
}
await websocket.send(json.dumps(subscription_msg))
- 購読成功したらログ出力。
🔄 5. データ受信ループ
while True:
...
websocket.recv()でサーバーからのメッセージを受け取り、以下のような処理を行います:- データに
topicとdataが含まれているか確認 - シンボル名を取り出し、
klineの情報を整形してDataFrameに変換 - DataFrameを更新し、CSVファイルに保存
- 保存したことをログ出力
- データに
⚠️ 6. エラーハンドリング
except Exception as e:
log_message(f"Error: {e}")
await asyncio.sleep(5)
- 何かしらのエラーが出た場合、5秒間待ってから再試行します。
🚀 7. メイン関数と再接続ロジック
async def main():
while True:
try:
await connect_to_bybit()
except Exception as e:
log_message(f"Connection failed: {e}. Retrying in 10 seconds...")
await asyncio.sleep(10)
connect_to_bybit()が例外で落ちても、10秒後に再接続するようにします。
✅ 8. 実行部分
if __name__ == "__main__":
asyncio.run(main())
💡 補足ポイント
- 非同期処理(
async/await)を使って、リアルタイムでのデータ受信と処理を実現。 - 柔軟な設定ファイルにより、複数シンボル・時間足に対応。
- CSVファイルへの保存で、後からのバックテストにも活用可能。
次のステップ
- 異常検出ロジック(EWMAやボラティリティ)を組み込む
- P&Dイベントの検出モデルの設計
- 取引執行ロジックの実装
次回の記事では、「異常検出ロジックの組み込み」についてまとめます。
補足:klineデータとは?
「klineデータ(ケーライン)」**とは、いわゆる ローソク足(Candlestick)データ のことです。トレーディングでよく見る、あの「棒にヒゲが生えたグラフ」の元データです。
📊 補足:kline(ローソク足)とは?
1本のローソク足は、ある一定時間(例:1分、5分、1時間など)の価格変動を1本で表します。
その1本には次のような情報が含まれます:
| 項目 | 説明 |
|---|---|
open | 始値(その時間の最初の価格) |
high | 高値(その時間中での一番高い価格) |
low | 安値(その時間中での一番低い価格) |
close | 終値(その時間の最後の価格) |
volume | 出来高(その時間に取引された量) |
start | このローソク足の開始時刻(UNIXタイム) |
🔁 時間足(Timeframe)
ローソク足は時間単位で区切られており、Bybitでは例えば以下のような時間足を指定できます:
"1"→ 1分足"3"→ 3分足"5"→ 5分足"15"→ 15分足"60"→ 1時間足"D"→ 日足"W"→ 週足
🧠 実際のデータの例(Bybit WebSocket経由)
WebSocketで受け取るklineデータは、こんなJSON形式になっています。
{
"topic": "kline.1.BTCUSDT",
"data": [{
"start": 1640995200000,
"open": "47200",
"high": "47350",
"low": "47100",
"close": "47280",
"volume": "125.34"
}]
}
このデータは:
- 1分間のBTCUSDTのローソク足で、
- 開始時刻が2022年1月1日 00:00:00(UNIXミリ秒)
- 始値 47200、終値 47280、高値 47350、安値 47100
- 出来高(取引量)が125.34
という内容です。
✅ まとめ
- klineデータ = ローソク足データ
- 各ローソク足には「始値、高値、安値、終値、出来高、開始時刻」が含まれている
- トレード戦略やチャート分析に不可欠な基本情報
おまけ「なぜklileと呼ぶのか?」
「なぜ kline と呼ぶのか?」は、多くの人が気になってるけど意外と知られていない話です。
🈶 結論:K-line は「K線(K線チャート)」の略
- 「K-line(ケーライン)」は、英語ではなく 中国発祥の呼び名 です。
- 中国ではローソク足チャートのことを K線(K線图:ケイ・シエン・トゥー) と呼びます。
🔤 じゃあ「K」は何の略なのか?
実は「K」に明確な意味はない、という説が有力です。だけど、いくつかの説があります:
有力説①:日本の「株価」の K?
- 日本の「株価」や「価格」を表す言葉の頭文字「K」から来たのでは?という説。
- ローソク足自体は日本で生まれ、中国に伝わった → その過程で「K線」と呼ばれるようになった。
有力説②:「Karte(独語で図・チャート)」の K?
- ドイツ語の「Karte」(カード、図、チャート)から来た可能性もあるというマニアックな説。
🌍 どうして英語でも「kline」って使うの?
- 中国発の取引所(Bybit、Binance など)では、この「K-line」という表現がそのままAPIでも使われています。
- だから、英語圏の開発者も "candlestick" じゃなく「kline」という語をそのまま使うようになった、という流れ。
✅ まとめ
| 用語 | 意味 | 備考 |
|---|---|---|
| K-line | ローソク足データ(K線) | 中国語の表現が元になっている |
| Candlestick | 英語圏での正式な呼び名 | 意味はまったく同じ |
-
-
開発記録#141(2025/3/24)「論文ベースのbot開発フローpart.3 異常検出ロジックの組み込み」
続きを見る