-
Notifications
You must be signed in to change notification settings - Fork 1
fix(oocana): replace busy-wait with threading.Event in notify_block_ready #452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| import operator | ||
| from urllib.parse import urlparse | ||
| import uuid | ||
| import threading | ||
| from .data import BlockDict, JobDict, dumps, EXECUTOR_NAME | ||
| import logging | ||
| from typing import Optional, Callable, Any | ||
|
|
@@ -104,15 +105,30 @@ def notify_executor_ready(self, session_id: str, package: str | None, identifier | |
| "debug_port": debug_port, | ||
| }), qos=1) | ||
|
|
||
| def notify_block_ready(self, session_id: str, job_id: str) -> dict: | ||
| def notify_block_ready(self, session_id: str, job_id: str, timeout: Optional[float] = None) -> dict: | ||
| """ | ||
| Notify that a block is ready and wait for input message. | ||
|
|
||
| Args: | ||
| session_id: The session ID | ||
| job_id: The job ID | ||
| timeout: Optional timeout in seconds. If None, wait indefinitely. | ||
|
|
||
| Returns: | ||
| The input message payload as a dict | ||
|
|
||
| Raises: | ||
| TimeoutError: If timeout is specified and no message is received within the timeout | ||
| """ | ||
| topic = f"inputs/{session_id}/{job_id}" | ||
| replay = None | ||
| event = threading.Event() | ||
|
|
||
| def on_message_once(_client, _userdata, message): | ||
| nonlocal replay | ||
| self.client.unsubscribe(topic) | ||
| replay = loads(message.payload) | ||
| event.set() | ||
|
Comment on lines
127
to
+131
|
||
|
|
||
| self.client.subscribe(topic, qos=1) | ||
| self.client.message_callback_add(topic, on_message_once) | ||
|
|
@@ -123,10 +139,14 @@ def on_message_once(_client, _userdata, message): | |
| "job_id": job_id, | ||
| }), qos=1) | ||
|
|
||
| while True: | ||
| if replay is not None: | ||
| self._logger.info("notify ready success in {} {}".format(session_id, job_id)) | ||
| return replay | ||
| if event.wait(timeout=timeout): | ||
| self._logger.info("notify ready success in {} {}".format(session_id, job_id)) | ||
| return replay # type: ignore | ||
| else: | ||
| # Timeout occurred, clean up subscription | ||
| self.client.unsubscribe(topic) | ||
| self.client.message_callback_remove(topic) | ||
| raise TimeoutError(f"Timeout waiting for block ready response in session {session_id}, job {job_id}") | ||
|
|
||
| def add_request_response_callback(self, session_id: str, request_id: str, callback: Callable[[Any], Any]): | ||
| """Add a callback to be called when an error occurs while running a block.""" | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,140 @@ | ||||||||
| import unittest | ||||||||
| from unittest.mock import MagicMock, patch, PropertyMock | ||||||||
|
||||||||
| from unittest.mock import MagicMock, patch, PropertyMock | |
| from unittest.mock import MagicMock, patch |
Copilot
AI
Jan 31, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test verifies that unsubscribe is called, but it doesn't verify that message_callback_remove is also called on success. Since the callback in the main code unsubscribes on line 129, and the cleanup code on timeout calls both unsubscribe and message_callback_remove (lines 147-148), there's inconsistency. The test should verify that message_callback_remove is called after successful message receipt to ensure proper cleanup.
Copilot
AI
Jan 31, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The simplejson import is repeated inside multiple function scopes. Since simplejson is already used in the module (lines 42, 83, 101, 113), it would be more conventional and cleaner to import it at the module level at the top of the file, similar to how other test files in the project import dependencies.
Copilot
AI
Jan 31, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'except' clause does nothing but pass and there is no explanatory comment.
| except TimeoutError: | |
| except TimeoutError: | |
| # TimeoutError is expected here; we only care about measuring elapsed time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
成功路径可能存在回调清理不完整的问题。
在
on_message_once回调中,成功接收消息后只调用了unsubscribe(topic),但没有调用message_callback_remove(topic)。相比之下,超时路径(第 147-148 行)同时调用了两者。虽然取消订阅后不会再收到新消息,但残留的回调引用可能导致内存泄漏(如果频繁调用此方法)。建议保持一致性。
🛡️ 建议在成功路径也清理回调
def on_message_once(_client, _userdata, message): nonlocal replay self.client.unsubscribe(topic) + self.client.message_callback_remove(topic) replay = loads(message.payload) event.set()📝 Committable suggestion
🤖 Prompt for AI Agents