前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
バックテストのコードを修正する
以下のコードは、ビットフライヤーFXでのドンチャンブレイクアウトの検証用コードです。
ChatGPTに指示を出して、コードの修正をしました。
出した指示は「ビットフライヤーのAPI」である「https://api.bitflyer.com/v1/getexecutions?product_code=FX_BTC_JPY&count=500を指定すること」と「文字列形式の日時をパースしてUNIXタイムスタンプに変換すること」です。
import requests
from datetime import datetime
import time
import numpy as np
#-----設定項目
chart_sec = 3600 # 1時間足を使用
term = 30 # 過去n日の設定
wait = 0 # ループの待機時間
lot = 1 # BTCの注文枚数
slippage = 0.001 # 手数料・スリッページ
api_endpoint = "https://api.bitflyer.com/v1/getexecutions?product_code=FX_BTC_JPY&count=500"
# 新しいAPIを使用する関数
def get_price(chart_sec, before=0, after=0):
params = {
"product_code": "FX_BTC_JPY",
"count": 500
}
response = requests.get(api_endpoint, params=params)
data = response.json()
price = []
for item in data:
# 文字列形式の日時をパースしてUNIXタイムスタンプに変換
timestamp = datetime.strptime(item["exec_date"], "%Y-%m-%dT%H:%M:%S.%f").timestamp()
price.append({
"close_time": int(timestamp), # UNIXタイムスタンプを整数に変換
"close_time_dt": datetime.fromtimestamp(timestamp).strftime('%Y/%m/%d %H:%M'),
"open_price": float(item["price"]),
"high_price": float(item["price"]),
"low_price": float(item["price"]),
"close_price": float(item["price"])
})
return price
# 時間と高値・安値をログに記録する関数
def log_price( data,flag ):
log = "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + "\n"
flag["records"]["log"].append(log)
return flag
# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
highest = max(i["high_price"] for i in last_data)
if data["high_price"] > highest:
return {"side":"BUY","price":highest}
lowest = min(i["low_price"] for i in last_data)
if data["low_price"] < lowest:
return {"side":"SELL","price":lowest}
return {"side" : None , "price":0}
# ドンチャンブレイクを判定してエントリー注文を出す関数
def entry_signal( data,last_data,flag ):
signal = donchian( data,last_data )
if signal["side"] == "BUY":
flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
flag["records"]["log"].append(str(data["close_price"]) + "円で買いの指値注文を出します\n")
# ここに買い注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
flag["order"]["price"] = round(data["close_price"] * lot)
if signal["side"] == "SELL":
flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
flag["records"]["log"].append(str(data["close_price"]) + "円で売りの指値注文を出します\n")
# ここに売り注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "SELL"
flag["order"]["price"] = round(data["close_price"] * lot)
return flag
# サーバーに出した注文が約定したか確認する関数
def check_order( flag ):
# 注文状況を確認して通っていたら以下を実行
# 一定時間で注文が通っていなければキャンセルする
flag["order"]["exist"] = False
flag["order"]["count"] = 0
flag["position"]["exist"] = True
flag["position"]["side"] = flag["order"]["side"]
flag["position"]["price"] = flag["order"]["price"]
return flag
# 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数
def close_position( data,last_data,flag ):
flag["position"]["count"] += 1
signal = donchian( data,last_data )
if flag["position"]["side"] == "BUY":
if signal["side"] == "SELL":
flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
# 決済の成行注文コードを入れる
records( flag,data )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします\n")
# ここに売り注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "SELL"
flag["order"]["price"] = round(data["close_price"] * lot)
if flag["position"]["side"] == "SELL":
if signal["side"] == "BUY":
flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
# 決済の成行注文コードを入れる
records( flag,data )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で買いの指値注文を入れてドテンします\n")
# ここに買い注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
flag["order"]["price"] = round(data["close_price"] * lot)
return flag
# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
# 取引手数料等の計算
entry_price = flag["position"]["price"]
exit_price = round(data["close_price"] * lot)
trade_cost = round( exit_price * slippage )
log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
flag["records"]["log"].append(log)
flag["records"]["slippage"].append(trade_cost)
# 値幅の計算
buy_profit = exit_price - entry_price - trade_cost
sell_profit = entry_price - exit_price - trade_cost
# 利益が出てるかの計算
if flag["position"]["side"] == "BUY":
flag["records"]["buy-count"] += 1
flag["records"]["buy-profit"].append( buy_profit )
flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
if buy_profit > 0:
flag["records"]["buy-winning"] += 1
log = str(buy_profit) + "円の利益です\n"
flag["records"]["log"].append(log)
else:
log = str(buy_profit) + "円の損失です\n"
flag["records"]["log"].append(log)
if flag["position"]["side"] == "SELL":
flag["records"]["sell-count"] += 1
flag["records"]["sell-profit"].append( sell_profit )
flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )
if sell_profit > 0:
flag["records"]["sell-winning"] += 1
log = str(sell_profit) + "円の利益です\n"
flag["records"]["log"].append(log)
else:
log = str(sell_profit) + "円の損失です\n"
flag["records"]["log"].append(log)
return flag
# バックテストの集計用の関数
def backtest(flag):
buy_gross_profit = np.sum(flag["records"]["buy-profit"])
sell_gross_profit = np.sum(flag["records"]["sell-profit"])
print("バックテストの結果")
print("--------------------------")
print("買いエントリの成績")
print("--------------------------")
print("トレード回数 : {}回".format(flag["records"]["buy-count"] ))
print("勝率 : {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
print("平均リターン : {}%".format(round(np.average(flag["records"]["buy-return"]),2)))
print("総損益 : {}円".format( np.sum(flag["records"]["buy-profit"]) ))
print("平均保有期間 : {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
print("--------------------------")
print("売りエントリの成績")
print("--------------------------")
print("トレード回数 : {}回".format(flag["records"]["sell-count"] ))
print("勝率 : {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
print("平均リターン : {}%".format(round(np.average(flag["records"]["sell-return"]),2)))
print("総損益 : {}円".format( np.sum(flag["records"]["sell-profit"]) ))
print("平均保有期間 : {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))
print("--------------------------")
print("総合の成績")
print("--------------------------")
print("総損益 : {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
print("手数料合計 : {}円".format( np.sum(flag["records"]["slippage"]) ))
# ログファイルの出力
file = open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
file.writelines(flag["records"]["log"])
# ここからメイン処理
# 価格チャートを取得
price = get_price(chart_sec, after=0)
flag = {
"order":{
"exist" : False,
"side" : "",
"price" : 0,
"count" : 0
},
"position":{
"exist" : False,
"side" : "",
"price": 0,
"count":0
},
"records":{
"buy-count": 0,
"buy-winning" : 0,
"buy-return":[],
"buy-profit": [],
"buy-holding-periods":[],
"sell-count": 0,
"sell-winning" : 0,
"sell-return":[],
"sell-profit":[],
"sell-holding-periods":[],
"slippage":[],
"log":[]
}
}
last_data = []
i = 0
while i < len(price):
# ドンチャンの判定に使う過去30日分の安値・高値データを準備する
if len(last_data) < term:
last_data.append(price[i])
flag = log_price(price[i],flag)
time.sleep(wait)
i += 1
continue
data = price[i]
flag = log_price(data,flag)
if flag["order"]["exist"]:
flag = check_order( flag )
elif flag["position"]["exist"]:
flag = close_position( data,last_data,flag )
else:
flag = entry_signal( data,last_data,flag )
# 過去データを30個に保つために先頭を削除
del last_data[0]
last_data.append( data )
i += 1
time.sleep(wait)
print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")
backtest(flag)
上記のコードを実行すると、以下の結果が得られました。

このパラメータ設定では、まだ利益をだすことはできません。
しかし、バックテスト用のコードとして稼働することが分かったので、この時点ではOKです。
細々とした部分はまだ修正が必要ですが、ChatGPTでのコード修正は有効です。
取引結果の視覚化
下記のコードを用いて、バックテストの結果を視覚化できるようにしました。
import requests
from datetime import datetime
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
#-----設定項目
chart_sec = 60 # 1時間足を使用
term = 1 # 過去n日の設定
wait = 0 # ループの待機時間
lot = 0.01 # BTCの注文枚数
slippage = 0.001 # 手数料・スリッページ
api_endpoint = "https://api.bitflyer.com/v1/getexecutions?product_code=FX_BTC_JPY&count=500"
# 新しいAPIを使用する関数
def get_price(chart_sec, before=0, after=0):
params = {
"product_code": "FX_BTC_JPY",
"count": 500
}
response = requests.get(api_endpoint, params=params)
data = response.json()
price = []
for item in data:
# 文字列形式の日時をパースしてUNIXタイムスタンプに変換
timestamp = datetime.strptime(item["exec_date"], "%Y-%m-%dT%H:%M:%S.%f").timestamp()
price.append({
"close_time": int(timestamp), # UNIXタイムスタンプを整数に変換
"close_time_dt": datetime.fromtimestamp(timestamp).strftime('%Y/%m/%d %H:%M'),
"open_price": float(item["price"]),
"high_price": float(item["price"]),
"low_price": float(item["price"]),
"close_price": float(item["price"])
})
return price
# 時間と高値・安値をログに記録する関数
def log_price( data,flag ):
log = "時間: " + datetime.fromtimestamp(data["close_time"]).strftime('%Y/%m/%d %H:%M') + " 高値: " + str(data["high_price"]) + " 安値: " + str(data["low_price"]) + "\n"
flag["records"]["log"].append(log)
return flag
# ドンチャンブレイクを判定する関数
def donchian( data,last_data ):
highest = max(i["high_price"] for i in last_data)
if data["high_price"] > highest:
return {"side":"BUY","price":highest}
lowest = min(i["low_price"] for i in last_data)
if data["low_price"] < lowest:
return {"side":"SELL","price":lowest}
return {"side" : None , "price":0}
# ドンチャンブレイクを判定してエントリー注文を出す関数
def entry_signal( data,last_data,flag ):
signal = donchian( data,last_data )
if signal["side"] == "BUY":
flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
flag["records"]["log"].append(str(data["close_price"]) + "円で買いの指値注文を出します\n")
# ここに買い注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
flag["order"]["price"] = round(data["close_price"] * lot)
if signal["side"] == "SELL":
flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
flag["records"]["log"].append(str(data["close_price"]) + "円で売りの指値注文を出します\n")
# ここに売り注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "SELL"
flag["order"]["price"] = round(data["close_price"] * lot)
return flag
# サーバーに出した注文が約定したか確認する関数
def check_order( flag ):
# 注文状況を確認して通っていたら以下を実行
# 一定時間で注文が通っていなければキャンセルする
flag["order"]["exist"] = False
flag["order"]["count"] = 0
flag["position"]["exist"] = True
flag["position"]["side"] = flag["order"]["side"]
flag["position"]["price"] = flag["order"]["price"]
return flag
# 手仕舞いのシグナルが出たら決済の成行注文 + ドテン注文 を出す関数
def close_position( data,last_data,flag ):
flag["position"]["count"] += 1
signal = donchian( data,last_data )
if flag["position"]["side"] == "BUY":
if signal["side"] == "SELL":
flag["records"]["log"].append("過去{0}足の最安値{1}円を、直近の安値が{2}円でブレイクしました\n".format(term,signal["price"],data["low_price"]))
flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
# 決済の成行注文コードを入れる
records( flag,data )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で売りの指値注文を入れてドテンします\n")
# ここに売り注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "SELL"
flag["order"]["price"] = round(data["close_price"] * lot)
if flag["position"]["side"] == "SELL":
if signal["side"] == "BUY":
flag["records"]["log"].append("過去{0}足の最高値{1}円を、直近の高値が{2}円でブレイクしました\n".format(term,signal["price"],data["high_price"]))
flag["records"]["log"].append(str(data["close_price"]) + "円あたりで成行注文を出してポジションを決済します\n")
# 決済の成行注文コードを入れる
records( flag,data )
flag["position"]["exist"] = False
flag["position"]["count"] = 0
flag["records"]["log"].append("さらに" + str(data["close_price"]) + "円で買いの指値注文を入れてドテンします\n")
# ここに買い注文のコードを入れる
flag["order"]["exist"] = True
flag["order"]["side"] = "BUY"
flag["order"]["price"] = round(data["close_price"] * lot)
return flag
# 各トレードのパフォーマンスを記録する関数
def records(flag,data):
# 取引手数料等の計算
entry_price = flag["position"]["price"]
exit_price = round(data["close_price"] * lot)
trade_cost = round( exit_price * slippage )
log = "スリッページ・手数料として " + str(trade_cost) + "円を考慮します\n"
flag["records"]["log"].append(log)
flag["records"]["slippage"].append(trade_cost)
# 値幅の計算
buy_profit = exit_price - entry_price - trade_cost
sell_profit = entry_price - exit_price - trade_cost
# 利益が出てるかの計算
if flag["position"]["side"] == "BUY":
flag["records"]["buy-count"] += 1
flag["records"]["buy-profit"].append( buy_profit )
flag["records"]["buy-return"].append( round( buy_profit / entry_price * 100, 4 ))
flag["records"]["buy-holding-periods"].append( flag["position"]["count"] )
if buy_profit > 0:
flag["records"]["buy-winning"] += 1
log = str(buy_profit) + "円の利益です\n"
flag["records"]["log"].append(log)
else:
log = str(buy_profit) + "円の損失です\n"
flag["records"]["log"].append(log)
if flag["position"]["side"] == "SELL":
flag["records"]["sell-count"] += 1
flag["records"]["sell-profit"].append( sell_profit )
flag["records"]["sell-return"].append( round( sell_profit / entry_price * 100, 4 ))
flag["records"]["sell-holding-periods"].append( flag["position"]["count"] )
if sell_profit > 0:
flag["records"]["sell-winning"] += 1
log = str(sell_profit) + "円の利益です\n"
flag["records"]["log"].append(log)
else:
log = str(sell_profit) + "円の損失です\n"
flag["records"]["log"].append(log)
return flag
# バックテストの集計用の関数
def backtest(flag):
buy_gross_profit = np.sum(flag["records"]["buy-profit"])
sell_gross_profit = np.sum(flag["records"]["sell-profit"])
print("バックテストの結果")
print("--------------------------")
print("買いエントリの成績")
print("--------------------------")
print("トレード回数 : {}回".format(flag["records"]["buy-count"] ))
print("勝率 : {}%".format(round(flag["records"]["buy-winning"] / flag["records"]["buy-count"] * 100,1)))
print("平均リターン : {}%".format(round(np.average(flag["records"]["buy-return"]),2)))
print("総損益 : {}円".format( np.sum(flag["records"]["buy-profit"]) ))
print("平均保有期間 : {}足分".format( round(np.average(flag["records"]["buy-holding-periods"]),1) ))
print("--------------------------")
print("売りエントリの成績")
print("--------------------------")
print("トレード回数 : {}回".format(flag["records"]["sell-count"] ))
print("勝率 : {}%".format(round(flag["records"]["sell-winning"] / flag["records"]["sell-count"] * 100,1)))
print("平均リターン : {}%".format(round(np.average(flag["records"]["sell-return"]),2)))
print("総損益 : {}円".format( np.sum(flag["records"]["sell-profit"]) ))
print("平均保有期間 : {}足分".format( round(np.average(flag["records"]["sell-holding-periods"]),1) ))
print("--------------------------")
print("総合の成績")
print("--------------------------")
print("総損益 : {}円".format( np.sum(flag["records"]["sell-profit"]) + np.sum(flag["records"]["buy-profit"]) ))
print("手数料合計 : {}円".format( np.sum(flag["records"]["slippage"]) ))
# ログファイルの出力
file = open("./{0}-log.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M")),'wt',encoding='utf-8')
file.writelines(flag["records"]["log"])
# ここからメイン処理
# 価格チャートを取得
price = get_price(chart_sec, after=0)
flag = {
"order":{
"exist" : False,
"side" : "",
"price" : 0,
"count" : 0
},
"position":{
"exist" : False,
"side" : "",
"price": 0,
"count":0
},
"records":{
"buy-count": 0,
"buy-winning" : 0,
"buy-return":[],
"buy-profit": [],
"buy-holding-periods":[],
"sell-count": 0,
"sell-winning" : 0,
"sell-return":[],
"sell-profit":[],
"sell-holding-periods":[],
"slippage":[],
"log":[]
}
}
last_data = []
i = 0
while i < len(price):
# ドンチャンの判定に使う過去30日分の安値・高値データを準備する
if len(last_data) < term:
last_data.append(price[i])
flag = log_price(price[i],flag)
time.sleep(wait)
i += 1
continue
data = price[i]
flag = log_price(data,flag)
if flag["order"]["exist"]:
flag = check_order( flag )
elif flag["position"]["exist"]:
flag = close_position( data,last_data,flag )
else:
flag = entry_signal( data,last_data,flag )
# 過去データを30個に保つために先頭を削除
del last_data[0]
last_data.append( data )
i += 1
time.sleep(wait)
print("--------------------------")
print("テスト期間:")
print("開始時点 : " + str(price[0]["close_time_dt"]))
print("終了時点 : " + str(price[-1]["close_time_dt"]))
print(str(len(price)) + "件のローソク足データで検証")
print("--------------------------")
# バックテストの結果をPandas DataFrameに変換
result_data = {
"Date": [item["close_time_dt"] for item in price],
"Profit": [np.sum(flag["records"]["buy-profit"][:i+1]) + np.sum(flag["records"]["sell-profit"][:i+1]) for i in range(len(price))],
}
df = pd.DataFrame(result_data)
df["Date"] = pd.to_datetime(df["Date"])
backtest(flag)
# 折れ線グラフでバックテストの結果を視覚化
plt.figure(figsize=(12, 6))
plt.plot(df["Date"], df["Profit"], label="Profit", color="green")
plt.xlabel("Date")
plt.ylabel("Profit")
plt.title("Backtest Results")
plt.legend()
plt.grid(True)
plt.show()
コードの実行結果は以下の通り。

バックテストの結果をグラフ化することに成功しました。
こちらも修正をかけながら使いやすくしていきます。
まとめ
Pandasを使って、ビットフライヤーのAPIから取得したデータを用いたbotの運用結果をグラフに表示させることができるようになりました。
現在参照しているデータは、ビットフライヤーから取得できる現在のデータです。
今後は、現在貯めているヒストリカルデータに対応するようにコードを書き換えていきます。
そうすれば、もっと多くのデータを使用して戦略検証ができるようになるはずです。