Bot プログラミングスキル

仮想通貨botの開発記録#76(2024/5/19)「約定履歴の蓄積(サンプルコードあり)」

2024年5月19日

前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。

今回は、「約定履歴を記録・蓄積するコード」を書きました。

約定履歴のデータはロジックのバックテストに使ったり、データから市場の動向を分析したりするために役立ちます。

約定履歴は「取引所における事実」なので、データ分析の土台としても非常に有用性が高いです。

約定履歴があれば、それを使って出来高やローソク足の生成なども可能です。

自前のデータを蓄積することで独自のロジックの検証検証に活かすことができるので、以下のコードを参考にしてみて下さい。

サンプルコード

Yodaka

以下のコードは、bitFlyerFXのBCT/JPYの約定履歴を蓄積するためのコードです。

import requests
import time
import json

def fetch_execution_history(product_code='FX_BTC_JPY', count=500, before=0):
    url = 'https://api.bitflyer.com/v1/getexecutions'
    params = {
        'product_code': product_code,
        'count': count,
        'before': before
    }
    response = requests.get(url, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"API request failed with status code {response.status_code} and response {response.text}")

def save_executions_to_file(data, filename='executions.json'):
    try:
        with open(filename, 'r+') as file:
            # ファイルから既存のデータを読み込む
            try:
                existing_data = json.load(file)
            except json.JSONDecodeError as json_err:
                existing_data = []
                print(f"JSON decode error: {json_err}")

            # 新しいデータを追加(重複チェック)
            new_data = [exec for exec in data if exec not in existing_data]
            if new_data:
                existing_data.extend(new_data)
                # ファイルに書き戻す
                file.seek(0)
                json.dump(existing_data, file, indent=4)
                file.truncate()
                print(f"Successfully wrote to {filename} at {time.strftime('%Y-%m-%d %H:%M:%S')}")
            else:
                print(f"No new data to write at {time.strftime('%Y-%m-%d %H:%M:%S')}")
    except FileNotFoundError as fnf_err:
        # ファイルが存在しない場合は新規作成
        print(f"File not found: {fnf_err}. Creating a new file.")
        with open(filename, 'w') as file:
            json.dump(data, file, indent=4)
            print(f"Successfully wrote to {filename} at {time.strftime('%Y-%m-%d %H:%M:%S')}")
    except IOError as io_err:
        print(f"IO error: {io_err}")
    except Exception as e:
        print(f"An unexpected error occurred while saving data to file: {e}")

def main():
    try:
        # 最初のデータ取得
        data = fetch_execution_history()
        save_executions_to_file(data)

        last_id = data[-1]['id']  # 取得したデータの中で最も古いIDを記録

        # 過去のデータを更に取得する例(必要に応じて繰り返し処理を行う)
        while True:
            more_data = fetch_execution_history(before=last_id)
            if not more_data or more_data[-1]['id'] == last_id:
                break  # 新たに取得したデータがないか前回の最後と同じなら終了
            save_executions_to_file(more_data)
            last_id = more_data[-1]['id']
            time.sleep(1)  # APIのレート制限に注意

    except requests.RequestException as req_err:
        print(f"Request error: {req_err}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

if __name__ == '__main__':
    main()

実行結果は以下の通り。

まとめ

今回は「bitFlyerFXのBCT/JPYの約定履歴を記録・蓄積する」コードを書きました。

他の取引所のデータ取得にも応用できるコードなので、参考にしてみて下さい。

Yodaka

この調子で開発のペースを上げていきます。

-Bot, プログラミングスキル