前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
ソースコードはこちら。
本記事では「メソッド」と「関数」を同じような意味の言葉として使っている部分もありますが、厳密には定義が異なるため、より詳細に理解したい方はこちらの記事を参考にしてください。「関数とメソッドの違い」
1.モジュールのインポート
以下のコードは、冒頭部分です。まずは、それぞれのインポート文について説明します。
from __future__ import annotations
from typing import TYPE_CHECKING, Callable
if TYPE_CHECKING:
from pybotters.store import DataStore, Item
import asyncio
import loguru
import pybotters
from argparse import ArgumentParser
from functools import partial
from __future__ import annotations: この文は、Python 3.7以降で型アノテーションを行う際の挙動を変更するためのものです。annotationsフューチャーは、型の循環参照に対処するためのものです。詳細な理由や背景については、公式ドキュメントを参照。from typing import TYPE_CHECKING, Callable:typingモジュールからTYPE_CHECKINGとCallableをインポートしています。TYPE_CHECKING: これは、型ヒントをチェックするための特殊な値です。通常はランタイムで実行されないようになっています。型ヒントの中で循環参照を避けるために使用されます。Callable: 関数の型ヒントとして使用され、引数と戻り値の型を指定するために利用されます。
if TYPE_CHECKING: ...: 前述のTYPE_CHECKINGの値がTrueの場合、以下のブロックが実行されます。通常、この中では実行時には利用できない型ヒントを扱うための特殊な処理を行うことがあります。上のコードでは、pybotters.storeモジュールからDataStoreとItemをインポートしています。import asyncio: 非同期処理をサポートするためのasyncioモジュールをインポートしています。非同期プログラミングにおいては、非同期イベントループを管理するためにasyncioが利用されます。import loguru: ロギングを行うためのサードパーティのモジュールloguruをインポートしています。loguruは、シンプルで柔軟なログの取得を可能にするためのツールです。このモジュールはPythonの標準ライブラリではないので、別途インストールする必要があります。Loguruの使い方(Pythonログ出力ライブラリ)import pybotters: この行では、pybottersというモジュールをインポートしています。これはプロジェクト独自のモジュールで、その中でstoreモジュールが定義されています。インストールの手順はこちらから。from argparse import ArgumentParser: コマンドライン引数を処理するための標準ライブラリのargparseモジュールからArgumentParserクラスをインポートしています。これを使用することで、スクリプトにコマンドライン引数を追加し、それらを処理することができます。from functools import partial:functoolsモジュールからpartial関数をインポートしています。partial関数は、他の関数を一部適用して新しい関数を作成するために使用されます。
2.class Status(状態管理のクラス)
このクラスは、特定の仮想通貨取引所(bitFlyer)のデータを処理するためのものです。
class Status:
def __init__(
self,
store: pybotters.bitFlyerDataStore,
max_position: int = 1,
):
self._store: pybotters.bitFlyerDataStore = store
self._asks = None
self._bids = None
self._max_position = max_position
このクラスは、主にbitFlyer取引所のデータを管理するためのものであり、特に板情報を定期的に更新する非同期タスクを含んでいます。クラスのコンストラクタでは、取引所データの格納、ポジションの最大許容量などが初期化されます。
インポートとの関連
import asyncio import pybotters
asyncio: 非同期処理をサポートするPythonの組み込みモジュール。pybotters: 外部ライブラリまたはモジュールで、bitFlyer取引所のデータを処理するためのツールや関数が含まれています。
Statusクラスの定義
class Status:
クラス Status を定義しています。
コンストラクタの定義
def __init__(
self,
store: pybotters.bitFlyerDataStore,
max_position: int = 1,
):
- クラスのコンストラクタ(
__init__メソッド)を定義しています。コンストラクタはクラスのインスタンスが作成されるときに呼び出されます。 store: bitFlyerデータを取得するためのpybotters.bitFlyerDataStoreオブジェクト。型注釈により、この引数がどの型であるべきか示しています。max_position: ポジション(取引の保有量)の最大許容量。デフォルトでは1に設定されています。
インスタンス変数の初期化
self._store: pybotters.bitFlyerDataStore = store
self._asks = None
self._bids = None
self._max_position = max_position
- インスタンス変数
_storeに、渡されたstore引数を格納します。 - インスタンス変数
_asksと_bidsをNoneで初期化します。 - インスタンス変数
_max_positionに、渡されたmax_position引数を格納します。
ボード(板情報)の自動更新タスクの作成
asyncio.create_task(self.auto_update_board())
asyncio.create_task() を使用して、auto_update_board() メソッドを非同期タスクとして作成します。このメソッドはボード(板情報)のデータを定期的に更新するためのものです。
板情報の自動更新
このコードは、非同期(async)で動作するボード(板)情報の自動更新機能を提供する関数です。
async def auto_update_board(self):
"""板情報の自動更新タスク"""
with self._store.board.watch() as stream:
async for msg in stream:
self._asks, self._bids = self._store.board.sorted().values()
この関数は、bitFlyer 取引所のボード情報を非同期的に監視し、変更があった場合にそれを取得してクラスのインスタンス変数に格納するものです。非同期プログラミングの概念や、asyncio モジュールにおける非同期イベントの取り扱いが含まれています。
非同期関数の定義
async def auto_update_board(self):
- 非同期関数(
asyncキーワードがついている)としてauto_update_board関数が定義されています。 selfパラメータがあり、クラスのメソッドとして使われることを示しています。
板情報の非同期的な監視の開始
with self._store.board.watch() as stream:
self._store.board.watch()は、self._storeが持つboardオブジェクトの監視を開始します。withステートメントを使用して、監視が終了した時点でクリーンアップ処理を行います。streamは監視ストリーム(stream)で、取引所からのデータ更新を待ち受けるためのものです。
非同期イベントの待機と処理
async for msg in stream:
self._asks, self._bids = self._store.board.sorted().values()
async for文は非同期イベントの待機と処理を行います。streamからイベントが到着するたびにループが実行されます。msgには取引所からのメッセージが入りますが、このコードでは使われていません。self._store.board.sorted().values()は、ボードの売り注文(asks)と買い注文(bids)を取得し、それぞれself._asksとself._bidsに代入します。
指値注文を出す関数
このコードは、注文板のデータから指定された累積注文サイズの閾値を超えた位置に指値注文を出すための関数です。
def get_limit_price(self, side: str, t: float, d: int = 1):
"""注文サイズの累積量が``t``を超えたところに``d``だけ離して指値をだす。
:param str side: ask or bid
:param float t: 累積注文サイズの閾値
:param int d: 参照注文からのマージン
:return: 指値
"""
items = self._asks if side == "ask" else self._bids
cum_size, price = items[0]["size"], items[0]["price"]
for i in items:
if cum_size >= t:
return int(price + d if side == "ask" else price - d)
price = i["price"]
cum_size += i["size"]
# 最後までthresholdを満たさなかった場合、一番後ろの注文と同じ額
return int(items[-1]["price"])
この関数は、指定された注文板から特定の条件を満たす位置に指値注文を出すためのものです。注文板から注文を順に見ていき、指定された閾値を超えた時点で指値価格を計算して返します。初心者が理解する上でのポイントは、注文板の概念やループ処理の流れです。
関数の定義
def get_limit_price(self, side: str, t: float, d: int = 1):
get_limit_price関数が定義されています。sideパラメータは文字列で、"ask"(売り注文)または"bid"(買い注文)を指定します。tパラメータは浮動小数点数で、累積注文サイズの閾値を示します。dパラメータは整数で、指定された閾値を超えた位置から離れるマージンを表します。デフォルトでは1に設定されています。
注文ブックの選択
items = self._asks if side == "ask" else self._bids
sideパラメータに応じて、指定された方向の注文ブックを選択します。self._asksは売り注文のリスト、self._bidsは買い注文のリストを仮定しています。
累積注文サイズの計算と指値価格の取得
cum_size, price = items[0]["size"], items[0]["price"]
for i in items:
if cum_size >= t:
return int(price + d if side == "ask" else price - d)
price = i["price"]
cum_size += i["size"]
- 注文ブックを順番に見ていき、累積注文サイズが指定された閾値
tを超えた時点で、指定されたマージンdだけ離した位置の指値価格を計算して返します。 cum_size: 累積注文サイズを累積していく変数。price: 注文の価格。
閾値を満たさなかった場合の処理
# 最後までthresholdを満たさなかった場合、一番後ろの注文と同じ額
return int(items[-1]["price"])
注文ブックを最後まで探しても閾値 t を満たさなかった場合、一番後ろの注文の価格を指定された形式に変換して返します。
保有ポジションのリストを取得する関数
このコードは、特定の取引方向(BUYまたはSELL)に関連する保有ポジションのリストを取得するためのメソッドです。
def positions(self, side: str):
"""保有ポジションリスト。
:param str side: BUY or SELL
:return: ポジションのlist
"""
positions = self._store.positions.find({"side": side})
assert len(positions) <= self._max_position
return positions
このメソッドは、指定された取引方向に関連するポジションを取得し、最大ポジション数の条件を満たすかどうかを検証してから、ポジションのリストを返すものです。ここでも、メソッドの目的や各ステップで行われる処理の流れを理解することが重要です。
メソッドの定義
def positions(self, side: str):
positionsメソッドが定義されています。selfパラメータがあり、クラスのメソッドとして使われることを示しています。sideパラメータは文字列で、"BUY"または"SELL"を指定します。
ポジションの取得
positions = self._store.positions.find({"side": side})
self._store.positions.find({"side": side})は、self._storeが持つpositionsオブジェクトから、指定された取引方向("BUY"または"SELL")に関連するポジションを検索します。- ポジションのデータはおそらく辞書(dictionary)のリストとして返されます。
ポジション数の制約
assert len(positions) <= self._max_position
assert文は、指定された条件が満たされているかどうかを検証するためのものです。条件が偽の場合、AssertionErrorが発生します。- この行では、取得したポジションの数が設定された最大ポジション数(
self._max_position)以下であることを検証しています。
結果の返却
return positions
最後に、取得したポジションのリストを返します。
保有ポジションのサイズを確認する関数
このコードは、指定された取引方向(BUYまたはSELL)に関連する保有ポジションのサイズを取得するためのメソッドです。
def remaining_size(self, side):
"""保有ポジションサイズ。
:param str side: BUY or SELL
:return: ポジションサイズ
"""
positions = self.positions(side)
if len(positions):
return sum([p["size"] for p in positions])
else:
return 0
このメソッドは、指定された取引方向に関連する保有ポジションのサイズを取得するためのものです。取得したポジションが存在する場合は、そのサイズを合計して返し、ポジションが存在しない場合は0を返します。初心者が理解する上でのポイントは、他のメソッドの呼び出しや条件文によるフローの制御です。
メソッドの定義
def remaining_size(self, side):
remaining_sizeメソッドが定義されています。selfパラメータがあり、おそらくクラスのメソッドとして使われることを示しています。sideパラメータは文字列で、"BUY"または"SELL"を指定します。
他のメソッドの呼び出し
positions = self.positions(side)
self.positions(side)は、同じクラス内のpositionsメソッドを呼び出して、指定された取引方向に関連するポジションのリストを取得します。- 取得したポジションの情報は
positions変数に格納されます。
ポジションサイズの計算
if len(positions):
return sum([p["size"] for p in positions])
if len(positions):は、取得したポジションが存在するかどうかを検証しています。len(positions)が0でない(ポジションが存在する)場合に、以下の処理を行います。sum([p["size"] for p in positions])は、取得したポジションのサイズ("size"フィールドの値)を合計してポジションサイズを計算します。
ポジションが存在しない場合の処理
else:
return 0
ポジションが存在しない場合(len(positions) が0の場合)、0を返します。
3つのプロパティ
このコードは、3つのプロパティ(best_ask、best_bid、spread)を持つクラスの一部です。
'@'で書き始めることによって、関数に特殊な振る舞いをさせることができます。
@property
def best_ask(self):
return int(self._asks[0]["price"])
@property
def best_bid(self):
return int(self._bids[0]["price"])
@property
def spread(self):
return (self.best_ask - self.best_bid) / self.best_bid
このクラスは、最良の売り注文価格、最良の買い注文価格、およびそれらの差であるスプレッドを提供する3つのプロパティを持っています。それぞれのプロパティは、注文ブックから必要な情報を抽出し、適切な形式に変換して返します。ここを理解する上でのポイントは、プロパティが属性のようにアクセスできること、デコレータ(関数に特殊な振る舞いをさせる機能)、およびスプレッドの概念です。
best_ask プロパティ
@property
def best_ask(self):
return int(self._asks[0]["price"])
best_askプロパティは、売り注文ブック(asks)から最も価格が低い注文の価格を取得します。@propertyデコレータを使用して、このメソッドを属性のようにアクセスできるようにしています。self._asks[0]["price"]は売り注文ブックの一番上にある注文の価格を取得します。
best_bid プロパティ
@property
def best_bid(self):
return int(self._bids[0]["price"])
best_bidプロパティは、買い注文ブック(bids)から最も価格が高い注文の価格を取得します。@propertyデコレータを使用して、このメソッドを属性のようにアクセスできるようにしています。self._bids[0]["price"]は買い注文ブックの一番上にある注文の価格を取得します。
spread プロパティ
@property
def spread(self):
return (self.best_ask - self.best_bid) / self.best_bid
spreadプロパティは、best_ask(最良の売り注文価格)とbest_bid(最良の買い注文価格)の差を計算し、その差をbest_bidで割った値を返します。- スプレッドは、取引所の売りと買いの最良価格との差を表す指標で、取引所での流動性を示す一つの尺度です。
class EventWatcher(Event監視のクラス)
このコードは、イベントを監視するためのクラス EventWatcher を定義しています。
class EventWatcher:
def __init__(self, store: DataStore, trigger_fn: Callable[[Item], bool] = None):
self._store = store
self._trigger_fn = trigger_fn
self._task = asyncio.create_task(self._watch())
このクラスは、外部から提供されたデータストアを用いてイベントを監視し、トリガー条件が満たされた場合に指定されたアクションを実行するためのものです。理解を深めるポイントは、外部から注入される関数や非同期タスクの概念、そして __init__ メソッドの使い方です。
クラスの定義
class EventWatcher:
クラス EventWatcher を定義しています。
クラスのコンストラクタ(__init__ メソッド)
def __init__(self, store: DataStore, trigger_fn: Callable[[Item], bool] = None):
self._store = store
self._trigger_fn = trigger_fn
self._task = asyncio.create_task(self._watch())
- クラスのコンストラクタ (
__init__メソッド) が定義されています。 storeパラメータはDataStoreクラスのオブジェクトを受け取ります。これは、外部から提供されるデータストアを指定するものです。trigger_fnパラメータは、イベントをトリガーするための関数です。Callable[[Item], bool]は、引数がItem型で、戻り値がbool型の関数であることを示しています。もしtrigger_fnが指定されなかった場合、デフォルトでNoneになります。self._storeには外部から提供されたデータストアが格納されます。self._trigger_fnには外部から提供されたトリガー関数が格納されます。self._taskには、_watchメソッドを非同期タスクとして実行するためのタスクが格納されます。
_watch メソッド
self._task = asyncio.create_task(self._watch())
イベント監視のための非同期タスクを開始するために、asyncio.create_task を使用して _watch メソッドを非同期タスクとして実行しています。
非同期関数_watch
このコードは非同期関数 _watch を定義しています。この関数は、DataStore を監視し、指定された条件が満たされるまでイベントを待機するものです。
この非同期関数 _watch は、DataStore を監視し、指定された条件が満たされるまでイベントを待機するものです。指定された条件が満たされた場合、そのイベントメッセージを返し、関数が終了します。理解する上でのポイントは、非同期プログラミングの概念や async for ループ、with ステートメントの使い方です。
async def _watch(self):
"""`_is_target_event(msg.data)`がTrueを返すまでDataStoreをwatchし続ける。"""
with self._store.watch() as stream:
async for msg in stream:
if self._is_trigger(msg.data):
return msg.data
関数の定義
_watch関数は非同期関数として定義されています。- ドキュメンテーション文字列により、関数の目的が説明されています。この関数は、指定された条件が満たされるまで
DataStoreを監視し続けるものです。 with self._store.watch() as stream:は、DataStoreのwatchメソッドを使用して、イベントのストリームを開始します。このwithステートメントにより、監視が終了した時点でクリーンアップが自動的に行われます。async for msg in stream:は、streamからイベントメッセージを非同期的に待ち受けるループです。
イベントトリガー条件の判定
if self._is_trigger(msg.data):
return msg.data
self._is_trigger(msg.data)がTrueを返すかどうかを判定します。- もし
Trueを返すならば、そのイベントメッセージmsg.dataを返し、関数を終了します。これにより、指定された条件が満たされた時点で関数が終了します。
_is_trigger(イベントのトリガー条件を判定するメソッド)
このコードは、イベントのトリガー条件を判定するための非常に簡潔なメソッド _is_trigger を定義しています。
def _is_trigger(self, d: Item):
"""socketメッセージを受け取って、イベント発火の有無を判定する。
子クラスこの関数をオーバーライドしてもいいし、`trigger_fn`として与えてもいい。
:param Item d: socketメッセージ。
:return:
"""
if self._trigger_fn is None:
raise NotImplementedError
return self._trigger_fn(d)
このメソッドは、指定されたデータに対してイベントのトリガー条件を判定するためのものです。self._trigger_fn が未設定の場合はエラーを発生させ、設定されている場合はそれを呼び出してトリガー条件を確認します。理解する上でのポイントは、メソッドの目的やドキュメンテーション文字列、および例外の処理です。
_is_trigger メソッドの定義
_is_triggerメソッドは、指定されたItem型のデータdに対してイベントのトリガー条件を判定する役割を持っています。- ドキュメンテーション文字列により、関数の目的や使い方が説明されています。
:param Item d:は、このメソッドの引数dの型と説明を示しています。Itemはおそらく特定の型を表しているものと考えられます。:return:は、このメソッドの戻り値についての説明です。
イベントトリガー条件の判定
if self._trigger_fn is None:
raise NotImplementedError
return self._trigger_fn(d)
self._trigger_fnがNone(未設定)の場合、NotImplementedErrorを発生させます。これは、派生クラスでこのメソッドが適切に実装されていない場合のエラーです。self._trigger_fn(d)は、実際にイベントのトリガー条件を判定する関数を呼び出しています。この関数は、dという引数を取り、特定の条件を満たすかどうかを返します。
非同期処理を扱うためのメソッド群
このコードは非同期処理を扱うためのクラスのメソッド群です。
async def wait(self):
await self._task
def done(self):
return self._task.done()
def result(self):
return self._task.result()
このクラスは非同期処理を管理するためのもので、wait メソッドを使って非同期処理の完了を待ち、done メソッドを使って非同期処理の完了状態を確認し、result メソッドを使って非同期処理の結果を取得します。理解する上でのポイントは、非同期処理の待機と結果取得の仕組みに焦点を当てることです。
wait メソッド
async def wait(self):
await self._task
waitメソッドは、非同期処理が完了するまで待機するためのものです。await self._taskは、_taskが完了するまで非同期的に待機します。他の非同期処理が進行する一方で、このメソッドは_taskの完了を待ちます。
done メソッド
def done(self):
return self._task.done()
doneメソッドは、非同期処理が完了しているかどうかを判定するためのものです。self._task.done()は、非同期処理が完了している場合はTrueを、まだ未完了の場合はFalseを返します。
result メソッド
def result(self):
return self._task.result()
resultメソッドは、非同期処理が完了した後にその結果を取得するためのものです。self._task.result()は、非同期処理が完了している場合はその結果を返します。ただし、非同期処理がまだ完了していない場合は適切な方法で待機することが期待されます。
3.class ChildOrderEventWatcher(EventWatcher)「特定の注文に関連するイベントを監視する」
このコードは、EventWatcher クラスを継承した ChildOrderEventWatcher クラスを定義しています。
class ChildOrderEventWatcher(EventWatcher):
def __init__(self, store, order_id, **kwargs):
self._order_id = order_id
self._cond = kwargs
super(ChildOrderEventWatcher, self).__init__(store)
def _is_trigger(self, d):
return d["child_order_acceptance_id"] == self._order_id and all(
[v == d[k] for (k, v) in self._cond.items()]
)
def replace_order_id(self, order_id):
self._order_id = order_id
このクラスは、特定の注文に関連するイベントを監視するためのものであり、指定された注文IDと条件がイベントデータと一致したときにトリガーされます。理解する上でのポイントは、クラスの初期化や条件の判定のメカニズム、メソッドの目的などです。
__init__ メソッド
def __init__(self, store, order_id, **kwargs):
self._order_id = order_id
self._cond = kwargs
super(ChildOrderEventWatcher, self).__init__(store)
__init__メソッドは、クラスの初期化を行います。storeパラメータは、DataStoreのような外部から提供されるデータストアオブジェクトです。order_idパラメータは、特定の注文を識別するための注文IDです。**kwargsは可変長のキーワード引数で、任意の数のキーと値を持つ辞書として受け取ります。これは条件を表します。self._order_idには注文IDが格納されます。self._condには条件が格納されます。super(ChildOrderEventWatcher, self).__init__(store)は親クラスのコンストラクタを呼び出して、親クラスの初期化を行います。
_is_trigger メソッド
def _is_trigger(self, d):
return d["child_order_acceptance_id"] == self._order_id and all(
[v == d[k] for (k, v) in self._cond.items()]
)
_is_triggerメソッドは、特定の条件に基づいてイベントのトリガーを判定します。dパラメータは、受け取ったイベントデータです。- メソッドは、イベントデータの
"child_order_acceptance_id"がself._order_idと等しく、かつself._condに指定された条件がすべて満たされている場合にTrueを返します。
replace_order_id メソッド
def replace_order_id(self, order_id):
self._order_id = order_id
replace_order_idメソッドは、保持している注文IDを更新するためのものです。order_idパラメータで新しい注文IDを指定し、それをself._order_idに格納します。
4.class ExecutionWatcher(ChildOrderEventWatcher)「特定の注文に関連する "EXECUTION" イベントを監視する」
このコードは、ChildOrderEventWatcher クラスを継承して新しいクラス ExecutionWatcher を定義しています。
class ExecutionWatcher(ChildOrderEventWatcher):
def __init__(self, store, order_id):
super(ExecutionWatcher, self).__init__(store, order_id, event_type="EXECUTION")
ExecutionWatcher クラスは、ChildOrderEventWatcher クラスを継承しつつ、特定の注文に関連する "EXECUTION" イベントを監視するために初期化されます。理解する上でのポイントは、クラスの継承や初期化の仕組み、および親クラスの設定に注目することです。
ExecutionWatcher クラスの定義
class ExecutionWatcher(ChildOrderEventWatcher):
def __init__(self, store, order_id):
super(ExecutionWatcher, self).__init__(store, order_id, event_type="EXECUTION")
ExecutionWatcherクラスはChildOrderEventWatcherクラスを継承しています。これは、ChildOrderEventWatcherクラスの機能を継承しつつ、新しい機能を追加するものです。__init__メソッドは、クラスの初期化を行います。親クラスのコンストラクタを呼び出して初期化するとともに、追加の情報(event_type)を設定しています。
__init__ メソッド
def __init__(self, store, order_id):
super(ExecutionWatcher, self).__init__(store, order_id, event_type="EXECUTION")
__init__メソッドはクラスの初期化を行います。storeパラメータは外部から提供されるデータストアオブジェクトです。order_idパラメータは特定の注文を識別するための注文IDです。super(ExecutionWatcher, self).__init__(store, order_id, event_type="EXECUTION")は親クラス(ChildOrderEventWatcher)のコンストラクタを呼び出して初期化します。ここで、event_typeパラメータに"EXECUTION"を指定しています。これは、ChildOrderEventWatcherクラスがイベントを監視する条件の一部です。具体的には、このクラスは "EXECUTION" イベントに対して監視を行うように設定されています。
5.class CancelWatcher(ChildOrderEventWatcher)「特定の注文に関連する "CANCEL" または "CANCEL_FAILED" イベントを監視する」
このコードは、ChildOrderEventWatcher クラスを継承した新しいクラス CancelWatcher を定義しています。
class CancelWatcher(ChildOrderEventWatcher):
def __init__(self, store, order_id):
super(CancelWatcher, self).__init__(store, order_id)
def _is_trigger(self, d):
return d["child_order_acceptance_id"] == self._order_id and d["event_type"] in [
"CANCEL",
"CANCEL_FAILED",
]
CancelWatcher クラスは、ChildOrderEventWatcher クラスを継承して特定の注文に関連する "CANCEL" または "CANCEL_FAILED" イベントを監視するために初期化されます。理解する上でのポイントは、クラスの継承や初期化の仕組み、そしてイベントのトリガー条件に注目することです。
CancelWatcher クラスの定義
class CancelWatcher(ChildOrderEventWatcher):
def __init__(self, store, order_id):
super(CancelWatcher, self).__init__(store, order_id)
CancelWatcherクラスはChildOrderEventWatcherクラスを継承しています。これは、ChildOrderEventWatcherクラスの機能を利用しつつ、新しいクラスを作成するものです。__init__メソッドはクラスの初期化を行います。親クラスのコンストラクタを呼び出して初期化を行っています。
__init__ メソッド
def __init__(self, store, order_id):
super(CancelWatcher, self).__init__(store, order_id)
__init__メソッドは、クラスの初期化を行います。storeパラメータは外部から提供されるデータストアオブジェクトです。order_idパラメータは特定の注文を識別するための注文IDです。super(CancelWatcher, self).__init__(store, order_id)は親クラス(ChildOrderEventWatcher)のコンストラクタを呼び出して初期化を行います。これにより、CancelWatcherクラスもChildOrderEventWatcherクラスの機能を利用できるようになります。
_is_trigger メソッド
def _is_trigger(self, d):
return d["child_order_acceptance_id"] == self._order_id and d["event_type"] in [
"CANCEL",
"CANCEL_FAILED",
]
is_triggerメソッドは、特定の条件に基づいてイベントのトリガーを判定します。dパラメータは、受け取ったイベントデータです。- メソッドは、イベントデータの
"child_order_acceptance_id"がself._order_idと等しく、かつ"event_type"が指定されたイベントのいずれかである場合にTrueを返します。このクラスでは "CANCEL" または "CANCEL_FAILED" イベントがトリガー条件となっています。
6.注文ヘルパー①「指し値注文を行うための非同期関数」
このコードは、指定された条件で制限付き注文(リミットオーダー)を行うための非同期関数 limit_order を定義しています。
async def limit_order(client, symbol, side, size, price, time_in_force="GTC"):
assert side in ["BUY", "SELL"]
res = await client.post(
"/v1/me/sendchildorder",
data={
"product_code": symbol,
"side": side,
"size": size,
"child_order_type": "LIMIT",
"price": int(price),
"time_in_force": time_in_force,
},
)
data = await res.json()
if res.status != 200:
raise RuntimeError(f"Invalid request: {data}")
else:
return data["child_order_acceptance_id"]
limit_order 関数は、指定された条件で制限付き注文を行い、注文受付IDを返す非同期関数です。理解する上でのポイントは、非同期処理、APIリクエストの作成、レスポンスの処理です。
limit_order 関数の定義
async def limit_order(client, symbol, side, size, price, time_in_force="GTC"):
limit_order関数は非同期関数で、指定された条件で制限付き注文を行います。- パラメータ:
client: 注文を送信するクライアントオブジェクト(例えば、APIクライアントなど)。symbol: トレードする商品のシンボル(例: BTC/USD)。side: 取引の方向("BUY" または "SELL")。size: 注文の数量。price: 注文の価格。time_in_force: 注文の有効期間(デフォルトは "GTC")。
条件のアサーション
assert side in ["BUY", "SELL"]
assert side in ["BUY", "SELL"] は、side が "BUY" または "SELL" のいずれかであることを確認します。それ以外の場合はエラーが発生します。
注文の送信と結果の取得
res = await client.post(
"/v1/me/sendchildorder",
data={
"product_code": symbol,
"side": side,
"size": size,
"child_order_type": "LIMIT",
"price": int(price),
"time_in_force": time_in_force,
},
)
client.postメソッドを使用して、注文を送信し、結果を取得します。- APIのエンドポイント
/v1/me/sendchildorderに対して POST リクエストを行います。 - リクエストのデータは、注文に関する情報(商品コード、取引方向、数量、注文タイプ、価格、有効期間など)です。
結果のJSONデータの取得
data = await res.json()
res.json() メソッドを使用して、APIからのレスポンスを JSON 形式で取得します。
注文の確認と注文受付IDの返却
if res.status != 200:
raise RuntimeError(f"Invalid request: {data}")
else:
return data["child_order_acceptance_id"]
- レスポンスのステータスコードが 200 でない場合、エラーメッセージを含んだ RuntimeError 例外が発生します。
- そうでない場合、注文が正常に受け付けられたことを示すので、注文受付IDを返します。
7.注文ヘルパー②「注文をキャンセルするための非同期関数」
このコードは、指定された条件で注文をキャンセルするための非同期関数 cancel_order を定義しています。
async def cancel_order(client, symbol, order_id):
order_id_key = "child_order_id"
if order_id.startswith("JRF"):
order_id_key = order_id_key.replace("_id", "_acceptance_id")
res = await client.post(
"/v1/me/cancelchildorder", data={"product_code": symbol, order_id_key: order_id}
)
return res.status == 200
cancel_order 関数は、指定された条件で注文をキャンセルし、その結果を True または False で返す非同期関数です。理解する上でのポイントは、非同期処理、APIリクエストの作成、レスポンスの処理などが含まれます。
cancel_order 関数の定義
async def cancel_order(client, symbol, order_id):
cancel_order関数は非同期関数で、指定された条件で注文をキャンセルします。- パラメータ:
client: 注文を送信するクライアントオブジェクト。symbol: トレードする商品のシンボル(例: BTC/USD)。order_id: キャンセル対象の注文ID。
注文IDの調整
order_id_key = "child_order_id"
if order_id.startswith("JRF"):
order_id_key = order_id_key.replace("_id", "_acceptance_id")
order_idが "JRF" で始まる場合、order_id_keyの一部を置換しています。- 例えば、
order_idが "JRF12345" の場合、order_id_keyは "child_order_acceptance_id" に変更されます。これは、APIによって注文IDが "_acceptance_id" で終わる場合があるため、それに対応しています。
キャンセルリクエストの送信と結果の取得
res = await client.post(
"/v1/me/cancelchildorder", data={"product_code": symbol, order_id_key: order_id}
)
client.postメソッドを使用して、キャンセルリクエストを送信し、結果を取得します。- APIのエンドポイント
/v1/me/cancelchildorderに対して POST リクエストを行います。 - リクエストのデータは、注文キャンセルに関する情報(商品コード、注文IDなど)です。
キャンセル結果の判定と返却
return res.status == 200
レスポンスのステータスコードが 200 であれば、注文が正常にキャンセルされたことを示すため、True を返します。それ以外の場合は False を返します。
8.マーケットメイキングのロジック
ここからは、マーケットメイキングのロジックを作り上げている部分の解説です。戦略実装の要となる部分ですので、マーケットメイキング戦略への理解があることが土台です。
market_making 関数の定義
このコードは、マーケットメイキングアルゴリズムを実装する非同期関数 market_making を定義しています。
async def market_making(
client,
store: pybotters.bitFlyerDataStore,
status: Status,
symbol: str,
t: float,
d: int,
s_entry: float,
s_update: float,
size: float,
logger: loguru.Logger,
):
market_making関数は、マーケットメイキングアルゴリズムを実装する非同期関数です。- パラメータ:
client: 注文を送信するクライアントオブジェクト。store: データのストア(保存庫)オブジェクト。status: トレードの状態を管理するオブジェクト。symbol: トレードする商品のシンボル(例: BTC/USD)。t: 注文サイズの累積量の閾値。d: 注文価格を更新するマージン。s_entry: スプレッドの基準となるエントリー価格の閾値。s_update: スプレッドの更新に使用される閾値。size: 注文の数量。logger: ログを出力するための Logger オブジェクト。
_oneside_loop(片側の注文(BUYまたはSELL)からキャンセルして再注文を繰り返す非同期関数 _)
このコードは、片側の注文(BUYまたはSELL)からキャンセルして再注文を繰り返す非同期関数 _oneside_loop を実装しています。
async def _oneside_loop(side: str, size: float, pricer: Callable[[], int]):
"""片サイドの注文→キャンセル→再注文ループ。
:param side: "BUY" or "SELL"
:param size: 注文サイズ
:param pricer: 指値関数
:return:
"""
# エントリー
price = pricer()
order_id = await limit_order(client, symbol, side, size, price)
logger.info(f"[{side} ENTRY] {order_id} / {price} / {size:.5f}")
# 約定監視ループ
execution_watcher = ExecutionWatcher(store.childorderevents, order_id)
while not execution_watcher.done():
# 指値更新間隔
# 1ループでキャンセルと指値更新を最大2回x2(両サイド)行う。API制限が500/5minなので、
# 300 / 3.5 * 4 = 342.85... とちょっと余裕あるくらいに設定しておく。
# (余談)途中でcontinueとかよくするのでsleepはwhileの直下で実行するのが個人的に好き
await asyncio.sleep(3.5)
# spreadが閾値以上なので指値更新
if status.spread > s_update:
new_price = pricer()
if price != new_price:
# 前の注文をキャンセル。childorvereventsをwatchしてCANCEL or
# CANCEL_FAILEDのステータスを確認してから次の注文を入れる
cancel_watcher = CancelWatcher(store.childorderevents, order_id)
is_canceled = await cancel_order(client, symbol, order_id)
await cancel_watcher.wait()
cancel_result = cancel_watcher.result()
if cancel_result["event_type"] == "CANCEL":
# キャンセル成功→再注文
logger.info(f"[{side} CANCELED] {order_id}")
new_order_id = await limit_order(
client, symbol, side, size, new_price
)
# 監視する注文番号・指値を更新
execution_watcher.replace_order_id(new_order_id)
order_id = new_order_id
price = new_price
logger.info(f"[{side} UPDATE] {order_id} / {price}")
elif cancel_result["event_type"] == "CANCEL_FAILED":
# キャンセル失敗→約定しているはずなので次のループでexecution_watcherがdoneになる
logger.info(
f"[{side} CANCEL FAILED] {order_id} (should be executed)"
)
continue
# 約定
loop_result = execution_watcher.result()
logger.info(f"[{side} FINISH] {loop_result}")
return loop_result
理解する上でのポイントは、非同期処理、注文の発行とキャンセル、注文の監視です。
_oneside_loop 関数の定義
async def _oneside_loop(side: str, size: float, pricer: Callable[[], int]):
_oneside_loop関数は、片側の注文からキャンセルして再注文を繰り返す非同期関数です。- パラメータ:
side: 注文の方向("BUY" または "SELL")。size: 注文サイズ。pricer: 指値関数(注文価格を決定するための関数)。
エントリー(新規注文)の処理
price = pricer()
order_id = await limit_order(client, symbol, side, size, price)
logger.info(f"[{side} ENTRY] {order_id} / {price} / {size:.5f}")
pricer関数を使用して指値価格を取得し、それを使って新規注文を行います。limit_order関数は、指定された条件で制限付き注文を行います。- ログに注文ID、注文価格、注文サイズを表示します。
約定監視ループ
execution_watcher = ExecutionWatcher(store.childorderevents, order_id)
while not execution_watcher.done():
ExecutionWatcher オブジェクトを作成し、注文の約定を監視するループを開始します。
指値更新とキャンセルの処理
await asyncio.sleep(3.5)
if status.spread > s_update:
new_price = pricer()
# 以下、指値の更新とキャンセルの処理
- 指定の条件(スプレッドが閾値以上)の場合、指値を新しい価格に更新します。
- 注文のキャンセル処理を行います。キャンセルが成功した場合、再注文を行います。
キャンセルが成功した場合の処理
logger.info(f"[{side} CANCELED] {order_id}")
new_order_id = await limit_order(
client, symbol, side, size, new_price
)
execution_watcher.replace_order_id(new_order_id)
order_id = new_order_id
price = new_price
logger.info(f"[{side} UPDATE] {order_id} / {price}")
- キャンセルが成功した場合、ログにキャンセルの成功と更新後の注文情報を表示します。
- 新しい注文を作成し、
ExecutionWatcherオブジェクトの監視対象の注文IDを更新します。
キャンセルが失敗した場合の処理
elif cancel_result["event_type"] == "CANCEL_FAILED":
logger.info(
f"[{side} CANCEL FAILED] {order_id} (should be executed)"
)
continue
キャンセルが失敗した場合、ログにキャンセルの失敗を表示し、次のループで約定したものとして処理を継続します。
約定が完了した場合の処理
loop_result = execution_watcher.result()
logger.info(f"[{side} FINISH] {loop_result}")
return loop_result
約定が完了した場合、ログに約定の結果を表示し、その結果を返します。
結果をログに表示するプログラム
このコードは、無限ループでスプレッドが特定の閾値を超えた場合に、指定された条件で買い注文と売り注文を同時に実行して、その結果をログに表示するプログラムです。
while True:
sp = status.spread
if sp > s_entry:
# 現在のポジションから端数を取得
buy_remaining_size = status.remaining_size("BUY")
sell_remaining_size = status.remaining_size("SELL")
logger.info(
f"[START]\n"
f"\tspread: {sp}\n"
f"\tsymbol: {symbol}\n"
f"\tt: {t}\n"
f"\td: {d}\n"
f"\ts_entry: {s_entry}\n"
f"\ts_update: {s_update}\n"
f"\tbuy_size: {size + sell_remaining_size}\n"
f"\tsell_size: {size + buy_remaining_size}"
)
buy_result, sell_result = await asyncio.gather(
_oneside_loop(
"BUY",
size + sell_remaining_size,
partial(status.get_limit_price, "bid", t, d),
),
_oneside_loop(
"SELL",
size + buy_remaining_size,
partial(status.get_limit_price, "ask", t, d),
),
)
logger.info(f"[FINISH] {sell_result['price'] - buy_result['price']}")
break
else:
logger.info(
f"[WAITING CHANCE] {status.best_ask} - ({status.spread:.4f}) - {status.best_bid}"
)
await asyncio.sleep(0.1)
メインの無限ループ
while True:
sp = status.spread
if sp > s_entry:
# 中略
else:
# 中略
await asyncio.sleep(0.1)
- 無限ループを開始しています。
- スプレッド(最良の売り注文価格と買い注文価格の差)が指定のエントリー閾値
s_entryを超えた場合とそれ以外の場合に分かれています。 await asyncio.sleep(0.1)によって、次のループまで0.1秒待機します。これにより、無駄な処理負荷を抑えながら定期的にスプレッドを確認します。
スプレッドが閾値を超えた場合の処理
if sp > s_entry:
# 中略
else:
# 中略
スプレッドがエントリー閾値 s_entry を超えた場合の処理が行われます。
ログと注文サイズの計算
buy_remaining_size = status.remaining_size("BUY")
sell_remaining_size = status.remaining_size("SELL")
logger.info(
f"[START]\n"
f"\tspread: {sp}\n"
f"\tsymbol: {symbol}\n"
f"\tt: {t}\n"
f"\td: {d}\n"
f"\ts_entry: {s_entry}\n"
f"\ts_update: {s_update}\n"
f"\tbuy_size: {size + sell_remaining_size}\n"
f"\tsell_size: {size + buy_remaining_size}"
)
- 現在のポジションから端数を取得して、ログに表示するための情報を計算しています。
remaining_size関数は、指定された方向("BUY"または"SELL")のポジションの合計サイズを取得する関数です。
買い注文と売り注文の同時実行
buy_result, sell_result = await asyncio.gather(
_oneside_loop(
"BUY",
size + sell_remaining_size,
partial(status.get_limit_price, "bid", t, d),
),
_oneside_loop(
"SELL",
size + buy_remaining_size,
partial(status.get_limit_price, "ask", t, d),
),
)
asyncio.gatherを使用して、買い注文と売り注文を同時に実行します。_oneside_loop関数は、片側の注文からキャンセルして再注文を繰り返す関数です。
ログに実行結果を表示
logger.info(f"[FINISH] {sell_result['price'] - buy_result['price']}")
break
- 実行結果(買い注文と売り注文の約定価格の差)をログに表示します。
breakによって、無限ループを終了させます。
スプレッドが閾値を超えていない場合のログ
else:
logger.info(
f"[WAITING CHANCE] {status.best_ask} - ({status.spread:.4f}) - {status.best_bid}"
)
await asyncio.sleep(0.1)
- スプレッドがエントリー閾値を超えていない場合、ログに待機メッセージを表示します。
await asyncio.sleep(0.1)によって、次のループまで0.1秒待機します。
9.メインの処理
このコードは、BitflyerのAPIを使用して、マーケットメイキングアルゴリズムを実装したプログラムです。
async def main(args):
# ロガーの設定
logger = loguru.logger
logger.add("log.txt", rotation="10MB", retention=3)
# Bitflyerのクライアントとデータストアを初期化
async with pybotters.Client(
apis=args.api_key_json, base_url="https://api.bitflyer.com"
) as client:
store = pybotters.bitFlyerDataStore()
status = Status(store)
# WebSocket接続とサブスクライブ
wstask = await client.ws_connect(
"wss://ws.lightstream.bitflyer.com/json-rpc",
send_json=[
{
"method": "subscribe",
"params": {"channel": "lightning_board_snapshot_FX_BTC_JPY"},
"id": 1,
},
{
"method": "subscribe",
"params": {"channel": "lightning_board_FX_BTC_JPY"},
"id": 2,
},
{
"method": "subscribe",
"params": {"channel": "child_order_events"},
"id": 3,
},
],
hdlr_json=store.onmessage,
)
# WebSocketの初期化が完了するまで待機
while not all([len(w) for w in [store.board]]):
logger.debug("[WAITING SOCKET RESPONSE]")
await store.wait()
# メインの無限ループ
while True:
await market_making(
client,
store,
status,
args.symbol,
args.t,
args.d,
args.s_entry,
args.s_update,
args.lot,
logger,
)
await asyncio.sleep(args.interval)
main 関数の定義
async def main(args):
- メインのプログラムが実行される関数です。
- ロガーの設定やBitflyerのクライアントの初期化、WebSocketの接続やサブスクライブ、メインの無限ループが含まれています。
await asyncio.sleep(args.interval)によって、指定されたインターバルで無限ループが回ります。
ロギングの設定
logger.add("log.txt", rotation="10MB", retention=3)
log.txtという名前のログファイルを設定しています。rotation="10MB"で10メガバイトごとにログファイルをローテーション(新しいファイルを作成)します。retention=3で古いログファイルを3つまで保持します。
Bitflyerのクライアントとデータストアの初期化
async with pybotters.Client(
apis=args.api_key_json, base_url="https://api.bitflyer.com"
) as client:
store = pybotters.bitFlyerDataStore()
status = Status(store)
- BitflyerのAPIを使用するためのクライアントを初期化します。
- データストア(
pybotters.bitFlyerDataStore())とトレードのステータスを管理するためのオブジェクトを初期化します。
WebSocket接続とサブスクライブ
wstask = await client.ws_connect(
"wss://ws.lightstream.bitflyer.com/json-rpc",
send_json=[
{"method": "subscribe", "params": {"channel": "lightning_board_snapshot_FX_BTC_JPY"}, "id": 1},
{"method": "subscribe", "params": {"channel": "lightning_board_FX_BTC_JPY"}, "id": 2},
{"method": "subscribe", "params": {"channel": "child_order_events"}, "id": 3},
],
hdlr_json=store.onmessage,
)
- BitflyerのWebSocketに接続し、指定のチャンネルにサブスクライブします。
send_jsonでサブスクライブするチャンネルとそのパラメータを指定します。hdlr_json=store.onmessageでWebSocketから受信したデータをデータストアに処理させます。
WebSocketの初期化が完了するまで待機
while not all([len(w) for w in [store.board]]):
logger.debug("[WAITING SOCKET RESPONSE]")
await store.wait()
- WebSocketの初期化が完了するまで待機します。
store.wait()はデータストア内の特定のデータが到着するまで待機する関数です。
メインの無限ループ
while True:
await market_making(
client,
store,
status,
args.symbol,
args.t,
args.d,
args.s_entry,
args.s_update,
args.lot,
logger,
)
await asyncio.sleep(args.interval)
- メインの無限ループです。
market_making関数を呼び出して市場メイキングアルゴリズムを実行します。- 指定されたインターバルだけ待機します。
10.main関数に渡す引数の設定
このコードは、コマンドライン引数を受け取り、指定された条件でマーケットメイキングアルゴリズムを実行するプログラムです。
コマンドラインから実行する際に様々な設定や条件を指定できるようになっており、asyncio モジュールを使用して非同期処理を行っています。
if __name__ == "__main__":
if __name__ == "__main__":
- Python スクリプトが直接実行されるときに、そのスクリプトがエントリーポイントとして使われるブロックです。
- この中に記述されたコードが、他のモジュールからインポートされたときには実行されません。
この部分は特殊な構文なので、具体例を挙げて詳しく解説します。
詳しく解説
if __name__ == "__main__": ブロックは Python スクリプトが直接実行されるときに、そのスクリプトがエントリーポイントとして動作するための特殊な構文です。ここで、その詳細な説明を提供します。
Python スクリプトが他のスクリプトやモジュールからインポートされると、そのスクリプトの中に書かれているコードが即座に実行されます。しかし、実行したいコードが他のファイルからインポートされたときには、この動作は望ましくありません。
例えば、以下のようなスクリプトがあるとします。
# my_module.py
print("Hello from my_module!")
そして、これを別のスクリプトからインポートしようとします。
# main_script.py import my_module
この場合、main_script.py を実行すると、my_module.py の中の print 文も実行されてしまいます。しかし、my_module.py をモジュールとして使うとき、通常はモジュールの中の関数やクラスだけを使いたいと思うでしょう。
if __name__ == "__main__": ブロックは、スクリプトが直接実行された場合のみコードが実行されるようにするための仕組みです。例を用いて説明します。
# my_module.py
def greet():
print("Hello from my_module!")
if __name__ == "__main__":
greet()
この場合、my_module.py を直接実行すると、greet() 関数が呼び出されます。しかし、他のスクリプトから my_module をインポートした場合には greet() は実行されません。なぜなら、__name__ はスクリプトが直接実行された場合には "__main__" となり、それ以外の場合にはモジュール名になるからです。
この仕組みにより、モジュールを作成するときに不要なコードの実行を避けることができ、モジュールとして期待通りに使えるようになります。
コマンドライン引数のパース
parser = ArgumentParser(description="pybotters x asyncio x magito MM")
parser.add_argument("--api_key_json", help="apiキーが入ったJSONファイル", required=True)
parser.add_argument("--symbol", default="FX_BTC_JPY", help="取引通過")
parser.add_argument("--lot", default=0.01, type=float, help="注文サイズ")
parser.add_argument(
"--t",
default=0.01,
type=float,
help="板上での累積注文量に対する閾値(この閾値を超えた時点での注文価格が参照価格となる)",
)
parser.add_argument("--d", default=1, type=int, help="参照価格と指値のマージン(指値=参照価格±d)")
parser.add_argument(
"--s_entry",
default=0.0003,
type=float,
help="エントリー用のスプレッド閾値(スプレッドがこの閾値以上の時にマーケットメイキングを開始する)",
)
parser.add_argument(
"--s_update",
default=0.0001,
type=float,
help="指値更新用のスプレッド閾値(スプレッドがこの閾値以上の時に指値を更新する)",
)
parser.add_argument("--interval", default=5, type=int, help="マーケットメイキングサイクルの間隔")
args = parser.parse_args()
- コマンドライン引数をパースするための
ArgumentParserクラスを使用しています。 --api_key_jsonから--intervalまでの各引数が定義されています。- 引数のデフォルト値や型、ヘルプメッセージが指定されています。
メイン関数の呼び出し
try:
asyncio.run(main(args))
except KeyboardInterrupt as e:
pass
- コマンドライン引数を元にして定義された
main関数を呼び出します。 asyncio.run関数を使用して非同期のメイン関数mainを同期的に呼び出します。- キーボード割り込み (
KeyboardInterrupt) が発生した場合、プログラムの実行を終了します。
例外処理
except KeyboardInterrupt as e:
pass
- キーボード割り込みが発生した場合の例外処理です。
- プログラムの実行を終了します。
まとめ
長大なコードも部分ごとに見ていけば、各部分の機能やそれぞれのつながりが見えてきます。
分かりにくい部分は実際に手を動かして、コードを書いてみることが機能を理解する近道です。
引き続き、Botの作成と実践を積んでいきます。
今後は、イナゴbotの開発も進めていきます。