Skip to content

Conversation

@leavesster
Copy link
Contributor

Summary

  • Replaces busy-wait polling with threading.Event for service launch synchronization
  • When service is "launching", now blocks on event instead of re-queuing message
  • Adds 30-second timeout for service launch
  • Cleans up events when service exits

Before

elif status == "launching":
    fs.put(message)  # Re-queue, causes busy-wait loop

After

elif status == "launching":
    event.wait(timeout=30)  # Block until service ready
    if service_store.get(service_hash) == "running":
        run_service_block(message)

Test plan

  • pyright type check passes
  • Python syntax validation passes
  • Verify service launch does not spam logs with re-queue messages
  • Verify concurrent service block requests are handled correctly

Instead of re-queuing messages when service is launching (busy-wait),
use threading.Event to block until service is ready. This eliminates
unnecessary CPU usage from polling and reduces log spam.

- Add service_events dict to track launch events
- Wait up to 30s for service ready signal
- Clean up events on service exit
Copilot AI review requested due to automatic review settings January 30, 2026 16:02
@coderabbitai
Copy link

coderabbitai bot commented Jan 30, 2026

Summary by CodeRabbit

改进

  • 服务启动稳定性增强
    • 优化服务启动等待机制,添加超时保护确保系统响应性
    • 改进服务状态同步管理,增强整体可靠性和故障恢复能力

✏️ Tip: You can customize this high-level summary in your review settings.

概览

在 executor.py 中引入了线程支持和基于事件的服务生命周期同步机制。使用全局 service_events 字典存储每个服务的 threading.Event 实例,在服务启动、状态变更和退出时管理事件生命周期,改进了服务就绪状态的检测流程。

变更内容

队列 / 文件 摘要
服务生命周期事件同步
executor/python_executor/executor.py
引入 threading.Event 全局映射以同步服务状态。在 spawn_service 中为每个服务创建事件实例,在服务报告 "running" 状态时设置事件,服务退出时清理事件与服务存储。修改消息处理的启动等待逻辑,使用事件阻塞等待(30秒超时),未就绪时重新排队消息到文件系统。

序列图

sequenceDiagram
    participant Msg as 消息处理
    participant Spawn as 服务生成
    participant Event as 事件管理
    participant Status as 状态报告
    participant Svc as 服务实例

    Msg->>Spawn: 请求启动服务
    Spawn->>Event: 创建 threading.Event
    Spawn->>Svc: 启动服务进程
    Spawn-->>Msg: 返回(状态: launching)
    Msg->>Event: 等待事件(超时 30s)
    Svc->>Status: 报告状态 "running"
    Status->>Event: 设置事件信号
    Event-->>Msg: 事件触发,继续处理
    Svc->>Spawn: 服务退出
    Spawn->>Event: 清理事件与服务记录
Loading

预估代码审查工作量

🎯 3 (中等) | ⏱️ ~20 分钟

🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title follows the required format <type>(<scope>): <subject> with type 'fix' and clearly describes the main change of replacing polling with event-based waiting.
Description check ✅ Passed The description is directly related to the changeset, providing a detailed summary of the polling-to-event replacement, before/after code examples, and test plan aligned with the actual changes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
executor/python_executor/executor.py (1)

191-193: ⚠️ Potential issue | 🟡 Minor

spawn_service 结束时未清理 service_events,与 service_exit 回调不一致。

process.wait() 完成后,这里只删除了 service_store[service_hash],但没有清理 service_events。而 service_exit 回调(第 123-128 行)会同时清理两者。

这可能导致 service_events 中残留无用的 Event 对象。

🧹 建议的修复
         await process.wait()
         logger.info(f"service {service_hash} exit")
-        del service_store[service_hash]
+        service_store.pop(service_hash, None)
+        service_events.pop(service_hash, None)
🤖 Fix all issues with AI agents
In `@executor/python_executor/executor.py`:
- Around line 134-135: The send_service_config path sets
service_store[service_hash] to "running" but never notifies waiting threads;
update send_service_config (and any callbacks like prepare_report_topic) to
check service_events for service_hash and call
service_events[service_hash].set() immediately after setting the state to
"running" so waiters are woken; ensure you access service_events and
service_store using the same locking protocol already used elsewhere to avoid
race conditions.
- Around line 214-225: The branch handling a "launching" service currently logs
a warning and drops the message on timeout/failure; update the block in
executor.py (the section that checks service_events.get(service_hash) and then
service_store.get(service_hash) == "running") to either requeue the message with
a bounded retry counter (attach/increment a retry field on message and call
fs.put(message) when retry < MAX_RETRIES) or call a failure-reporting helper
(e.g., send_failure_to_mainframe or similar) to notify mainframe of the failed
execution; ensure you reference the existing symbols service_events,
service_store, run_service_block, fs.put, logger.warning and message, implement
a MAX_RETRIES constant, and handle the terminal case (exceeded retries) by
logging and reporting the failure rather than silently dropping the message.
🧹 Nitpick comments (1)
executor/python_executor/executor.py (1)

123-128: 建议使用 pop 简化代码。

根据静态分析工具提示,可以用 pop(..., None) 替代 if key in dict 后跟 del dict[key] 的模式。

♻️ 建议的重构
 def service_exit(message: ReportStatusPayload):
     service_hash = message.get("service_hash")
-    if service_hash in service_store:
-        del service_store[service_hash]
-    if service_hash in service_events:
-        del service_events[service_hash]
+    service_store.pop(service_hash, None)
+    service_events.pop(service_hash, None)

Comment on lines +134 to +135
if service_hash in service_events:
service_events[service_hash].set()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

send_service_config 中缺少 event.set() 调用,可能导致等待方超时。

这里正确地在服务状态变为 "running" 时设置了 event。但是,send_service_config 函数(第 186 行)也会将 service_store[service_hash] 设置为 "running",却没有对应的 event.set() 调用。

如果服务通过 prepare_report_topic 回调变为 "running",等待的线程将不会被唤醒,导致 30 秒超时。

🐛 建议的修复
         def send_service_config(params: ServiceTopicParams, message: ServiceExecutePayload):

             async def run():
                 mainframe.publish(service_config_topic(params), message)
                 service_store[service_hash] = "running"
+                if service_hash in service_events:
+                    service_events[service_hash].set()
             run_in_new_thread(run)
🤖 Prompt for AI Agents
In `@executor/python_executor/executor.py` around lines 134 - 135, The
send_service_config path sets service_store[service_hash] to "running" but never
notifies waiting threads; update send_service_config (and any callbacks like
prepare_report_topic) to check service_events for service_hash and call
service_events[service_hash].set() immediately after setting the state to
"running" so waiters are woken; ensure you access service_events and
service_store using the same locking protocol already used elsewhere to avoid
race conditions.

Comment on lines 214 to +225
elif status == "launching":
logger.info(f"service {service_hash} is launching, set message back to fs to wait next time")
fs.put(message)
event = service_events.get(service_hash)
if event:
logger.info(f"service {service_hash} is launching, waiting for ready")
event.wait(timeout=30)
if service_store.get(service_hash) == "running":
run_service_block(message)
else:
logger.warning(f"service {service_hash} launch timeout or failed")
else:
logger.info(f"service {service_hash} is launching, set message back to fs to wait next time")
fs.put(message)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

服务启动超时后消息会丢失,考虑是否需要重试或更明确的错误处理。

当服务启动超时或失败时(第 222 行),当前实现只记录警告日志,消息被丢弃。这可能导致服务块请求静默失败,调用方无法得知执行结果。

建议考虑:

  1. 向 mainframe 报告执行失败状态
  2. 或者将消息重新放回队列进行有限次数的重试
💡 可能的改进方向
                 if event:
                     logger.info(f"service {service_hash} is launching, waiting for ready")
                     event.wait(timeout=30)
                     if service_store.get(service_hash) == "running":
                         run_service_block(message)
                     else:
                         logger.warning(f"service {service_hash} launch timeout or failed")
+                        # TODO: 考虑向 mainframe 报告执行失败,或实现有限次数的重试机制
🤖 Prompt for AI Agents
In `@executor/python_executor/executor.py` around lines 214 - 225, The branch
handling a "launching" service currently logs a warning and drops the message on
timeout/failure; update the block in executor.py (the section that checks
service_events.get(service_hash) and then service_store.get(service_hash) ==
"running") to either requeue the message with a bounded retry counter
(attach/increment a retry field on message and call fs.put(message) when retry <
MAX_RETRIES) or call a failure-reporting helper (e.g., send_failure_to_mainframe
or similar) to notify mainframe of the failed execution; ensure you reference
the existing symbols service_events, service_store, run_service_block, fs.put,
logger.warning and message, implement a MAX_RETRIES constant, and handle the
terminal case (exceeded retries) by logging and reporting the failure rather
than silently dropping the message.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR replaces a busy-wait polling mechanism with an event-based synchronization approach for service launch coordination. Instead of re-queuing messages when a service is launching, the code now blocks on a threading.Event with a 30-second timeout.

Changes:

  • Introduces threading.Event-based synchronization to wait for service launch completion
  • Adds cleanup of events when services exit
  • Improves type annotations for global state variables

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +127 to +135
if service_hash in service_events:
del service_events[service_hash]

def service_status(message: ReportStatusPayload):
service_hash = message.get("service_hash")
if service_hash in service_store:
service_store[service_hash] = "running"
if service_hash in service_events:
service_events[service_hash].set()
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TOCTOU race condition: between checking if service_hash is in service_events (line 134) and setting the event (line 135), another thread could delete the event via service_exit. This could raise a KeyError when trying to access service_events[service_hash].

Consider using service_events.get(service_hash) and check if it's not None before calling .set(), or protect both service_store and service_events accesses with a lock.

Suggested change
if service_hash in service_events:
del service_events[service_hash]
def service_status(message: ReportStatusPayload):
service_hash = message.get("service_hash")
if service_hash in service_store:
service_store[service_hash] = "running"
if service_hash in service_events:
service_events[service_hash].set()
# Safely remove the service event without raising KeyError
service_events.pop(service_hash, None)
def service_status(message: ReportStatusPayload):
service_hash = message.get("service_hash")
if service_hash in service_store:
service_store[service_hash] = "running"
event = service_events.get(service_hash)
if event is not None:
event.set()

Copilot uses AI. Check for mistakes.
if service_store.get(service_hash) == "running":
run_service_block(message)
else:
logger.warning(f"service {service_hash} launch timeout or failed")
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory leak: when the service launch times out or fails (line 222), the event is never removed from service_events dictionary. This event will remain in memory indefinitely, even though it will never be used again.

Add cleanup code to remove the event from service_events after timeout or failure.

Copilot uses AI. Check for mistakes.
event = service_events.get(service_hash)
if event:
logger.info(f"service {service_hash} is launching, waiting for ready")
event.wait(timeout=30)
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking the main event loop with event.wait will prevent processing of other messages during the wait period. This means:

  1. Regular block executions will be delayed
  2. Multiple service launches will be serialized instead of handled concurrently
  3. The entire executor becomes unresponsive for up to 30 seconds per service launch

Consider moving the service block execution to a separate thread to avoid blocking the main message processing loop, or use a non-blocking approach with callbacks.

Suggested change
event.wait(timeout=30)
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, event.wait, 30)

Copilot uses AI. Check for mistakes.
Comment on lines 160 to +161
service_store[service_hash] = "launching"
service_events[service_hash] = threading.Event()
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The service_store and service_events dictionaries are accessed from multiple threads without synchronization. service_store is modified from:

  1. The main loop (line 160)
  2. The send_service_config callback thread (line 186)
  3. The service_exit callback
  4. The service_status callback

Similarly, service_events is accessed from multiple threads. This creates race conditions that could lead to:

  • Incorrect status checks
  • Missing or duplicate event notifications
  • Dictionary iteration errors during modification

Consider using threading.Lock to protect all accesses to these shared dictionaries.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants