Bot

開発記録#130(2025/3/9)「MMbotの戦略強化」

前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。

Yodaka

今回は「MMbot(マーケットメイキングボット)のロジックを発展させる」ことについてまとめました。本記事では、「雛形の紹介」「改良型の提案」をします。

関連
仮想通貨botの開発を本格的に始めてみる#30(2023/11/13)「MMBotの開発に着手する:ソースコードの構造解析:準備編」

続きを見る

1.スプレッドの動的調整:

  • 市場のボラティリティや取引量に基づいて、スプレッド(買値と売値の差)をリアルタイムで調整する。これにより、市場環境が変化しても利益を最大化し、リスクを管理できる。

2.裁定取引の組み込み:

  • 複数の取引所の価格差を利用して利益を出す裁定取引のロジックを組み込む。これにより、一方の取引所で低く買い、他方で高く売ることができる。

3.機械学習を利用した予測モデルの導入:

  • 価格動向や注文の流れを分析する機械学習モデルを導入して、より精度の高いトレーディング判断を行うことができる。たとえば、LSTMやDQNなどのモデルを使って市場のトレンドを予測し、それに基づいて注文を出すタイミングを決める。

4.マルチアセット対応:

  • 複数の通貨ペアや資産クラスに対応し、より多くの市場でマーケットメイキングを展開する。これによりリスク分散が図れ、利益機会も増加する。

5.サーキットブレーカーの導入:

  • 極端な市場状況下での自動取引停止機能を導入し、大きな損失を防ぐ。価格が一定範囲以上に急変した場合に自動的に取引を一時停止し、状況を再評価する。
Yodaka

実際には、これらの戦略を組み合わせてbot開発を進めていくことになります。

1.スプレッドの動的調整

class DynamicSpreadMMBot:
    def __init__(self, api_client, volatility_threshold):
        self.api_client = api_client
        self.volatility_threshold = volatility_threshold

    def fetch_market_data(self):
        # マーケットデータを取得
        return self.api_client.get_recent_trades()

    def adjust_spread_based_on_volatility(self, market_data):
        # ボラティリティに基づいてスプレッドを調整
        volatility = self.calculate_volatility(market_data)
        if volatility > self.volatility_threshold:
            return 0.01  # ボラティリティが高い場合は広いスプレッド
        else:
            return 0.005  # ボラティリティが低い場合は狭いスプレッド

    def calculate_volatility(self, market_data):
        # ボラティリティ計算ロジック
        return market_data.std()  # 標準偏差を使用

    def place_orders(self, spread):
        # 注文を配置
        bid_price = self.api_client.get_current_price() - spread
        ask_price = self.api_client.get_current_price() + spread
        self.api_client.place_order('buy', bid_price)
        self.api_client.place_order('sell', ask_price)

    def run(self):
        market_data = self.fetch_market_data()
        spread = self.adjust_spread_based_on_volatility(market_data)
        self.place_orders(spread)

こちらのコードは、DynamicSpreadMMBot クラスとして定義されたマーケットメイキングボット(MMBot)の実装です。このボットは、市場のボラティリティに基づいて取引のスプレッドを動的に調整することで効率的な取引を目指しています。以下に、各部分の機能を説明します。

クラスの構造

  • __init__メソッド:
    • api_client: 取引を行うAPIクライアント。このクライアントを通じて市場データの取得や注文の発行を行います。
    • volatility_threshold: ボラティリティの閾値。この値を基にスプレッドの調整が行われます。

メソッドの説明

  • fetch_market_dataメソッド:
    • 市場の取引データを取得します。このデータはボラティリティの計算に使用されます。
  • adjust_spread_based_on_volatilityメソッド:
    • 取得した市場データからボラティリティを計算し、その値に基づいてスプレッドを調整します。ボラティリティが設定された閾値より高い場合、より大きなスプレッドを設定し(0.01)、低い場合はより狭いスプレッドを設定します(0.005)。これにより、市場の不安定さに応じてリスクを管理します。
  • calculate_volatilityメソッド:
    • 市場データの標準偏差を計算してボラティリティとしています。標準偏差は、価格変動の幅を数値化する一般的な方法で、この値が大きいほど価格の動きが大きいと判断されます。
  • place_ordersメソッド:
    • 計算されたスプレッドを元に、買い注文(bid)と売り注文(ask)を市場に出します。このスプレッドは、現在価格から上下に調整された価格で注文が設定されます。
  • runメソッド:
    • ボットの主要な実行ロジックです。市場データを取得し、それに基づいてスプレッドを調整後、注文を行います。このメソッドが周期的に呼ばれることで、ボットは連続して市場に対応した取引を行うことができます。

このボットは市場の状態に応じてスプレッドを動的に調整することで、リスクを適切に管理しつつ利益を最大化する設計になっています。実際の使用ではAPIクライアントの実装やエラーハンドリング、さらに詳細な市場分析機能などを追加する必要があります。

改良型

Yodaka

以下は、DynamicSpreadMMBot クラスをより実践的に改善するための提案です。主な改善点は以下の通りです。

  1. 非同期処理の導入:APIからのデータ取得と注文の発行を非同期で行い、レスポンス時間の短縮と効率の向上を図ります。
  2. エラーハンドリングの強化:API呼び出しの際のエラーを適切に処理し、ロバストなシステムを構築します。
  3. 取引手数料の考慮:実際のスプレッド計算に取引手数料を考慮することで、より正確な利益計算を行います。
  4. ログ出力の追加:重要な操作のログを出力し、デバッグとモニタリングを容易にします。
Yodaka

改良版のコードは以下のようになります。

import asyncio
import logging

class DynamicSpreadMMBot:
    def __init__(self, api_client, volatility_threshold, transaction_fee=0.0005):
        self.api_client = api_client
        self.volatility_threshold = volatility_threshold
        self.transaction_fee = transaction_fee
        logging.basicConfig(level=logging.INFO)

    async def fetch_market_data(self):
        # マーケットデータを非同期で取得
        try:
            return await self.api_client.get_recent_trades()
        except Exception as e:
            logging.error(f"Failed to fetch market data: {str(e)}")
            return None

    def adjust_spread_based_on_volatility(self, market_data):
        if market_data is not None:
            volatility = self.calculate_volatility(market_data)
            base_spread = 0.01 if volatility > self.volatility_threshold else 0.005
            return base_spread + self.transaction_fee
        else:
            return 0.005 + self.transaction_fee  # デフォルトスプレッド

    def calculate_volatility(self, market_data):
        # ボラティリティ計算ロジック
        return market_data.std()  # 標準偏差を使用

    async def place_orders(self, spread):
        try:
            current_price = await self.api_client.get_current_price()
            bid_price = current_price - spread
            ask_price = current_price + spread
            await self.api_client.place_order('buy', bid_price)
            await self.api_client.place_order('sell', ask_price)
            logging.info(f"Orders placed at bid: {bid_price}, ask: {ask_price}")
        except Exception as e:
            logging.error(f"Failed to place orders: {str(e)}")

    async def run(self):
        while True:
            market_data = await self.fetch_market_data()
            if market_data:
                spread = self.adjust_spread_based_on_volatility(market_data)
                await self.place_orders(spread)
            await asyncio.sleep(1)  # ループの待機時間

# APIクライアントのインスタンス化とボットの起動
api_client = YourAPIClient()
bot = DynamicSpreadMMBot(api_client, volatility_threshold=0.02)
asyncio.run(bot.run())

コードの解説

  • 非同期処理asyncio モジュールを使用して、市場データの取得と注文の配置を非同期で実行します。これにより、APIレスポンスの待ち時間を有効に活用し、システム全体の反応速度を向上させます。
  • エラーハンドリング:APIからのデータ取得や注文配置の際にエラーが発生した場合、それをログに記録し、処理を続けることができます。
  • 取引手数料の考慮:実際のスプレッド計算に取引手数料を加算し、収益性のある取引を確保します。
  • ログ出力:重要な操作やエラー情報をログに記録し、問題発生時の追跡やシステムの状態監視を容易にします。
Yodaka

このコードは実際の取引環境で利用するための基礎型ですが、実際に運用する前にはAPIクライアントの実装や市場データの詳細に基づいてさらにカスタマイズする必要があります。

関連
仮想通貨botの開発記録#75(2024/5/19)「高頻度Bot開発:マーケットメイカーの振る舞い」

続きを見る

関連
仮想通貨botの開発記録#78(2024/6/29)「高頻度取引の一般的な戦略と勉強方法まとめ」

続きを見る

2.裁定取引の組み込み

class ArbitrageMMBot:
    def __init__(self, api_client1, api_client2):
        self.api_client1 = api_client1  # 取引所1用API
        self.api_client2 = api_client2  # 取引所2用API

    def find_arbitrage_opportunity(self):
        price1 = self.api_client1.get_current_price()
        price2 = self.api_client2.get_current_price()
        if price1 < price2:
            return ('buy', price1, 'sell', price2)
        elif price1 > price2:
            return ('sell', price1, 'buy', price2)
        else:
            return None

    def execute_trades(self, opportunity):
        if opportunity:
            self.api_client1.place_order(opportunity[0], opportunity[1])
            self.api_client2.place_order(opportunity[2], opportunity[3])

    def run(self):
        opportunity = self.find_arbitrage_opportunity()
        self.execute_trades(opportunity)

こちらのコードは、ArbitrageMMBot クラスとして定義された裁定取引を行うマーケットメイキングボット(MMBot)です。このボットは、異なる二つの取引所の価格差を利用して利益を得ることを目的としています。

Yodaka

以下に各部分の機能を説明します。

クラスの構造

  • __init__メソッド:
    • api_client1: 取引所1にアクセスするためのAPIクライアントです。
    • api_client2: 取引所2にアクセスするためのAPIクライアントです。
    • この二つのAPIクライアントを用いて、それぞれの取引所から価格情報を取得し、注文を発行します。

メソッドの説明

  • find_arbitrage_opportunityメソッド:
    • 二つの取引所から現在の価格情報を取得します。
    • 取得した価格を比較し、価格差が存在する場合(一方が他方より低い場合)、裁定取引の機会があると判断します。
    • 価格1が価格2より低い場合は、価格1の取引所で買い(buy)注文を、価格2の取引所で売り(sell)注文を行います。逆に価格1が価格2より高い場合は、価格1の取引所で売り注文を、価格2で買い注文を行います。
    • 価格が等しい場合は、裁定の機会がないと判断し、None を返します。
  • execute_tradesメソッド:
    • find_arbitrage_opportunityから裁定の機会が返された場合(opportunityNoneでない場合)、その情報に基づいて実際に取引を実行します。
    • opportunityの情報を用いて、対応する取引所に対して注文を出します。例えば、価格1で買い注文を出し、価格2で売り注文を出すことで、価格差の利益を得ることができます。
  • runメソッド:
    • ボットの主要な実行ループです。裁定の機会を探し、見つかれば取引を実行します。
    • このメソッドが周期的に呼ばれることで、ボットは連続して市場を監視し、裁定取引のチャンスを探ります。

このボットは、異なる取引所間の価格差を利用して、リスクの少ない裁定取引を通じて利益を得ることを目指しています。実際の使用には、APIのレイテンシーや取引手数料、価格の更新頻度などの要因も考慮する必要があります。また、市場の動きが速いため、迅速な取引実行が求められるでしょう。

改良型

Yodaka

裁定取引ボットのコードをより実践的に改良するために、以下の点を改善します。

  1. 価格取得と注文の非同期実行:APIレスポンスの遅延を最小限に抑える。
  2. 取引手数料を考慮:実際の利益計算に手数料を含める。
  3. 価格更新の高速化:より頻繁に価格を更新し、機会を逃さないようにする。
  4. エラーハンドリングの強化:API呼び出しの失敗などの際に適切に対応する。
  5. 取引ログの記録:取引の詳細を記録し、後で分析可能にする。
Yodaka

以下は、これらの改良を取り入れた実践的なコードの例です。

import asyncio
import logging

class ArbitrageMMBot:
    def __init__(self, api_client1, api_client2, fee=0.001):
        self.api_client1 = api_client1
        self.api_client2 = api_client2
        self.fee = fee  # 取引手数料
        logging.basicConfig(level=logging.INFO)

    async def get_prices(self):
        # 非同期で価格を取得
        price1 = await self.api_client1.get_current_price()
        price2 = await self.api_client2.get_current_price()
        return price1, price2

    def calculate_profit(self, price1, price2):
        # 取引手数料を考慮した利益計算
        if price1 < price2 * (1 - self.fee):
            return ('buy', price1, 'sell', price2)
        elif price1 * (1 - self.fee) > price2:
            return ('sell', price1, 'buy', price2)
        return None

    async def execute_trades(self, opportunity):
        if opportunity:
            try:
                await self.api_client1.place_order(opportunity[0], opportunity[1])
                await self.api_client2.place_order(opportunity[2], opportunity[3])
                logging.info(f"Executed trades on opportunity: {opportunity}")
            except Exception as e:
                logging.error(f"Failed to execute trades: {str(e)}")

    async def run(self):
        while True:
            try:
                price1, price2 = await self.get_prices()
                opportunity = self.calculate_profit(price1, price2)
                await self.execute_trades(opportunity)
                await asyncio.sleep(1)  # 価格更新の頻度(秒)
            except Exception as e:
                logging.error(f"Error in running bot: {str(e)}")
                await asyncio.sleep(10)  # エラー発生時のスリープ時間

# APIクライアントのインスタンス化とボットの実行
api_client1 = YourAPIClient(exchange='Exchange1')
api_client2 = YourAPIClient(exchange='Exchange2')
bot = ArbitrageMMBot(api_client1, api_client2)
asyncio.run(bot.run())

主要な変更点

  • 非同期処理asyncio を使って、価格取得と注文処理を非同期で行うことで、APIレスポンスの待ち時間を最適化し、高速に取引を行うことが可能になります。
  • 取引手数料の考慮:実際の利益計算に手数料を考慮し、手数料を差し引いた後の価格で裁定取引の条件を判断します。
  • エラーハンドリング:API呼び出しの失敗やその他のエラーをキャッチし、ログに記録します。これにより、何が原因で問題が発生したかを追跡しやすくなります。
  • ログ記録:取引の実行詳細をログに記録し、分析やデバッグに役立てます。
Yodaka

このコードは、実際のAPIクライアントの実装と詳細な市場状況によってさらに調整が必要です。

関連
仮想通貨botの開発記録#51(2024/2/3)「仮想通貨Botterの必読書シリーズ②アービトラージ入門」

続きを見る

3.機械学習を利用した予測モデルの導入

import numpy as np
from sklearn.linear_model import LinearRegression

class PredictiveMMBot:
    def __init__(self, api_client, model):
        self.api_client = api_client
        self.model = model  # 予測モデル (例: LinearRegression)

    def fetch_historical_data(self):
        return np.array(self.api_client.get_historical_data())

    def predict_future_price(self, historical_data):
        X = historical_data[:-1].reshape(-1, 1)
        y = historical_data[1:].reshape(-1, 1)
        self.model.fit(X, y)
        return self.model.predict([historical_data[-1]])[0][0]

    def place_orders_based_on_prediction(self, future_price):
        current_price = self.api_client.get_current_price()
        if future_price > current_price:
            self.api_client.place_order('buy', current_price)
        else:
            self.api_client.place_order('sell', current_price)

    def run(self):
        historical_data = self.fetch_historical_data()
        future_price = self.predict_future_price(historical_data)
        self.place_orders_based_on_prediction(future_price)

こちらのコードは、価格予測に基づいて取引を行うPredictiveMMBot(予測マーケットメイキングボット)の実装です。クラスはnumpysklearnLinearRegressionモデルを使用しています。

Yodaka

各メソッドの役割と処理の流れを説明します。

クラスの構造

  • __init__メソッド:
    • api_client: APIクライアントを受け取り、このクライアントを通じて市場データの取得や注文の発行を行います。
    • model: 予測モデルで、ここではLinearRegressionを使用。これは価格データに基づいて将来の価格を予測するために使われます。

メソッドの説明

  • fetch_historical_dataメソッド:
    • APIクライアントを使って過去の市場データを取得します。このデータはnumpyの配列に変換され、価格予測モデルの学習に使用されます。
  • predict_future_priceメソッド:
    • 取得した歴史データを使用して、未来の価格を予測します。
    • 入力データXは、最後のデータポイントを除く全てのデータポイントです。目標変数yは、1つ後のデータポイントです。
    • LinearRegressionモデルを使ってXyの関係を学習し、最後のデータポイントを用いて未来の価格を予測します。予測された価格は、次の取引判断に利用されます。
  • place_orders_based_on_predictionメソッド:
    • 予測された未来価格と現在価格を比較します。未来価格が現在価格より高ければ「買い」注文、低ければ「売り」注文を行います。
    • 注文はAPIクライアントを通じて市場に出されます。
  • runメソッド:
    • ボットの主要な実行ロジックです。過去の市場データを取得し、未来価格を予測した後、その予測に基づいて注文を行います。

実行フロー

  1. ボットが起動されると、runメソッドが呼び出されます。
  2. fetch_historical_dataで過去データを取得し、numpy配列に変換。
  3. predict_future_priceで次の価格を予測。
  4. place_orders_based_on_predictionで予測価格を基に注文を実行。

このボットのコードは、基本的な予測ベースの取引戦略を示していますが、実際のトレーディングではモデルの精度、過学習、市場のノイズなどの要因を考慮する必要があります。また、市場条件の変化に応じてモデルの再学習やパラメータの調整が必要になる場合もあります。そのため、リアルタイムの市場データに基づく動的なモデル更新機能の追加も検討する必要があります。

改良型

Yodaka

以下の改良を行ったコードは、実装に向けた問題点を考慮し、より実用的なトレーディングボットに進化させるための提案です。主な改善点は以下の通りです。

  1. データの正規化:モデルの学習効率と予測精度を向上させるために、入力データを正規化します。
  2. 動的なモデル更新:市場の変動に対応するため、定期的にモデルを再学習します。
  3. エラーハンドリングの強化:API呼び出しやデータ処理の際のエラーを捉え、適切に対応します。
  4. 取引ロジックの改善:注文のタイミングや価格の決定をより洗練させ、リスク管理を強化します。
  5. ロギングの追加:システムの動作状態やエラー情報をログに記録し、デバッグや監視を容易にします。
Yodaka

以下は改良版のコードです。

import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
import logging
import asyncio

class PredictiveMMBot:
    def __init__(self, api_client, retrain_interval=60):
        self.api_client = api_client
        self.model = LinearRegression()
        self.scaler = StandardScaler()
        self.retrain_interval = retrain_interval
        self.iteration_count = 0
        logging.basicConfig(level=logging.INFO)

    async def fetch_historical_data(self):
        try:
            return np.array(await self.api_client.get_historical_data())
        except Exception as e:
            logging.error(f"Failed to fetch historical data: {str(e)}")
            return np.array([])

    def train_model(self, data):
        if len(data) > 1:  # Ensure there is enough data to train
            X = data[:-1].reshape(-1, 1)
            y = data[1:].reshape(-1, 1)
            X_scaled = self.scaler.fit_transform(X)
            self.model.fit(X_scaled, y)
            logging.info("Model retrained")

    def predict_future_price(self, historical_data):
        if len(historical_data) > 0:
            last_point_scaled = self.scaler.transform([historical_data[-1:].reshape(-1, 1)])
            prediction = self.model.predict(last_point_scaled)
            return prediction[0][0]
        return None

    async def place_orders_based_on_prediction(self, future_price):
        try:
            current_price = await self.api_client.get_current_price()
            if future_price and current_price:
                if future_price > current_price:
                    await self.api_client.place_order('buy', current_price)
                else:
                    await self.api_client.place_order('sell', current_price)
        except Exception as e:
            logging.error(f"Failed to place order: {str(e)}")

    async def run(self):
        while True:
            historical_data = await self.fetch_historical_data()
            if len(historical_data) > 0:
                if self.iteration_count % self.retrain_interval == 0:
                    self.train_model(historical_data)
                future_price = self.predict_future_price(historical_data)
                await self.place_orders_based_on_prediction(future_price)
            else:
                logging.warning("No historical data available for training")
            self.iteration_count += 1
            await asyncio.sleep(1)  # Sleep to mimic continuous operation

# APIクライアントのインスタンス化とボットの起動
api_client = YourAPIClient()  # APIクライアントを適切に定義
bot = PredictiveMMBot(api_client)
asyncio.run(bot.run())

主要な改善点の詳細

  • データの正規化StandardScalerを使用して、モデルの入力データを正規化します。これにより、モデルの収束速度と予測の安定性が向上します。
  • 動的なモデル更新:定期的にモデルを再学習し、市場の最新の動向に適応させます。これにより、モデルの時代遅れを防ぎます。
  • エラーハンドリング:データ取得や注文配置のプロセスで発生する可能性のあるエラーを捕捉し、ログに記録します。
  • ロギング:システムの重要な動作やエラーをログに記録し、問題解析や監視の効率を高めます。
Yodaka

このコードは、リアルタイムの市場条件に対応するための基本的なフレームワークであり、さらなるカスタマイズや機能追加に柔軟に対応できるように設計されています。

関連
開発記録#117(2024/10/21)MMbot開発「LSTMとの組み合わせ6種」

続きを見る

4.マルチアセット対応

class MultiAssetMMBot:
    def __init__(self, api_clients):
        self.api_clients = api_clients  # 複数の資産クラスに対応するAPIクライアントの辞書

    def fetch_best_spread(self, asset):
        # 各資産に対して最適なスプレッドを決定
        return 0.005 + 0.01 * np.random.random()  # ランダムなスプレッドを追加で示す例

    def place_orders_for_asset(self, asset, spread):
        bid_price = self.api_clients[asset].get_current_price() - spread
        ask_price = self.api_clients[asset].get_current_price() + spread
        self.api_clients[asset].place_order('buy', bid_price)
        self.api_clients[asset].place_order('sell', ask_price)

    def run(self):
        for asset, api_client in self.api_clients.items():
            spread = self.fetch_best_spread(asset)
            self.place_orders_for_asset(asset, spread)

このコードは、MultiAssetMMBotクラスとして定義されており、複数の資産クラスを管理するマーケットメイキングボットの実装です。クラスは複数のAPIクライアントを管理し、それぞれ異なる資産に対して最適なスプレッドを計算し、買い注文と売り注文を配置します。

Yodaka

以下に各部分の機能と役割を説明します。

クラスの構造

  • __init__メソッド:
    • api_clients: これは辞書型で、キーとして資産の種類、値として各資産に対応するAPIクライアントを保持します。このAPIクライアントを通じて、資産の現在価格の取得や注文の配置が行われます。

メソッドの説明

  • fetch_best_spreadメソッド:
    • このメソッドは、与えられた資産に対して最適なスプレッドを決定します。ここではランダムにスプレッドを決定するための例として、基本スプレッド(0.005)に乱数(0から0.01の範囲)を加えています。実際の取引環境では、市場のボラティリティや流動性データに基づいてこのスプレッドを計算することが一般的です。
  • place_orders_for_assetメソッド:
    • 与えられた資産とスプレッドを用いて、買い注文(bid)と売り注文(ask)を配置します。注文の価格は、APIクライアントを使用して取得した現在価格からスプレッドを引いたり加えたりして決定されます。このメソッドでは、それぞれの注文をAPIクライアントに渡して実際に市場に注文を出します。
  • runメソッド:
    • ボットの主要な実行ルーチンです。登録されている全資産に対してループを行い、各資産に対してスプレッドを計算し、そのスプレッドを用いて注文を配置します。このメソッドはボットが起動されたときに呼び出され、定期的に全資産の注文を更新します。

実用性に向けた考慮事項

このコードはシンプルな実装を示していますが、実際のトレーディング環境に適用するためには、いくつかの改良が必要です。例えば、スプレッドの計算方法をより洗練させる必要があり、市場データをリアルタイムで分析してスプレッドを動的に調整するアルゴリズムの導入が考えられます。また、注文の成功率を高めるために、オーダーブックの深さや他の市場参加者の行動を考慮に入れた複雑なロジックが必要になるかもしれません。

このボットは複数の資産を扱うため、各資産ごとの特性(ボラティリティ、取引量、市場のニュース影響度)に応じて個別にパラメータを調整する機能も有効です。全体として、このコードは多資産を対象としたマーケットメイキング戦略の基本的な枠組みを提供し、さらなるカスタマイズと最適化に向けた出発点となります。

改良型

Yodaka

上記で述べた考慮事項に基づき、MultiAssetMMBotクラスを実際のトレーディング環境でより有効に機能するように改良したコードです。ここでは、スプレッドの動的調整、市場データの詳細分析、およびエラーハンドリングの強化を行います。

import numpy as np
import logging

class MultiAssetMMBot:
    def __init__(self, api_clients, volatility_factors):
        self.api_clients = api_clients  # 複数の資産クラスに対応するAPIクライアントの辞書
        self.volatility_factors = volatility_factors  # 各資産のボラティリティに基づくスプレッド調整ファクター
        logging.basicConfig(level=logging.INFO)

    def fetch_current_price(self, asset):
        try:
            return self.api_clients[asset].get_current_price()
        except Exception as e:
            logging.error(f"Error fetching current price for {asset}: {str(e)}")
            return None

    def fetch_best_spread(self, asset, current_price):
        # 市場の流動性やボラティリティに基づいてスプレッドを決定
        if current_price is None:
            return None
        base_spread = 0.005
        volatility_index = self.volatility_factors.get(asset, 1)
        adjusted_spread = base_spread + (0.01 * np.random.random() * volatility_index)
        return adjusted_spread

    def place_orders_for_asset(self, asset, spread, current_price):
        if current_price is None or spread is None:
            logging.warning(f"Cannot place order for {asset} due to lack of price or spread data.")
            return
        bid_price = current_price - spread
        ask_price = current_price + spread
        try:
            self.api_clients[asset].place_order('buy', bid_price)
            self.api_clients[asset].place_order('sell', ask_price)
            logging.info(f"Orders placed for {asset} at bid: {bid_price}, ask: {ask_price}")
        except Exception as e:
            logging.error(f"Failed to place orders for {asset}: {str(e)}")

    def run(self):
        for asset, api_client in self.api_clients.items():
            current_price = self.fetch_current_price(asset)
            spread = self.fetch_best_spread(asset, current_price)
            self.place_orders_for_asset(asset, spread, current_price)

# APIクライアントのインスタンス化とボットの起動
api_clients = {
    "Asset1": YourAPIClient1(),
    "Asset2": YourAPIClient2(),
    # 他の資産APIクライアント
}
volatility_factors = {
    "Asset1": 1.2,
    "Asset2": 0.8,
    # 他の資産のボラティリティファクター
}

bot = MultiAssetMMBot(api_clients, volatility_factors)
bot.run()

主な改良点

  1. エラーハンドリング:各API呼び出しに対するエラーハンドリングを強化し、エラー時には適切なログ出力を行い、処理を中断せずに次のステップに進めるようにしました。
  2. 動的スプレッド計算:各資産のボラティリティファクターを考慮してスプレッドを計算し、市場の状況に応じてスプレッドを調整します。これにより、市場の変動が大きい資産ではより広いスプレッドを設定し、リスクを管理します。
  3. ロギング:操作の各段階で情報をログに記録し、デバッグと監視を容易にします。
Yodaka

このコードは、複数の資産に対する注文を効率的かつ安全に管理するための基本的なフレームであり、市場の動向に基づいて柔軟に対応できます。

関連
仮想通貨botの開発記録#73(2024/4/29)「高頻度取引のロジック案まとめ【基本30種】&【応用15種】」

続きを見る

関連
仮想通貨botの開発記録#82(2024/7/29)「基本的な取引ロジックの雛形まとめ②」

続きを見る

5.サーキットブレーカーの導入

class CircuitBreakerMMBot:
    def __init__(self, api_client, threshold):
        self.api_client = api_client
        self.threshold = threshold  # 市場の価格変動がこの閾値を超えた場合に取引を停止

    def check_market_conditions(self):
        # 市場状況を確認し、大きな価格変動を検出する
        recent_prices = self.api_client.get_recent_prices()
        if max(recent_prices) - min(recent_prices) > self.threshold:
            return True
        return False

    def place_orders(self):
        if not self.check_market_conditions():
            bid_price = self.api_client.get_current_price() - 0.005
            ask_price = self.api_client.get_current_price() + 0.005
            self.api_client.place_order('buy', bid_price)
            self.api_client.place_order('sell', ask_price)
        else:
            print("Trading paused due to high market volatility")

    def run(self):
        self.place_orders()

このコードは、CircuitBreakerMMBotクラスを定義しており、マーケットメイキングボットの一種であることを示しています。このボットは「サーキットブレーカー」機能を備えており、市場の価格変動が特定の閾値を超えた場合に取引を一時停止することができます。

Yodaka

具体的な機能と役割について詳しく説明します。

クラスの構造

  • __init__メソッド:
    • api_client: このAPIクライアントを通じて市場データの取得や注文の発行が行われます。
    • threshold: これはサーキットブレーカーが作動する市場価格変動の閾値です。この値を超える価格変動があった場合、ボットは取引を停止します。

メソッドの説明

  • check_market_conditionsメソッド:
    • このメソッドは、APIクライアントを使用して最近の価格データを取得し、その価格変動が設定された閾値を超えているかどうかを確認します。価格の最大値と最小値の差が閾値より大きい場合は、Trueを返して取引を停止するシグナルとなります。そうでない場合はFalseを返して通常通りの取引が可能です。
  • place_ordersメソッド:
    • check_market_conditionsメソッドを呼び出して市場状況を確認し、取引停止条件が満たされていない場合(Falseが返された場合)にのみ、買い注文と売り注文を配置します。この時、現在の市場価格から一定のスプレッド(ここでは±0.005)を設定して注文を行います。もし取引停止条件が満たされていれば(Trueが返された場合)、"Trading paused due to high market volatility"というメッセージを出力して取引を一時停止します。
  • runメソッド:
    • このメソッドはボットの主要な実行ロジックを担います。place_ordersメソッドを呼び出し、市場状況に応じて注文を配置するか取引を停止します。

実用性に向けた考慮事項

Yodaka

このボットは特に市場の大きな変動時にリスクを管理するために設計されていますが、実際の取引環境で使用するには以下のような追加の機能が考えられます。

  • 頻繁な市場監視: runメソッドを周期的に実行するために、スケジューリングやループ処理を導入することが望ましいです。
  • 高度な価格分析: より複雑な価格分析アルゴリズムを導入して、閾値を動的に調整するか、異なる市場条件での閾値の設定を行うことができます。
  • エラーハンドリングの強化: API呼び出しの失敗やその他の予期せぬエラーに対処するためのエラーハンドリングを強化することが重要です。

改良型

Yodaka

市場の大きな変動に対処し、より高度なエラーハンドリングと定期的な市場監視を実現するために、CircuitBreakerMMBot クラスを改良します。以下のコードは、非同期プログラミングを利用して周期的に市場状況をチェックし、動的に閾値を調整する機能を含めたものです。

import asyncio
import logging

class CircuitBreakerMMBot:
    def __init__(self, api_client, threshold):
        self.api_client = api_client
        self.base_threshold = threshold
        self.current_threshold = threshold
        logging.basicConfig(level=logging.INFO)

    async def fetch_recent_prices(self):
        try:
            return await self.api_client.get_recent_prices()
        except Exception as e:
            logging.error(f"Failed to fetch recent prices: {str(e)}")
            return []

    async def check_market_conditions(self):
        recent_prices = await self.fetch_recent_prices()
        if not recent_prices:
            logging.warning("No price data available.")
            return True  # Assume high volatility if data fetch fails
        price_range = max(recent_prices) - min(recent_prices)
        if price_range > self.current_threshold:
            logging.info(f"Market volatility too high: {price_range} exceeds threshold {self.current_threshold}")
            return True
        return False

    async def adjust_threshold(self):
        # Dynamically adjust the threshold based on market conditions
        avg_recent_price = sum(recent_prices) / len(recent_prices)
        self.current_threshold = self.base_threshold + 0.1 * avg_recent_price
        logging.info(f"Threshold adjusted to {self.current_threshold}")

    async def place_orders(self):
        if not await self.check_market_conditions():
            try:
                current_price = await self.api_client.get_current_price()
                bid_price = current_price - 0.005
                ask_price = current_price + 0.005
                await self.api_client.place_order('buy', bid_price)
                await self.api_client.place_order('sell', ask_price)
                logging.info(f"Orders placed: Buy at {bid_price}, Sell at {ask_price}")
            except Exception as e:
                logging.error(f"Failed to place orders: {str(e)}")
        else:
            logging.info("Trading paused due to high market volatility")

    async def run(self):
        while True:
            await self.place_orders()
            await asyncio.sleep(10)  # Check market conditions every 10 seconds

# Example usage
api_client = YourAPIClient()  # YourAPIClient should be an instance that matches the expected interface
bot = CircuitBreakerMMBot(api_client, threshold=50)
asyncio.run(bot.run())

改良点の詳細説明

  1. 非同期プログラミング: asyncio ライブラリを利用して、API呼び出しを非同期で行います。これにより、リアルタイムでの市場監視と迅速な反応が可能になります。
  2. エラーハンドリングの強化: 各API呼び出しにエラーハンドリングを追加し、問題発生時に適切なログを出力し、プログラムが停止しないようにします。
  3. 動的な閾値調整: 市場の平均価格に基づいて閾値を動的に調整する機能を追加し、市場の変動による影響を考慮します。これにより、市場の安定性に応じてサーキットブレーカーの感度を調整できます。
  4. 定期的な市場監視: 一定間隔で市場の状態を確認し、必要に応じて取引を行います。これにより、市場の急激な変動に迅速に対応することが可能です。
Yodaka

この改良されたコードは、市場の大きな変動時にも柔軟に対応できるように設計されており、リスク管理をより効果的に行うことができます。

関連
仮想通貨botの開発を本格的に始めてみる#34(2023/11/23)「サーキットブレーカーBotを作ってみる①」

続きを見る

まとめ

今回は「MMbotの戦略強化」という観点で5種類のフレームワークを紹介しました。

実装に向けて調整は必要ですが、それぞれのbotを走らせながら効果検証を進めていきます。

Yodaka

最近はアイデアを検証するための時間が足りていないを感じるので、こういう時こそひとつずつ丁寧に実装・検証していきます。

今後もこの調子でbot開発の状況を発信していきます。

関連
仮想通貨botの開発を本格的に始めてみる#39(2023/12/20)「MMBotの開発④MMBotとは?/マーケットメイキング(MM)戦略のコスト分析」

続きを見る

-Bot