-
Notifications
You must be signed in to change notification settings - Fork 1
fix: replace service launch polling with event-based waiting #449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Instead of re-queuing messages when service is launching (busy-wait), use threading.Event to block until service is ready. This eliminates unnecessary CPU usage from polling and reduces log spam. - Add service_events dict to track launch events - Wait up to 30s for service ready signal - Clean up events on service exit
Summary by CodeRabbit改进
✏️ Tip: You can customize this high-level summary in your review settings. 概览在 executor.py 中引入了线程支持和基于事件的服务生命周期同步机制。使用全局 变更内容
序列图sequenceDiagram
participant Msg as 消息处理
participant Spawn as 服务生成
participant Event as 事件管理
participant Status as 状态报告
participant Svc as 服务实例
Msg->>Spawn: 请求启动服务
Spawn->>Event: 创建 threading.Event
Spawn->>Svc: 启动服务进程
Spawn-->>Msg: 返回(状态: launching)
Msg->>Event: 等待事件(超时 30s)
Svc->>Status: 报告状态 "running"
Status->>Event: 设置事件信号
Event-->>Msg: 事件触发,继续处理
Svc->>Spawn: 服务退出
Spawn->>Event: 清理事件与服务记录
预估代码审查工作量🎯 3 (中等) | ⏱️ ~20 分钟 🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
executor/python_executor/executor.py (1)
191-193:⚠️ Potential issue | 🟡 Minor
spawn_service结束时未清理service_events,与service_exit回调不一致。当
process.wait()完成后,这里只删除了service_store[service_hash],但没有清理service_events。而service_exit回调(第 123-128 行)会同时清理两者。这可能导致
service_events中残留无用的 Event 对象。🧹 建议的修复
await process.wait() logger.info(f"service {service_hash} exit") - del service_store[service_hash] + service_store.pop(service_hash, None) + service_events.pop(service_hash, None)
🤖 Fix all issues with AI agents
In `@executor/python_executor/executor.py`:
- Around line 134-135: The send_service_config path sets
service_store[service_hash] to "running" but never notifies waiting threads;
update send_service_config (and any callbacks like prepare_report_topic) to
check service_events for service_hash and call
service_events[service_hash].set() immediately after setting the state to
"running" so waiters are woken; ensure you access service_events and
service_store using the same locking protocol already used elsewhere to avoid
race conditions.
- Around line 214-225: The branch handling a "launching" service currently logs
a warning and drops the message on timeout/failure; update the block in
executor.py (the section that checks service_events.get(service_hash) and then
service_store.get(service_hash) == "running") to either requeue the message with
a bounded retry counter (attach/increment a retry field on message and call
fs.put(message) when retry < MAX_RETRIES) or call a failure-reporting helper
(e.g., send_failure_to_mainframe or similar) to notify mainframe of the failed
execution; ensure you reference the existing symbols service_events,
service_store, run_service_block, fs.put, logger.warning and message, implement
a MAX_RETRIES constant, and handle the terminal case (exceeded retries) by
logging and reporting the failure rather than silently dropping the message.
🧹 Nitpick comments (1)
executor/python_executor/executor.py (1)
123-128: 建议使用pop简化代码。根据静态分析工具提示,可以用
pop(..., None)替代if key in dict后跟del dict[key]的模式。♻️ 建议的重构
def service_exit(message: ReportStatusPayload): service_hash = message.get("service_hash") - if service_hash in service_store: - del service_store[service_hash] - if service_hash in service_events: - del service_events[service_hash] + service_store.pop(service_hash, None) + service_events.pop(service_hash, None)
| if service_hash in service_events: | ||
| service_events[service_hash].set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send_service_config 中缺少 event.set() 调用,可能导致等待方超时。
这里正确地在服务状态变为 "running" 时设置了 event。但是,send_service_config 函数(第 186 行)也会将 service_store[service_hash] 设置为 "running",却没有对应的 event.set() 调用。
如果服务通过 prepare_report_topic 回调变为 "running",等待的线程将不会被唤醒,导致 30 秒超时。
🐛 建议的修复
def send_service_config(params: ServiceTopicParams, message: ServiceExecutePayload):
async def run():
mainframe.publish(service_config_topic(params), message)
service_store[service_hash] = "running"
+ if service_hash in service_events:
+ service_events[service_hash].set()
run_in_new_thread(run)🤖 Prompt for AI Agents
In `@executor/python_executor/executor.py` around lines 134 - 135, The
send_service_config path sets service_store[service_hash] to "running" but never
notifies waiting threads; update send_service_config (and any callbacks like
prepare_report_topic) to check service_events for service_hash and call
service_events[service_hash].set() immediately after setting the state to
"running" so waiters are woken; ensure you access service_events and
service_store using the same locking protocol already used elsewhere to avoid
race conditions.
| elif status == "launching": | ||
| logger.info(f"service {service_hash} is launching, set message back to fs to wait next time") | ||
| fs.put(message) | ||
| event = service_events.get(service_hash) | ||
| if event: | ||
| logger.info(f"service {service_hash} is launching, waiting for ready") | ||
| event.wait(timeout=30) | ||
| if service_store.get(service_hash) == "running": | ||
| run_service_block(message) | ||
| else: | ||
| logger.warning(f"service {service_hash} launch timeout or failed") | ||
| else: | ||
| logger.info(f"service {service_hash} is launching, set message back to fs to wait next time") | ||
| fs.put(message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
服务启动超时后消息会丢失,考虑是否需要重试或更明确的错误处理。
当服务启动超时或失败时(第 222 行),当前实现只记录警告日志,消息被丢弃。这可能导致服务块请求静默失败,调用方无法得知执行结果。
建议考虑:
- 向 mainframe 报告执行失败状态
- 或者将消息重新放回队列进行有限次数的重试
💡 可能的改进方向
if event:
logger.info(f"service {service_hash} is launching, waiting for ready")
event.wait(timeout=30)
if service_store.get(service_hash) == "running":
run_service_block(message)
else:
logger.warning(f"service {service_hash} launch timeout or failed")
+ # TODO: 考虑向 mainframe 报告执行失败,或实现有限次数的重试机制🤖 Prompt for AI Agents
In `@executor/python_executor/executor.py` around lines 214 - 225, The branch
handling a "launching" service currently logs a warning and drops the message on
timeout/failure; update the block in executor.py (the section that checks
service_events.get(service_hash) and then service_store.get(service_hash) ==
"running") to either requeue the message with a bounded retry counter
(attach/increment a retry field on message and call fs.put(message) when retry <
MAX_RETRIES) or call a failure-reporting helper (e.g., send_failure_to_mainframe
or similar) to notify mainframe of the failed execution; ensure you reference
the existing symbols service_events, service_store, run_service_block, fs.put,
logger.warning and message, implement a MAX_RETRIES constant, and handle the
terminal case (exceeded retries) by logging and reporting the failure rather
than silently dropping the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR replaces a busy-wait polling mechanism with an event-based synchronization approach for service launch coordination. Instead of re-queuing messages when a service is launching, the code now blocks on a threading.Event with a 30-second timeout.
Changes:
- Introduces threading.Event-based synchronization to wait for service launch completion
- Adds cleanup of events when services exit
- Improves type annotations for global state variables
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if service_hash in service_events: | ||
| del service_events[service_hash] | ||
|
|
||
| def service_status(message: ReportStatusPayload): | ||
| service_hash = message.get("service_hash") | ||
| if service_hash in service_store: | ||
| service_store[service_hash] = "running" | ||
| if service_hash in service_events: | ||
| service_events[service_hash].set() |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TOCTOU race condition: between checking if service_hash is in service_events (line 134) and setting the event (line 135), another thread could delete the event via service_exit. This could raise a KeyError when trying to access service_events[service_hash].
Consider using service_events.get(service_hash) and check if it's not None before calling .set(), or protect both service_store and service_events accesses with a lock.
| if service_hash in service_events: | |
| del service_events[service_hash] | |
| def service_status(message: ReportStatusPayload): | |
| service_hash = message.get("service_hash") | |
| if service_hash in service_store: | |
| service_store[service_hash] = "running" | |
| if service_hash in service_events: | |
| service_events[service_hash].set() | |
| # Safely remove the service event without raising KeyError | |
| service_events.pop(service_hash, None) | |
| def service_status(message: ReportStatusPayload): | |
| service_hash = message.get("service_hash") | |
| if service_hash in service_store: | |
| service_store[service_hash] = "running" | |
| event = service_events.get(service_hash) | |
| if event is not None: | |
| event.set() |
| if service_store.get(service_hash) == "running": | ||
| run_service_block(message) | ||
| else: | ||
| logger.warning(f"service {service_hash} launch timeout or failed") |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memory leak: when the service launch times out or fails (line 222), the event is never removed from service_events dictionary. This event will remain in memory indefinitely, even though it will never be used again.
Add cleanup code to remove the event from service_events after timeout or failure.
| event = service_events.get(service_hash) | ||
| if event: | ||
| logger.info(f"service {service_hash} is launching, waiting for ready") | ||
| event.wait(timeout=30) |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking the main event loop with event.wait will prevent processing of other messages during the wait period. This means:
- Regular block executions will be delayed
- Multiple service launches will be serialized instead of handled concurrently
- The entire executor becomes unresponsive for up to 30 seconds per service launch
Consider moving the service block execution to a separate thread to avoid blocking the main message processing loop, or use a non-blocking approach with callbacks.
| event.wait(timeout=30) | |
| loop = asyncio.get_running_loop() | |
| await loop.run_in_executor(None, event.wait, 30) |
| service_store[service_hash] = "launching" | ||
| service_events[service_hash] = threading.Event() |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The service_store and service_events dictionaries are accessed from multiple threads without synchronization. service_store is modified from:
- The main loop (line 160)
- The send_service_config callback thread (line 186)
- The service_exit callback
- The service_status callback
Similarly, service_events is accessed from multiple threads. This creates race conditions that could lead to:
- Incorrect status checks
- Missing or duplicate event notifications
- Dictionary iteration errors during modification
Consider using threading.Lock to protect all accesses to these shared dictionaries.
Summary
threading.Eventfor service launch synchronizationBefore
After
Test plan