Bot トレードロジック プログラミングスキル

仮想通貨botの開発を本格的に始めてみる#40(2023/12/30)「MMBotの開発⑤サンプルコードの解析」

2023年12月30日

前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。

ソースコードはこちら

Yodaka

本記事では「メソッド」と「関数」を同じような意味の言葉として使っている部分もありますが、厳密には定義が異なるため、より詳細に理解したい方はこちらの記事を参考にしてください。「関数とメソッドの違い

1.モジュールのインポート

Yodaka

以下のコードは、冒頭部分です。まずは、それぞれのインポート文について説明します。

from __future__ import annotations
from typing import TYPE_CHECKING, Callable

if TYPE_CHECKING:
    from pybotters.store import DataStore, Item

import asyncio
import loguru
import pybotters

from argparse import ArgumentParser
from functools import partial
  • from __future__ import annotations: この文は、Python 3.7以降で型アノテーションを行う際の挙動を変更するためのものです。annotationsフューチャーは、型の循環参照に対処するためのものです。詳細な理由や背景については、公式ドキュメントを参照。
  • from typing import TYPE_CHECKING, Callable: typingモジュールからTYPE_CHECKINGCallableをインポートしています。
    • TYPE_CHECKING: これは、型ヒントをチェックするための特殊な値です。通常はランタイムで実行されないようになっています。型ヒントの中で循環参照を避けるために使用されます。
    • Callable: 関数の型ヒントとして使用され、引数と戻り値の型を指定するために利用されます。
  • if TYPE_CHECKING: ...: 前述のTYPE_CHECKINGの値がTrueの場合、以下のブロックが実行されます。通常、この中では実行時には利用できない型ヒントを扱うための特殊な処理を行うことがあります。上のコードでは、pybotters.storeモジュールからDataStoreItemをインポートしています。
  • import asyncio: 非同期処理をサポートするためのasyncioモジュールをインポートしています。非同期プログラミングにおいては、非同期イベントループを管理するためにasyncioが利用されます。
  • import loguru: ロギングを行うためのサードパーティのモジュールloguruをインポートしています。loguruは、シンプルで柔軟なログの取得を可能にするためのツールです。このモジュールはPythonの標準ライブラリではないので、別途インストールする必要があります。Loguruの使い方(Pythonログ出力ライブラリ)
  • import pybotters: この行では、pybottersというモジュールをインポートしています。これはプロジェクト独自のモジュールで、その中でstoreモジュールが定義されています。インストールの手順はこちらから。
  • from argparse import ArgumentParser: コマンドライン引数を処理するための標準ライブラリのargparseモジュールからArgumentParserクラスをインポートしています。これを使用することで、スクリプトにコマンドライン引数を追加し、それらを処理することができます。
  • from functools import partial: functoolsモジュールからpartial関数をインポートしています。partial関数は、他の関数を一部適用して新しい関数を作成するために使用されます。

2.class Status(状態管理のクラス)

Yodaka

このクラスは、特定の仮想通貨取引所(bitFlyer)のデータを処理するためのものです。

class Status:
    def __init__(
        self,
        store: pybotters.bitFlyerDataStore,
        max_position: int = 1,
    ):
        self._store: pybotters.bitFlyerDataStore = store
        self._asks = None
        self._bids = None
        self._max_position = max_position

このクラスは、主にbitFlyer取引所のデータを管理するためのものであり、特に板情報を定期的に更新する非同期タスクを含んでいます。クラスのコンストラクタでは、取引所データの格納、ポジションの最大許容量などが初期化されます。

インポートとの関連

import asyncio
import pybotters
  • asyncio: 非同期処理をサポートするPythonの組み込みモジュール。
  • pybotters: 外部ライブラリまたはモジュールで、bitFlyer取引所のデータを処理するためのツールや関数が含まれています。

Statusクラスの定義

class Status:

クラス Status を定義しています。

コンストラクタの定義

    def __init__(
        self,
        store: pybotters.bitFlyerDataStore,
        max_position: int = 1,
    ):
  • クラスのコンストラクタ(__init__メソッド)を定義しています。コンストラクタはクラスのインスタンスが作成されるときに呼び出されます。
  • store: bitFlyerデータを取得するための pybotters.bitFlyerDataStore オブジェクト。型注釈により、この引数がどの型であるべきか示しています。
  • max_position: ポジション(取引の保有量)の最大許容量。デフォルトでは1に設定されています。

インスタンス変数の初期化

        self._store: pybotters.bitFlyerDataStore = store
        self._asks = None
        self._bids = None
        self._max_position = max_position
  • インスタンス変数 _store に、渡された store 引数を格納します。
  • インスタンス変数 _asks_bidsNone で初期化します。
  • インスタンス変数 _max_position に、渡された max_position 引数を格納します。

ボード(板情報)の自動更新タスクの作成

        asyncio.create_task(self.auto_update_board())

asyncio.create_task() を使用して、auto_update_board() メソッドを非同期タスクとして作成します。このメソッドはボード(板情報)のデータを定期的に更新するためのものです。

板情報の自動更新

Yodaka

このコードは、非同期(async)で動作するボード(板)情報の自動更新機能を提供する関数です。

async def auto_update_board(self):
        """板情報の自動更新タスク"""
        with self._store.board.watch() as stream:
            async for msg in stream:
                self._asks, self._bids = self._store.board.sorted().values()

この関数は、bitFlyer 取引所のボード情報を非同期的に監視し、変更があった場合にそれを取得してクラスのインスタンス変数に格納するものです。非同期プログラミングの概念や、asyncio モジュールにおける非同期イベントの取り扱いが含まれています。

非同期関数の定義

async def auto_update_board(self):
  • 非同期関数(asyncキーワードがついている)として auto_update_board 関数が定義されています。
  • self パラメータがあり、クラスのメソッドとして使われることを示しています。

板情報の非同期的な監視の開始

        with self._store.board.watch() as stream:
  • self._store.board.watch() は、self._store が持つ board オブジェクトの監視を開始します。
  • with ステートメントを使用して、監視が終了した時点でクリーンアップ処理を行います。
  • stream は監視ストリーム(stream)で、取引所からのデータ更新を待ち受けるためのものです。

非同期イベントの待機と処理

            async for msg in stream:
                self._asks, self._bids = self._store.board.sorted().values()
  • async for 文は非同期イベントの待機と処理を行います。stream からイベントが到着するたびにループが実行されます。
  • msg には取引所からのメッセージが入りますが、このコードでは使われていません。
  • self._store.board.sorted().values() は、ボードの売り注文(asks)と買い注文(bids)を取得し、それぞれ self._asksself._bids に代入します。

指値注文を出す関数

Yodaka

このコードは、注文板のデータから指定された累積注文サイズの閾値を超えた位置に指値注文を出すための関数です。

def get_limit_price(self, side: str, t: float, d: int = 1):
        """注文サイズの累積量が``t``を超えたところに``d``だけ離して指値をだす。

        :param str side: ask or bid
        :param float t: 累積注文サイズの閾値
        :param int d: 参照注文からのマージン
        :return: 指値
        """
        items = self._asks if side == "ask" else self._bids
        cum_size, price = items[0]["size"], items[0]["price"]
        for i in items:
            if cum_size >= t:
                return int(price + d if side == "ask" else price - d)
            price = i["price"]
            cum_size += i["size"]
        # 最後までthresholdを満たさなかった場合、一番後ろの注文と同じ額
        return int(items[-1]["price"])

この関数は、指定された注文板から特定の条件を満たす位置に指値注文を出すためのものです。注文板から注文を順に見ていき、指定された閾値を超えた時点で指値価格を計算して返します。初心者が理解する上でのポイントは、注文板の概念やループ処理の流れです。

関数の定義

def get_limit_price(self, side: str, t: float, d: int = 1):
  • get_limit_price 関数が定義されています。
  • side パラメータは文字列で、"ask"(売り注文)または"bid"(買い注文)を指定します。
  • t パラメータは浮動小数点数で、累積注文サイズの閾値を示します。
  • d パラメータは整数で、指定された閾値を超えた位置から離れるマージンを表します。デフォルトでは1に設定されています。

注文ブックの選択

        items = self._asks if side == "ask" else self._bids
  • side パラメータに応じて、指定された方向の注文ブックを選択します。
  • self._asks は売り注文のリスト、self._bids は買い注文のリストを仮定しています。

累積注文サイズの計算と指値価格の取得

        cum_size, price = items[0]["size"], items[0]["price"]
        for i in items:
            if cum_size >= t:
                return int(price + d if side == "ask" else price - d)
            price = i["price"]
            cum_size += i["size"]
  • 注文ブックを順番に見ていき、累積注文サイズが指定された閾値 t を超えた時点で、指定されたマージン d だけ離した位置の指値価格を計算して返します。
  • cum_size: 累積注文サイズを累積していく変数。
  • price: 注文の価格。

閾値を満たさなかった場合の処理

        # 最後までthresholdを満たさなかった場合、一番後ろの注文と同じ額
        return int(items[-1]["price"])

注文ブックを最後まで探しても閾値 t を満たさなかった場合、一番後ろの注文の価格を指定された形式に変換して返します。

保有ポジションのリストを取得する関数

Yodaka

このコードは、特定の取引方向(BUYまたはSELL)に関連する保有ポジションのリストを取得するためのメソッドです。

 def positions(self, side: str):
        """保有ポジションリスト。

        :param str side: BUY or SELL
        :return: ポジションのlist
        """
        positions = self._store.positions.find({"side": side})
        assert len(positions) <= self._max_position
        return positions

このメソッドは、指定された取引方向に関連するポジションを取得し、最大ポジション数の条件を満たすかどうかを検証してから、ポジションのリストを返すものです。ここでも、メソッドの目的や各ステップで行われる処理の流れを理解することが重要です。

メソッドの定義

def positions(self, side: str):
  • positions メソッドが定義されています。
  • self パラメータがあり、クラスのメソッドとして使われることを示しています。
  • side パラメータは文字列で、"BUY"または"SELL"を指定します。

ポジションの取得

        positions = self._store.positions.find({"side": side})
  • self._store.positions.find({"side": side}) は、self._store が持つ positions オブジェクトから、指定された取引方向("BUY"または"SELL")に関連するポジションを検索します。
  • ポジションのデータはおそらく辞書(dictionary)のリストとして返されます。

ポジション数の制約

        assert len(positions) <= self._max_position

  • assert 文は、指定された条件が満たされているかどうかを検証するためのものです。条件が偽の場合、AssertionError が発生します。
  • この行では、取得したポジションの数が設定された最大ポジション数(self._max_position)以下であることを検証しています。

結果の返却

        return positions

最後に、取得したポジションのリストを返します。

保有ポジションのサイズを確認する関数

Yodaka

このコードは、指定された取引方向(BUYまたはSELL)に関連する保有ポジションのサイズを取得するためのメソッドです。

def remaining_size(self, side):
        """保有ポジションサイズ。

        :param str side: BUY or SELL
        :return: ポジションサイズ
        """
        positions = self.positions(side)
        if len(positions):
            return sum([p["size"] for p in positions])
        else:
            return 0

このメソッドは、指定された取引方向に関連する保有ポジションのサイズを取得するためのものです。取得したポジションが存在する場合は、そのサイズを合計して返し、ポジションが存在しない場合は0を返します。初心者が理解する上でのポイントは、他のメソッドの呼び出しや条件文によるフローの制御です。

メソッドの定義

def remaining_size(self, side):
  • remaining_size メソッドが定義されています。
  • self パラメータがあり、おそらくクラスのメソッドとして使われることを示しています。
  • side パラメータは文字列で、"BUY"または"SELL"を指定します。

他のメソッドの呼び出し

        positions = self.positions(side)
  • self.positions(side) は、同じクラス内の positions メソッドを呼び出して、指定された取引方向に関連するポジションのリストを取得します。
  • 取得したポジションの情報は positions 変数に格納されます。

ポジションサイズの計算

        if len(positions):
            return sum([p["size"] for p in positions])
  • if len(positions): は、取得したポジションが存在するかどうかを検証しています。len(positions) が0でない(ポジションが存在する)場合に、以下の処理を行います。
  • sum([p["size"] for p in positions]) は、取得したポジションのサイズ("size" フィールドの値)を合計してポジションサイズを計算します。

ポジションが存在しない場合の処理

        else:
            return 0

ポジションが存在しない場合(len(positions) が0の場合)、0を返します。

3つのプロパティ

Yodaka

このコードは、3つのプロパティ(best_askbest_bidspread)を持つクラスの一部です。
'@'で書き始めることによって、関数に特殊な振る舞いをさせることができます。

 @property
    def best_ask(self):
        return int(self._asks[0]["price"])

    @property
    def best_bid(self):
        return int(self._bids[0]["price"])

    @property
    def spread(self):
        return (self.best_ask - self.best_bid) / self.best_bid

このクラスは、最良の売り注文価格、最良の買い注文価格、およびそれらの差であるスプレッドを提供する3つのプロパティを持っています。それぞれのプロパティは、注文ブックから必要な情報を抽出し、適切な形式に変換して返します。ここを理解する上でのポイントは、プロパティが属性のようにアクセスできること、デコレータ(関数に特殊な振る舞いをさせる機能)、およびスプレッドの概念です。

best_ask プロパティ

    @property
    def best_ask(self):
        return int(self._asks[0]["price"])
  • best_ask プロパティは、売り注文ブック(asks)から最も価格が低い注文の価格を取得します。
  • @property デコレータを使用して、このメソッドを属性のようにアクセスできるようにしています。
  • self._asks[0]["price"] は売り注文ブックの一番上にある注文の価格を取得します。

best_bid プロパティ

    @property
    def best_bid(self):
        return int(self._bids[0]["price"])
  • best_bid プロパティは、買い注文ブック(bids)から最も価格が高い注文の価格を取得します。
  • @property デコレータを使用して、このメソッドを属性のようにアクセスできるようにしています。
  • self._bids[0]["price"] は買い注文ブックの一番上にある注文の価格を取得します。

spread プロパティ

    @property
    def spread(self):
        return (self.best_ask - self.best_bid) / self.best_bid
  • spread プロパティは、best_ask(最良の売り注文価格)と best_bid(最良の買い注文価格)の差を計算し、その差を best_bid で割った値を返します。
  • スプレッドは、取引所の売りと買いの最良価格との差を表す指標で、取引所での流動性を示す一つの尺度です。

class EventWatcher(Event監視のクラス)

Yodaka

このコードは、イベントを監視するためのクラス EventWatcher を定義しています。

class EventWatcher:
    def __init__(self, store: DataStore, trigger_fn: Callable[[Item], bool] = None):
        self._store = store
        self._trigger_fn = trigger_fn
        self._task = asyncio.create_task(self._watch())

このクラスは、外部から提供されたデータストアを用いてイベントを監視し、トリガー条件が満たされた場合に指定されたアクションを実行するためのものです。理解を深めるポイントは、外部から注入される関数や非同期タスクの概念、そして __init__ メソッドの使い方です。

クラスの定義

class EventWatcher:

クラス EventWatcher を定義しています。

クラスのコンストラクタ(__init__ メソッド)

    def __init__(self, store: DataStore, trigger_fn: Callable[[Item], bool] = None):
        self._store = store
        self._trigger_fn = trigger_fn
        self._task = asyncio.create_task(self._watch())
  • クラスのコンストラクタ (__init__ メソッド) が定義されています。
  • store パラメータは DataStore クラスのオブジェクトを受け取ります。これは、外部から提供されるデータストアを指定するものです。
  • trigger_fn パラメータは、イベントをトリガーするための関数です。Callable[[Item], bool] は、引数が Item 型で、戻り値が bool 型の関数であることを示しています。もし trigger_fn が指定されなかった場合、デフォルトで None になります。
  • self._store には外部から提供されたデータストアが格納されます。
  • self._trigger_fn には外部から提供されたトリガー関数が格納されます。
  • self._task には、_watch メソッドを非同期タスクとして実行するためのタスクが格納されます。

_watch メソッド

        self._task = asyncio.create_task(self._watch())

イベント監視のための非同期タスクを開始するために、asyncio.create_task を使用して _watch メソッドを非同期タスクとして実行しています。

非同期関数_watch

Yodaka

このコードは非同期関数 _watch を定義しています。この関数は、DataStore を監視し、指定された条件が満たされるまでイベントを待機するものです。

この非同期関数 _watch は、DataStore を監視し、指定された条件が満たされるまでイベントを待機するものです。指定された条件が満たされた場合、そのイベントメッセージを返し、関数が終了します。理解する上でのポイントは、非同期プログラミングの概念や async for ループ、with ステートメントの使い方です。

async def _watch(self):
    """`_is_target_event(msg.data)`がTrueを返すまでDataStoreをwatchし続ける。"""
    with self._store.watch() as stream:
        async for msg in stream:
            if self._is_trigger(msg.data):
                return msg.data

関数の定義

  • _watch 関数は非同期関数として定義されています。
  • ドキュメンテーション文字列により、関数の目的が説明されています。この関数は、指定された条件が満たされるまで DataStore を監視し続けるものです。
  • with self._store.watch() as stream: は、DataStorewatch メソッドを使用して、イベントのストリームを開始します。この with ステートメントにより、監視が終了した時点でクリーンアップが自動的に行われます。
  • async for msg in stream: は、stream からイベントメッセージを非同期的に待ち受けるループです。

イベントトリガー条件の判定

            if self._is_trigger(msg.data):
                return msg.data
  • self._is_trigger(msg.data)True を返すかどうかを判定します。
  • もし True を返すならば、そのイベントメッセージ msg.data を返し、関数を終了します。これにより、指定された条件が満たされた時点で関数が終了します。

_is_trigger(イベントのトリガー条件を判定するメソッド)

Yodaka

このコードは、イベントのトリガー条件を判定するための非常に簡潔なメソッド _is_trigger を定義しています。

def _is_trigger(self, d: Item):
        """socketメッセージを受け取って、イベント発火の有無を判定する。
        子クラスこの関数をオーバーライドしてもいいし、`trigger_fn`として与えてもいい。

        :param Item d: socketメッセージ。
        :return:
        """
        if self._trigger_fn is None:
            raise NotImplementedError
        return self._trigger_fn(d)

このメソッドは、指定されたデータに対してイベントのトリガー条件を判定するためのものです。self._trigger_fn が未設定の場合はエラーを発生させ、設定されている場合はそれを呼び出してトリガー条件を確認します。理解する上でのポイントは、メソッドの目的やドキュメンテーション文字列、および例外の処理です。

_is_trigger メソッドの定義

  • _is_trigger メソッドは、指定された Item 型のデータ d に対してイベントのトリガー条件を判定する役割を持っています。
  • ドキュメンテーション文字列により、関数の目的や使い方が説明されています。
  • :param Item d: は、このメソッドの引数 d の型と説明を示しています。Item はおそらく特定の型を表しているものと考えられます。
  • :return: は、このメソッドの戻り値についての説明です。

イベントトリガー条件の判定

    if self._trigger_fn is None:
        raise NotImplementedError
    return self._trigger_fn(d)
  • self._trigger_fnNone(未設定)の場合、NotImplementedError を発生させます。これは、派生クラスでこのメソッドが適切に実装されていない場合のエラーです。
  • self._trigger_fn(d) は、実際にイベントのトリガー条件を判定する関数を呼び出しています。この関数は、d という引数を取り、特定の条件を満たすかどうかを返します。

非同期処理を扱うためのメソッド群

Yodaka

このコードは非同期処理を扱うためのクラスのメソッド群です。

async def wait(self):
    await self._task

def done(self):
    return self._task.done()

def result(self):
    return self._task.result()

このクラスは非同期処理を管理するためのもので、wait メソッドを使って非同期処理の完了を待ち、done メソッドを使って非同期処理の完了状態を確認し、result メソッドを使って非同期処理の結果を取得します。理解する上でのポイントは、非同期処理の待機と結果取得の仕組みに焦点を当てることです。

wait メソッド

async def wait(self):
    await self._task
  • wait メソッドは、非同期処理が完了するまで待機するためのものです。
  • await self._task は、_task が完了するまで非同期的に待機します。他の非同期処理が進行する一方で、このメソッドは _task の完了を待ちます。

done メソッド

def done(self):
    return self._task.done()
  • done メソッドは、非同期処理が完了しているかどうかを判定するためのものです。
  • self._task.done() は、非同期処理が完了している場合は True を、まだ未完了の場合は False を返します。

result メソッド

def result(self):
    return self._task.result()
  • result メソッドは、非同期処理が完了した後にその結果を取得するためのものです。
  • self._task.result() は、非同期処理が完了している場合はその結果を返します。ただし、非同期処理がまだ完了していない場合は適切な方法で待機することが期待されます。

3.class ChildOrderEventWatcher(EventWatcher)「特定の注文に関連するイベントを監視する」

Yodaka

このコードは、EventWatcher クラスを継承した ChildOrderEventWatcher クラスを定義しています。

class ChildOrderEventWatcher(EventWatcher):
    def __init__(self, store, order_id, **kwargs):
        self._order_id = order_id
        self._cond = kwargs
        super(ChildOrderEventWatcher, self).__init__(store)

    def _is_trigger(self, d):
        return d["child_order_acceptance_id"] == self._order_id and all(
            [v == d[k] for (k, v) in self._cond.items()]
        )

    def replace_order_id(self, order_id):
        self._order_id = order_id

このクラスは、特定の注文に関連するイベントを監視するためのものであり、指定された注文IDと条件がイベントデータと一致したときにトリガーされます。理解する上でのポイントは、クラスの初期化や条件の判定のメカニズム、メソッドの目的などです。

__init__ メソッド

    def __init__(self, store, order_id, **kwargs):
        self._order_id = order_id
        self._cond = kwargs
        super(ChildOrderEventWatcher, self).__init__(store)
  • __init__ メソッドは、クラスの初期化を行います。
  • store パラメータは、DataStore のような外部から提供されるデータストアオブジェクトです。
  • order_id パラメータは、特定の注文を識別するための注文IDです。
  • **kwargs は可変長のキーワード引数で、任意の数のキーと値を持つ辞書として受け取ります。これは条件を表します。
  • self._order_id には注文IDが格納されます。
  • self._cond には条件が格納されます。
  • super(ChildOrderEventWatcher, self).__init__(store) は親クラスのコンストラクタを呼び出して、親クラスの初期化を行います。

_is_trigger メソッド

    def _is_trigger(self, d):
        return d["child_order_acceptance_id"] == self._order_id and all(
            [v == d[k] for (k, v) in self._cond.items()]
        )
  • _is_trigger メソッドは、特定の条件に基づいてイベントのトリガーを判定します。
  • d パラメータは、受け取ったイベントデータです。
  • メソッドは、イベントデータの "child_order_acceptance_id"self._order_id と等しく、かつ self._cond に指定された条件がすべて満たされている場合に True を返します。

replace_order_id メソッド

    def replace_order_id(self, order_id):
        self._order_id = order_id
  • replace_order_id メソッドは、保持している注文IDを更新するためのものです。
  • order_id パラメータで新しい注文IDを指定し、それを self._order_id に格納します。

4.class ExecutionWatcher(ChildOrderEventWatcher)「特定の注文に関連する "EXECUTION" イベントを監視する」

Yodaka

このコードは、ChildOrderEventWatcher クラスを継承して新しいクラス ExecutionWatcher を定義しています。

class ExecutionWatcher(ChildOrderEventWatcher):
    def __init__(self, store, order_id):
        super(ExecutionWatcher, self).__init__(store, order_id, event_type="EXECUTION")

ExecutionWatcher クラスは、ChildOrderEventWatcher クラスを継承しつつ、特定の注文に関連する "EXECUTION" イベントを監視するために初期化されます。理解する上でのポイントは、クラスの継承や初期化の仕組み、および親クラスの設定に注目することです。

ExecutionWatcher クラスの定義

class ExecutionWatcher(ChildOrderEventWatcher):
    def __init__(self, store, order_id):
        super(ExecutionWatcher, self).__init__(store, order_id, event_type="EXECUTION")
  • ExecutionWatcher クラスは ChildOrderEventWatcher クラスを継承しています。これは、ChildOrderEventWatcher クラスの機能を継承しつつ、新しい機能を追加するものです。
  • __init__ メソッドは、クラスの初期化を行います。親クラスのコンストラクタを呼び出して初期化するとともに、追加の情報(event_type)を設定しています。

__init__ メソッド

    def __init__(self, store, order_id):
        super(ExecutionWatcher, self).__init__(store, order_id, event_type="EXECUTION")
  • __init__ メソッドはクラスの初期化を行います。
  • store パラメータは外部から提供されるデータストアオブジェクトです。
  • order_id パラメータは特定の注文を識別するための注文IDです。
  • super(ExecutionWatcher, self).__init__(store, order_id, event_type="EXECUTION") は親クラス(ChildOrderEventWatcher)のコンストラクタを呼び出して初期化します。ここで、event_type パラメータに "EXECUTION" を指定しています。これは、ChildOrderEventWatcher クラスがイベントを監視する条件の一部です。具体的には、このクラスは "EXECUTION" イベントに対して監視を行うように設定されています。

5.class CancelWatcher(ChildOrderEventWatcher)「特定の注文に関連する "CANCEL" または "CANCEL_FAILED" イベントを監視する」

Yodaka

このコードは、ChildOrderEventWatcher クラスを継承した新しいクラス CancelWatcher を定義しています。

class CancelWatcher(ChildOrderEventWatcher):
    def __init__(self, store, order_id):
        super(CancelWatcher, self).__init__(store, order_id)

    def _is_trigger(self, d):
        return d["child_order_acceptance_id"] == self._order_id and d["event_type"] in [
            "CANCEL",
            "CANCEL_FAILED",
        ]

CancelWatcher クラスは、ChildOrderEventWatcher クラスを継承して特定の注文に関連する "CANCEL" または "CANCEL_FAILED" イベントを監視するために初期化されます。理解する上でのポイントは、クラスの継承初期化の仕組み、そしてイベントのトリガー条件に注目することです。

CancelWatcher クラスの定義

class CancelWatcher(ChildOrderEventWatcher):
    def __init__(self, store, order_id):
        super(CancelWatcher, self).__init__(store, order_id)
  • CancelWatcher クラスは ChildOrderEventWatcher クラスを継承しています。これは、ChildOrderEventWatcher クラスの機能を利用しつつ、新しいクラスを作成するものです。
  • __init__ メソッドはクラスの初期化を行います。親クラスのコンストラクタを呼び出して初期化を行っています。

__init__ メソッド

    def __init__(self, store, order_id):
        super(CancelWatcher, self).__init__(store, order_id)
  • __init__ メソッドは、クラスの初期化を行います。
  • store パラメータは外部から提供されるデータストアオブジェクトです。
  • order_id パラメータは特定の注文を識別するための注文IDです。
  • super(CancelWatcher, self).__init__(store, order_id) は親クラス(ChildOrderEventWatcher)のコンストラクタを呼び出して初期化を行います。これにより、CancelWatcher クラスも ChildOrderEventWatcher クラスの機能を利用できるようになります。

_is_trigger メソッド

    def _is_trigger(self, d):
        return d["child_order_acceptance_id"] == self._order_id and d["event_type"] in [
            "CANCEL",
            "CANCEL_FAILED",
        ]
  • is_trigger メソッドは、特定の条件に基づいてイベントのトリガーを判定します。
  • d パラメータは、受け取ったイベントデータです。
  • メソッドは、イベントデータの "child_order_acceptance_id"self._order_id と等しく、かつ "event_type" が指定されたイベントのいずれかである場合に True を返します。このクラスでは "CANCEL" または "CANCEL_FAILED" イベントがトリガー条件となっています。

6.注文ヘルパー①「指し値注文を行うための非同期関数」

Yodaka

このコードは、指定された条件で制限付き注文(リミットオーダー)を行うための非同期関数 limit_order を定義しています。

async def limit_order(client, symbol, side, size, price, time_in_force="GTC"):
    assert side in ["BUY", "SELL"]
    res = await client.post(
        "/v1/me/sendchildorder",
        data={
            "product_code": symbol,
            "side": side,
            "size": size,
            "child_order_type": "LIMIT",
            "price": int(price),
            "time_in_force": time_in_force,
        },
    )

    data = await res.json()

    if res.status != 200:
        raise RuntimeError(f"Invalid request: {data}")
    else:
        return data["child_order_acceptance_id"]

limit_order 関数は、指定された条件で制限付き注文を行い、注文受付IDを返す非同期関数です。理解する上でのポイントは、非同期処理、APIリクエストの作成、レスポンスの処理です。

limit_order 関数の定義

async def limit_order(client, symbol, side, size, price, time_in_force="GTC"):
  • limit_order 関数は非同期関数で、指定された条件で制限付き注文を行います。
  • パラメータ:
    • client: 注文を送信するクライアントオブジェクト(例えば、APIクライアントなど)。
    • symbol: トレードする商品のシンボル(例: BTC/USD)。
    • side: 取引の方向("BUY" または "SELL")。
    • size: 注文の数量。
    • price: 注文の価格。
    • time_in_force: 注文の有効期間(デフォルトは "GTC")。

条件のアサーション

    assert side in ["BUY", "SELL"]

assert side in ["BUY", "SELL"] は、side が "BUY" または "SELL" のいずれかであることを確認します。それ以外の場合はエラーが発生します。

注文の送信と結果の取得

    res = await client.post(
        "/v1/me/sendchildorder",
        data={
            "product_code": symbol,
            "side": side,
            "size": size,
            "child_order_type": "LIMIT",
            "price": int(price),
            "time_in_force": time_in_force,
        },
    )
  • client.post メソッドを使用して、注文を送信し、結果を取得します。
  • APIのエンドポイント /v1/me/sendchildorder に対して POST リクエストを行います。
  • リクエストのデータは、注文に関する情報(商品コード、取引方向、数量、注文タイプ、価格、有効期間など)です。

結果のJSONデータの取得

    data = await res.json()

res.json() メソッドを使用して、APIからのレスポンスを JSON 形式で取得します。

注文の確認と注文受付IDの返却

    if res.status != 200:
        raise RuntimeError(f"Invalid request: {data}")
    else:
        return data["child_order_acceptance_id"]
  • レスポンスのステータスコードが 200 でない場合、エラーメッセージを含んだ RuntimeError 例外が発生します。
  • そうでない場合、注文が正常に受け付けられたことを示すので、注文受付IDを返します。

7.注文ヘルパー②「注文をキャンセルするための非同期関数」

Yodaka

このコードは、指定された条件で注文をキャンセルするための非同期関数 cancel_order を定義しています。

async def cancel_order(client, symbol, order_id):
    order_id_key = "child_order_id"
    if order_id.startswith("JRF"):
        order_id_key = order_id_key.replace("_id", "_acceptance_id")

    res = await client.post(
        "/v1/me/cancelchildorder", data={"product_code": symbol, order_id_key: order_id}
    )

    return res.status == 200

cancel_order 関数は、指定された条件で注文をキャンセルし、その結果を True または False で返す非同期関数です。理解する上でのポイントは、非同期処理、APIリクエストの作成、レスポンスの処理などが含まれます。

cancel_order 関数の定義

async def cancel_order(client, symbol, order_id):
  • cancel_order 関数は非同期関数で、指定された条件で注文をキャンセルします。
  • パラメータ:
    • client: 注文を送信するクライアントオブジェクト。
    • symbol: トレードする商品のシンボル(例: BTC/USD)。
    • order_id: キャンセル対象の注文ID。

注文IDの調整

    order_id_key = "child_order_id"
    if order_id.startswith("JRF"):
        order_id_key = order_id_key.replace("_id", "_acceptance_id")
  • order_id が "JRF" で始まる場合、order_id_key の一部を置換しています。
  • 例えば、order_id が "JRF12345" の場合、order_id_key は "child_order_acceptance_id" に変更されます。これは、APIによって注文IDが "_acceptance_id" で終わる場合があるため、それに対応しています。

キャンセルリクエストの送信と結果の取得

    res = await client.post(
        "/v1/me/cancelchildorder", data={"product_code": symbol, order_id_key: order_id}
    )
  • client.post メソッドを使用して、キャンセルリクエストを送信し、結果を取得します。
  • APIのエンドポイント /v1/me/cancelchildorder に対して POST リクエストを行います。
  • リクエストのデータは、注文キャンセルに関する情報(商品コード、注文IDなど)です。

キャンセル結果の判定と返却

    return res.status == 200

レスポンスのステータスコードが 200 であれば、注文が正常にキャンセルされたことを示すため、True を返します。それ以外の場合は False を返します。

8.マーケットメイキングのロジック

Yodaka

ここからは、マーケットメイキングのロジックを作り上げている部分の解説です。戦略実装の要となる部分ですので、マーケットメイキング戦略への理解があることが土台です。

market_making 関数の定義

Yodaka

このコードは、マーケットメイキングアルゴリズムを実装する非同期関数 market_making を定義しています。

async def market_making(
    client,
    store: pybotters.bitFlyerDataStore,
    status: Status,
    symbol: str,
    t: float,
    d: int,
    s_entry: float,
    s_update: float,
    size: float,
    logger: loguru.Logger,
):
  • market_making 関数は、マーケットメイキングアルゴリズムを実装する非同期関数です。
  • パラメータ:
    • client: 注文を送信するクライアントオブジェクト。
    • store: データのストア(保存庫)オブジェクト。
    • status: トレードの状態を管理するオブジェクト。
    • symbol: トレードする商品のシンボル(例: BTC/USD)。
    • t: 注文サイズの累積量の閾値。
    • d: 注文価格を更新するマージン。
    • s_entry: スプレッドの基準となるエントリー価格の閾値。
    • s_update: スプレッドの更新に使用される閾値。
    • size: 注文の数量。
    • logger: ログを出力するための Logger オブジェクト。

_oneside_loop(片側の注文(BUYまたはSELL)からキャンセルして再注文を繰り返す非同期関数 _)

Yodaka

このコードは、片側の注文(BUYまたはSELL)からキャンセルして再注文を繰り返す非同期関数 _oneside_loop を実装しています。

    async def _oneside_loop(side: str, size: float, pricer: Callable[[], int]):
        """片サイドの注文→キャンセル→再注文ループ。

        :param side: "BUY" or "SELL"
        :param size: 注文サイズ
        :param pricer: 指値関数
        :return:
        """
        # エントリー
        price = pricer()
        order_id = await limit_order(client, symbol, side, size, price)
        logger.info(f"[{side} ENTRY] {order_id} / {price} / {size:.5f}")

        # 約定監視ループ
        execution_watcher = ExecutionWatcher(store.childorderevents, order_id)
        while not execution_watcher.done():

            # 指値更新間隔
            # 1ループでキャンセルと指値更新を最大2回x2(両サイド)行う。API制限が500/5minなので、
            # 300 / 3.5 * 4 = 342.85... とちょっと余裕あるくらいに設定しておく。
            # (余談)途中でcontinueとかよくするのでsleepはwhileの直下で実行するのが個人的に好き
            await asyncio.sleep(3.5)

            # spreadが閾値以上なので指値更新
            if status.spread > s_update:
                new_price = pricer()
                if price != new_price:
                    # 前の注文をキャンセル。childorvereventsをwatchしてCANCEL or
                    # CANCEL_FAILEDのステータスを確認してから次の注文を入れる
                    cancel_watcher = CancelWatcher(store.childorderevents, order_id)
                    is_canceled = await cancel_order(client, symbol, order_id)
                    await cancel_watcher.wait()

                    cancel_result = cancel_watcher.result()

                    if cancel_result["event_type"] == "CANCEL":
                        # キャンセル成功→再注文
                        logger.info(f"[{side} CANCELED] {order_id}")
                        new_order_id = await limit_order(
                            client, symbol, side, size, new_price
                        )

                        # 監視する注文番号・指値を更新
                        execution_watcher.replace_order_id(new_order_id)
                        order_id = new_order_id
                        price = new_price
                        logger.info(f"[{side} UPDATE] {order_id} / {price}")

                    elif cancel_result["event_type"] == "CANCEL_FAILED":
                        # キャンセル失敗→約定しているはずなので次のループでexecution_watcherがdoneになる
                        logger.info(
                            f"[{side} CANCEL FAILED] {order_id} (should be executed)"
                        )
                        continue

        # 約定
        loop_result = execution_watcher.result()
        logger.info(f"[{side} FINISH] {loop_result}")
        return loop_result

理解する上でのポイントは、非同期処理、注文の発行とキャンセル、注文の監視です。

_oneside_loop 関数の定義

async def _oneside_loop(side: str, size: float, pricer: Callable[[], int]):
  • _oneside_loop 関数は、片側の注文からキャンセルして再注文を繰り返す非同期関数です。
  • パラメータ:
    • side: 注文の方向("BUY" または "SELL")。
    • size: 注文サイズ。
    • pricer: 指値関数(注文価格を決定するための関数)。

エントリー(新規注文)の処理

        price = pricer()
        order_id = await limit_order(client, symbol, side, size, price)
        logger.info(f"[{side} ENTRY] {order_id} / {price} / {size:.5f}")
  • pricer 関数を使用して指値価格を取得し、それを使って新規注文を行います。
  • limit_order 関数は、指定された条件で制限付き注文を行います。
  • ログに注文ID、注文価格、注文サイズを表示します。

約定監視ループ

        execution_watcher = ExecutionWatcher(store.childorderevents, order_id)
        while not execution_watcher.done():

ExecutionWatcher オブジェクトを作成し、注文の約定を監視するループを開始します。

指値更新とキャンセルの処理

            await asyncio.sleep(3.5)

            if status.spread > s_update:
                new_price = pricer()

                # 以下、指値の更新とキャンセルの処理
  • 指定の条件(スプレッドが閾値以上)の場合、指値を新しい価格に更新します。
  • 注文のキャンセル処理を行います。キャンセルが成功した場合、再注文を行います。

キャンセルが成功した場合の処理

                    logger.info(f"[{side} CANCELED] {order_id}")
                    new_order_id = await limit_order(
                        client, symbol, side, size, new_price
                    )
                    execution_watcher.replace_order_id(new_order_id)
                    order_id = new_order_id
                    price = new_price
                    logger.info(f"[{side} UPDATE] {order_id} / {price}")
  • キャンセルが成功した場合、ログにキャンセルの成功と更新後の注文情報を表示します。
  • 新しい注文を作成し、ExecutionWatcher オブジェクトの監視対象の注文IDを更新します。

キャンセルが失敗した場合の処理

                elif cancel_result["event_type"] == "CANCEL_FAILED":
                    logger.info(
                        f"[{side} CANCEL FAILED] {order_id} (should be executed)"
                    )
                    continue

キャンセルが失敗した場合、ログにキャンセルの失敗を表示し、次のループで約定したものとして処理を継続します。

約定が完了した場合の処理

        loop_result = execution_watcher.result()
        logger.info(f"[{side} FINISH] {loop_result}")
        return loop_result

約定が完了した場合、ログに約定の結果を表示し、その結果を返します。

結果をログに表示するプログラム

Yodaka

このコードは、無限ループでスプレッドが特定の閾値を超えた場合に、指定された条件で買い注文と売り注文を同時に実行して、その結果をログに表示するプログラムです。

   while True:
        sp = status.spread
        if sp > s_entry:
            # 現在のポジションから端数を取得
            buy_remaining_size = status.remaining_size("BUY")
            sell_remaining_size = status.remaining_size("SELL")

            logger.info(
                f"[START]\n"
                f"\tspread: {sp}\n"
                f"\tsymbol: {symbol}\n"
                f"\tt: {t}\n"
                f"\td: {d}\n"
                f"\ts_entry: {s_entry}\n"
                f"\ts_update: {s_update}\n"
                f"\tbuy_size: {size + sell_remaining_size}\n"
                f"\tsell_size: {size + buy_remaining_size}"
            )

            buy_result, sell_result = await asyncio.gather(
                _oneside_loop(
                    "BUY",
                    size + sell_remaining_size,
                    partial(status.get_limit_price, "bid", t, d),
                ),
                _oneside_loop(
                    "SELL",
                    size + buy_remaining_size,
                    partial(status.get_limit_price, "ask", t, d),
                ),
            )

            logger.info(f"[FINISH] {sell_result['price'] - buy_result['price']}")
            break
        else:
            logger.info(
                f"[WAITING CHANCE] {status.best_ask} - ({status.spread:.4f}) - {status.best_bid}"
            )

            await asyncio.sleep(0.1)

メインの無限ループ

while True:
    sp = status.spread
    if sp > s_entry:
        # 中略
    else:
        # 中略
        await asyncio.sleep(0.1)
  • 無限ループを開始しています。
  • スプレッド(最良の売り注文価格と買い注文価格の差)が指定のエントリー閾値 s_entry を超えた場合とそれ以外の場合に分かれています。
  • await asyncio.sleep(0.1) によって、次のループまで0.1秒待機します。これにより、無駄な処理負荷を抑えながら定期的にスプレッドを確認します。

スプレッドが閾値を超えた場合の処理

if sp > s_entry:
    # 中略
else:
    # 中略

スプレッドがエントリー閾値 s_entry を超えた場合の処理が行われます。

ログと注文サイズの計算

buy_remaining_size = status.remaining_size("BUY")
sell_remaining_size = status.remaining_size("SELL")

logger.info(
    f"[START]\n"
    f"\tspread: {sp}\n"
    f"\tsymbol: {symbol}\n"
    f"\tt: {t}\n"
    f"\td: {d}\n"
    f"\ts_entry: {s_entry}\n"
    f"\ts_update: {s_update}\n"
    f"\tbuy_size: {size + sell_remaining_size}\n"
    f"\tsell_size: {size + buy_remaining_size}"
)
  • 現在のポジションから端数を取得して、ログに表示するための情報を計算しています。
  • remaining_size 関数は、指定された方向("BUY"または"SELL")のポジションの合計サイズを取得する関数です。

買い注文と売り注文の同時実行

buy_result, sell_result = await asyncio.gather(
    _oneside_loop(
        "BUY",
        size + sell_remaining_size,
        partial(status.get_limit_price, "bid", t, d),
    ),
    _oneside_loop(
        "SELL",
        size + buy_remaining_size,
        partial(status.get_limit_price, "ask", t, d),
    ),
)
  • asyncio.gather を使用して、買い注文と売り注文を同時に実行します。
  • _oneside_loop 関数は、片側の注文からキャンセルして再注文を繰り返す関数です。

ログに実行結果を表示

logger.info(f"[FINISH] {sell_result['price'] - buy_result['price']}")
break
  • 実行結果(買い注文と売り注文の約定価格の差)をログに表示します。
  • break によって、無限ループを終了させます。

スプレッドが閾値を超えていない場合のログ

else:
    logger.info(
        f"[WAITING CHANCE] {status.best_ask} - ({status.spread:.4f}) - {status.best_bid}"
    )
    await asyncio.sleep(0.1)
  • スプレッドがエントリー閾値を超えていない場合、ログに待機メッセージを表示します。
  • await asyncio.sleep(0.1) によって、次のループまで0.1秒待機します。

9.メインの処理

Yodaka

このコードは、BitflyerのAPIを使用して、マーケットメイキングアルゴリズムを実装したプログラムです。

async def main(args):
    # ロガーの設定
    logger = loguru.logger
    logger.add("log.txt", rotation="10MB", retention=3)

    # Bitflyerのクライアントとデータストアを初期化
    async with pybotters.Client(
        apis=args.api_key_json, base_url="https://api.bitflyer.com"
    ) as client:
        store = pybotters.bitFlyerDataStore()
        status = Status(store)

        # WebSocket接続とサブスクライブ
        wstask = await client.ws_connect(
            "wss://ws.lightstream.bitflyer.com/json-rpc",
            send_json=[
                {
                    "method": "subscribe",
                    "params": {"channel": "lightning_board_snapshot_FX_BTC_JPY"},
                    "id": 1,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": "lightning_board_FX_BTC_JPY"},
                    "id": 2,
                },
                {
                    "method": "subscribe",
                    "params": {"channel": "child_order_events"},
                    "id": 3,
                },
            ],
            hdlr_json=store.onmessage,
        )

        # WebSocketの初期化が完了するまで待機
        while not all([len(w) for w in [store.board]]):
            logger.debug("[WAITING SOCKET RESPONSE]")
            await store.wait()

        # メインの無限ループ
        while True:
            await market_making(
                client,
                store,
                status,
                args.symbol,
                args.t,
                args.d,
                args.s_entry,
                args.s_update,
                args.lot,
                logger,
            )

            await asyncio.sleep(args.interval)

main 関数の定義

async def main(args):
  • メインのプログラムが実行される関数です。
  • ロガーの設定やBitflyerのクライアントの初期化、WebSocketの接続やサブスクライブ、メインの無限ループが含まれています。
  • await asyncio.sleep(args.interval) によって、指定されたインターバルで無限ループが回ります。

ロギングの設定

logger.add("log.txt", rotation="10MB", retention=3)
  • log.txt という名前のログファイルを設定しています。
  • rotation="10MB" で10メガバイトごとにログファイルをローテーション(新しいファイルを作成)します。
  • retention=3 で古いログファイルを3つまで保持します。

Bitflyerのクライアントとデータストアの初期化

async with pybotters.Client(
    apis=args.api_key_json, base_url="https://api.bitflyer.com"
) as client:
    store = pybotters.bitFlyerDataStore()
    status = Status(store)
  • BitflyerのAPIを使用するためのクライアントを初期化します。
  • データストア(pybotters.bitFlyerDataStore())とトレードのステータスを管理するためのオブジェクトを初期化します。

WebSocket接続とサブスクライブ

wstask = await client.ws_connect(
    "wss://ws.lightstream.bitflyer.com/json-rpc",
    send_json=[
        {"method": "subscribe", "params": {"channel": "lightning_board_snapshot_FX_BTC_JPY"}, "id": 1},
        {"method": "subscribe", "params": {"channel": "lightning_board_FX_BTC_JPY"}, "id": 2},
        {"method": "subscribe", "params": {"channel": "child_order_events"}, "id": 3},
    ],
    hdlr_json=store.onmessage,
)
  • BitflyerのWebSocketに接続し、指定のチャンネルにサブスクライブします。
  • send_json でサブスクライブするチャンネルとそのパラメータを指定します。
  • hdlr_json=store.onmessage でWebSocketから受信したデータをデータストアに処理させます。

WebSocketの初期化が完了するまで待機

while not all([len(w) for w in [store.board]]):
    logger.debug("[WAITING SOCKET RESPONSE]")
    await store.wait()
  • WebSocketの初期化が完了するまで待機します。
  • store.wait() はデータストア内の特定のデータが到着するまで待機する関数です。

メインの無限ループ

while True:
    await market_making(
        client,
        store,
        status,
        args.symbol,
        args.t,
        args.d,
        args.s_entry,
        args.s_update,
        args.lot,
        logger,
    )

    await asyncio.sleep(args.interval)
  • メインの無限ループです。
  • market_making 関数を呼び出して市場メイキングアルゴリズムを実行します。
  • 指定されたインターバルだけ待機します。

10.main関数に渡す引数の設定

Yodaka

このコードは、コマンドライン引数を受け取り、指定された条件でマーケットメイキングアルゴリズムを実行するプログラムです。
コマンドラインから実行する際に様々な設定や条件を指定できるようになっており、asyncio モジュールを使用して非同期処理を行っています。

if __name__ == "__main__":

if __name__ == "__main__":
  • Python スクリプトが直接実行されるときに、そのスクリプトがエントリーポイントとして使われるブロックです。
  • この中に記述されたコードが、他のモジュールからインポートされたときには実行されません。
Yodaka

この部分は特殊な構文なので、具体例を挙げて詳しく解説します。

詳しく解説

if __name__ == "__main__": ブロックは Python スクリプトが直接実行されるときに、そのスクリプトがエントリーポイントとして動作するための特殊な構文です。ここで、その詳細な説明を提供します。

Python スクリプトが他のスクリプトやモジュールからインポートされると、そのスクリプトの中に書かれているコードが即座に実行されます。しかし、実行したいコードが他のファイルからインポートされたときには、この動作は望ましくありません。

例えば、以下のようなスクリプトがあるとします。

# my_module.py
print("Hello from my_module!")

そして、これを別のスクリプトからインポートしようとします。

# main_script.py
import my_module

この場合、main_script.py を実行すると、my_module.py の中の print 文も実行されてしまいます。しかし、my_module.py をモジュールとして使うとき、通常はモジュールの中の関数やクラスだけを使いたいと思うでしょう。

if __name__ == "__main__": ブロックは、スクリプトが直接実行された場合のみコードが実行されるようにするための仕組みです。例を用いて説明します。

# my_module.py
def greet():
    print("Hello from my_module!")

if __name__ == "__main__":
    greet()

この場合、my_module.py を直接実行すると、greet() 関数が呼び出されます。しかし、他のスクリプトから my_module をインポートした場合には greet() は実行されません。なぜなら、__name__ はスクリプトが直接実行された場合には "__main__" となり、それ以外の場合にはモジュール名になるからです。

この仕組みにより、モジュールを作成するときに不要なコードの実行を避けることができ、モジュールとして期待通りに使えるようになります。

コマンドライン引数のパース

parser = ArgumentParser(description="pybotters x asyncio x magito MM")
parser.add_argument("--api_key_json", help="apiキーが入ったJSONファイル", required=True)
parser.add_argument("--symbol", default="FX_BTC_JPY", help="取引通過")
parser.add_argument("--lot", default=0.01, type=float, help="注文サイズ")
parser.add_argument(
    "--t",
    default=0.01,
    type=float,
    help="板上での累積注文量に対する閾値(この閾値を超えた時点での注文価格が参照価格となる)",
)
parser.add_argument("--d", default=1, type=int, help="参照価格と指値のマージン(指値=参照価格±d)")
parser.add_argument(
    "--s_entry",
    default=0.0003,
    type=float,
    help="エントリー用のスプレッド閾値(スプレッドがこの閾値以上の時にマーケットメイキングを開始する)",
)
parser.add_argument(
    "--s_update",
    default=0.0001,
    type=float,
    help="指値更新用のスプレッド閾値(スプレッドがこの閾値以上の時に指値を更新する)",
)
parser.add_argument("--interval", default=5, type=int, help="マーケットメイキングサイクルの間隔")

args = parser.parse_args()
  • コマンドライン引数をパースするための ArgumentParser クラスを使用しています。
  • --api_key_json から --interval までの各引数が定義されています。
  • 引数のデフォルト値や型、ヘルプメッセージが指定されています。

メイン関数の呼び出し

try:
    asyncio.run(main(args))
except KeyboardInterrupt as e:
    pass
  • コマンドライン引数を元にして定義された main 関数を呼び出します。
  • asyncio.run 関数を使用して非同期のメイン関数 main を同期的に呼び出します。
  • キーボード割り込み (KeyboardInterrupt) が発生した場合、プログラムの実行を終了します。

例外処理

except KeyboardInterrupt as e:
    pass
  • キーボード割り込みが発生した場合の例外処理です。
  • プログラムの実行を終了します。

まとめ

長大なコードも部分ごとに見ていけば、各部分の機能やそれぞれのつながりが見えてきます。

分かりにくい部分は実際に手を動かして、コードを書いてみることが機能を理解する近道です。

引き続き、Botの作成と実践を積んでいきます。

今後は、イナゴbotの開発も進めていきます。

-Bot, トレードロジック, プログラミングスキル