Bot

開発記録#182(2025/4/17)「Bybit自動取引Botに常時監視ループを実装してみた【Python×pybotters】」

2025年4月17日

前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。

🎯 今回の目的

前回までに完成していた最小構成のBybit自動取引Botに、
常時スプレッドを監視して発注判断を自動化するループ処理を組み込みました。


🔁 追加した処理:常時監視ループ

これまでは1回限りの判断→発注だけの構成でしたが、
while True: で処理を囲み、スプレッドの条件を常に監視し続けるBotに進化。

while True:
    bid, ask = orderbook.get("bid"), orderbook.get("ask")
    spread = (ask - bid) / bid

    if spread > s_entry:
        # 発注処理
        await place_limit_order(...)
        await notify_slack(...)
        await asyncio.sleep(interval)  # 次の発注まで少し休憩
    else:
        print("❌ Spread not wide enough. Waiting...")
        await asyncio.sleep(1)  # 毎秒チェック継続

✅ 構造のポイント

  • 毎秒スプレッドをチェック
  • 条件を満たしたときだけBUY/SELL指値を実行
  • 発注が終わったら再び監視モードへ戻る

🔁 常時監視ループとは?

これまでは「1回だけスプレッドを判定して、条件を満たしていれば注文する」という単発型のBotでした。

今回のアップデートでは、while True: という無限ループ構文を使って、
スプレッドの状況を常に監視し続ける構成に進化させました。


🔧 処理の流れ

while True:
    bid, ask = orderbook.get("bid"), orderbook.get("ask")
    spread = (ask - bid) / bid
  • 現在の**買値(bid)と売値(ask)**を取得し、
  • その差からスプレッドを毎秒計算します。

✅ 条件を満たしたら発注

    if spread > s_entry:
        await place_limit_order(...)
        await notify_slack(...)
        await asyncio.sleep(interval)
  • スプレッドが指定したしきい値(s_entry)を超えたら、
    BUYとSELLの指値注文を同時に実行。
  • 発注後は少し休憩してから次の監視へ。

❌ 条件未達ならスキップ

    else:
        print("❌ Spread not wide enough. Waiting...")
        await asyncio.sleep(1)
  • スプレッドが狭すぎる場合は注文せず、1秒待って再チェック
  • これにより、ムダな発注を避けつつ、チャンスを逃さない構造になっています。

📌 ポイントまとめ

  • ✅ 板の価格を毎秒監視
  • ✅ チャンスが来たときだけ発注
  • ✅ 注文後は待機→再監視へ戻る

このように、シンプルだけど実戦的な構造で、
Botが「ずっと市場を見張り続けてくれる」仕組みが完成しました。
実運用に向けた基礎として、とても大切なステップです。

🔍 実行結果のログ抜粋

⏳ waiting for best bid/ask...
📡 spread = 0.00000, bid = 84738.8, ask = 84738.9
❌ Spread not wide enough. Waiting...
📡 spread = 0.00000, bid = 84738.8, ask = 84738.9
❌ Spread not wide enough. Waiting...
📡 spread = 0.00000, bid = 84738.8, ask = 84738.9
❌ Spread not wide enough. Waiting...

✅ 状況分析

  • WebSocketで板情報取得 → 成功
  • spread計算 → 正常(ただし狭すぎて条件未達)
  • 条件に合わないため発注せず → 健全な動作!
  • Ctrl+Cで中断した際の CancelledError → 想定通りの終了処理

🧠 なぜスプレッドが「0.00000」なのか?

これは BidとAskの差が0.1(=最小tick)しかない状態だから。

  • bid: 84738.8
  • ask: 84738.9
  • spread = (ask - bid) / bid = 0.1 ÷ 84738.8 ≒ 0.00000118

→ これは --s_entry0.0015 を設定していたら、ぜんぜん届いていません


✅ 試すべきこと:発注&Slack通知まで確認したいなら?

python MMbotbybit.py --s_entry 0.000001

とかにして、スプレッド条件を極端に緩くすれば即発注&Slack通知まで確認できます

✅ ここまでで実現できたこと

項目状態
板監視✅ 毎秒自動更新で取得中
spread判定✅ 計算&条件チェック
発注判断✅ spread条件を超えたときだけ発注
Slack通知✅ 発注時に通知(spread成立時のみ)
安定稼働✅ 無限ループ監視で安定実行中

🔜 次に取り組む予定の強化ポイント

フェーズ内容
✅ 検知強化約定イベントの監視を追加(通ったか確認)
✅ リスク管理約定しない注文の自動キャンセル機能
✅ 記録PnL(損益)のログ出力&CSV記録

✍️ まとめ

常時監視ループを組み込むことで、Botが「ずっと板を見張り続ける」状態を実現できました。
手動で何度も実行し直す必要がなくなり、実運用に向けた下地としては十分な完成度に到達。

次のステップでは、「約定監視」や「失敗時のリカバリ」「ログ記録」などを加えて、
より堅牢で実戦的なBot構築へ進んでいきます。

👇ラジオで話したこと

今回の開発記録は【Bybit自動取引Bot】シリーズの続編になります。
タイトルは――

**「常時監視ループを実装してみた」**です。


🎯 今回の目的

前回までで、**最小構成のMM Bot(マーケットメイキングBot)**はすでに完成していました。
でもその構成では、「一回だけスプレッドをチェック → 条件を満たしてたら注文」っていう単発動作だったんですね。

そこで今回は、Botが“ずっと板を見張り続ける”構成にアップデートしました。


🔁 どう変わったのか?

キーワードは 「while True:」
これ、Pythonで無限ループを作るときによく使う構文です。

これを使って、スプレッドの状態を毎秒チェックして、
「条件を満たしたら即発注 → また監視に戻る」という動きを実現しました。


🔧 ざっくり構造を言うと:

while True:
    bid, ask = orderbook.get("bid"), orderbook.get("ask")
    spread = (ask - bid) / bid

    if spread > s_entry:
        await place_limit_order(...)
        await notify_slack(...)
        await asyncio.sleep(interval)
    else:
        print("❌ Spread not wide enough. Waiting...")
        await asyncio.sleep(1)

──って感じです。
これでBotが板情報を監視→スプレッド計算→条件を満たせば発注→また監視に戻るという循環を自動でこなしてくれます。


📡 実際に動かしてみると…

⏳ waiting for best bid/ask...
📡 spread = 0.00000, bid = 84738.8, ask = 84738.9
❌ Spread not wide enough. Waiting...
📡 spread = 0.00000, bid = 84738.8, ask = 84738.9
❌ Spread not wide enough. Waiting...

というように、毎秒spreadを計算して、しきい値を超えるまで“待ち”の状態が続くわけです。


🤔 なんで spread が 0.00000 なの?

これは、BidとAskの差が小さすぎるから。
たとえば Bidが84738.8、Askが84738.9 だと、その差は0.1ドルしかありません。

(ask - bid) / bid ≒ 0.00000118

設定している --s_entry が 0.0015 だった場合は、全然足りないわけです。


🧪 実験したいときは?

すぐにSlack通知まで見たい場合は、
--s_entry 0.000001 みたいに条件をめっちゃ緩くして試すと、即発注されて通知も飛んできます。

開発中はこの方法で動作確認しておくと安心です。


📌 今回実現できたこと

機能状態
板の監視✅ 毎秒自動更新で取得中
spreadの計算✅ 正常動作中
発注判断✅ 条件を超えたときだけ実行
Slack通知✅ 発注時に通知あり
安定稼働✅ 無限ループ構成で安定

つまり、Botが“市場を監視し続ける”状態を完成させたってことですね。


🔜 次のステップは?

Botの動作をもっと信頼できるように、以下を強化していきます:

  • ✅ 約定イベントの監視(注文が実際に通ったかの確認)
  • ✅ 失敗注文の自動キャンセル
  • ✅ 成約履歴やPnLのログ記録

このへんを実装していくことで、本番運用にも耐えられるBotの地盤が固まっていくはずです。

✍️ まとめ

というわけで、今回は「常時監視ループ」をテーマにお届けしました。

このループ処理があることで、Botは「人間が操作しなくても、チャンスが来たら勝手に動く」状態になります。

Botが板を見て、条件を判断して、必要な時だけ動く。
これ、めちゃくちゃ実用的なステップです。
今後の強化に向けても、この土台があることで応用が効く構成になりました。

「asyncioでループを書くとき、どこに気をつけるべきか?」

Pythonで非同期処理をするための仕組みが asyncio
そして、今回みたいに while True: を使って「ずっと監視し続けるBot」を書くときに超重要になるのが、“ブロッキングを避ける”ことなんです。


☠️ やってはいけないこと:time.sleep()

非同期コードの中に、time.sleep() を書いてしまうと、それは**「完全に同期的なブロック」**になります。

つまり、その間は他の処理が一切動かない!

🔴 悪い例:

while True:
    do_something()
    time.sleep(1)  # ここで他の処理が止まる

✅ 正しい例:

import asyncio

while True:
    do_something()
    await asyncio.sleep(1)  # ノンブロッキングで待つ

🧠 注意1:ちゃんと await をつけよう

async関数の中で、await を忘れると非同期じゃなくなるので注意。
特に、複数のAPI呼び出しやIO処理(ファイル、ネットワークなど)を扱うときに、

  • await client.post(...)
  • await notify_slack(...)
  • await asyncio.sleep(...)

みたいに、ちゃんと待たせる処理には await をつけるのがルールです。


🔁 注意2:無限ループは“暴走”に注意

while True: って便利だけど、うっかりするとCPUを無限に使い続ける暴走ループになっちゃうこともあります。

なので、中には必ず「休憩処理」= await asyncio.sleep(x) を挟むのが鉄則。

# ダメな例(CPU爆走注意)
while True:
    do_something()  # めちゃ速で回り続ける

# OKな例
while True:
    do_something()
    await asyncio.sleep(1)

⚠️ 注意3:例外処理を忘れずに

非同期ループ中にエラーが起きると、Botがサイレントに止まってしまうことがあります。
それを防ぐために、最低限でも try / except を入れておきましょう。

while True:
    try:
        await do_something_async()
    except Exception as e:
        print(f"⚠️ エラー発生: {e}")
        await asyncio.sleep(5)  # 休憩してリトライ

これで、何か異常が起きてもBotが落ちずにリカバリできます。


🎯 まとめ:asyncioループで意識したい3つのこと

注意点説明
awaitを忘れないIOやsleepなど非同期処理には必ず await をつける
ループは“休ませる”await asyncio.sleep() を使ってCPU暴走を防ぐ
例外処理で守るtry/exceptで落ちないBotにする

📌 このルールを守ることで、Botは「軽くて安定してて落ちにくい」構成になります。

実運用では1日中動かすことになるので、こういう基本の積み重ねが将来的な“信頼感”につながりますね。

現在のコード(ログ用)

import asyncio
import json
import aiohttp
import pybotters
from argparse import ArgumentParser


async def place_limit_order(client, symbol, side, qty, price):
    """指値注文を出し、orderIdを返す。"""
    res = await client.post("/v5/order/create", data={
        "category": "linear",
        "symbol": symbol,
        "side": side,
        "orderType": "Limit",
        "qty": str(qty),
        "price": str(price),
        "timeInForce": "GTC"
    })
    data = await res.json()
    if data["retCode"] != 0:
        raise RuntimeError(f"Order failed: {data}")
    return data["result"]["orderId"]


async def wait_for_fill(client, symbol, order_id, timeout=30, interval=1):
    """注文の約定を一定時間ポーリングで待つ。FilledならTrue、timeoutでFalseを返す。"""
    for _ in range(int(timeout / interval)):
        res = await client.get("/v5/order/realtime", params={
            "symbol": symbol,
            "orderId": order_id
        })
        d = await res.json()
        status = d["result"]["data"][0]["orderStatus"]
        if status == "Filled":
            return True
        await asyncio.sleep(interval)
    return False


async def notify_slack(webhook_url: str, message: str):
    """Slack Incoming Webhookへ通知を送信。"""
    async with aiohttp.ClientSession() as session:
        await session.post(webhook_url, json={"text": message})


async def get_orderbook(symbol, api_key_json, orderbook):
    """WebSocketでorderbookを更新し続ける。戻り値はRESTクライアント。"""
    def onmessage(msg, ws):
        if "data" not in msg:
            return
        data = msg["data"]
        if data.get("a"):
            orderbook["ask"] = float(data["a"][0][0])
        if data.get("b"):
            orderbook["bid"] = float(data["b"][0][0])

    async with pybotters.Client(apis=api_key_json) as client:
        await client.ws_connect(
            "wss://stream.bybit.com/v5/public/linear",
            send_json=[
                {"op": "subscribe", "args": [f"orderbook.1.{symbol}"]}
            ],
            hdlr_json=onmessage
        )

        # 板情報が揃うまで待機
        while "bid" not in orderbook or "ask" not in orderbook:
            await asyncio.sleep(1)

        return client


async def run_loop(symbol, qty, s_entry, config, interval=10):
    orderbook = {}
    client = await get_orderbook(symbol, config["bybit"], orderbook)

    while True:
        bid, ask = orderbook.get("bid"), orderbook.get("ask")
        if not bid or not ask:
            await asyncio.sleep(1)
            continue

        spread = (ask - bid) / bid
        print(f"📡 spread = {spread:.5f}, bid = {bid}, ask = {ask}")

        if spread > s_entry:
            print("🚀 spread condition met, sending orders...")

            # 1) 指値発注
            buy_price, sell_price = int(bid - 1), int(ask + 1)
            try:
                buy_id = await place_limit_order(client, symbol, "Buy", qty, buy_price)
                sell_id = await place_limit_order(client, symbol, "Sell", qty, sell_price)
            except Exception as e:
                err_msg = f"❌ Order error: {e}"
                await notify_slack(config["slack_webhook_url"], err_msg)
                await asyncio.sleep(interval)
                continue

            # 2) 約定待ち
            filled_buy = await wait_for_fill(client, symbol, buy_id)
            filled_sell = await wait_for_fill(client, symbol, sell_id)

            # 3) Slack通知
            msg = (
                f"📈 *MMBot Execution Report*\n"
                f"> Pair: `{symbol}`\n"
                f"> Spread: `{spread:.5f}`\n"
                f"> 🟢 Buy @{buy_price} ({'Filled' if filled_buy else 'Pending'})\n"
                f"> 🔴 Sell @{sell_price} ({'Filled' if filled_sell else 'Pending'})\n"
                f"> Lot: `{qty}`"
            )
            await notify_slack(config["slack_webhook_url"], msg)

            print("✅ Orders processed. Sleeping before next check...\n")
            await asyncio.sleep(interval)
        else:
            print("❌ Spread not wide enough. Waiting...\n")
            await asyncio.sleep(1)


def cli():
    parser = ArgumentParser()
    parser.add_argument("--config", default="config.json")
    parser.add_argument("--symbol", default="BTCUSDT")
    parser.add_argument("--lot", default=0.01, type=float)
    parser.add_argument("--s_entry", default=0.000001C, type=float)
    parser.add_argument("--interval", default=10, type=int)
    return parser.parse_args()


if __name__ == "__main__":
    args = cli()
    with open(args.config) as f:
        config = json.load(f)
    asyncio.run(run_loop(
        args.symbol,
        args.lot,
        args.s_entry,
        config,
        args.interval
    ))

-Bot