Skip to content

Conversation

@leavesster
Copy link
Contributor

Summary

  • Replace CPU-spinning while True loop with threading.Event for blocking wait
  • Add optional timeout parameter to prevent indefinite hangs
  • Add unit tests for the new implementation

Problem

The original code used a busy-wait pattern that causes 100% CPU usage:

while True:
    if replay is not None:
        return replay

Solution

Use threading.Event for efficient blocking:

event = threading.Event()
event.wait(timeout=timeout)
return replay

Test Plan

  • Added tests/test_mainframe_notify.py with unit tests
  • All existing tests pass

The previous implementation used a while True loop that would spin the CPU
waiting for a response. This is replaced with threading.Event.wait() which
properly blocks without consuming CPU resources.

Changes:
- Add threading.Event to wait for message response
- Add optional timeout parameter to prevent indefinite blocking
- Raise TimeoutError with descriptive message on timeout
- Properly clean up subscription on timeout
- Add comprehensive unit tests for the new implementation
Copilot AI review requested due to automatic review settings January 31, 2026 09:26
@coderabbitai
Copy link

coderabbitai bot commented Jan 31, 2026

总体概览

本次变更修改了 Mainframe 类的 notify_block_ready 方法,增加了可选的超时参数,使用 threading.Event 实现非阻塞式等待机制,并添加了超时处理逻辑。同时新增了相应的单元测试。

变更汇总

变更类别 / 文件 变更说明
方法签名更新
oocana/oocana/mainframe.py
notify_block_ready 方法增加可选的 timeout 参数,并改为使用 threading.Event 进行事件协调。新增显式的超时错误处理:超时时执行取消订阅、移除回调,然后抛出 TimeoutError 异常。
单元测试套件
oocana/tests/test_mainframe_notify.py
新增完整的单元测试模块,涵盖成功消息处理、超时异常处理、订阅清理、消息发布验证和无自旋等待验证等多个场景。使用模拟对象替代真实 MQTT 客户端。

序列图

sequenceDiagram
    participant Caller as 调用方
    participant Mainframe as Mainframe<br/>(notify_block_ready)
    participant MQTT as MQTT<br/>客户端
    participant Event as threading<br/>.Event
    
    Caller->>Mainframe: notify_block_ready(session_id,<br/>job_id, timeout)
    Mainframe->>Event: 创建Event实例
    Mainframe->>MQTT: 订阅输入消息主题
    Mainframe->>MQTT: 发布BlockReady消息
    Mainframe->>Event: event.wait(timeout)
    
    alt 消息到达(超时前)
        MQTT->>Mainframe: 触发on_message_once<br/>回调
        Mainframe->>Mainframe: 存储消息负载
        Mainframe->>Event: event.set()
        Event-->>Mainframe: wait()返回True
        Mainframe->>MQTT: 取消订阅
        Mainframe->>MQTT: 移除消息回调
        Mainframe-->>Caller: 返回响应dict
    else 等待超时
        Event-->>Mainframe: wait()返回False
        Mainframe->>MQTT: 取消订阅
        Mainframe->>MQTT: 移除消息回调
        Mainframe-->>Caller: 抛出TimeoutError
    end
Loading

代码审查工作量评估

🎯 3 (中等) | ⏱️ ~20 分钟

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Description check ✅ Passed The pull request description is directly related to the changeset, providing clear context about the problem, solution, and test plan for the busy-wait replacement.
Title check ✅ Passed PR标题遵循规范的格式 <type>(<scope>): <subject>,其中type为"fix",scope为"oocana",subject清晰描述了主要改动(用threading.Event替代忙等待)。

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@oocana/oocana/mainframe.py`:
- Around line 127-131: The on_message_once callback currently only calls
self.client.unsubscribe(topic) on success, leaving the MQTT callback registered
and risking memory leaks; update the on_message_once function to also call
self.client.message_callback_remove(topic) (matching the timeout path cleanup)
before setting replay and event so the message callback is fully removed when a
message is received.
🧹 Nitpick comments (1)
oocana/tests/test_mainframe_notify.py (1)

31-58: 测试逻辑正确,建议优化 import 位置。

测试正确模拟了异步回调行为。simplejson 在多个测试方法中重复导入(第 42、83、101、113 行),可以考虑移至文件顶部以提高一致性。

关于静态分析提示的 *args, **kwargs 未使用问题:这些参数是 side_effect 签名匹配所必需的,可以安全忽略。

♻️ 建议将 simplejson 导入移至文件顶部
 import unittest
 from unittest.mock import MagicMock, patch, PropertyMock
 import threading
 import time
+import simplejson
 from oocana import Mainframe

然后在各测试方法中移除 import simplejson 语句。

Comment on lines 127 to +131
def on_message_once(_client, _userdata, message):
nonlocal replay
self.client.unsubscribe(topic)
replay = loads(message.payload)
event.set()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

成功路径可能存在回调清理不完整的问题。

on_message_once 回调中,成功接收消息后只调用了 unsubscribe(topic),但没有调用 message_callback_remove(topic)。相比之下,超时路径(第 147-148 行)同时调用了两者。

虽然取消订阅后不会再收到新消息,但残留的回调引用可能导致内存泄漏(如果频繁调用此方法)。建议保持一致性。

🛡️ 建议在成功路径也清理回调
         def on_message_once(_client, _userdata, message):
             nonlocal replay
             self.client.unsubscribe(topic)
+            self.client.message_callback_remove(topic)
             replay = loads(message.payload)
             event.set()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def on_message_once(_client, _userdata, message):
nonlocal replay
self.client.unsubscribe(topic)
replay = loads(message.payload)
event.set()
def on_message_once(_client, _userdata, message):
nonlocal replay
self.client.unsubscribe(topic)
self.client.message_callback_remove(topic)
replay = loads(message.payload)
event.set()
🤖 Prompt for AI Agents
In `@oocana/oocana/mainframe.py` around lines 127 - 131, The on_message_once
callback currently only calls self.client.unsubscribe(topic) on success, leaving
the MQTT callback registered and risking memory leaks; update the
on_message_once function to also call self.client.message_callback_remove(topic)
(matching the timeout path cleanup) before setting replay and event so the
message callback is fully removed when a message is received.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request replaces a busy-wait loop with threading.Event in the notify_block_ready method to eliminate CPU spinning while waiting for MQTT messages. The change adds an optional timeout parameter and includes unit tests to verify the new behavior.

Changes:

  • Modified notify_block_ready to use threading.Event.wait() instead of a busy-wait while True loop
  • Added optional timeout parameter with proper error handling
  • Created comprehensive unit tests in test_mainframe_notify.py

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.

File Description
oocana/oocana/mainframe.py Replaced busy-wait with threading.Event, added timeout parameter and TimeoutError handling
oocana/tests/test_mainframe_notify.py Added new test file with 5 test cases covering timeout, cleanup, message receipt, and CPU usage

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +75 to +92
def test_notify_block_ready_unsubscribes_on_success(self):
"""Test that the topic is unsubscribed after successful message receipt."""
session_id = 'test-session'
job_id = 'test-job'
expected_topic = f"inputs/{session_id}/{job_id}"

def trigger_callback(*args, **kwargs):
callback = self.mock_client.message_callback_add.call_args[0][1]
import simplejson
mock_message = MockMessage(simplejson.dumps({}).encode())
callback(None, None, mock_message)

self.mock_client.publish.side_effect = trigger_callback

self.mainframe.notify_block_ready(session_id, job_id, timeout=5.0)

# Verify unsubscribe was called with the correct topic
self.mock_client.unsubscribe.assert_called_with(expected_topic)
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test verifies that unsubscribe is called, but it doesn't verify that message_callback_remove is also called on success. Since the callback in the main code unsubscribes on line 129, and the cleanup code on timeout calls both unsubscribe and message_callback_remove (lines 147-148), there's inconsistency. The test should verify that message_callback_remove is called after successful message receipt to ensure proper cleanup.

Copilot uses AI. Check for mistakes.
Comment on lines 127 to +131
def on_message_once(_client, _userdata, message):
nonlocal replay
self.client.unsubscribe(topic)
replay = loads(message.payload)
event.set()
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition in this implementation. The callback can execute and set the event before event.wait() is called, but more critically, if the callback executes after the timeout (line 142-149), the code will attempt to unsubscribe twice - once in the callback (line 129) and once in the timeout handler (line 147). This can lead to errors or unexpected behavior.

Consider refactoring to ensure cleanup only happens once, or move the unsubscribe logic outside the callback to the cleanup section only.

Copilot uses AI. Check for mistakes.
Comment on lines 129 to +131
self.client.unsubscribe(topic)
replay = loads(message.payload)
event.set()
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is inconsistent cleanup behavior. On successful message receipt, the callback unsubscribes (line 129) but does not call message_callback_remove, potentially leaving the callback registered. On timeout, both unsubscribe and message_callback_remove are called (lines 147-148). This inconsistency could lead to memory leaks or unexpected behavior if the callback remains registered after a successful message receipt.

The cleanup should be consistent in both paths - either remove the unsubscribe from the callback and handle all cleanup after the wait, or add message_callback_remove to the callback as well.

Copilot uses AI. Check for mistakes.
Comment on lines +42 to +113
import simplejson
mock_message = MockMessage(simplejson.dumps(expected_payload).encode())
# Trigger the callback in a separate thread
callback(None, None, mock_message)

# Make subscribe trigger the callback after a short delay
def delayed_trigger(*args, **kwargs):
timer = threading.Timer(0.1, trigger_callback)
timer.start()

self.mock_client.publish.side_effect = delayed_trigger

result = self.mainframe.notify_block_ready(session_id, job_id, timeout=5.0)

self.assertEqual(result, expected_payload)
self.mock_client.subscribe.assert_called_once()
self.mock_client.publish.assert_called_once()

def test_notify_block_ready_timeout(self):
"""Test that notify_block_ready raises TimeoutError on timeout."""
session_id = 'test-session'
job_id = 'test-job'

# Don't trigger any callback, let it timeout
with self.assertRaises(TimeoutError) as context:
self.mainframe.notify_block_ready(session_id, job_id, timeout=0.1)

self.assertIn(session_id, str(context.exception))
self.assertIn(job_id, str(context.exception))
# Verify cleanup was called
self.mock_client.unsubscribe.assert_called()
self.mock_client.message_callback_remove.assert_called()

def test_notify_block_ready_unsubscribes_on_success(self):
"""Test that the topic is unsubscribed after successful message receipt."""
session_id = 'test-session'
job_id = 'test-job'
expected_topic = f"inputs/{session_id}/{job_id}"

def trigger_callback(*args, **kwargs):
callback = self.mock_client.message_callback_add.call_args[0][1]
import simplejson
mock_message = MockMessage(simplejson.dumps({}).encode())
callback(None, None, mock_message)

self.mock_client.publish.side_effect = trigger_callback

self.mainframe.notify_block_ready(session_id, job_id, timeout=5.0)

# Verify unsubscribe was called with the correct topic
self.mock_client.unsubscribe.assert_called_with(expected_topic)

def test_notify_block_ready_publishes_correct_message(self):
"""Test that the correct BlockReady message is published."""
session_id = 'test-session'
job_id = 'test-job'

def trigger_callback(*args, **kwargs):
callback = self.mock_client.message_callback_add.call_args[0][1]
import simplejson
mock_message = MockMessage(simplejson.dumps({}).encode())
callback(None, None, mock_message)

self.mock_client.publish.side_effect = trigger_callback

self.mainframe.notify_block_ready(session_id, job_id, timeout=5.0)

# Check that publish was called with correct topic and payload
publish_call = self.mock_client.publish.call_args
self.assertEqual(publish_call[0][0], f"session/{session_id}")

import simplejson
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The simplejson import is repeated inside multiple function scopes. Since simplejson is already used in the module (lines 42, 83, 101, 113), it would be more conventional and cleaner to import it at the module level at the top of the file, similar to how other test files in the project import dependencies.

Copilot uses AI. Check for mistakes.
@@ -0,0 +1,140 @@
import unittest
from unittest.mock import MagicMock, patch, PropertyMock
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PropertyMock is imported but never used in the test file. Consider removing this unused import to keep the code clean.

Suggested change
from unittest.mock import MagicMock, patch, PropertyMock
from unittest.mock import MagicMock, patch

Copilot uses AI. Check for mistakes.
# Use a short timeout to verify it actually waits
try:
self.mainframe.notify_block_ready(session_id, job_id, timeout=0.2)
except TimeoutError:
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
except TimeoutError:
except TimeoutError:
# TimeoutError is expected here; we only care about measuring elapsed time.

Copilot uses AI. Check for mistakes.
@leavesster leavesster changed the title fix: replace busy-wait with threading.Event in notify_block_ready fix(oocana): replace busy-wait with threading.Event in notify_block_ready Jan 31, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants