Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions oocana/oocana/mainframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import operator
from urllib.parse import urlparse
import uuid
import threading
from .data import BlockDict, JobDict, dumps, EXECUTOR_NAME
import logging
from typing import Optional, Callable, Any
Expand Down Expand Up @@ -104,15 +105,30 @@ def notify_executor_ready(self, session_id: str, package: str | None, identifier
"debug_port": debug_port,
}), qos=1)

def notify_block_ready(self, session_id: str, job_id: str) -> dict:
def notify_block_ready(self, session_id: str, job_id: str, timeout: Optional[float] = None) -> dict:
"""
Notify that a block is ready and wait for input message.

Args:
session_id: The session ID
job_id: The job ID
timeout: Optional timeout in seconds. If None, wait indefinitely.

Returns:
The input message payload as a dict

Raises:
TimeoutError: If timeout is specified and no message is received within the timeout
"""
topic = f"inputs/{session_id}/{job_id}"
replay = None
event = threading.Event()

def on_message_once(_client, _userdata, message):
nonlocal replay
self.client.unsubscribe(topic)
replay = loads(message.payload)
event.set()
Comment on lines 127 to +131
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

成功路径可能存在回调清理不完整的问题。

on_message_once 回调中,成功接收消息后只调用了 unsubscribe(topic),但没有调用 message_callback_remove(topic)。相比之下,超时路径(第 147-148 行)同时调用了两者。

虽然取消订阅后不会再收到新消息,但残留的回调引用可能导致内存泄漏(如果频繁调用此方法)。建议保持一致性。

🛡️ 建议在成功路径也清理回调
         def on_message_once(_client, _userdata, message):
             nonlocal replay
             self.client.unsubscribe(topic)
+            self.client.message_callback_remove(topic)
             replay = loads(message.payload)
             event.set()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def on_message_once(_client, _userdata, message):
nonlocal replay
self.client.unsubscribe(topic)
replay = loads(message.payload)
event.set()
def on_message_once(_client, _userdata, message):
nonlocal replay
self.client.unsubscribe(topic)
self.client.message_callback_remove(topic)
replay = loads(message.payload)
event.set()
🤖 Prompt for AI Agents
In `@oocana/oocana/mainframe.py` around lines 127 - 131, The on_message_once
callback currently only calls self.client.unsubscribe(topic) on success, leaving
the MQTT callback registered and risking memory leaks; update the
on_message_once function to also call self.client.message_callback_remove(topic)
(matching the timeout path cleanup) before setting replay and event so the
message callback is fully removed when a message is received.

Comment on lines 127 to +131
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition in this implementation. The callback can execute and set the event before event.wait() is called, but more critically, if the callback executes after the timeout (line 142-149), the code will attempt to unsubscribe twice - once in the callback (line 129) and once in the timeout handler (line 147). This can lead to errors or unexpected behavior.

Consider refactoring to ensure cleanup only happens once, or move the unsubscribe logic outside the callback to the cleanup section only.

Copilot uses AI. Check for mistakes.
Comment on lines 129 to +131
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is inconsistent cleanup behavior. On successful message receipt, the callback unsubscribes (line 129) but does not call message_callback_remove, potentially leaving the callback registered. On timeout, both unsubscribe and message_callback_remove are called (lines 147-148). This inconsistency could lead to memory leaks or unexpected behavior if the callback remains registered after a successful message receipt.

The cleanup should be consistent in both paths - either remove the unsubscribe from the callback and handle all cleanup after the wait, or add message_callback_remove to the callback as well.

Copilot uses AI. Check for mistakes.

self.client.subscribe(topic, qos=1)
self.client.message_callback_add(topic, on_message_once)
Expand All @@ -123,10 +139,14 @@ def on_message_once(_client, _userdata, message):
"job_id": job_id,
}), qos=1)

while True:
if replay is not None:
self._logger.info("notify ready success in {} {}".format(session_id, job_id))
return replay
if event.wait(timeout=timeout):
self._logger.info("notify ready success in {} {}".format(session_id, job_id))
return replay # type: ignore
else:
# Timeout occurred, clean up subscription
self.client.unsubscribe(topic)
self.client.message_callback_remove(topic)
raise TimeoutError(f"Timeout waiting for block ready response in session {session_id}, job {job_id}")

def add_request_response_callback(self, session_id: str, request_id: str, callback: Callable[[Any], Any]):
"""Add a callback to be called when an error occurs while running a block."""
Expand Down
140 changes: 140 additions & 0 deletions oocana/tests/test_mainframe_notify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import unittest
from unittest.mock import MagicMock, patch, PropertyMock
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PropertyMock is imported but never used in the test file. Consider removing this unused import to keep the code clean.

Suggested change
from unittest.mock import MagicMock, patch, PropertyMock
from unittest.mock import MagicMock, patch

Copilot uses AI. Check for mistakes.
import threading
import time
from oocana import Mainframe


class MockMessage:
"""Mock MQTT message for testing."""
def __init__(self, payload: bytes):
self.payload = payload


class TestNotifyBlockReady(unittest.TestCase):
"""Test cases for Mainframe.notify_block_ready method."""

def setUp(self):
# Patch the mqtt client to avoid real network connections
self.mock_client_patcher = patch('paho.mqtt.client.Client')
self.mock_client_class = self.mock_client_patcher.start()
self.mock_client = MagicMock()
self.mock_client_class.return_value = self.mock_client
self.mock_client.is_connected.return_value = True

self.mainframe = Mainframe('mqtt://localhost:1883')
self.mainframe.client = self.mock_client

def tearDown(self):
self.mock_client_patcher.stop()

def test_notify_block_ready_receives_response(self):
"""Test that notify_block_ready correctly waits for and returns response."""
session_id = 'test-session'
job_id = 'test-job'
expected_payload = {'inputs': {'key': 'value'}}

# Simulate message callback being triggered
def trigger_callback(*args, **kwargs):
# Get the callback that was registered
callback = self.mock_client.message_callback_add.call_args[0][1]
# Create a mock message
import simplejson
mock_message = MockMessage(simplejson.dumps(expected_payload).encode())
# Trigger the callback in a separate thread
callback(None, None, mock_message)

# Make subscribe trigger the callback after a short delay
def delayed_trigger(*args, **kwargs):
timer = threading.Timer(0.1, trigger_callback)
timer.start()

self.mock_client.publish.side_effect = delayed_trigger

result = self.mainframe.notify_block_ready(session_id, job_id, timeout=5.0)

self.assertEqual(result, expected_payload)
self.mock_client.subscribe.assert_called_once()
self.mock_client.publish.assert_called_once()

def test_notify_block_ready_timeout(self):
"""Test that notify_block_ready raises TimeoutError on timeout."""
session_id = 'test-session'
job_id = 'test-job'

# Don't trigger any callback, let it timeout
with self.assertRaises(TimeoutError) as context:
self.mainframe.notify_block_ready(session_id, job_id, timeout=0.1)

self.assertIn(session_id, str(context.exception))
self.assertIn(job_id, str(context.exception))
# Verify cleanup was called
self.mock_client.unsubscribe.assert_called()
self.mock_client.message_callback_remove.assert_called()

def test_notify_block_ready_unsubscribes_on_success(self):
"""Test that the topic is unsubscribed after successful message receipt."""
session_id = 'test-session'
job_id = 'test-job'
expected_topic = f"inputs/{session_id}/{job_id}"

def trigger_callback(*args, **kwargs):
callback = self.mock_client.message_callback_add.call_args[0][1]
import simplejson
mock_message = MockMessage(simplejson.dumps({}).encode())
callback(None, None, mock_message)

self.mock_client.publish.side_effect = trigger_callback

self.mainframe.notify_block_ready(session_id, job_id, timeout=5.0)

# Verify unsubscribe was called with the correct topic
self.mock_client.unsubscribe.assert_called_with(expected_topic)
Comment on lines +75 to +92
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test verifies that unsubscribe is called, but it doesn't verify that message_callback_remove is also called on success. Since the callback in the main code unsubscribes on line 129, and the cleanup code on timeout calls both unsubscribe and message_callback_remove (lines 147-148), there's inconsistency. The test should verify that message_callback_remove is called after successful message receipt to ensure proper cleanup.

Copilot uses AI. Check for mistakes.

def test_notify_block_ready_publishes_correct_message(self):
"""Test that the correct BlockReady message is published."""
session_id = 'test-session'
job_id = 'test-job'

def trigger_callback(*args, **kwargs):
callback = self.mock_client.message_callback_add.call_args[0][1]
import simplejson
mock_message = MockMessage(simplejson.dumps({}).encode())
callback(None, None, mock_message)

self.mock_client.publish.side_effect = trigger_callback

self.mainframe.notify_block_ready(session_id, job_id, timeout=5.0)

# Check that publish was called with correct topic and payload
publish_call = self.mock_client.publish.call_args
self.assertEqual(publish_call[0][0], f"session/{session_id}")

import simplejson
Comment on lines +42 to +113
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The simplejson import is repeated inside multiple function scopes. Since simplejson is already used in the module (lines 42, 83, 101, 113), it would be more conventional and cleaner to import it at the module level at the top of the file, similar to how other test files in the project import dependencies.

Copilot uses AI. Check for mistakes.
payload = simplejson.loads(publish_call[0][1])
self.assertEqual(payload['type'], 'BlockReady')
self.assertEqual(payload['session_id'], session_id)
self.assertEqual(payload['job_id'], job_id)

def test_notify_block_ready_no_cpu_spin(self):
"""Test that notify_block_ready does not spin CPU while waiting."""
session_id = 'test-session'
job_id = 'test-job'

start_time = time.time()

# Use a short timeout to verify it actually waits
try:
self.mainframe.notify_block_ready(session_id, job_id, timeout=0.2)
except TimeoutError:
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
except TimeoutError:
except TimeoutError:
# TimeoutError is expected here; we only care about measuring elapsed time.

Copilot uses AI. Check for mistakes.
pass

elapsed_time = time.time() - start_time

# If it was busy-waiting, it would return almost immediately
# With Event.wait(), it should wait close to the timeout
self.assertGreaterEqual(elapsed_time, 0.15)


if __name__ == '__main__':
unittest.main()
Loading