前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
今回は「asyncioによる高速化」のコードを書いてみました。
async/await練習中。板情報を元にリアルタイムのスプレッドと出来高を算出。もっと高速で回せるように改良して、速度で勝てるようにする。 pic.twitter.com/K8UUjB5Jex
— よだか(夜鷹/yodaka) (@yodakablog) April 30, 2024
 
		高頻度bot開発のトピックとして、記録しておきます。
実際のコード
 
		以下のコードは、bitFlyerFXのBTC_JPYの板情報を獲得してスプレッドと出来高を出力する処理に非同期処理を組み込んだものです。
import aiohttp
import asyncio
async def fetch_order_book(product_code='FX_BTC_JPY'):
    url = 'https://api.bitflyer.com/v1/getboard'
    params = {'product_code': product_code}
    async with aiohttp.ClientSession() as session:
        async with session.get(url, params=params) as response:
            if response.status == 200:
                return await response.json()
            else:
                raise Exception(f"API request failed with status {response.status}")
def calculate_spread(order_book):
    best_ask = order_book['asks'][0]['price']
    best_bid = order_book['bids'][0]['price']
    return best_ask - best_bid
def calculate_total_volume(order_book):
    total_ask_volume = sum(ask['size'] for ask in order_book['asks'])
    total_bid_volume = sum(bid['size'] for bid in order_book['bids'])
    return total_ask_volume, total_bid_volume
async def main():
    try:
        while True:  # 無限ループで繰り返し処理を行う
            # 板情報を非同期で取得
            order_book = await fetch_order_book()
            # スプレッドを計算
            spread = calculate_spread(order_book)
            print(f"Current spread: {spread} JPY")
            # 合計出来高を計算
            total_ask_volume, total_bid_volume = calculate_total_volume(order_book)
            print(f"Total ask volume: {total_ask_volume} BTC")
            print(f"Total bid volume: {total_bid_volume} BTC")
            await asyncio.sleep(1)  # 1秒待機
    except Exception as e:
        print(f"Error occurred: {e}")
# この部分を変更
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    if not loop.is_running():
        loop.run_until_complete(main())
    else:
        asyncio.create_task(main())
 
		Spyderエディタで実行した結果は、以下の通り。

取り敢えず問題なく動いている。コードを長くしていった時にもこれぐらい早く動いてくれれば良いのですが,,, pic.twitter.com/Tpff4phyCM
— よだか(夜鷹/yodaka) (@yodakablog) April 30, 2024
イベントループ対策
 
		Spyderなどのエディタでは、内部でイベントループが起動されているため、asyncio.run(main())だけでは、コードを実行できないことがあります。
そこで、既存のイベントループを取得し、そのイベントループを使用して main() をスケジュールすることで、コードを実行することができるようになります。
# 既存のイベントループを取得
loop = asyncio.get_event_loop()
# main() がまだスケジュールされていない場合は、ここでスケジュールする
if not loop.is_running():
    loop.run_until_complete(main())
else:
    # 既に実行中のイベントループがある場合は、そのループにタスクをスケジュールする
    asyncio.ensure_future(main())
一部の開発環境でイベントループが実行されている場合は、こんな感じで対策できるのか。 pic.twitter.com/EbiCM6ZALH
— よだか(夜鷹/yodaka) (@yodakablog) May 6, 2024
おまけ:約定履歴の取得と記録を行うコード
 
		約定履歴を取得して、それを記録するコードも載せておきます。こちらは、エラーが出ると止まるので改良が必要です。
import requests
import time
import json
def fetch_execution_history(product_code='FX_BTC_JPY', count=500, before=0):
    url = 'https://api.bitflyer.com/v1/getexecutions'
    params = {
        'product_code': product_code,
        'count': count,
        'before': before
    }
    response = requests.get(url, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"API request failed with status code {response.status_code}")
def save_executions_to_file(data, filename='executions.json'):
    try:
        with open(filename, 'r+') as file:
            # ファイルから既存のデータを読み込む
            try:
                existing_data = json.load(file)
            except json.JSONDecodeError:
                existing_data = []
            # 新しいデータを追加(重複チェック)
            new_data = [exec for exec in data if exec not in existing_data]
            existing_data.extend(new_data)
            # ファイルに書き戻す
            file.seek(0)
            json.dump(existing_data, file, indent=4)
    except FileNotFoundError:
        # ファイルが存在しない場合は新規作成
        with open(filename, 'w') as file:
            json.dump(data, file, indent=4)
def main():
    try:
        # 最初のデータ取得
        data = fetch_execution_history()
        save_executions_to_file(data)
        last_id = data[-1]['id']  # 取得したデータの中で最も古いIDを記録
        # 過去のデータを更に取得する例(必要に応じて繰り返し処理を行う)
        while True:
            more_data = fetch_execution_history(before=last_id)
            if not more_data or more_data[-1]['id'] == last_id:
                break  # 新たに取得したデータがないか前回の最後と同じなら終了
            save_executions_to_file(more_data)
            last_id = more_data[-1]['id']
            time.sleep(1)  # APIのレート制限に注意
    except Exception as e:
        print(f"Error occurred: {e}")
if __name__ == '__main__':
    main()
まとめ
今回は「リアルタイムの板情報」に注目して、それを加工するコードを書いてみました。
高頻度Botのロジックとして、如何に早くデータ処理を済ませるかやどれだけ無駄な挙動を減らすことができるかが重要になってくるので、基本的なことを抑えつつ必要な技術を身につけていきます。
 
		今後もこの調子で開発を進めていきます。
宿題
- イベントループについて学ぶ
- コルーチンについて学ぶ
- 個別のask/bid/sizeを獲得して、それらに対して様々な加工や処理をするコードを書く
 →高頻度Bot開発のロジックを参考にする
- そのコードをasyncioを使って書き換える
- 
																																										  
- 
																	仮想通貨botの開発記録#73(2024/4/29)「高頻度取引のロジック案まとめ【基本30種】&【応用15種】」続きを見る 
- 
																																										  
- 
																	仮想通貨botの開発記録#71(2024/4/19)「書籍:アルゴリズム取引まとめ」【高頻度bot開発のTips】続きを見る 
- 
																																										  
- 
																	仮想通貨botの開発記録#70(2024/4/14)「高頻度Bot開発と機械学習に役立つ書籍」続きを見る 
