Bot 機械学習・データサイエンス

仮想通貨botの開発記録#110(2024/10/3)「MLbot開発③ニューラルネットワークの構築と学習」

2024年10月4日

前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。

今回は「ニューラルネットワークの構築と学習」についてまとめました。

Yodaka

実際に取引所のデータを使って、機械学習モデルのトレーニングを行います。

解決したかったこと

・機械学習モジュールの使用に慣れる

・精度の高いモデルを作る

・モデルの精度を上げるために必要な要素を探る

価格予測を行うプログラム

Yodaka

このコードは、仮想通貨の価格データを取得し、ニューラルネットワークを用いて価格予測を行うプログラムです。以下に各部分の詳細な解説を示します。

概要

このコードは、Bybit取引所から取得したビットコインの過去データを使い、ニューラルネットワークで価格予測を行うプログラムです。エラーハンドリングとロギング機能を備えており、データ取得や予測を自動化するためのリトライ機能も組み込まれています。

import ccxt
import pandas as pd
from datetime import datetime, timedelta
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
import time
import logging

# ロギングの設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# エラーハンドリングとリトライ機能
def retry_on_exception(max_retries=3, delay=5):
    def decorator(func):
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    logger.error(f"Error occurred: {str(e)}. Attempt {attempt + 1} of {max_retries}")
                    if attempt < max_retries - 1:
                        logger.info(f"Retrying in {delay} seconds...")
                        time.sleep(delay)
                    else:
                        logger.error("Max retries reached. Giving up.")
                        raise
        return wrapper
    return decorator

# Bybitのインスタンス(APIキーなし)
exchange = ccxt.bybit()

# データ取得モジュール
@retry_on_exception(max_retries=3, delay=5)
def fetch_data():
    now = datetime.now()
    since = now - timedelta(days=30)
    ohlcv = exchange.fetch_ohlcv('BTC/USDT', timeframe='1h', since=int(since.timestamp() * 1000))
    df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    return optimize_memory_usage(df)

# メモリ使用量の最適化
def optimize_memory_usage(df):
    for col in df.columns:
        if df[col].dtype == 'float64':
            df[col] = df[col].astype('float32')
    return df

# モデル構築モジュール
def build_model(input_shape):
    model = Sequential([
        Dense(64, activation='relu', input_shape=input_shape),
        Dense(32, activation='relu'),
        Dense(1)  # 価格予測
    ])
    model.compile(optimizer='adam', loss='mean_squared_error')
    return model

# メインロジック
def main():
    try:
        df = fetch_data()
        
        # 特徴量とラベル
        X = df[['open', 'high', 'low', 'close', 'volume']].values
        y = df['close'].shift(-1).fillna(df['close']).values
        
        # 訓練・テストデータ分割
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
        
        # データの標準化
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        # モデルの構築
        model = build_model((X_train_scaled.shape[1],))
        model.fit(X_train_scaled, y_train, epochs=10, batch_size=32, validation_data=(X_test_scaled, y_test))
        
        # 最新のデータを取得して予測
        df = fetch_data()
        X_live = scaler.transform(df[['open', 'high', 'low', 'close', 'volume']].values[-1].reshape(1, -1))
        predicted_price = model.predict(X_live)[0][0]
        
        print(f"Predicted Price: {predicted_price}")

    except Exception as e:
        logger.error(f"メインロジックでエラーが発生しました: {str(e)}")

if __name__ == "__main__":
    main()

ライブラリのインポート

import ccxt
import pandas as pd
from datetime import datetime, timedelta
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
import time
import logging

ccxt: 仮想通貨取引所のAPIを扱うためのライブラリ。ここではBybitのデータ取得に使用。
pandas: データ処理に使用するライブラリ。データをDataFrame形式で扱います。
datetime: 日付と時間を操作するために使用します。
scikit-learntrain_test_splitStandardScaler: データの分割と標準化に使用します。
tensorflowkeras: 機械学習ライブラリで、ニューラルネットワークの構築と学習に使用します。
time: 再試行の遅延処理に使用。
logging: 実行のログを出力するためのライブラリ。エラーや情報を記録します。

ロギングの設定

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

ログのフォーマットと出力レベルを設定しています。エラーや情報メッセージが実行時に記録されます。

エラーハンドリングとリトライ機能

def retry_on_exception(max_retries=3, delay=5):
    def decorator(func):
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    logger.error(f"Error occurred: {str(e)}. Attempt {attempt + 1} of {max_retries}")
                    if attempt < max_retries - 1:
                        logger.info(f"Retrying in {delay} seconds...")
                        time.sleep(delay)
                    else:
                        logger.error("Max retries reached. Giving up.")
                        raise
        return wrapper
    return decorator

リトライ機能を提供するデコレーターです。指定された回数(デフォルトでは3回)まで再試行し、失敗した場合には遅延(デフォルトでは5秒)を挟んで再実行します。最終的にすべての試行が失敗した場合、エラーが記録されます。

Bybit取引所のインスタンス

exchange = ccxt.bybit()

Bybit取引所のAPIインスタンスを作成しています。このインスタンスを使用して、仮想通貨のデータを取得します。

データ取得モジュール

@retry_on_exception(max_retries=3, delay=5)
def fetch_data():
    now = datetime.now()
    since = now - timedelta(days=30)
    ohlcv = exchange.fetch_ohlcv('BTC/USDT', timeframe='1h', since=int(since.timestamp() * 1000))
    df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    return optimize_memory_usage(df)

Bybit取引所からビットコイン/USDTの1時間ごとのOHLCVデータ(Open, High, Low, Close, Volume)を過去30日分取得し、pandas.DataFrameに変換します。エラーが発生した場合は、リトライ機能が働きます。

メモリ使用量の最適化

def optimize_memory_usage(df):
    for col in df.columns:
        if df[col].dtype == 'float64':
            df[col] = df[col].astype('float32')
    return df

メモリ効率を向上させるため、データフレーム内のfloat64型の列をfloat32に変換します。

モデル構築モジュール

def build_model(input_shape):
    model = Sequential([
        Dense(64, activation='relu', input_shape=input_shape),
        Dense(32, activation='relu'),
        Dense(1)  # 価格予測
    ])
    model.compile(optimizer='adam', loss='mean_squared_error')
    return model

TensorFlow/Kerasを使って、シンプルなニューラルネットワークを構築します。入力層に64ユニット、隠れ層に32ユニット、出力層に1ユニットを持ちます。目的は価格予測で、mean_squared_errorを損失関数として使用します。

メインロジック

def main():
    try:
        df = fetch_data()
        
        # 特徴量とラベル
        X = df[['open', 'high', 'low', 'close', 'volume']].values
        y = df['close'].shift(-1).fillna(df['close']).values
        
        # 訓練・テストデータ分割
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
        
        # データの標準化
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        # モデルの構築
        model = build_model((X_train_scaled.shape[1],))
        model.fit(X_train_scaled, y_train, epochs=10, batch_size=32, validation_data=(X_test_scaled, y_test))
        
        # 最新のデータを取得して予測
        df = fetch_data()
        X_live = scaler.transform(df[['open', 'high', 'low', 'close', 'volume']].values[-1].reshape(1, -1))
        predicted_price = model.predict(X_live)[0][0]
        
        print(f"Predicted Price: {predicted_price}")

    except Exception as e:
        logger.error(f"メインロジックでエラーが発生しました: {str(e)}")

データ取得、前処理、モデルの学習、予測の一連の流れを処理します。

  1. データの取得: 過去30日分のビットコインデータを取得。
  2. 特徴量とラベルの作成: 予測モデルの入力(特徴量)はopen, high, low, close, volumeの各値で、ラベル(予測対象)はclose(次の時間の価格)です。
  3. データの分割と標準化: データを訓練データとテストデータに分割し、StandardScalerを使ってデータを標準化。
  4. モデルの訓練: ニューラルネットワークを10エポックで訓練。
  5. 価格予測: 訓練済みモデルを使用して最新のデータから価格を予測し、結果を表示。

実行部分

if __name__ == "__main__":
    main()

main関数を実行します。プログラムを実行すると、自動的にデータを取得し、モデルを訓練し、予測を行います。

改良型のプログラム

Yodaka

Cursorを使ってより高速に稼働するように修正しました。

import ccxt
import pandas as pd
from datetime import datetime, timedelta
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
import time
import logging

# ロギングの設定
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# エラーハンドリングとリトライ機能
def retry_on_exception(max_retries=3, delay=5):
    def decorator(func):
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    logger.error(f"Error occurred: {str(e)}. Attempt {attempt + 1} of {max_retries}")
                    if attempt < max_retries - 1:
                        logger.info(f"Retrying in {delay} seconds...")
                        time.sleep(delay)
                    else:
                        logger.error("Max retries reached. Giving up.")
                        raise
        return wrapper
    return decorator

# Bybitのインスタンス(APIキーなし)
exchange = ccxt.bybit()

# データ取得モジュール
@retry_on_exception(max_retries=3, delay=5)
def fetch_data():
    now = datetime.now()
    since = now - timedelta(days=30)
    ohlcv = exchange.fetch_ohlcv('BTC/USDT', timeframe='1h', since=int(since.timestamp() * 1000))
    df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    return optimize_memory_usage(df)

# メモリ使用量の最適化
def optimize_memory_usage(df):
    for col in df.columns:
        if df[col].dtype == 'float64':
            df[col] = df[col].astype('float32')
    return df

# モデル構築モジュール
def build_model(input_shape):
    model = Sequential([
        Input(shape=input_shape),
        Dense(32, activation='relu'),  # ノード数を減らして軽量化
        Dense(16, activation='relu'),
        Dense(1)  # 価格予測
    ])
    model.compile(optimizer='adam', loss='mean_squared_error')
    return model

# メインロジック
def main():
    try:
        df = fetch_data()
        
        # 特徴量とラベル
        X = df[['open', 'high', 'low', 'close', 'volume']].values
        y = df['close'].shift(-1).fillna(df['close']).values
        
        # 訓練・テストデータ分割
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
        
        # データの標準化
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        # モデルの構築
        model = build_model((X_train_scaled.shape[1],))
        model.fit(X_train_scaled, y_train, epochs=5, batch_size=64, validation_data=(X_test_scaled, y_test))  # エポック数とバッチサイズを調整
        
        # 最新のデータを取得して予測
        df = fetch_data()
        X_live = scaler.transform(df[['open', 'high', 'low', 'close', 'volume']].values[-1].reshape(1, -1))
        predicted_price = model.predict(X_live)[0][0]
        
        print(f"Predicted Price: {predicted_price}")

    except Exception as e:
        logger.error(f"メインロジックでエラーが発生しました: {str(e)}")

if __name__ == "__main__":
    main()

改良のポイント

1. データ取得の効率化:データ取得の際に必要なデータだけを取得するようにし、不要なデータの取得を避けます。

2.モデルの訓練の最適化:エポック数を減らすか、バッチサイズを調整して訓練時間を短縮します。モデルの複雑さを減らす(例:レイヤー数やノード数を減らす)ことで、訓練時間を短縮します。

3. データの前処理の効率化:データの標準化やスケーリングを効率的に行うために、必要な部分だけを処理します。

4.GPUの活用:TensorFlowがGPUを使用できる環境であれば、GPUを活用することで訓練を高速化できます。

5. ログレベルの調整:ログレベルをINFOからWARNINGに変更することで、ログ出力を減らし、パフォーマンスを向上させます。

変更点

  • モデルの軽量化: レイヤーのノード数を減らして、モデルの訓練を高速化。
  • エポック数とバッチサイズの調整: エポック数を減らし、バッチサイズを増やして訓練を高速化。
  • ログレベルの調整: ログレベルをWARNINGに設定して、ログ出力を減らし、パフォーマンスを向上。

まとめ

今回は「ニューラルネットワークの構築と学習」についてまとめました。

Yodaka

モデルの高速化についても様々なアプローチがあることが分かりました。

今後もこの調子で開発の経過を発信していきます。

-Bot, 機械学習・データサイエンス