前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
🎯 今回の目的
前回までに完成していた最小構成のBybit自動取引Botに、
常時スプレッドを監視して発注判断を自動化するループ処理を組み込みました。
🔁 追加した処理:常時監視ループ
これまでは1回限りの判断→発注だけの構成でしたが、while True:
で処理を囲み、スプレッドの条件を常に監視し続けるBotに進化。
while True: bid, ask = orderbook.get("bid"), orderbook.get("ask") spread = (ask - bid) / bid if spread > s_entry: # 発注処理 await place_limit_order(...) await notify_slack(...) await asyncio.sleep(interval) # 次の発注まで少し休憩 else: print("❌ Spread not wide enough. Waiting...") await asyncio.sleep(1) # 毎秒チェック継続
✅ 構造のポイント
- 毎秒スプレッドをチェック
- 条件を満たしたときだけBUY/SELL指値を実行
- 発注が終わったら再び監視モードへ戻る
🔁 常時監視ループとは?
これまでは「1回だけスプレッドを判定して、条件を満たしていれば注文する」という単発型のBotでした。
今回のアップデートでは、while True:
という無限ループ構文を使って、
スプレッドの状況を常に監視し続ける構成に進化させました。
🔧 処理の流れ
while True: bid, ask = orderbook.get("bid"), orderbook.get("ask") spread = (ask - bid) / bid
- 現在の**買値(bid)と売値(ask)**を取得し、
- その差からスプレッドを毎秒計算します。
✅ 条件を満たしたら発注
if spread > s_entry: await place_limit_order(...) await notify_slack(...) await asyncio.sleep(interval)
- スプレッドが指定したしきい値(
s_entry
)を超えたら、
BUYとSELLの指値注文を同時に実行。 - 発注後は少し休憩してから次の監視へ。
❌ 条件未達ならスキップ
else: print("❌ Spread not wide enough. Waiting...") await asyncio.sleep(1)
- スプレッドが狭すぎる場合は注文せず、1秒待って再チェック。
- これにより、ムダな発注を避けつつ、チャンスを逃さない構造になっています。
📌 ポイントまとめ
- ✅ 板の価格を毎秒監視
- ✅ チャンスが来たときだけ発注
- ✅ 注文後は待機→再監視へ戻る
このように、シンプルだけど実戦的な構造で、
Botが「ずっと市場を見張り続けてくれる」仕組みが完成しました。
実運用に向けた基礎として、とても大切なステップです。
🔍 実行結果のログ抜粋
⏳ waiting for best bid/ask... 📡 spread = 0.00000, bid = 84738.8, ask = 84738.9 ❌ Spread not wide enough. Waiting... 📡 spread = 0.00000, bid = 84738.8, ask = 84738.9 ❌ Spread not wide enough. Waiting... 📡 spread = 0.00000, bid = 84738.8, ask = 84738.9 ❌ Spread not wide enough. Waiting...
✅ 状況分析
- WebSocketで板情報取得 → 成功
- spread計算 → 正常(ただし狭すぎて条件未達)
- 条件に合わないため発注せず → 健全な動作!
- Ctrl+Cで中断した際の
CancelledError
→ 想定通りの終了処理
🧠 なぜスプレッドが「0.00000」なのか?
これは BidとAskの差が0.1(=最小tick)しかない状態だから。
- bid: 84738.8
- ask: 84738.9
- spread = (ask - bid) / bid = 0.1 ÷ 84738.8 ≒
0.00000118
→ これは --s_entry
に 0.0015
を設定していたら、ぜんぜん届いていません
✅ 試すべきこと:発注&Slack通知まで確認したいなら?
python MMbotbybit.py --s_entry 0.000001
とかにして、スプレッド条件を極端に緩くすれば即発注&Slack通知まで確認できます。
✅ ここまでで実現できたこと
項目 | 状態 |
---|---|
板監視 | ✅ 毎秒自動更新で取得中 |
spread判定 | ✅ 計算&条件チェック |
発注判断 | ✅ spread条件を超えたときだけ発注 |
Slack通知 | ✅ 発注時に通知(spread成立時のみ) |
安定稼働 | ✅ 無限ループ監視で安定実行中 |
🔜 次に取り組む予定の強化ポイント
フェーズ | 内容 |
---|---|
✅ 検知強化 | 約定イベントの監視を追加(通ったか確認) |
✅ リスク管理 | 約定しない注文の自動キャンセル機能 |
✅ 記録 | PnL(損益)のログ出力&CSV記録 |
✍️ まとめ
常時監視ループを組み込むことで、Botが「ずっと板を見張り続ける」状態を実現できました。
手動で何度も実行し直す必要がなくなり、実運用に向けた下地としては十分な完成度に到達。
次のステップでは、「約定監視」や「失敗時のリカバリ」「ログ記録」などを加えて、
より堅牢で実戦的なBot構築へ進んでいきます。
MMbot、常時監視ループの実装まで達成。あとは約定イベントの監視、失敗注文自動キャンセル、約定履歴と損益のログ記録まで積んだらひとまず動かせそう。 pic.twitter.com/Rghmve3Rny
— よだか(夜鷹/yodaka) (@yodakablog) April 17, 2025
👇ラジオで話したこと
今回の開発記録は【Bybit自動取引Bot】シリーズの続編になります。
タイトルは――
**「常時監視ループを実装してみた」**です。
🎯 今回の目的
前回までで、**最小構成のMM Bot(マーケットメイキングBot)**はすでに完成していました。
でもその構成では、「一回だけスプレッドをチェック → 条件を満たしてたら注文」っていう単発動作だったんですね。
そこで今回は、Botが“ずっと板を見張り続ける”構成にアップデートしました。
🔁 どう変わったのか?
キーワードは 「while True:」。
これ、Pythonで無限ループを作るときによく使う構文です。
これを使って、スプレッドの状態を毎秒チェックして、
「条件を満たしたら即発注 → また監視に戻る」という動きを実現しました。
🔧 ざっくり構造を言うと:
while True: bid, ask = orderbook.get("bid"), orderbook.get("ask") spread = (ask - bid) / bid if spread > s_entry: await place_limit_order(...) await notify_slack(...) await asyncio.sleep(interval) else: print("❌ Spread not wide enough. Waiting...") await asyncio.sleep(1)
──って感じです。
これでBotが板情報を監視→スプレッド計算→条件を満たせば発注→また監視に戻るという循環を自動でこなしてくれます。
📡 実際に動かしてみると…
⏳ waiting for best bid/ask... 📡 spread = 0.00000, bid = 84738.8, ask = 84738.9 ❌ Spread not wide enough. Waiting... 📡 spread = 0.00000, bid = 84738.8, ask = 84738.9 ❌ Spread not wide enough. Waiting...
というように、毎秒spreadを計算して、しきい値を超えるまで“待ち”の状態が続くわけです。
🤔 なんで spread が 0.00000 なの?
これは、BidとAskの差が小さすぎるから。
たとえば Bidが84738.8、Askが84738.9 だと、その差は0.1ドルしかありません。
(ask - bid) / bid ≒ 0.00000118
設定している --s_entry
が 0.0015 だった場合は、全然足りないわけです。
🧪 実験したいときは?
すぐにSlack通知まで見たい場合は、--s_entry 0.000001
みたいに条件をめっちゃ緩くして試すと、即発注されて通知も飛んできます。
開発中はこの方法で動作確認しておくと安心です。
📌 今回実現できたこと
機能 | 状態 |
---|---|
板の監視 | ✅ 毎秒自動更新で取得中 |
spreadの計算 | ✅ 正常動作中 |
発注判断 | ✅ 条件を超えたときだけ実行 |
Slack通知 | ✅ 発注時に通知あり |
安定稼働 | ✅ 無限ループ構成で安定 |
つまり、Botが“市場を監視し続ける”状態を完成させたってことですね。
🔜 次のステップは?
Botの動作をもっと信頼できるように、以下を強化していきます:
- ✅ 約定イベントの監視(注文が実際に通ったかの確認)
- ✅ 失敗注文の自動キャンセル
- ✅ 成約履歴やPnLのログ記録
このへんを実装していくことで、本番運用にも耐えられるBotの地盤が固まっていくはずです。
✍️ まとめ
というわけで、今回は「常時監視ループ」をテーマにお届けしました。
このループ処理があることで、Botは「人間が操作しなくても、チャンスが来たら勝手に動く」状態になります。
Botが板を見て、条件を判断して、必要な時だけ動く。
これ、めちゃくちゃ実用的なステップです。
今後の強化に向けても、この土台があることで応用が効く構成になりました。
「asyncioでループを書くとき、どこに気をつけるべきか?」
Pythonで非同期処理をするための仕組みが asyncio
。
そして、今回みたいに while True:
を使って「ずっと監視し続けるBot」を書くときに超重要になるのが、“ブロッキングを避ける”ことなんです。
☠️ やってはいけないこと:time.sleep()
非同期コードの中に、time.sleep()
を書いてしまうと、それは**「完全に同期的なブロック」**になります。
つまり、その間は他の処理が一切動かない!
🔴 悪い例:
while True: do_something() time.sleep(1) # ここで他の処理が止まる
✅ 正しい例:
import asyncio while True: do_something() await asyncio.sleep(1) # ノンブロッキングで待つ
🧠 注意1:ちゃんと await
をつけよう
async関数の中で、await
を忘れると非同期じゃなくなるので注意。
特に、複数のAPI呼び出しやIO処理(ファイル、ネットワークなど)を扱うときに、
await client.post(...)
await notify_slack(...)
await asyncio.sleep(...)
みたいに、ちゃんと待たせる処理には await
をつけるのがルールです。
🔁 注意2:無限ループは“暴走”に注意
while True:
って便利だけど、うっかりするとCPUを無限に使い続ける暴走ループになっちゃうこともあります。
なので、中には必ず「休憩処理」= await asyncio.sleep(x)
を挟むのが鉄則。
# ダメな例(CPU爆走注意) while True: do_something() # めちゃ速で回り続ける # OKな例 while True: do_something() await asyncio.sleep(1)
⚠️ 注意3:例外処理を忘れずに
非同期ループ中にエラーが起きると、Botがサイレントに止まってしまうことがあります。
それを防ぐために、最低限でも try
/ except
を入れておきましょう。
while True: try: await do_something_async() except Exception as e: print(f"⚠️ エラー発生: {e}") await asyncio.sleep(5) # 休憩してリトライ
これで、何か異常が起きてもBotが落ちずにリカバリできます。
🎯 まとめ:asyncioループで意識したい3つのこと
注意点 | 説明 |
---|---|
await を忘れない | IOやsleepなど非同期処理には必ず await をつける |
ループは“休ませる” | await asyncio.sleep() を使ってCPU暴走を防ぐ |
例外処理で守る | try/exceptで落ちないBotにする |
📌 このルールを守ることで、Botは「軽くて安定してて落ちにくい」構成になります。
実運用では1日中動かすことになるので、こういう基本の積み重ねが将来的な“信頼感”につながりますね。
現在のコード(ログ用)
import asyncio import json import aiohttp import pybotters from argparse import ArgumentParser async def place_limit_order(client, symbol, side, qty, price): """指値注文を出し、orderIdを返す。""" res = await client.post("/v5/order/create", data={ "category": "linear", "symbol": symbol, "side": side, "orderType": "Limit", "qty": str(qty), "price": str(price), "timeInForce": "GTC" }) data = await res.json() if data["retCode"] != 0: raise RuntimeError(f"Order failed: {data}") return data["result"]["orderId"] async def wait_for_fill(client, symbol, order_id, timeout=30, interval=1): """注文の約定を一定時間ポーリングで待つ。FilledならTrue、timeoutでFalseを返す。""" for _ in range(int(timeout / interval)): res = await client.get("/v5/order/realtime", params={ "symbol": symbol, "orderId": order_id }) d = await res.json() status = d["result"]["data"][0]["orderStatus"] if status == "Filled": return True await asyncio.sleep(interval) return False async def notify_slack(webhook_url: str, message: str): """Slack Incoming Webhookへ通知を送信。""" async with aiohttp.ClientSession() as session: await session.post(webhook_url, json={"text": message}) async def get_orderbook(symbol, api_key_json, orderbook): """WebSocketでorderbookを更新し続ける。戻り値はRESTクライアント。""" def onmessage(msg, ws): if "data" not in msg: return data = msg["data"] if data.get("a"): orderbook["ask"] = float(data["a"][0][0]) if data.get("b"): orderbook["bid"] = float(data["b"][0][0]) async with pybotters.Client(apis=api_key_json) as client: await client.ws_connect( "wss://stream.bybit.com/v5/public/linear", send_json=[ {"op": "subscribe", "args": [f"orderbook.1.{symbol}"]} ], hdlr_json=onmessage ) # 板情報が揃うまで待機 while "bid" not in orderbook or "ask" not in orderbook: await asyncio.sleep(1) return client async def run_loop(symbol, qty, s_entry, config, interval=10): orderbook = {} client = await get_orderbook(symbol, config["bybit"], orderbook) while True: bid, ask = orderbook.get("bid"), orderbook.get("ask") if not bid or not ask: await asyncio.sleep(1) continue spread = (ask - bid) / bid print(f"📡 spread = {spread:.5f}, bid = {bid}, ask = {ask}") if spread > s_entry: print("🚀 spread condition met, sending orders...") # 1) 指値発注 buy_price, sell_price = int(bid - 1), int(ask + 1) try: buy_id = await place_limit_order(client, symbol, "Buy", qty, buy_price) sell_id = await place_limit_order(client, symbol, "Sell", qty, sell_price) except Exception as e: err_msg = f"❌ Order error: {e}" await notify_slack(config["slack_webhook_url"], err_msg) await asyncio.sleep(interval) continue # 2) 約定待ち filled_buy = await wait_for_fill(client, symbol, buy_id) filled_sell = await wait_for_fill(client, symbol, sell_id) # 3) Slack通知 msg = ( f"📈 *MMBot Execution Report*\n" f"> Pair: `{symbol}`\n" f"> Spread: `{spread:.5f}`\n" f"> 🟢 Buy @{buy_price} ({'Filled' if filled_buy else 'Pending'})\n" f"> 🔴 Sell @{sell_price} ({'Filled' if filled_sell else 'Pending'})\n" f"> Lot: `{qty}`" ) await notify_slack(config["slack_webhook_url"], msg) print("✅ Orders processed. Sleeping before next check...\n") await asyncio.sleep(interval) else: print("❌ Spread not wide enough. Waiting...\n") await asyncio.sleep(1) def cli(): parser = ArgumentParser() parser.add_argument("--config", default="config.json") parser.add_argument("--symbol", default="BTCUSDT") parser.add_argument("--lot", default=0.01, type=float) parser.add_argument("--s_entry", default=0.000001C, type=float) parser.add_argument("--interval", default=10, type=int) return parser.parse_args() if __name__ == "__main__": args = cli() with open(args.config) as f: config = json.load(f) asyncio.run(run_loop( args.symbol, args.lot, args.s_entry, config, args.interval ))