-
Notifications
You must be signed in to change notification settings - Fork 1
fix(oocana): replace busy-wait with threading.Event in notify_block_ready #452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
The previous implementation used a while True loop that would spin the CPU waiting for a response. This is replaced with threading.Event.wait() which properly blocks without consuming CPU resources. Changes: - Add threading.Event to wait for message response - Add optional timeout parameter to prevent indefinite blocking - Raise TimeoutError with descriptive message on timeout - Properly clean up subscription on timeout - Add comprehensive unit tests for the new implementation
总体概览本次变更修改了 变更汇总
序列图sequenceDiagram
participant Caller as 调用方
participant Mainframe as Mainframe<br/>(notify_block_ready)
participant MQTT as MQTT<br/>客户端
participant Event as threading<br/>.Event
Caller->>Mainframe: notify_block_ready(session_id,<br/>job_id, timeout)
Mainframe->>Event: 创建Event实例
Mainframe->>MQTT: 订阅输入消息主题
Mainframe->>MQTT: 发布BlockReady消息
Mainframe->>Event: event.wait(timeout)
alt 消息到达(超时前)
MQTT->>Mainframe: 触发on_message_once<br/>回调
Mainframe->>Mainframe: 存储消息负载
Mainframe->>Event: event.set()
Event-->>Mainframe: wait()返回True
Mainframe->>MQTT: 取消订阅
Mainframe->>MQTT: 移除消息回调
Mainframe-->>Caller: 返回响应dict
else 等待超时
Event-->>Mainframe: wait()返回False
Mainframe->>MQTT: 取消订阅
Mainframe->>MQTT: 移除消息回调
Mainframe-->>Caller: 抛出TimeoutError
end
代码审查工作量评估🎯 3 (中等) | ⏱️ ~20 分钟 🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@oocana/oocana/mainframe.py`:
- Around line 127-131: The on_message_once callback currently only calls
self.client.unsubscribe(topic) on success, leaving the MQTT callback registered
and risking memory leaks; update the on_message_once function to also call
self.client.message_callback_remove(topic) (matching the timeout path cleanup)
before setting replay and event so the message callback is fully removed when a
message is received.
🧹 Nitpick comments (1)
oocana/tests/test_mainframe_notify.py (1)
31-58: 测试逻辑正确,建议优化 import 位置。测试正确模拟了异步回调行为。
simplejson在多个测试方法中重复导入(第 42、83、101、113 行),可以考虑移至文件顶部以提高一致性。关于静态分析提示的
*args, **kwargs未使用问题:这些参数是side_effect签名匹配所必需的,可以安全忽略。♻️ 建议将 simplejson 导入移至文件顶部
import unittest from unittest.mock import MagicMock, patch, PropertyMock import threading import time +import simplejson from oocana import Mainframe然后在各测试方法中移除
import simplejson语句。
| def on_message_once(_client, _userdata, message): | ||
| nonlocal replay | ||
| self.client.unsubscribe(topic) | ||
| replay = loads(message.payload) | ||
| event.set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
成功路径可能存在回调清理不完整的问题。
在 on_message_once 回调中,成功接收消息后只调用了 unsubscribe(topic),但没有调用 message_callback_remove(topic)。相比之下,超时路径(第 147-148 行)同时调用了两者。
虽然取消订阅后不会再收到新消息,但残留的回调引用可能导致内存泄漏(如果频繁调用此方法)。建议保持一致性。
🛡️ 建议在成功路径也清理回调
def on_message_once(_client, _userdata, message):
nonlocal replay
self.client.unsubscribe(topic)
+ self.client.message_callback_remove(topic)
replay = loads(message.payload)
event.set()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def on_message_once(_client, _userdata, message): | |
| nonlocal replay | |
| self.client.unsubscribe(topic) | |
| replay = loads(message.payload) | |
| event.set() | |
| def on_message_once(_client, _userdata, message): | |
| nonlocal replay | |
| self.client.unsubscribe(topic) | |
| self.client.message_callback_remove(topic) | |
| replay = loads(message.payload) | |
| event.set() |
🤖 Prompt for AI Agents
In `@oocana/oocana/mainframe.py` around lines 127 - 131, The on_message_once
callback currently only calls self.client.unsubscribe(topic) on success, leaving
the MQTT callback registered and risking memory leaks; update the
on_message_once function to also call self.client.message_callback_remove(topic)
(matching the timeout path cleanup) before setting replay and event so the
message callback is fully removed when a message is received.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request replaces a busy-wait loop with threading.Event in the notify_block_ready method to eliminate CPU spinning while waiting for MQTT messages. The change adds an optional timeout parameter and includes unit tests to verify the new behavior.
Changes:
- Modified
notify_block_readyto usethreading.Event.wait()instead of a busy-waitwhile Trueloop - Added optional
timeoutparameter with proper error handling - Created comprehensive unit tests in
test_mainframe_notify.py
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| oocana/oocana/mainframe.py | Replaced busy-wait with threading.Event, added timeout parameter and TimeoutError handling |
| oocana/tests/test_mainframe_notify.py | Added new test file with 5 test cases covering timeout, cleanup, message receipt, and CPU usage |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def test_notify_block_ready_unsubscribes_on_success(self): | ||
| """Test that the topic is unsubscribed after successful message receipt.""" | ||
| session_id = 'test-session' | ||
| job_id = 'test-job' | ||
| expected_topic = f"inputs/{session_id}/{job_id}" | ||
|
|
||
| def trigger_callback(*args, **kwargs): | ||
| callback = self.mock_client.message_callback_add.call_args[0][1] | ||
| import simplejson | ||
| mock_message = MockMessage(simplejson.dumps({}).encode()) | ||
| callback(None, None, mock_message) | ||
|
|
||
| self.mock_client.publish.side_effect = trigger_callback | ||
|
|
||
| self.mainframe.notify_block_ready(session_id, job_id, timeout=5.0) | ||
|
|
||
| # Verify unsubscribe was called with the correct topic | ||
| self.mock_client.unsubscribe.assert_called_with(expected_topic) |
Copilot
AI
Jan 31, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test verifies that unsubscribe is called, but it doesn't verify that message_callback_remove is also called on success. Since the callback in the main code unsubscribes on line 129, and the cleanup code on timeout calls both unsubscribe and message_callback_remove (lines 147-148), there's inconsistency. The test should verify that message_callback_remove is called after successful message receipt to ensure proper cleanup.
| def on_message_once(_client, _userdata, message): | ||
| nonlocal replay | ||
| self.client.unsubscribe(topic) | ||
| replay = loads(message.payload) | ||
| event.set() |
Copilot
AI
Jan 31, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race condition in this implementation. The callback can execute and set the event before event.wait() is called, but more critically, if the callback executes after the timeout (line 142-149), the code will attempt to unsubscribe twice - once in the callback (line 129) and once in the timeout handler (line 147). This can lead to errors or unexpected behavior.
Consider refactoring to ensure cleanup only happens once, or move the unsubscribe logic outside the callback to the cleanup section only.
| self.client.unsubscribe(topic) | ||
| replay = loads(message.payload) | ||
| event.set() |
Copilot
AI
Jan 31, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is inconsistent cleanup behavior. On successful message receipt, the callback unsubscribes (line 129) but does not call message_callback_remove, potentially leaving the callback registered. On timeout, both unsubscribe and message_callback_remove are called (lines 147-148). This inconsistency could lead to memory leaks or unexpected behavior if the callback remains registered after a successful message receipt.
The cleanup should be consistent in both paths - either remove the unsubscribe from the callback and handle all cleanup after the wait, or add message_callback_remove to the callback as well.
| import simplejson | ||
| mock_message = MockMessage(simplejson.dumps(expected_payload).encode()) | ||
| # Trigger the callback in a separate thread | ||
| callback(None, None, mock_message) | ||
|
|
||
| # Make subscribe trigger the callback after a short delay | ||
| def delayed_trigger(*args, **kwargs): | ||
| timer = threading.Timer(0.1, trigger_callback) | ||
| timer.start() | ||
|
|
||
| self.mock_client.publish.side_effect = delayed_trigger | ||
|
|
||
| result = self.mainframe.notify_block_ready(session_id, job_id, timeout=5.0) | ||
|
|
||
| self.assertEqual(result, expected_payload) | ||
| self.mock_client.subscribe.assert_called_once() | ||
| self.mock_client.publish.assert_called_once() | ||
|
|
||
| def test_notify_block_ready_timeout(self): | ||
| """Test that notify_block_ready raises TimeoutError on timeout.""" | ||
| session_id = 'test-session' | ||
| job_id = 'test-job' | ||
|
|
||
| # Don't trigger any callback, let it timeout | ||
| with self.assertRaises(TimeoutError) as context: | ||
| self.mainframe.notify_block_ready(session_id, job_id, timeout=0.1) | ||
|
|
||
| self.assertIn(session_id, str(context.exception)) | ||
| self.assertIn(job_id, str(context.exception)) | ||
| # Verify cleanup was called | ||
| self.mock_client.unsubscribe.assert_called() | ||
| self.mock_client.message_callback_remove.assert_called() | ||
|
|
||
| def test_notify_block_ready_unsubscribes_on_success(self): | ||
| """Test that the topic is unsubscribed after successful message receipt.""" | ||
| session_id = 'test-session' | ||
| job_id = 'test-job' | ||
| expected_topic = f"inputs/{session_id}/{job_id}" | ||
|
|
||
| def trigger_callback(*args, **kwargs): | ||
| callback = self.mock_client.message_callback_add.call_args[0][1] | ||
| import simplejson | ||
| mock_message = MockMessage(simplejson.dumps({}).encode()) | ||
| callback(None, None, mock_message) | ||
|
|
||
| self.mock_client.publish.side_effect = trigger_callback | ||
|
|
||
| self.mainframe.notify_block_ready(session_id, job_id, timeout=5.0) | ||
|
|
||
| # Verify unsubscribe was called with the correct topic | ||
| self.mock_client.unsubscribe.assert_called_with(expected_topic) | ||
|
|
||
| def test_notify_block_ready_publishes_correct_message(self): | ||
| """Test that the correct BlockReady message is published.""" | ||
| session_id = 'test-session' | ||
| job_id = 'test-job' | ||
|
|
||
| def trigger_callback(*args, **kwargs): | ||
| callback = self.mock_client.message_callback_add.call_args[0][1] | ||
| import simplejson | ||
| mock_message = MockMessage(simplejson.dumps({}).encode()) | ||
| callback(None, None, mock_message) | ||
|
|
||
| self.mock_client.publish.side_effect = trigger_callback | ||
|
|
||
| self.mainframe.notify_block_ready(session_id, job_id, timeout=5.0) | ||
|
|
||
| # Check that publish was called with correct topic and payload | ||
| publish_call = self.mock_client.publish.call_args | ||
| self.assertEqual(publish_call[0][0], f"session/{session_id}") | ||
|
|
||
| import simplejson |
Copilot
AI
Jan 31, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The simplejson import is repeated inside multiple function scopes. Since simplejson is already used in the module (lines 42, 83, 101, 113), it would be more conventional and cleaner to import it at the module level at the top of the file, similar to how other test files in the project import dependencies.
| @@ -0,0 +1,140 @@ | |||
| import unittest | |||
| from unittest.mock import MagicMock, patch, PropertyMock | |||
Copilot
AI
Jan 31, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PropertyMock is imported but never used in the test file. Consider removing this unused import to keep the code clean.
| from unittest.mock import MagicMock, patch, PropertyMock | |
| from unittest.mock import MagicMock, patch |
| # Use a short timeout to verify it actually waits | ||
| try: | ||
| self.mainframe.notify_block_ready(session_id, job_id, timeout=0.2) | ||
| except TimeoutError: |
Copilot
AI
Jan 31, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'except' clause does nothing but pass and there is no explanatory comment.
| except TimeoutError: | |
| except TimeoutError: | |
| # TimeoutError is expected here; we only care about measuring elapsed time. |
Summary
while Trueloop withthreading.Eventfor blocking waittimeoutparameter to prevent indefinite hangsProblem
The original code used a busy-wait pattern that causes 100% CPU usage:
Solution
Use
threading.Eventfor efficient blocking:Test Plan
tests/test_mainframe_notify.pywith unit tests