前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
ソースコードはこちら。
本記事では「メソッド」と「関数」を同じような意味の言葉として使っている部分もありますが、厳密には定義が異なるため、より詳細に理解したい方はこちらの記事を参考にしてください。「関数とメソッドの違い」
1.モジュールのインポート
以下のコードは、冒頭部分です。まずは、それぞれのインポート文について説明します。
from __future__ import annotations from typing import TYPE_CHECKING, Callable if TYPE_CHECKING: from pybotters.store import DataStore, Item import asyncio import loguru import pybotters from argparse import ArgumentParser from functools import partial
from __future__ import annotations
: この文は、Python 3.7以降で型アノテーションを行う際の挙動を変更するためのものです。annotations
フューチャーは、型の循環参照に対処するためのものです。詳細な理由や背景については、公式ドキュメントを参照。from typing import TYPE_CHECKING, Callable
:typing
モジュールからTYPE_CHECKING
とCallable
をインポートしています。TYPE_CHECKING
: これは、型ヒントをチェックするための特殊な値です。通常はランタイムで実行されないようになっています。型ヒントの中で循環参照を避けるために使用されます。Callable
: 関数の型ヒントとして使用され、引数と戻り値の型を指定するために利用されます。
if TYPE_CHECKING: ...
: 前述のTYPE_CHECKING
の値がTrue
の場合、以下のブロックが実行されます。通常、この中では実行時には利用できない型ヒントを扱うための特殊な処理を行うことがあります。上のコードでは、pybotters.store
モジュールからDataStore
とItem
をインポートしています。import asyncio
: 非同期処理をサポートするためのasyncio
モジュールをインポートしています。非同期プログラミングにおいては、非同期イベントループを管理するためにasyncio
が利用されます。import loguru
: ロギングを行うためのサードパーティのモジュールloguru
をインポートしています。loguru
は、シンプルで柔軟なログの取得を可能にするためのツールです。このモジュールはPythonの標準ライブラリではないので、別途インストールする必要があります。Loguruの使い方(Pythonログ出力ライブラリ)import pybotters
: この行では、pybotters
というモジュールをインポートしています。これはプロジェクト独自のモジュールで、その中でstore
モジュールが定義されています。インストールの手順はこちらから。from argparse import ArgumentParser
: コマンドライン引数を処理するための標準ライブラリのargparse
モジュールからArgumentParser
クラスをインポートしています。これを使用することで、スクリプトにコマンドライン引数を追加し、それらを処理することができます。from functools import partial
:functools
モジュールからpartial
関数をインポートしています。partial
関数は、他の関数を一部適用して新しい関数を作成するために使用されます。
2.class Status(状態管理のクラス)
このクラスは、特定の仮想通貨取引所(bitFlyer)のデータを処理するためのものです。
class Status: def __init__( self, store: pybotters.bitFlyerDataStore, max_position: int = 1, ): self._store: pybotters.bitFlyerDataStore = store self._asks = None self._bids = None self._max_position = max_position
このクラスは、主にbitFlyer取引所のデータを管理するためのものであり、特に板情報を定期的に更新する非同期タスクを含んでいます。クラスのコンストラクタでは、取引所データの格納、ポジションの最大許容量などが初期化されます。
インポートとの関連
import asyncio import pybotters
asyncio
: 非同期処理をサポートするPythonの組み込みモジュール。pybotters
: 外部ライブラリまたはモジュールで、bitFlyer取引所のデータを処理するためのツールや関数が含まれています。
Statusクラスの定義
class Status:
クラス Status
を定義しています。
コンストラクタの定義
def __init__( self, store: pybotters.bitFlyerDataStore, max_position: int = 1, ):
- クラスのコンストラクタ(
__init__
メソッド)を定義しています。コンストラクタはクラスのインスタンスが作成されるときに呼び出されます。 store
: bitFlyerデータを取得するためのpybotters.bitFlyerDataStore
オブジェクト。型注釈により、この引数がどの型であるべきか示しています。max_position
: ポジション(取引の保有量)の最大許容量。デフォルトでは1に設定されています。
インスタンス変数の初期化
self._store: pybotters.bitFlyerDataStore = store self._asks = None self._bids = None self._max_position = max_position
- インスタンス変数
_store
に、渡されたstore
引数を格納します。 - インスタンス変数
_asks
と_bids
をNone
で初期化します。 - インスタンス変数
_max_position
に、渡されたmax_position
引数を格納します。
ボード(板情報)の自動更新タスクの作成
asyncio.create_task(self.auto_update_board())
asyncio.create_task()
を使用して、auto_update_board()
メソッドを非同期タスクとして作成します。このメソッドはボード(板情報)のデータを定期的に更新するためのものです。
板情報の自動更新
このコードは、非同期(async
)で動作するボード(板)情報の自動更新機能を提供する関数です。
async def auto_update_board(self): """板情報の自動更新タスク""" with self._store.board.watch() as stream: async for msg in stream: self._asks, self._bids = self._store.board.sorted().values()
この関数は、bitFlyer
取引所のボード情報を非同期的に監視し、変更があった場合にそれを取得してクラスのインスタンス変数に格納するものです。非同期プログラミングの概念や、asyncio
モジュールにおける非同期イベントの取り扱いが含まれています。
非同期関数の定義
async def auto_update_board(self):
- 非同期関数(
async
キーワードがついている)としてauto_update_board
関数が定義されています。 self
パラメータがあり、クラスのメソッドとして使われることを示しています。
板情報の非同期的な監視の開始
with self._store.board.watch() as stream:
self._store.board.watch()
は、self._store
が持つboard
オブジェクトの監視を開始します。with
ステートメントを使用して、監視が終了した時点でクリーンアップ処理を行います。stream
は監視ストリーム(stream)で、取引所からのデータ更新を待ち受けるためのものです。
非同期イベントの待機と処理
async for msg in stream: self._asks, self._bids = self._store.board.sorted().values()
async for
文は非同期イベントの待機と処理を行います。stream
からイベントが到着するたびにループが実行されます。msg
には取引所からのメッセージが入りますが、このコードでは使われていません。self._store.board.sorted().values()
は、ボードの売り注文(asks)と買い注文(bids)を取得し、それぞれself._asks
とself._bids
に代入します。
指値注文を出す関数
このコードは、注文板のデータから指定された累積注文サイズの閾値を超えた位置に指値注文を出すための関数です。
def get_limit_price(self, side: str, t: float, d: int = 1): """注文サイズの累積量が``t``を超えたところに``d``だけ離して指値をだす。 :param str side: ask or bid :param float t: 累積注文サイズの閾値 :param int d: 参照注文からのマージン :return: 指値 """ items = self._asks if side == "ask" else self._bids cum_size, price = items[0]["size"], items[0]["price"] for i in items: if cum_size >= t: return int(price + d if side == "ask" else price - d) price = i["price"] cum_size += i["size"] # 最後までthresholdを満たさなかった場合、一番後ろの注文と同じ額 return int(items[-1]["price"])
この関数は、指定された注文板から特定の条件を満たす位置に指値注文を出すためのものです。注文板から注文を順に見ていき、指定された閾値を超えた時点で指値価格を計算して返します。初心者が理解する上でのポイントは、注文板の概念やループ処理の流れです。
関数の定義
def get_limit_price(self, side: str, t: float, d: int = 1):
get_limit_price
関数が定義されています。side
パラメータは文字列で、"ask"(売り注文)または"bid"(買い注文)を指定します。t
パラメータは浮動小数点数で、累積注文サイズの閾値を示します。d
パラメータは整数で、指定された閾値を超えた位置から離れるマージンを表します。デフォルトでは1に設定されています。
注文ブックの選択
items = self._asks if side == "ask" else self._bids
side
パラメータに応じて、指定された方向の注文ブックを選択します。self._asks
は売り注文のリスト、self._bids
は買い注文のリストを仮定しています。
累積注文サイズの計算と指値価格の取得
cum_size, price = items[0]["size"], items[0]["price"] for i in items: if cum_size >= t: return int(price + d if side == "ask" else price - d) price = i["price"] cum_size += i["size"]
- 注文ブックを順番に見ていき、累積注文サイズが指定された閾値
t
を超えた時点で、指定されたマージンd
だけ離した位置の指値価格を計算して返します。 cum_size
: 累積注文サイズを累積していく変数。price
: 注文の価格。
閾値を満たさなかった場合の処理
# 最後までthresholdを満たさなかった場合、一番後ろの注文と同じ額 return int(items[-1]["price"])
注文ブックを最後まで探しても閾値 t
を満たさなかった場合、一番後ろの注文の価格を指定された形式に変換して返します。
保有ポジションのリストを取得する関数
このコードは、特定の取引方向(BUYまたはSELL)に関連する保有ポジションのリストを取得するためのメソッドです。
def positions(self, side: str): """保有ポジションリスト。 :param str side: BUY or SELL :return: ポジションのlist """ positions = self._store.positions.find({"side": side}) assert len(positions) <= self._max_position return positions
このメソッドは、指定された取引方向に関連するポジションを取得し、最大ポジション数の条件を満たすかどうかを検証してから、ポジションのリストを返すものです。ここでも、メソッドの目的や各ステップで行われる処理の流れを理解することが重要です。
メソッドの定義
def positions(self, side: str):
positions
メソッドが定義されています。self
パラメータがあり、クラスのメソッドとして使われることを示しています。side
パラメータは文字列で、"BUY"または"SELL"を指定します。
ポジションの取得
positions = self._store.positions.find({"side": side})
self._store.positions.find({"side": side})
は、self._store
が持つpositions
オブジェクトから、指定された取引方向("BUY"または"SELL")に関連するポジションを検索します。- ポジションのデータはおそらく辞書(dictionary)のリストとして返されます。
ポジション数の制約
assert len(positions) <= self._max_position
assert
文は、指定された条件が満たされているかどうかを検証するためのものです。条件が偽の場合、AssertionError
が発生します。- この行では、取得したポジションの数が設定された最大ポジション数(
self._max_position
)以下であることを検証しています。
結果の返却
return positions
最後に、取得したポジションのリストを返します。
保有ポジションのサイズを確認する関数
このコードは、指定された取引方向(BUYまたはSELL)に関連する保有ポジションのサイズを取得するためのメソッドです。
def remaining_size(self, side): """保有ポジションサイズ。 :param str side: BUY or SELL :return: ポジションサイズ """ positions = self.positions(side) if len(positions): return sum([p["size"] for p in positions]) else: return 0
このメソッドは、指定された取引方向に関連する保有ポジションのサイズを取得するためのものです。取得したポジションが存在する場合は、そのサイズを合計して返し、ポジションが存在しない場合は0を返します。初心者が理解する上でのポイントは、他のメソッドの呼び出しや条件文によるフローの制御です。
メソッドの定義
def remaining_size(self, side):
remaining_size
メソッドが定義されています。self
パラメータがあり、おそらくクラスのメソッドとして使われることを示しています。side
パラメータは文字列で、"BUY"または"SELL"を指定します。
他のメソッドの呼び出し
positions = self.positions(side)
self.positions(side)
は、同じクラス内のpositions
メソッドを呼び出して、指定された取引方向に関連するポジションのリストを取得します。- 取得したポジションの情報は
positions
変数に格納されます。
ポジションサイズの計算
if len(positions): return sum([p["size"] for p in positions])
if len(positions):
は、取得したポジションが存在するかどうかを検証しています。len(positions)
が0でない(ポジションが存在する)場合に、以下の処理を行います。sum([p["size"] for p in positions])
は、取得したポジションのサイズ("size"
フィールドの値)を合計してポジションサイズを計算します。
ポジションが存在しない場合の処理
else: return 0
ポジションが存在しない場合(len(positions)
が0の場合)、0を返します。
3つのプロパティ
このコードは、3つのプロパティ(best_ask
、best_bid
、spread
)を持つクラスの一部です。
'@'で書き始めることによって、関数に特殊な振る舞いをさせることができます。
@property def best_ask(self): return int(self._asks[0]["price"]) @property def best_bid(self): return int(self._bids[0]["price"]) @property def spread(self): return (self.best_ask - self.best_bid) / self.best_bid
このクラスは、最良の売り注文価格、最良の買い注文価格、およびそれらの差であるスプレッドを提供する3つのプロパティを持っています。それぞれのプロパティは、注文ブックから必要な情報を抽出し、適切な形式に変換して返します。ここを理解する上でのポイントは、プロパティが属性のようにアクセスできること、デコレータ(関数に特殊な振る舞いをさせる機能)、およびスプレッドの概念です。
best_ask
プロパティ
@property def best_ask(self): return int(self._asks[0]["price"])
best_ask
プロパティは、売り注文ブック(asks)から最も価格が低い注文の価格を取得します。@property
デコレータを使用して、このメソッドを属性のようにアクセスできるようにしています。self._asks[0]["price"]
は売り注文ブックの一番上にある注文の価格を取得します。
best_bid
プロパティ
@property def best_bid(self): return int(self._bids[0]["price"])
best_bid
プロパティは、買い注文ブック(bids)から最も価格が高い注文の価格を取得します。@property
デコレータを使用して、このメソッドを属性のようにアクセスできるようにしています。self._bids[0]["price"]
は買い注文ブックの一番上にある注文の価格を取得します。
spread
プロパティ
@property def spread(self): return (self.best_ask - self.best_bid) / self.best_bid
spread
プロパティは、best_ask
(最良の売り注文価格)とbest_bid
(最良の買い注文価格)の差を計算し、その差をbest_bid
で割った値を返します。- スプレッドは、取引所の売りと買いの最良価格との差を表す指標で、取引所での流動性を示す一つの尺度です。
class EventWatcher(Event監視のクラス)
このコードは、イベントを監視するためのクラス EventWatcher
を定義しています。
class EventWatcher: def __init__(self, store: DataStore, trigger_fn: Callable[[Item], bool] = None): self._store = store self._trigger_fn = trigger_fn self._task = asyncio.create_task(self._watch())
このクラスは、外部から提供されたデータストアを用いてイベントを監視し、トリガー条件が満たされた場合に指定されたアクションを実行するためのものです。理解を深めるポイントは、外部から注入される関数や非同期タスクの概念、そして __init__
メソッドの使い方です。
クラスの定義
class EventWatcher:
クラス EventWatcher
を定義しています。
クラスのコンストラクタ(__init__
メソッド)
def __init__(self, store: DataStore, trigger_fn: Callable[[Item], bool] = None): self._store = store self._trigger_fn = trigger_fn self._task = asyncio.create_task(self._watch())
- クラスのコンストラクタ (
__init__
メソッド) が定義されています。 store
パラメータはDataStore
クラスのオブジェクトを受け取ります。これは、外部から提供されるデータストアを指定するものです。trigger_fn
パラメータは、イベントをトリガーするための関数です。Callable[[Item], bool]
は、引数がItem
型で、戻り値がbool
型の関数であることを示しています。もしtrigger_fn
が指定されなかった場合、デフォルトでNone
になります。self._store
には外部から提供されたデータストアが格納されます。self._trigger_fn
には外部から提供されたトリガー関数が格納されます。self._task
には、_watch
メソッドを非同期タスクとして実行するためのタスクが格納されます。
_watch
メソッド
self._task = asyncio.create_task(self._watch())
イベント監視のための非同期タスクを開始するために、asyncio.create_task
を使用して _watch
メソッドを非同期タスクとして実行しています。
非同期関数_watch
このコードは非同期関数 _watch
を定義しています。この関数は、DataStore
を監視し、指定された条件が満たされるまでイベントを待機するものです。
この非同期関数 _watch
は、DataStore
を監視し、指定された条件が満たされるまでイベントを待機するものです。指定された条件が満たされた場合、そのイベントメッセージを返し、関数が終了します。理解する上でのポイントは、非同期プログラミングの概念や async for
ループ、with
ステートメントの使い方です。
async def _watch(self): """`_is_target_event(msg.data)`がTrueを返すまでDataStoreをwatchし続ける。""" with self._store.watch() as stream: async for msg in stream: if self._is_trigger(msg.data): return msg.data
関数の定義
_watch
関数は非同期関数として定義されています。- ドキュメンテーション文字列により、関数の目的が説明されています。この関数は、指定された条件が満たされるまで
DataStore
を監視し続けるものです。 with self._store.watch() as stream:
は、DataStore
のwatch
メソッドを使用して、イベントのストリームを開始します。このwith
ステートメントにより、監視が終了した時点でクリーンアップが自動的に行われます。async for msg in stream:
は、stream
からイベントメッセージを非同期的に待ち受けるループです。
イベントトリガー条件の判定
if self._is_trigger(msg.data): return msg.data
self._is_trigger(msg.data)
がTrue
を返すかどうかを判定します。- もし
True
を返すならば、そのイベントメッセージmsg.data
を返し、関数を終了します。これにより、指定された条件が満たされた時点で関数が終了します。
_is_trigger
(イベントのトリガー条件を判定するメソッド)
このコードは、イベントのトリガー条件を判定するための非常に簡潔なメソッド _is_trigger
を定義しています。
def _is_trigger(self, d: Item): """socketメッセージを受け取って、イベント発火の有無を判定する。 子クラスこの関数をオーバーライドしてもいいし、`trigger_fn`として与えてもいい。 :param Item d: socketメッセージ。 :return: """ if self._trigger_fn is None: raise NotImplementedError return self._trigger_fn(d)
このメソッドは、指定されたデータに対してイベントのトリガー条件を判定するためのものです。self._trigger_fn
が未設定の場合はエラーを発生させ、設定されている場合はそれを呼び出してトリガー条件を確認します。理解する上でのポイントは、メソッドの目的やドキュメンテーション文字列、および例外の処理です。
_is_trigger
メソッドの定義
_is_trigger
メソッドは、指定されたItem
型のデータd
に対してイベントのトリガー条件を判定する役割を持っています。- ドキュメンテーション文字列により、関数の目的や使い方が説明されています。
:param Item d:
は、このメソッドの引数d
の型と説明を示しています。Item
はおそらく特定の型を表しているものと考えられます。:return:
は、このメソッドの戻り値についての説明です。
イベントトリガー条件の判定
if self._trigger_fn is None: raise NotImplementedError return self._trigger_fn(d)
self._trigger_fn
がNone
(未設定)の場合、NotImplementedError
を発生させます。これは、派生クラスでこのメソッドが適切に実装されていない場合のエラーです。self._trigger_fn(d)
は、実際にイベントのトリガー条件を判定する関数を呼び出しています。この関数は、d
という引数を取り、特定の条件を満たすかどうかを返します。
非同期処理を扱うためのメソッド群
このコードは非同期処理を扱うためのクラスのメソッド群です。
async def wait(self): await self._task def done(self): return self._task.done() def result(self): return self._task.result()
このクラスは非同期処理を管理するためのもので、wait
メソッドを使って非同期処理の完了を待ち、done
メソッドを使って非同期処理の完了状態を確認し、result
メソッドを使って非同期処理の結果を取得します。理解する上でのポイントは、非同期処理の待機と結果取得の仕組みに焦点を当てることです。
wait
メソッド
async def wait(self): await self._task
wait
メソッドは、非同期処理が完了するまで待機するためのものです。await self._task
は、_task
が完了するまで非同期的に待機します。他の非同期処理が進行する一方で、このメソッドは_task
の完了を待ちます。
done
メソッド
def done(self): return self._task.done()
done
メソッドは、非同期処理が完了しているかどうかを判定するためのものです。self._task.done()
は、非同期処理が完了している場合はTrue
を、まだ未完了の場合はFalse
を返します。
result
メソッド
def result(self): return self._task.result()
result
メソッドは、非同期処理が完了した後にその結果を取得するためのものです。self._task.result()
は、非同期処理が完了している場合はその結果を返します。ただし、非同期処理がまだ完了していない場合は適切な方法で待機することが期待されます。
3.class ChildOrderEventWatcher(EventWatcher)「特定の注文に関連するイベントを監視する」
このコードは、EventWatcher
クラスを継承した ChildOrderEventWatcher
クラスを定義しています。
class ChildOrderEventWatcher(EventWatcher): def __init__(self, store, order_id, **kwargs): self._order_id = order_id self._cond = kwargs super(ChildOrderEventWatcher, self).__init__(store) def _is_trigger(self, d): return d["child_order_acceptance_id"] == self._order_id and all( [v == d[k] for (k, v) in self._cond.items()] ) def replace_order_id(self, order_id): self._order_id = order_id
このクラスは、特定の注文に関連するイベントを監視するためのものであり、指定された注文IDと条件がイベントデータと一致したときにトリガーされます。理解する上でのポイントは、クラスの初期化や条件の判定のメカニズム、メソッドの目的などです。
__init__
メソッド
def __init__(self, store, order_id, **kwargs): self._order_id = order_id self._cond = kwargs super(ChildOrderEventWatcher, self).__init__(store)
__init__
メソッドは、クラスの初期化を行います。store
パラメータは、DataStore
のような外部から提供されるデータストアオブジェクトです。order_id
パラメータは、特定の注文を識別するための注文IDです。**kwargs
は可変長のキーワード引数で、任意の数のキーと値を持つ辞書として受け取ります。これは条件を表します。self._order_id
には注文IDが格納されます。self._cond
には条件が格納されます。super(ChildOrderEventWatcher, self).__init__(store)
は親クラスのコンストラクタを呼び出して、親クラスの初期化を行います。
_is_trigger
メソッド
def _is_trigger(self, d): return d["child_order_acceptance_id"] == self._order_id and all( [v == d[k] for (k, v) in self._cond.items()] )
_is_trigger
メソッドは、特定の条件に基づいてイベントのトリガーを判定します。d
パラメータは、受け取ったイベントデータです。- メソッドは、イベントデータの
"child_order_acceptance_id"
がself._order_id
と等しく、かつself._cond
に指定された条件がすべて満たされている場合にTrue
を返します。
replace_order_id
メソッド
def replace_order_id(self, order_id): self._order_id = order_id
replace_order_id
メソッドは、保持している注文IDを更新するためのものです。order_id
パラメータで新しい注文IDを指定し、それをself._order_id
に格納します。
4.class ExecutionWatcher(ChildOrderEventWatcher)「特定の注文に関連する "EXECUTION" イベントを監視する」
このコードは、ChildOrderEventWatcher
クラスを継承して新しいクラス ExecutionWatcher
を定義しています。
class ExecutionWatcher(ChildOrderEventWatcher): def __init__(self, store, order_id): super(ExecutionWatcher, self).__init__(store, order_id, event_type="EXECUTION")
ExecutionWatcher
クラスは、ChildOrderEventWatcher
クラスを継承しつつ、特定の注文に関連する "EXECUTION" イベントを監視するために初期化されます。理解する上でのポイントは、クラスの継承や初期化の仕組み、および親クラスの設定に注目することです。
ExecutionWatcher
クラスの定義
class ExecutionWatcher(ChildOrderEventWatcher): def __init__(self, store, order_id): super(ExecutionWatcher, self).__init__(store, order_id, event_type="EXECUTION")
ExecutionWatcher
クラスはChildOrderEventWatcher
クラスを継承しています。これは、ChildOrderEventWatcher
クラスの機能を継承しつつ、新しい機能を追加するものです。__init__
メソッドは、クラスの初期化を行います。親クラスのコンストラクタを呼び出して初期化するとともに、追加の情報(event_type
)を設定しています。
__init__
メソッド
def __init__(self, store, order_id): super(ExecutionWatcher, self).__init__(store, order_id, event_type="EXECUTION")
__init__
メソッドはクラスの初期化を行います。store
パラメータは外部から提供されるデータストアオブジェクトです。order_id
パラメータは特定の注文を識別するための注文IDです。super(ExecutionWatcher, self).__init__(store, order_id, event_type="EXECUTION")
は親クラス(ChildOrderEventWatcher
)のコンストラクタを呼び出して初期化します。ここで、event_type
パラメータに"EXECUTION"
を指定しています。これは、ChildOrderEventWatcher
クラスがイベントを監視する条件の一部です。具体的には、このクラスは "EXECUTION" イベントに対して監視を行うように設定されています。
5.class CancelWatcher(ChildOrderEventWatcher)「特定の注文に関連する "CANCEL" または "CANCEL_FAILED" イベントを監視する」
このコードは、ChildOrderEventWatcher
クラスを継承した新しいクラス CancelWatcher
を定義しています。
class CancelWatcher(ChildOrderEventWatcher): def __init__(self, store, order_id): super(CancelWatcher, self).__init__(store, order_id) def _is_trigger(self, d): return d["child_order_acceptance_id"] == self._order_id and d["event_type"] in [ "CANCEL", "CANCEL_FAILED", ]
CancelWatcher
クラスは、ChildOrderEventWatcher
クラスを継承して特定の注文に関連する "CANCEL" または "CANCEL_FAILED" イベントを監視するために初期化されます。理解する上でのポイントは、クラスの継承や初期化の仕組み、そしてイベントのトリガー条件に注目することです。
CancelWatcher
クラスの定義
class CancelWatcher(ChildOrderEventWatcher): def __init__(self, store, order_id): super(CancelWatcher, self).__init__(store, order_id)
CancelWatcher
クラスはChildOrderEventWatcher
クラスを継承しています。これは、ChildOrderEventWatcher
クラスの機能を利用しつつ、新しいクラスを作成するものです。__init__
メソッドはクラスの初期化を行います。親クラスのコンストラクタを呼び出して初期化を行っています。
__init__
メソッド
def __init__(self, store, order_id): super(CancelWatcher, self).__init__(store, order_id)
__init__
メソッドは、クラスの初期化を行います。store
パラメータは外部から提供されるデータストアオブジェクトです。order_id
パラメータは特定の注文を識別するための注文IDです。super(CancelWatcher, self).__init__(store, order_id)
は親クラス(ChildOrderEventWatcher
)のコンストラクタを呼び出して初期化を行います。これにより、CancelWatcher
クラスもChildOrderEventWatcher
クラスの機能を利用できるようになります。
_is_trigger
メソッド
def _is_trigger(self, d): return d["child_order_acceptance_id"] == self._order_id and d["event_type"] in [ "CANCEL", "CANCEL_FAILED", ]
is_trigger
メソッドは、特定の条件に基づいてイベントのトリガーを判定します。d
パラメータは、受け取ったイベントデータです。- メソッドは、イベントデータの
"child_order_acceptance_id"
がself._order_id
と等しく、かつ"event_type"
が指定されたイベントのいずれかである場合にTrue
を返します。このクラスでは "CANCEL" または "CANCEL_FAILED" イベントがトリガー条件となっています。
6.注文ヘルパー①「指し値注文を行うための非同期関数」
このコードは、指定された条件で制限付き注文(リミットオーダー)を行うための非同期関数 limit_order
を定義しています。
async def limit_order(client, symbol, side, size, price, time_in_force="GTC"): assert side in ["BUY", "SELL"] res = await client.post( "/v1/me/sendchildorder", data={ "product_code": symbol, "side": side, "size": size, "child_order_type": "LIMIT", "price": int(price), "time_in_force": time_in_force, }, ) data = await res.json() if res.status != 200: raise RuntimeError(f"Invalid request: {data}") else: return data["child_order_acceptance_id"]
limit_order
関数は、指定された条件で制限付き注文を行い、注文受付IDを返す非同期関数です。理解する上でのポイントは、非同期処理、APIリクエストの作成、レスポンスの処理です。
limit_order
関数の定義
async def limit_order(client, symbol, side, size, price, time_in_force="GTC"):
limit_order
関数は非同期関数で、指定された条件で制限付き注文を行います。- パラメータ:
client
: 注文を送信するクライアントオブジェクト(例えば、APIクライアントなど)。symbol
: トレードする商品のシンボル(例: BTC/USD)。side
: 取引の方向("BUY" または "SELL")。size
: 注文の数量。price
: 注文の価格。time_in_force
: 注文の有効期間(デフォルトは "GTC")。
条件のアサーション
assert side in ["BUY", "SELL"]
assert side in ["BUY", "SELL"]
は、side
が "BUY" または "SELL" のいずれかであることを確認します。それ以外の場合はエラーが発生します。
注文の送信と結果の取得
res = await client.post( "/v1/me/sendchildorder", data={ "product_code": symbol, "side": side, "size": size, "child_order_type": "LIMIT", "price": int(price), "time_in_force": time_in_force, }, )
client.post
メソッドを使用して、注文を送信し、結果を取得します。- APIのエンドポイント
/v1/me/sendchildorder
に対して POST リクエストを行います。 - リクエストのデータは、注文に関する情報(商品コード、取引方向、数量、注文タイプ、価格、有効期間など)です。
結果のJSONデータの取得
data = await res.json()
res.json()
メソッドを使用して、APIからのレスポンスを JSON 形式で取得します。
注文の確認と注文受付IDの返却
if res.status != 200: raise RuntimeError(f"Invalid request: {data}") else: return data["child_order_acceptance_id"]
- レスポンスのステータスコードが 200 でない場合、エラーメッセージを含んだ RuntimeError 例外が発生します。
- そうでない場合、注文が正常に受け付けられたことを示すので、注文受付IDを返します。
7.注文ヘルパー②「注文をキャンセルするための非同期関数」
このコードは、指定された条件で注文をキャンセルするための非同期関数 cancel_order
を定義しています。
async def cancel_order(client, symbol, order_id): order_id_key = "child_order_id" if order_id.startswith("JRF"): order_id_key = order_id_key.replace("_id", "_acceptance_id") res = await client.post( "/v1/me/cancelchildorder", data={"product_code": symbol, order_id_key: order_id} ) return res.status == 200
cancel_order
関数は、指定された条件で注文をキャンセルし、その結果を True
または False
で返す非同期関数です。理解する上でのポイントは、非同期処理、APIリクエストの作成、レスポンスの処理などが含まれます。
cancel_order
関数の定義
async def cancel_order(client, symbol, order_id):
cancel_order
関数は非同期関数で、指定された条件で注文をキャンセルします。- パラメータ:
client
: 注文を送信するクライアントオブジェクト。symbol
: トレードする商品のシンボル(例: BTC/USD)。order_id
: キャンセル対象の注文ID。
注文IDの調整
order_id_key = "child_order_id" if order_id.startswith("JRF"): order_id_key = order_id_key.replace("_id", "_acceptance_id")
order_id
が "JRF" で始まる場合、order_id_key
の一部を置換しています。- 例えば、
order_id
が "JRF12345" の場合、order_id_key
は "child_order_acceptance_id" に変更されます。これは、APIによって注文IDが "_acceptance_id" で終わる場合があるため、それに対応しています。
キャンセルリクエストの送信と結果の取得
res = await client.post( "/v1/me/cancelchildorder", data={"product_code": symbol, order_id_key: order_id} )
client.post
メソッドを使用して、キャンセルリクエストを送信し、結果を取得します。- APIのエンドポイント
/v1/me/cancelchildorder
に対して POST リクエストを行います。 - リクエストのデータは、注文キャンセルに関する情報(商品コード、注文IDなど)です。
キャンセル結果の判定と返却
return res.status == 200
レスポンスのステータスコードが 200 であれば、注文が正常にキャンセルされたことを示すため、True
を返します。それ以外の場合は False
を返します。
8.マーケットメイキングのロジック
ここからは、マーケットメイキングのロジックを作り上げている部分の解説です。戦略実装の要となる部分ですので、マーケットメイキング戦略への理解があることが土台です。
market_making
関数の定義
このコードは、マーケットメイキングアルゴリズムを実装する非同期関数 market_making
を定義しています。
async def market_making( client, store: pybotters.bitFlyerDataStore, status: Status, symbol: str, t: float, d: int, s_entry: float, s_update: float, size: float, logger: loguru.Logger, ):
market_making
関数は、マーケットメイキングアルゴリズムを実装する非同期関数です。- パラメータ:
client
: 注文を送信するクライアントオブジェクト。store
: データのストア(保存庫)オブジェクト。status
: トレードの状態を管理するオブジェクト。symbol
: トレードする商品のシンボル(例: BTC/USD)。t
: 注文サイズの累積量の閾値。d
: 注文価格を更新するマージン。s_entry
: スプレッドの基準となるエントリー価格の閾値。s_update
: スプレッドの更新に使用される閾値。size
: 注文の数量。logger
: ログを出力するための Logger オブジェクト。
_oneside_loop
(片側の注文(BUYまたはSELL)からキャンセルして再注文を繰り返す非同期関数 _
)
このコードは、片側の注文(BUYまたはSELL)からキャンセルして再注文を繰り返す非同期関数 _oneside_loop
を実装しています。
async def _oneside_loop(side: str, size: float, pricer: Callable[[], int]): """片サイドの注文→キャンセル→再注文ループ。 :param side: "BUY" or "SELL" :param size: 注文サイズ :param pricer: 指値関数 :return: """ # エントリー price = pricer() order_id = await limit_order(client, symbol, side, size, price) logger.info(f"[{side} ENTRY] {order_id} / {price} / {size:.5f}") # 約定監視ループ execution_watcher = ExecutionWatcher(store.childorderevents, order_id) while not execution_watcher.done(): # 指値更新間隔 # 1ループでキャンセルと指値更新を最大2回x2(両サイド)行う。API制限が500/5minなので、 # 300 / 3.5 * 4 = 342.85... とちょっと余裕あるくらいに設定しておく。 # (余談)途中でcontinueとかよくするのでsleepはwhileの直下で実行するのが個人的に好き await asyncio.sleep(3.5) # spreadが閾値以上なので指値更新 if status.spread > s_update: new_price = pricer() if price != new_price: # 前の注文をキャンセル。childorvereventsをwatchしてCANCEL or # CANCEL_FAILEDのステータスを確認してから次の注文を入れる cancel_watcher = CancelWatcher(store.childorderevents, order_id) is_canceled = await cancel_order(client, symbol, order_id) await cancel_watcher.wait() cancel_result = cancel_watcher.result() if cancel_result["event_type"] == "CANCEL": # キャンセル成功→再注文 logger.info(f"[{side} CANCELED] {order_id}") new_order_id = await limit_order( client, symbol, side, size, new_price ) # 監視する注文番号・指値を更新 execution_watcher.replace_order_id(new_order_id) order_id = new_order_id price = new_price logger.info(f"[{side} UPDATE] {order_id} / {price}") elif cancel_result["event_type"] == "CANCEL_FAILED": # キャンセル失敗→約定しているはずなので次のループでexecution_watcherがdoneになる logger.info( f"[{side} CANCEL FAILED] {order_id} (should be executed)" ) continue # 約定 loop_result = execution_watcher.result() logger.info(f"[{side} FINISH] {loop_result}") return loop_result
理解する上でのポイントは、非同期処理、注文の発行とキャンセル、注文の監視です。
_oneside_loop
関数の定義
async def _oneside_loop(side: str, size: float, pricer: Callable[[], int]):
_oneside_loop
関数は、片側の注文からキャンセルして再注文を繰り返す非同期関数です。- パラメータ:
side
: 注文の方向("BUY" または "SELL")。size
: 注文サイズ。pricer
: 指値関数(注文価格を決定するための関数)。
エントリー(新規注文)の処理
price = pricer() order_id = await limit_order(client, symbol, side, size, price) logger.info(f"[{side} ENTRY] {order_id} / {price} / {size:.5f}")
pricer
関数を使用して指値価格を取得し、それを使って新規注文を行います。limit_order
関数は、指定された条件で制限付き注文を行います。- ログに注文ID、注文価格、注文サイズを表示します。
約定監視ループ
execution_watcher = ExecutionWatcher(store.childorderevents, order_id) while not execution_watcher.done():
ExecutionWatcher
オブジェクトを作成し、注文の約定を監視するループを開始します。
指値更新とキャンセルの処理
await asyncio.sleep(3.5) if status.spread > s_update: new_price = pricer() # 以下、指値の更新とキャンセルの処理
- 指定の条件(スプレッドが閾値以上)の場合、指値を新しい価格に更新します。
- 注文のキャンセル処理を行います。キャンセルが成功した場合、再注文を行います。
キャンセルが成功した場合の処理
logger.info(f"[{side} CANCELED] {order_id}") new_order_id = await limit_order( client, symbol, side, size, new_price ) execution_watcher.replace_order_id(new_order_id) order_id = new_order_id price = new_price logger.info(f"[{side} UPDATE] {order_id} / {price}")
- キャンセルが成功した場合、ログにキャンセルの成功と更新後の注文情報を表示します。
- 新しい注文を作成し、
ExecutionWatcher
オブジェクトの監視対象の注文IDを更新します。
キャンセルが失敗した場合の処理
elif cancel_result["event_type"] == "CANCEL_FAILED": logger.info( f"[{side} CANCEL FAILED] {order_id} (should be executed)" ) continue
キャンセルが失敗した場合、ログにキャンセルの失敗を表示し、次のループで約定したものとして処理を継続します。
約定が完了した場合の処理
loop_result = execution_watcher.result() logger.info(f"[{side} FINISH] {loop_result}") return loop_result
約定が完了した場合、ログに約定の結果を表示し、その結果を返します。
結果をログに表示するプログラム
このコードは、無限ループでスプレッドが特定の閾値を超えた場合に、指定された条件で買い注文と売り注文を同時に実行して、その結果をログに表示するプログラムです。
while True: sp = status.spread if sp > s_entry: # 現在のポジションから端数を取得 buy_remaining_size = status.remaining_size("BUY") sell_remaining_size = status.remaining_size("SELL") logger.info( f"[START]\n" f"\tspread: {sp}\n" f"\tsymbol: {symbol}\n" f"\tt: {t}\n" f"\td: {d}\n" f"\ts_entry: {s_entry}\n" f"\ts_update: {s_update}\n" f"\tbuy_size: {size + sell_remaining_size}\n" f"\tsell_size: {size + buy_remaining_size}" ) buy_result, sell_result = await asyncio.gather( _oneside_loop( "BUY", size + sell_remaining_size, partial(status.get_limit_price, "bid", t, d), ), _oneside_loop( "SELL", size + buy_remaining_size, partial(status.get_limit_price, "ask", t, d), ), ) logger.info(f"[FINISH] {sell_result['price'] - buy_result['price']}") break else: logger.info( f"[WAITING CHANCE] {status.best_ask} - ({status.spread:.4f}) - {status.best_bid}" ) await asyncio.sleep(0.1)
メインの無限ループ
while True: sp = status.spread if sp > s_entry: # 中略 else: # 中略 await asyncio.sleep(0.1)
- 無限ループを開始しています。
- スプレッド(最良の売り注文価格と買い注文価格の差)が指定のエントリー閾値
s_entry
を超えた場合とそれ以外の場合に分かれています。 await asyncio.sleep(0.1)
によって、次のループまで0.1秒待機します。これにより、無駄な処理負荷を抑えながら定期的にスプレッドを確認します。
スプレッドが閾値を超えた場合の処理
if sp > s_entry: # 中略 else: # 中略
スプレッドがエントリー閾値 s_entry
を超えた場合の処理が行われます。
ログと注文サイズの計算
buy_remaining_size = status.remaining_size("BUY") sell_remaining_size = status.remaining_size("SELL") logger.info( f"[START]\n" f"\tspread: {sp}\n" f"\tsymbol: {symbol}\n" f"\tt: {t}\n" f"\td: {d}\n" f"\ts_entry: {s_entry}\n" f"\ts_update: {s_update}\n" f"\tbuy_size: {size + sell_remaining_size}\n" f"\tsell_size: {size + buy_remaining_size}" )
- 現在のポジションから端数を取得して、ログに表示するための情報を計算しています。
remaining_size
関数は、指定された方向("BUY"または"SELL")のポジションの合計サイズを取得する関数です。
買い注文と売り注文の同時実行
buy_result, sell_result = await asyncio.gather( _oneside_loop( "BUY", size + sell_remaining_size, partial(status.get_limit_price, "bid", t, d), ), _oneside_loop( "SELL", size + buy_remaining_size, partial(status.get_limit_price, "ask", t, d), ), )
asyncio.gather
を使用して、買い注文と売り注文を同時に実行します。_oneside_loop
関数は、片側の注文からキャンセルして再注文を繰り返す関数です。
ログに実行結果を表示
logger.info(f"[FINISH] {sell_result['price'] - buy_result['price']}") break
- 実行結果(買い注文と売り注文の約定価格の差)をログに表示します。
break
によって、無限ループを終了させます。
スプレッドが閾値を超えていない場合のログ
else: logger.info( f"[WAITING CHANCE] {status.best_ask} - ({status.spread:.4f}) - {status.best_bid}" ) await asyncio.sleep(0.1)
- スプレッドがエントリー閾値を超えていない場合、ログに待機メッセージを表示します。
await asyncio.sleep(0.1)
によって、次のループまで0.1秒待機します。
9.メインの処理
このコードは、BitflyerのAPIを使用して、マーケットメイキングアルゴリズムを実装したプログラムです。
async def main(args): # ロガーの設定 logger = loguru.logger logger.add("log.txt", rotation="10MB", retention=3) # Bitflyerのクライアントとデータストアを初期化 async with pybotters.Client( apis=args.api_key_json, base_url="https://api.bitflyer.com" ) as client: store = pybotters.bitFlyerDataStore() status = Status(store) # WebSocket接続とサブスクライブ wstask = await client.ws_connect( "wss://ws.lightstream.bitflyer.com/json-rpc", send_json=[ { "method": "subscribe", "params": {"channel": "lightning_board_snapshot_FX_BTC_JPY"}, "id": 1, }, { "method": "subscribe", "params": {"channel": "lightning_board_FX_BTC_JPY"}, "id": 2, }, { "method": "subscribe", "params": {"channel": "child_order_events"}, "id": 3, }, ], hdlr_json=store.onmessage, ) # WebSocketの初期化が完了するまで待機 while not all([len(w) for w in [store.board]]): logger.debug("[WAITING SOCKET RESPONSE]") await store.wait() # メインの無限ループ while True: await market_making( client, store, status, args.symbol, args.t, args.d, args.s_entry, args.s_update, args.lot, logger, ) await asyncio.sleep(args.interval)
main
関数の定義
async def main(args):
- メインのプログラムが実行される関数です。
- ロガーの設定やBitflyerのクライアントの初期化、WebSocketの接続やサブスクライブ、メインの無限ループが含まれています。
await asyncio.sleep(args.interval)
によって、指定されたインターバルで無限ループが回ります。
ロギングの設定
logger.add("log.txt", rotation="10MB", retention=3)
log.txt
という名前のログファイルを設定しています。rotation="10MB"
で10メガバイトごとにログファイルをローテーション(新しいファイルを作成)します。retention=3
で古いログファイルを3つまで保持します。
Bitflyerのクライアントとデータストアの初期化
async with pybotters.Client( apis=args.api_key_json, base_url="https://api.bitflyer.com" ) as client: store = pybotters.bitFlyerDataStore() status = Status(store)
- BitflyerのAPIを使用するためのクライアントを初期化します。
- データストア(
pybotters.bitFlyerDataStore()
)とトレードのステータスを管理するためのオブジェクトを初期化します。
WebSocket接続とサブスクライブ
wstask = await client.ws_connect( "wss://ws.lightstream.bitflyer.com/json-rpc", send_json=[ {"method": "subscribe", "params": {"channel": "lightning_board_snapshot_FX_BTC_JPY"}, "id": 1}, {"method": "subscribe", "params": {"channel": "lightning_board_FX_BTC_JPY"}, "id": 2}, {"method": "subscribe", "params": {"channel": "child_order_events"}, "id": 3}, ], hdlr_json=store.onmessage, )
- BitflyerのWebSocketに接続し、指定のチャンネルにサブスクライブします。
send_json
でサブスクライブするチャンネルとそのパラメータを指定します。hdlr_json=store.onmessage
でWebSocketから受信したデータをデータストアに処理させます。
WebSocketの初期化が完了するまで待機
while not all([len(w) for w in [store.board]]): logger.debug("[WAITING SOCKET RESPONSE]") await store.wait()
- WebSocketの初期化が完了するまで待機します。
store.wait()
はデータストア内の特定のデータが到着するまで待機する関数です。
メインの無限ループ
while True: await market_making( client, store, status, args.symbol, args.t, args.d, args.s_entry, args.s_update, args.lot, logger, ) await asyncio.sleep(args.interval)
- メインの無限ループです。
market_making
関数を呼び出して市場メイキングアルゴリズムを実行します。- 指定されたインターバルだけ待機します。
10.main関数に渡す引数の設定
このコードは、コマンドライン引数を受け取り、指定された条件でマーケットメイキングアルゴリズムを実行するプログラムです。
コマンドラインから実行する際に様々な設定や条件を指定できるようになっており、asyncio
モジュールを使用して非同期処理を行っています。
if __name__ == "__main__":
if __name__ == "__main__":
- Python スクリプトが直接実行されるときに、そのスクリプトがエントリーポイントとして使われるブロックです。
- この中に記述されたコードが、他のモジュールからインポートされたときには実行されません。
この部分は特殊な構文なので、具体例を挙げて詳しく解説します。
詳しく解説
if __name__ == "__main__":
ブロックは Python スクリプトが直接実行されるときに、そのスクリプトがエントリーポイントとして動作するための特殊な構文です。ここで、その詳細な説明を提供します。
Python スクリプトが他のスクリプトやモジュールからインポートされると、そのスクリプトの中に書かれているコードが即座に実行されます。しかし、実行したいコードが他のファイルからインポートされたときには、この動作は望ましくありません。
例えば、以下のようなスクリプトがあるとします。
# my_module.py print("Hello from my_module!")
そして、これを別のスクリプトからインポートしようとします。
# main_script.py import my_module
この場合、main_script.py
を実行すると、my_module.py
の中の print
文も実行されてしまいます。しかし、my_module.py
をモジュールとして使うとき、通常はモジュールの中の関数やクラスだけを使いたいと思うでしょう。
if __name__ == "__main__":
ブロックは、スクリプトが直接実行された場合のみコードが実行されるようにするための仕組みです。例を用いて説明します。
# my_module.py def greet(): print("Hello from my_module!") if __name__ == "__main__": greet()
この場合、my_module.py
を直接実行すると、greet()
関数が呼び出されます。しかし、他のスクリプトから my_module
をインポートした場合には greet()
は実行されません。なぜなら、__name__
はスクリプトが直接実行された場合には "__main__"
となり、それ以外の場合にはモジュール名になるからです。
この仕組みにより、モジュールを作成するときに不要なコードの実行を避けることができ、モジュールとして期待通りに使えるようになります。
コマンドライン引数のパース
parser = ArgumentParser(description="pybotters x asyncio x magito MM") parser.add_argument("--api_key_json", help="apiキーが入ったJSONファイル", required=True) parser.add_argument("--symbol", default="FX_BTC_JPY", help="取引通過") parser.add_argument("--lot", default=0.01, type=float, help="注文サイズ") parser.add_argument( "--t", default=0.01, type=float, help="板上での累積注文量に対する閾値(この閾値を超えた時点での注文価格が参照価格となる)", ) parser.add_argument("--d", default=1, type=int, help="参照価格と指値のマージン(指値=参照価格±d)") parser.add_argument( "--s_entry", default=0.0003, type=float, help="エントリー用のスプレッド閾値(スプレッドがこの閾値以上の時にマーケットメイキングを開始する)", ) parser.add_argument( "--s_update", default=0.0001, type=float, help="指値更新用のスプレッド閾値(スプレッドがこの閾値以上の時に指値を更新する)", ) parser.add_argument("--interval", default=5, type=int, help="マーケットメイキングサイクルの間隔") args = parser.parse_args()
- コマンドライン引数をパースするための
ArgumentParser
クラスを使用しています。 --api_key_json
から--interval
までの各引数が定義されています。- 引数のデフォルト値や型、ヘルプメッセージが指定されています。
メイン関数の呼び出し
try: asyncio.run(main(args)) except KeyboardInterrupt as e: pass
- コマンドライン引数を元にして定義された
main
関数を呼び出します。 asyncio.run
関数を使用して非同期のメイン関数main
を同期的に呼び出します。- キーボード割り込み (
KeyboardInterrupt
) が発生した場合、プログラムの実行を終了します。
例外処理
except KeyboardInterrupt as e: pass
- キーボード割り込みが発生した場合の例外処理です。
- プログラムの実行を終了します。
まとめ
長大なコードも部分ごとに見ていけば、各部分の機能やそれぞれのつながりが見えてきます。
分かりにくい部分は実際に手を動かして、コードを書いてみることが機能を理解する近道です。
引き続き、Botの作成と実践を積んでいきます。
今後は、イナゴbotの開発も進めていきます。