前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
今回は「ニューラルネットワークの構築と学習」についてまとめました。
実際に取引所のデータを使って、機械学習モデルのトレーニングを行います。
解決したかったこと
・機械学習モジュールの使用に慣れる
・精度の高いモデルを作る
・モデルの精度を上げるために必要な要素を探る
価格予測を行うプログラム
このコードは、仮想通貨の価格データを取得し、ニューラルネットワークを用いて価格予測を行うプログラムです。以下に各部分の詳細な解説を示します。
概要
このコードは、Bybit取引所から取得したビットコインの過去データを使い、ニューラルネットワークで価格予測を行うプログラムです。エラーハンドリングとロギング機能を備えており、データ取得や予測を自動化するためのリトライ機能も組み込まれています。
import ccxt import pandas as pd from datetime import datetime, timedelta from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Dense import time import logging # ロギングの設定 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # エラーハンドリングとリトライ機能 def retry_on_exception(max_retries=3, delay=5): def decorator(func): def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: logger.error(f"Error occurred: {str(e)}. Attempt {attempt + 1} of {max_retries}") if attempt < max_retries - 1: logger.info(f"Retrying in {delay} seconds...") time.sleep(delay) else: logger.error("Max retries reached. Giving up.") raise return wrapper return decorator # Bybitのインスタンス(APIキーなし) exchange = ccxt.bybit() # データ取得モジュール @retry_on_exception(max_retries=3, delay=5) def fetch_data(): now = datetime.now() since = now - timedelta(days=30) ohlcv = exchange.fetch_ohlcv('BTC/USDT', timeframe='1h', since=int(since.timestamp() * 1000)) df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') return optimize_memory_usage(df) # メモリ使用量の最適化 def optimize_memory_usage(df): for col in df.columns: if df[col].dtype == 'float64': df[col] = df[col].astype('float32') return df # モデル構築モジュール def build_model(input_shape): model = Sequential([ Dense(64, activation='relu', input_shape=input_shape), Dense(32, activation='relu'), Dense(1) # 価格予測 ]) model.compile(optimizer='adam', loss='mean_squared_error') return model # メインロジック def main(): try: df = fetch_data() # 特徴量とラベル X = df[['open', 'high', 'low', 'close', 'volume']].values y = df['close'].shift(-1).fillna(df['close']).values # 訓練・テストデータ分割 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) # データの標準化 scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # モデルの構築 model = build_model((X_train_scaled.shape[1],)) model.fit(X_train_scaled, y_train, epochs=10, batch_size=32, validation_data=(X_test_scaled, y_test)) # 最新のデータを取得して予測 df = fetch_data() X_live = scaler.transform(df[['open', 'high', 'low', 'close', 'volume']].values[-1].reshape(1, -1)) predicted_price = model.predict(X_live)[0][0] print(f"Predicted Price: {predicted_price}") except Exception as e: logger.error(f"メインロジックでエラーが発生しました: {str(e)}") if __name__ == "__main__": main()
ライブラリのインポート
import ccxt import pandas as pd from datetime import datetime, timedelta from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Dense import time import logging
ccxt
: 仮想通貨取引所のAPIを扱うためのライブラリ。ここではBybitのデータ取得に使用。pandas
: データ処理に使用するライブラリ。データをDataFrame
形式で扱います。datetime
: 日付と時間を操作するために使用します。scikit-learn
のtrain_test_split
とStandardScaler
: データの分割と標準化に使用します。tensorflow
とkeras
: 機械学習ライブラリで、ニューラルネットワークの構築と学習に使用します。time
: 再試行の遅延処理に使用。logging
: 実行のログを出力するためのライブラリ。エラーや情報を記録します。
ロギングの設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__)
ログのフォーマットと出力レベルを設定しています。エラーや情報メッセージが実行時に記録されます。
エラーハンドリングとリトライ機能
def retry_on_exception(max_retries=3, delay=5): def decorator(func): def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: logger.error(f"Error occurred: {str(e)}. Attempt {attempt + 1} of {max_retries}") if attempt < max_retries - 1: logger.info(f"Retrying in {delay} seconds...") time.sleep(delay) else: logger.error("Max retries reached. Giving up.") raise return wrapper return decorator
リトライ機能を提供するデコレーターです。指定された回数(デフォルトでは3回)まで再試行し、失敗した場合には遅延(デフォルトでは5秒)を挟んで再実行します。最終的にすべての試行が失敗した場合、エラーが記録されます。
Bybit取引所のインスタンス
exchange = ccxt.bybit()
Bybit取引所のAPIインスタンスを作成しています。このインスタンスを使用して、仮想通貨のデータを取得します。
データ取得モジュール
@retry_on_exception(max_retries=3, delay=5) def fetch_data(): now = datetime.now() since = now - timedelta(days=30) ohlcv = exchange.fetch_ohlcv('BTC/USDT', timeframe='1h', since=int(since.timestamp() * 1000)) df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') return optimize_memory_usage(df)
Bybit取引所からビットコイン/USDTの1時間ごとのOHLCVデータ(Open, High, Low, Close, Volume)を過去30日分取得し、pandas.DataFrame
に変換します。エラーが発生した場合は、リトライ機能が働きます。
メモリ使用量の最適化
def optimize_memory_usage(df): for col in df.columns: if df[col].dtype == 'float64': df[col] = df[col].astype('float32') return df
メモリ効率を向上させるため、データフレーム内のfloat64
型の列をfloat32
に変換します。
モデル構築モジュール
def build_model(input_shape): model = Sequential([ Dense(64, activation='relu', input_shape=input_shape), Dense(32, activation='relu'), Dense(1) # 価格予測 ]) model.compile(optimizer='adam', loss='mean_squared_error') return model
TensorFlow/Kerasを使って、シンプルなニューラルネットワークを構築します。入力層に64ユニット、隠れ層に32ユニット、出力層に1ユニットを持ちます。目的は価格予測で、mean_squared_error
を損失関数として使用します。
メインロジック
def main(): try: df = fetch_data() # 特徴量とラベル X = df[['open', 'high', 'low', 'close', 'volume']].values y = df['close'].shift(-1).fillna(df['close']).values # 訓練・テストデータ分割 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) # データの標準化 scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # モデルの構築 model = build_model((X_train_scaled.shape[1],)) model.fit(X_train_scaled, y_train, epochs=10, batch_size=32, validation_data=(X_test_scaled, y_test)) # 最新のデータを取得して予測 df = fetch_data() X_live = scaler.transform(df[['open', 'high', 'low', 'close', 'volume']].values[-1].reshape(1, -1)) predicted_price = model.predict(X_live)[0][0] print(f"Predicted Price: {predicted_price}") except Exception as e: logger.error(f"メインロジックでエラーが発生しました: {str(e)}")
データ取得、前処理、モデルの学習、予測の一連の流れを処理します。
- データの取得: 過去30日分のビットコインデータを取得。
- 特徴量とラベルの作成: 予測モデルの入力(特徴量)は
open
,high
,low
,close
,volume
の各値で、ラベル(予測対象)はclose
(次の時間の価格)です。 - データの分割と標準化: データを訓練データとテストデータに分割し、
StandardScaler
を使ってデータを標準化。 - モデルの訓練: ニューラルネットワークを10エポックで訓練。
- 価格予測: 訓練済みモデルを使用して最新のデータから価格を予測し、結果を表示。
実行部分
if __name__ == "__main__": main()
main
関数を実行します。プログラムを実行すると、自動的にデータを取得し、モデルを訓練し、予測を行います。
改良型のプログラム
Cursorを使ってより高速に稼働するように修正しました。
import ccxt import pandas as pd from datetime import datetime, timedelta from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Dense, Input import time import logging # ロギングの設定 logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # エラーハンドリングとリトライ機能 def retry_on_exception(max_retries=3, delay=5): def decorator(func): def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: logger.error(f"Error occurred: {str(e)}. Attempt {attempt + 1} of {max_retries}") if attempt < max_retries - 1: logger.info(f"Retrying in {delay} seconds...") time.sleep(delay) else: logger.error("Max retries reached. Giving up.") raise return wrapper return decorator # Bybitのインスタンス(APIキーなし) exchange = ccxt.bybit() # データ取得モジュール @retry_on_exception(max_retries=3, delay=5) def fetch_data(): now = datetime.now() since = now - timedelta(days=30) ohlcv = exchange.fetch_ohlcv('BTC/USDT', timeframe='1h', since=int(since.timestamp() * 1000)) df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') return optimize_memory_usage(df) # メモリ使用量の最適化 def optimize_memory_usage(df): for col in df.columns: if df[col].dtype == 'float64': df[col] = df[col].astype('float32') return df # モデル構築モジュール def build_model(input_shape): model = Sequential([ Input(shape=input_shape), Dense(32, activation='relu'), # ノード数を減らして軽量化 Dense(16, activation='relu'), Dense(1) # 価格予測 ]) model.compile(optimizer='adam', loss='mean_squared_error') return model # メインロジック def main(): try: df = fetch_data() # 特徴量とラベル X = df[['open', 'high', 'low', 'close', 'volume']].values y = df['close'].shift(-1).fillna(df['close']).values # 訓練・テストデータ分割 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) # データの標準化 scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # モデルの構築 model = build_model((X_train_scaled.shape[1],)) model.fit(X_train_scaled, y_train, epochs=5, batch_size=64, validation_data=(X_test_scaled, y_test)) # エポック数とバッチサイズを調整 # 最新のデータを取得して予測 df = fetch_data() X_live = scaler.transform(df[['open', 'high', 'low', 'close', 'volume']].values[-1].reshape(1, -1)) predicted_price = model.predict(X_live)[0][0] print(f"Predicted Price: {predicted_price}") except Exception as e: logger.error(f"メインロジックでエラーが発生しました: {str(e)}") if __name__ == "__main__": main()
改良のポイント
1. データ取得の効率化:データ取得の際に必要なデータだけを取得するようにし、不要なデータの取得を避けます。
2.モデルの訓練の最適化:エポック数を減らすか、バッチサイズを調整して訓練時間を短縮します。モデルの複雑さを減らす(例:レイヤー数やノード数を減らす)ことで、訓練時間を短縮します。
3. データの前処理の効率化:データの標準化やスケーリングを効率的に行うために、必要な部分だけを処理します。
4.GPUの活用:TensorFlowがGPUを使用できる環境であれば、GPUを活用することで訓練を高速化できます。
5. ログレベルの調整:ログレベルをINFOからWARNINGに変更することで、ログ出力を減らし、パフォーマンスを向上させます。
変更点
- モデルの軽量化: レイヤーのノード数を減らして、モデルの訓練を高速化。
- エポック数とバッチサイズの調整: エポック数を減らし、バッチサイズを増やして訓練を高速化。
- ログレベルの調整: ログレベルをWARNINGに設定して、ログ出力を減らし、パフォーマンスを向上。
まとめ
今回は「ニューラルネットワークの構築と学習」についてまとめました。
モデルの高速化についても様々なアプローチがあることが分かりました。
今後もこの調子で開発の経過を発信していきます。