前回の記事に引き続き、今回も仮想通貨botの開発状況をまとめていきます。
🎯 今回の目的
前回までに完成していた最小構成のBybit自動取引Botに、
常時スプレッドを監視して発注判断を自動化するループ処理を組み込みました。
🔁 追加した処理:常時監視ループ
これまでは1回限りの判断→発注だけの構成でしたが、while True: で処理を囲み、スプレッドの条件を常に監視し続けるBotに進化。
while True:
bid, ask = orderbook.get("bid"), orderbook.get("ask")
spread = (ask - bid) / bid
if spread > s_entry:
# 発注処理
await place_limit_order(...)
await notify_slack(...)
await asyncio.sleep(interval) # 次の発注まで少し休憩
else:
print("❌ Spread not wide enough. Waiting...")
await asyncio.sleep(1) # 毎秒チェック継続
✅ 構造のポイント
- 毎秒スプレッドをチェック
- 条件を満たしたときだけBUY/SELL指値を実行
- 発注が終わったら再び監視モードへ戻る
🔁 常時監視ループとは?
これまでは「1回だけスプレッドを判定して、条件を満たしていれば注文する」という単発型のBotでした。
今回のアップデートでは、while True: という無限ループ構文を使って、
スプレッドの状況を常に監視し続ける構成に進化させました。
🔧 処理の流れ
while True:
bid, ask = orderbook.get("bid"), orderbook.get("ask")
spread = (ask - bid) / bid
- 現在の**買値(bid)と売値(ask)**を取得し、
- その差からスプレッドを毎秒計算します。
✅ 条件を満たしたら発注
if spread > s_entry:
await place_limit_order(...)
await notify_slack(...)
await asyncio.sleep(interval)
- スプレッドが指定したしきい値(
s_entry)を超えたら、
BUYとSELLの指値注文を同時に実行。 - 発注後は少し休憩してから次の監視へ。
❌ 条件未達ならスキップ
else:
print("❌ Spread not wide enough. Waiting...")
await asyncio.sleep(1)
- スプレッドが狭すぎる場合は注文せず、1秒待って再チェック。
- これにより、ムダな発注を避けつつ、チャンスを逃さない構造になっています。
📌 ポイントまとめ
- ✅ 板の価格を毎秒監視
- ✅ チャンスが来たときだけ発注
- ✅ 注文後は待機→再監視へ戻る
このように、シンプルだけど実戦的な構造で、
Botが「ずっと市場を見張り続けてくれる」仕組みが完成しました。
実運用に向けた基礎として、とても大切なステップです。
🔍 実行結果のログ抜粋
⏳ waiting for best bid/ask... 📡 spread = 0.00000, bid = 84738.8, ask = 84738.9 ❌ Spread not wide enough. Waiting... 📡 spread = 0.00000, bid = 84738.8, ask = 84738.9 ❌ Spread not wide enough. Waiting... 📡 spread = 0.00000, bid = 84738.8, ask = 84738.9 ❌ Spread not wide enough. Waiting...
✅ 状況分析
- WebSocketで板情報取得 → 成功
- spread計算 → 正常(ただし狭すぎて条件未達)
- 条件に合わないため発注せず → 健全な動作!
- Ctrl+Cで中断した際の
CancelledError→ 想定通りの終了処理
🧠 なぜスプレッドが「0.00000」なのか?
これは BidとAskの差が0.1(=最小tick)しかない状態だから。
- bid: 84738.8
- ask: 84738.9
- spread = (ask - bid) / bid = 0.1 ÷ 84738.8 ≒
0.00000118
→ これは --s_entry に 0.0015 を設定していたら、ぜんぜん届いていません
✅ 試すべきこと:発注&Slack通知まで確認したいなら?
python MMbotbybit.py --s_entry 0.000001
とかにして、スプレッド条件を極端に緩くすれば即発注&Slack通知まで確認できます。
✅ ここまでで実現できたこと
| 項目 | 状態 |
|---|---|
| 板監視 | ✅ 毎秒自動更新で取得中 |
| spread判定 | ✅ 計算&条件チェック |
| 発注判断 | ✅ spread条件を超えたときだけ発注 |
| Slack通知 | ✅ 発注時に通知(spread成立時のみ) |
| 安定稼働 | ✅ 無限ループ監視で安定実行中 |
🔜 次に取り組む予定の強化ポイント
| フェーズ | 内容 |
|---|---|
| ✅ 検知強化 | 約定イベントの監視を追加(通ったか確認) |
| ✅ リスク管理 | 約定しない注文の自動キャンセル機能 |
| ✅ 記録 | PnL(損益)のログ出力&CSV記録 |
✍️ まとめ
常時監視ループを組み込むことで、Botが「ずっと板を見張り続ける」状態を実現できました。
手動で何度も実行し直す必要がなくなり、実運用に向けた下地としては十分な完成度に到達。
次のステップでは、「約定監視」や「失敗時のリカバリ」「ログ記録」などを加えて、
より堅牢で実戦的なBot構築へ進んでいきます。
MMbot、常時監視ループの実装まで達成。あとは約定イベントの監視、失敗注文自動キャンセル、約定履歴と損益のログ記録まで積んだらひとまず動かせそう。 pic.twitter.com/Rghmve3Rny
— よだか(夜鷹/yodaka) (@yodakablog) April 17, 2025
👇ラジオで話したこと
今回の開発記録は【Bybit自動取引Bot】シリーズの続編になります。
タイトルは――
**「常時監視ループを実装してみた」**です。
🎯 今回の目的
前回までで、**最小構成のMM Bot(マーケットメイキングBot)**はすでに完成していました。
でもその構成では、「一回だけスプレッドをチェック → 条件を満たしてたら注文」っていう単発動作だったんですね。
そこで今回は、Botが“ずっと板を見張り続ける”構成にアップデートしました。
🔁 どう変わったのか?
キーワードは 「while True:」。
これ、Pythonで無限ループを作るときによく使う構文です。
これを使って、スプレッドの状態を毎秒チェックして、
「条件を満たしたら即発注 → また監視に戻る」という動きを実現しました。
🔧 ざっくり構造を言うと:
while True:
bid, ask = orderbook.get("bid"), orderbook.get("ask")
spread = (ask - bid) / bid
if spread > s_entry:
await place_limit_order(...)
await notify_slack(...)
await asyncio.sleep(interval)
else:
print("❌ Spread not wide enough. Waiting...")
await asyncio.sleep(1)
──って感じです。
これでBotが板情報を監視→スプレッド計算→条件を満たせば発注→また監視に戻るという循環を自動でこなしてくれます。
📡 実際に動かしてみると…
⏳ waiting for best bid/ask... 📡 spread = 0.00000, bid = 84738.8, ask = 84738.9 ❌ Spread not wide enough. Waiting... 📡 spread = 0.00000, bid = 84738.8, ask = 84738.9 ❌ Spread not wide enough. Waiting...
というように、毎秒spreadを計算して、しきい値を超えるまで“待ち”の状態が続くわけです。
🤔 なんで spread が 0.00000 なの?
これは、BidとAskの差が小さすぎるから。
たとえば Bidが84738.8、Askが84738.9 だと、その差は0.1ドルしかありません。
(ask - bid) / bid ≒ 0.00000118
設定している --s_entry が 0.0015 だった場合は、全然足りないわけです。
🧪 実験したいときは?
すぐにSlack通知まで見たい場合は、--s_entry 0.000001 みたいに条件をめっちゃ緩くして試すと、即発注されて通知も飛んできます。
開発中はこの方法で動作確認しておくと安心です。
📌 今回実現できたこと
| 機能 | 状態 |
|---|---|
| 板の監視 | ✅ 毎秒自動更新で取得中 |
| spreadの計算 | ✅ 正常動作中 |
| 発注判断 | ✅ 条件を超えたときだけ実行 |
| Slack通知 | ✅ 発注時に通知あり |
| 安定稼働 | ✅ 無限ループ構成で安定 |
つまり、Botが“市場を監視し続ける”状態を完成させたってことですね。
🔜 次のステップは?
Botの動作をもっと信頼できるように、以下を強化していきます:
- ✅ 約定イベントの監視(注文が実際に通ったかの確認)
- ✅ 失敗注文の自動キャンセル
- ✅ 成約履歴やPnLのログ記録
このへんを実装していくことで、本番運用にも耐えられるBotの地盤が固まっていくはずです。
✍️ まとめ
というわけで、今回は「常時監視ループ」をテーマにお届けしました。
このループ処理があることで、Botは「人間が操作しなくても、チャンスが来たら勝手に動く」状態になります。
Botが板を見て、条件を判断して、必要な時だけ動く。
これ、めちゃくちゃ実用的なステップです。
今後の強化に向けても、この土台があることで応用が効く構成になりました。
「asyncioでループを書くとき、どこに気をつけるべきか?」
Pythonで非同期処理をするための仕組みが asyncio。
そして、今回みたいに while True: を使って「ずっと監視し続けるBot」を書くときに超重要になるのが、“ブロッキングを避ける”ことなんです。
☠️ やってはいけないこと:time.sleep()
非同期コードの中に、time.sleep() を書いてしまうと、それは**「完全に同期的なブロック」**になります。
つまり、その間は他の処理が一切動かない!
🔴 悪い例:
while True:
do_something()
time.sleep(1) # ここで他の処理が止まる
✅ 正しい例:
import asyncio
while True:
do_something()
await asyncio.sleep(1) # ノンブロッキングで待つ
🧠 注意1:ちゃんと await をつけよう
async関数の中で、await を忘れると非同期じゃなくなるので注意。
特に、複数のAPI呼び出しやIO処理(ファイル、ネットワークなど)を扱うときに、
await client.post(...)await notify_slack(...)await asyncio.sleep(...)
みたいに、ちゃんと待たせる処理には await をつけるのがルールです。
🔁 注意2:無限ループは“暴走”に注意
while True: って便利だけど、うっかりするとCPUを無限に使い続ける暴走ループになっちゃうこともあります。
なので、中には必ず「休憩処理」= await asyncio.sleep(x) を挟むのが鉄則。
# ダメな例(CPU爆走注意)
while True:
do_something() # めちゃ速で回り続ける
# OKな例
while True:
do_something()
await asyncio.sleep(1)
⚠️ 注意3:例外処理を忘れずに
非同期ループ中にエラーが起きると、Botがサイレントに止まってしまうことがあります。
それを防ぐために、最低限でも try / except を入れておきましょう。
while True:
try:
await do_something_async()
except Exception as e:
print(f"⚠️ エラー発生: {e}")
await asyncio.sleep(5) # 休憩してリトライ
これで、何か異常が起きてもBotが落ちずにリカバリできます。
🎯 まとめ:asyncioループで意識したい3つのこと
| 注意点 | 説明 |
|---|---|
awaitを忘れない | IOやsleepなど非同期処理には必ず await をつける |
| ループは“休ませる” | await asyncio.sleep() を使ってCPU暴走を防ぐ |
| 例外処理で守る | try/exceptで落ちないBotにする |
📌 このルールを守ることで、Botは「軽くて安定してて落ちにくい」構成になります。
実運用では1日中動かすことになるので、こういう基本の積み重ねが将来的な“信頼感”につながりますね。
現在のコード(ログ用)
import asyncio
import json
import aiohttp
import pybotters
from argparse import ArgumentParser
async def place_limit_order(client, symbol, side, qty, price):
"""指値注文を出し、orderIdを返す。"""
res = await client.post("/v5/order/create", data={
"category": "linear",
"symbol": symbol,
"side": side,
"orderType": "Limit",
"qty": str(qty),
"price": str(price),
"timeInForce": "GTC"
})
data = await res.json()
if data["retCode"] != 0:
raise RuntimeError(f"Order failed: {data}")
return data["result"]["orderId"]
async def wait_for_fill(client, symbol, order_id, timeout=30, interval=1):
"""注文の約定を一定時間ポーリングで待つ。FilledならTrue、timeoutでFalseを返す。"""
for _ in range(int(timeout / interval)):
res = await client.get("/v5/order/realtime", params={
"symbol": symbol,
"orderId": order_id
})
d = await res.json()
status = d["result"]["data"][0]["orderStatus"]
if status == "Filled":
return True
await asyncio.sleep(interval)
return False
async def notify_slack(webhook_url: str, message: str):
"""Slack Incoming Webhookへ通知を送信。"""
async with aiohttp.ClientSession() as session:
await session.post(webhook_url, json={"text": message})
async def get_orderbook(symbol, api_key_json, orderbook):
"""WebSocketでorderbookを更新し続ける。戻り値はRESTクライアント。"""
def onmessage(msg, ws):
if "data" not in msg:
return
data = msg["data"]
if data.get("a"):
orderbook["ask"] = float(data["a"][0][0])
if data.get("b"):
orderbook["bid"] = float(data["b"][0][0])
async with pybotters.Client(apis=api_key_json) as client:
await client.ws_connect(
"wss://stream.bybit.com/v5/public/linear",
send_json=[
{"op": "subscribe", "args": [f"orderbook.1.{symbol}"]}
],
hdlr_json=onmessage
)
# 板情報が揃うまで待機
while "bid" not in orderbook or "ask" not in orderbook:
await asyncio.sleep(1)
return client
async def run_loop(symbol, qty, s_entry, config, interval=10):
orderbook = {}
client = await get_orderbook(symbol, config["bybit"], orderbook)
while True:
bid, ask = orderbook.get("bid"), orderbook.get("ask")
if not bid or not ask:
await asyncio.sleep(1)
continue
spread = (ask - bid) / bid
print(f"📡 spread = {spread:.5f}, bid = {bid}, ask = {ask}")
if spread > s_entry:
print("🚀 spread condition met, sending orders...")
# 1) 指値発注
buy_price, sell_price = int(bid - 1), int(ask + 1)
try:
buy_id = await place_limit_order(client, symbol, "Buy", qty, buy_price)
sell_id = await place_limit_order(client, symbol, "Sell", qty, sell_price)
except Exception as e:
err_msg = f"❌ Order error: {e}"
await notify_slack(config["slack_webhook_url"], err_msg)
await asyncio.sleep(interval)
continue
# 2) 約定待ち
filled_buy = await wait_for_fill(client, symbol, buy_id)
filled_sell = await wait_for_fill(client, symbol, sell_id)
# 3) Slack通知
msg = (
f"📈 *MMBot Execution Report*\n"
f"> Pair: `{symbol}`\n"
f"> Spread: `{spread:.5f}`\n"
f"> 🟢 Buy @{buy_price} ({'Filled' if filled_buy else 'Pending'})\n"
f"> 🔴 Sell @{sell_price} ({'Filled' if filled_sell else 'Pending'})\n"
f"> Lot: `{qty}`"
)
await notify_slack(config["slack_webhook_url"], msg)
print("✅ Orders processed. Sleeping before next check...\n")
await asyncio.sleep(interval)
else:
print("❌ Spread not wide enough. Waiting...\n")
await asyncio.sleep(1)
def cli():
parser = ArgumentParser()
parser.add_argument("--config", default="config.json")
parser.add_argument("--symbol", default="BTCUSDT")
parser.add_argument("--lot", default=0.01, type=float)
parser.add_argument("--s_entry", default=0.000001C, type=float)
parser.add_argument("--interval", default=10, type=int)
return parser.parse_args()
if __name__ == "__main__":
args = cli()
with open(args.config) as f:
config = json.load(f)
asyncio.run(run_loop(
args.symbol,
args.lot,
args.s_entry,
config,
args.interval
))