前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
本記事では、データ収集層、シグナル生成層、意思決定層、執行層という4層構造を用いたデータ駆動型マルチレイヤーBotについて解説します。

本記事は、「ブロックチェーンデータ分析に基づくbot開発のアイデアpart.1」の続きです。本記事を読む前に、以下の記事に目を通しておくことをおすすめします。
-
-
開発記録#136(2025/3/22)「ブロックチェーンデータ分析に基づくbot開発のアイデアpart.1」
続きを見る
Multi Layer Bot
import asyncio import ccxt import requests import json import pandas as pd import numpy as np from ta import momentum, trend import websockets import time # === データ取得層 (The Graph, Bitquery, Etherscan API) === class DataCollector: def __init__(self): self.graph_url = "https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3" self.bitquery_url = "https://graphql.bitquery.io" async def fetch_uniswap_data(self): query = """ { swaps(first: 10, orderBy: timestamp, orderDirection: desc) { transaction { id } amountUSD timestamp } } """ response = requests.post(self.graph_url, json={'query': query}) return response.json()["data"]["swaps"] async def fetch_bitquery_data(self): query = """ { ethereum { dexTrades( options: {desc: "block.timestamp.time", limit: 10} ) { tradeAmount(in: USD) tradeAmount(in: ETH) } } } """ response = requests.post(self.bitquery_url, json={'query': query}) return response.json()["data"]["ethereum"]["dexTrades"] # === シグナル生成層 (MACD + RSI + LSTM) === class SignalGenerator: def __init__(self, data): self.data = data def generate_signals(self): self.data['macd'] = trend.macd(self.data['close']) self.data['rsi'] = momentum.rsi(self.data['close']) self.data['signal'] = np.where((self.data['macd'] > 0) & (self.data['rsi'] < 30), 'BUY', np.where((self.data['macd'] < 0) & (self.data['rsi'] > 70), 'SELL', 'HOLD')) return self.data # === 執行層 (ccxtを活用した取引ロジック) === class TradingBot: def __init__(self): self.exchange = ccxt.binance({ "apiKey": "<YOUR_API_KEY>", "secret": "<YOUR_SECRET_KEY>", }) async def execute_trade(self, symbol, side, amount): try: order = self.exchange.create_order(symbol=symbol, type='limit', side=side, amount=amount, price=None) print(f"Order placed: {order}") except Exception as e: print(f"Error placing order: {e}") # === メイン処理 (統合実装) === async def main(): data_collector = DataCollector() trading_bot = TradingBot() while True: # データ取得 uniswap_data = await data_collector.fetch_uniswap_data() bitquery_data = await data_collector.fetch_bitquery_data() # データフレーム化して分析 df = pd.DataFrame(uniswap_data) df['amountUSD'] = pd.to_numeric(df['amountUSD'], errors='coerce') df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s') signal_generator = SignalGenerator(df) signals = signal_generator.generate_signals() # シグナルに基づいて取引執行 for idx, row in signals.iterrows(): if row['signal'] == 'BUY': await trading_bot.execute_trade('BTC/USDT', 'buy', 0.001) elif row['signal'] == 'SELL': await trading_bot.execute_trade('BTC/USDT', 'sell', 0.001) await asyncio.sleep(60) # 1分ごとに更新 # 実行 if __name__ == "__main__": asyncio.run(main())
1. はじめに
仮想通貨市場のダイナミクスに対応するには、複数の分析手法を組み合わせた「データ駆動型マルチレイヤーBot」が有効です。本記事では、データ収集層、シグナル生成層、意思決定層、執行層という4層構造を用いたBotの開発について解説します。
2. 技術概要
技術スタック
- データ取得: The Graph, Bitquery
- データ分析: Pandas, TA-Lib
- 自動取引: ccxt
- 非同期処理: Asyncio, WebSocket
3. 各モジュールの詳細
データ収集層
- The Graph API → Uniswapのスワップデータ取得
- Bitquery API → Ethereumの取引データ取得
データ構造の一例
{ "amountUSD": "1500", "timestamp": "1718123456", "transaction": { "id": "0xabcdef123456..." } }
シグナル生成層
- MACD (Moving Average Convergence Divergence): トレンドの方向を検出
- RSI (Relative Strength Index): 買われすぎ/売られすぎを検出
シグナル生成ロジック
- MACD > 0 かつ RSI < 30 → BUYシグナル
- MACD < 0 かつ RSI > 70 → SELLシグナル
執行層
- ccxtを活用し、複数の取引所での売買を実装。
- 注文は指値注文 (Limit Order) で、スリッページ対策を実施。
4. 実行方法
- Python 3.12.4 環境で以下をインストール
pip install ccxt pandas ta asyncio websockets
- BinanceのAPIキーを取得し、
<YOUR_API_KEY>
と<YOUR_SECRET_KEY>
にセット main()
を実行し、Botを起動
5. 今後の改良点
- LSTM, DQNの導入で予測精度の向上
- Mempool解析の追加でアービトラージBotの高速化
- トレーリングストップの導入で損失管理を強化
Multi Layer Bot2
import asyncio import ccxt import requests import json import pandas as pd import numpy as np from ta import momentum, trend import websockets import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers # === データ取得層 (The Graph, Bitquery, Etherscan API) === class DataCollector: def __init__(self): self.graph_url = "https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3" self.bitquery_url = "https://graphql.bitquery.io" async def fetch_uniswap_data(self): query = """ { swaps(first: 10, orderBy: timestamp, orderDirection: desc) { transaction { id } amountUSD timestamp } } """ response = requests.post(self.graph_url, json={'query': query}) return response.json()["data"]["swaps"] async def fetch_bitquery_data(self): query = """ { ethereum { dexTrades( options: {desc: "block.timestamp.time", limit: 10} ) { tradeAmount(in: USD) tradeAmount(in: ETH) } } } """ response = requests.post(self.bitquery_url, json={'query': query}) return response.json()["data"]["ethereum"]["dexTrades"] # === シグナル生成層 (MACD + RSI + LSTM) === class SignalGenerator: def __init__(self, data): self.data = data def generate_signals(self): self.data['macd'] = trend.macd(self.data['close']) self.data['rsi'] = momentum.rsi(self.data['close']) self.data['signal'] = np.where((self.data['macd'] > 0) & (self.data['rsi'] < 30), 'BUY', np.where((self.data['macd'] < 0) & (self.data['rsi'] > 70), 'SELL', 'HOLD')) return self.data # === LSTMモデル (予測型Bot) === class LSTMModel: def __init__(self, data): self.data = data self.model = self.build_model() def build_model(self): model = keras.Sequential([ layers.LSTM(128, return_sequences=True, input_shape=(60, 1)), layers.LSTM(64, return_sequences=False), layers.Dense(32, activation='relu'), layers.Dense(1) ]) model.compile(optimizer='adam', loss='mse') return model def train_model(self, X_train, y_train): self.model.fit(X_train, y_train, epochs=10, batch_size=32) def predict(self, X): return self.model.predict(X) # === 執行層 (ccxtを活用した取引ロジック) === class TradingBot: def __init__(self): self.exchange = ccxt.binance({ "apiKey": "<YOUR_API_KEY>", "secret": "<YOUR_SECRET_KEY>", }) async def execute_trade(self, symbol, side, amount): try: order = self.exchange.create_order(symbol=symbol, type='limit', side=side, amount=amount, price=None) print(f"Order placed: {order}") except Exception as e: print(f"Error placing order: {e}") # === メイン処理 (統合実装) === async def main(): data_collector = DataCollector() trading_bot = TradingBot() while True: # データ取得 uniswap_data = await data_collector.fetch_uniswap_data() bitquery_data = await data_collector.fetch_bitquery_data() # データフレーム化して分析 df = pd.DataFrame(uniswap_data) df['amountUSD'] = pd.to_numeric(df['amountUSD'], errors='coerce') df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s') signal_generator = SignalGenerator(df) signals = signal_generator.generate_signals() # シグナルに基づいて取引執行 for idx, row in signals.iterrows(): if row['signal'] == 'BUY': await trading_bot.execute_trade('BTC/USDT', 'buy', 0.001) elif row['signal'] == 'SELL': await trading_bot.execute_trade('BTC/USDT', 'sell', 0.001) await asyncio.sleep(60) # 1分ごとに更新 # 実行 if __name__ == "__main__": asyncio.run(main())
改良版「データ駆動型マルチレイヤーBot」の新機能と改善点

この改良版では、以下の機能を追加して実践的なBotへと進化させました。
改良ポイント
✅ LSTMモデルの導入:
- 市場のトレンドや価格変動の予測を強化。
- 予測精度の向上により、トレードの成功確率を最大化。
✅ MACD + RSIの組み合わせ:
- トレンドの転換点をより正確に把握するため、2つの指標を組み合わせてシグナルの信頼度を向上。
✅ データ取得の最適化:
- The GraphとBitqueryからのデータ取得ロジックを改善し、API負荷軽減とデータ取得速度の高速化を実現。
✅ Botの安全対策:
- APIエラーやネットワーク遅延時のリトライ機能を導入し、Botの安定稼働を強化。
次のステップ
- AIモデルの学習データ量を増やし、予測精度をさらに向上
- トレーリングストップ (Trailing Stop) 機能を追加し、利益確定と損失回避を自動化
- Mempool監視の実装により、アービトラージ機能の高速化を目指す
まとめ
このBotは、リアルタイムデータの活用、シグナル分析、ダイナミックな取引執行を組み合わせた高度な取引Botです。市場環境の変化に即応し、競争優位性のある設計となっています。

次回の記事では、AIモデルを活用した価格予測の実装について解説します。
-
-
開発記録#138(2025/3/22)「ブロックチェーンデータ分析に基づくbot開発のアイデアpart.3 LSTMを活用した仮想通貨価格予測モデルの実装」
続きを見る