Bot

仮想通貨botの開発記録#74(2024/5/4)「高頻度bot開発:高速化①」

2024年5月4日

前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。

今回は「asyncioによる高速化」のコードを書いてみました。

Yodaka

高頻度bot開発のトピックとして、記録しておきます。

実際のコード

Yodaka

以下のコードは、bitFlyerFXのBTC_JPYの板情報を獲得してスプレッドと出来高を出力する処理に非同期処理を組み込んだものです。

import aiohttp
import asyncio

async def fetch_order_book(product_code='FX_BTC_JPY'):
    url = 'https://api.bitflyer.com/v1/getboard'
    params = {'product_code': product_code}
    async with aiohttp.ClientSession() as session:
        async with session.get(url, params=params) as response:
            if response.status == 200:
                return await response.json()
            else:
                raise Exception(f"API request failed with status {response.status}")

def calculate_spread(order_book):
    best_ask = order_book['asks'][0]['price']
    best_bid = order_book['bids'][0]['price']
    return best_ask - best_bid

def calculate_total_volume(order_book):
    total_ask_volume = sum(ask['size'] for ask in order_book['asks'])
    total_bid_volume = sum(bid['size'] for bid in order_book['bids'])
    return total_ask_volume, total_bid_volume

async def main():
    try:
        while True:  # 無限ループで繰り返し処理を行う
            # 板情報を非同期で取得
            order_book = await fetch_order_book()

            # スプレッドを計算
            spread = calculate_spread(order_book)
            print(f"Current spread: {spread} JPY")

            # 合計出来高を計算
            total_ask_volume, total_bid_volume = calculate_total_volume(order_book)
            print(f"Total ask volume: {total_ask_volume} BTC")
            print(f"Total bid volume: {total_bid_volume} BTC")

            await asyncio.sleep(1)  # 1秒待機
    except Exception as e:
        print(f"Error occurred: {e}")

# この部分を変更
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    if not loop.is_running():
        loop.run_until_complete(main())
    else:
        asyncio.create_task(main())
Yodaka

Spyderエディタで実行した結果は、以下の通り。

イベントループ対策

Yodaka

Spyderなどのエディタでは、内部でイベントループが起動されているため、asyncio.run(main())だけでは、コードを実行できないことがあります。

そこで、既存のイベントループを取得し、そのイベントループを使用して main() をスケジュールすることで、コードを実行することができるようになります。

# 既存のイベントループを取得
loop = asyncio.get_event_loop()

# main() がまだスケジュールされていない場合は、ここでスケジュールする
if not loop.is_running():
    loop.run_until_complete(main())
else:
    # 既に実行中のイベントループがある場合は、そのループにタスクをスケジュールする
    asyncio.ensure_future(main())

おまけ:約定履歴の取得と記録を行うコード

Yodaka

約定履歴を取得して、それを記録するコードも載せておきます。こちらは、エラーが出ると止まるので改良が必要です。

import requests
import time
import json

def fetch_execution_history(product_code='FX_BTC_JPY', count=500, before=0):
    url = 'https://api.bitflyer.com/v1/getexecutions'
    params = {
        'product_code': product_code,
        'count': count,
        'before': before
    }
    response = requests.get(url, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"API request failed with status code {response.status_code}")

def save_executions_to_file(data, filename='executions.json'):
    try:
        with open(filename, 'r+') as file:
            # ファイルから既存のデータを読み込む
            try:
                existing_data = json.load(file)
            except json.JSONDecodeError:
                existing_data = []

            # 新しいデータを追加(重複チェック)
            new_data = [exec for exec in data if exec not in existing_data]
            existing_data.extend(new_data)

            # ファイルに書き戻す
            file.seek(0)
            json.dump(existing_data, file, indent=4)
    except FileNotFoundError:
        # ファイルが存在しない場合は新規作成
        with open(filename, 'w') as file:
            json.dump(data, file, indent=4)

def main():
    try:
        # 最初のデータ取得
        data = fetch_execution_history()
        save_executions_to_file(data)

        last_id = data[-1]['id']  # 取得したデータの中で最も古いIDを記録

        # 過去のデータを更に取得する例(必要に応じて繰り返し処理を行う)
        while True:
            more_data = fetch_execution_history(before=last_id)
            if not more_data or more_data[-1]['id'] == last_id:
                break  # 新たに取得したデータがないか前回の最後と同じなら終了
            save_executions_to_file(more_data)
            last_id = more_data[-1]['id']
            time.sleep(1)  # APIのレート制限に注意

    except Exception as e:
        print(f"Error occurred: {e}")

if __name__ == '__main__':
    main()

まとめ

今回は「リアルタイムの板情報」に注目して、それを加工するコードを書いてみました。

高頻度Botのロジックとして、如何に早くデータ処理を済ませるかどれだけ無駄な挙動を減らすことができるかが重要になってくるので、基本的なことを抑えつつ必要な技術を身につけていきます。

Yodaka

今後もこの調子で開発を進めていきます。

宿題

関連
仮想通貨botの開発記録#73(2024/4/29)「高頻度取引のロジック案まとめ」

続きを見る

関連
仮想通貨botの開発記録#71(2024/4/19)「書籍:アルゴリズム取引まとめ」【高頻度bot開発のTips】

続きを見る

関連
仮想通貨botの開発記録#70(2024/4/14)「高頻度Bot開発と機械学習に役立つ書籍」

続きを見る

-Bot