-
Notifications
You must be signed in to change notification settings - Fork 1
fix: replace service launch polling with event-based waiting #449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,10 +14,12 @@ | |||||||||||||||||||||||||||||||||||||||
| from typing import Literal | ||||||||||||||||||||||||||||||||||||||||
| from .topic import prepare_report_topic, service_config_topic, run_action_topic, ServiceTopicParams, ReportStatusPayload, exit_report_topic, status_report_topic | ||||||||||||||||||||||||||||||||||||||||
| import uuid | ||||||||||||||||||||||||||||||||||||||||
| import threading | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| logger = logging.getLogger(EXECUTOR_NAME) | ||||||||||||||||||||||||||||||||||||||||
| service_store: dict[str, Literal["launching", "running"]] = {} | ||||||||||||||||||||||||||||||||||||||||
| job_set = set() | ||||||||||||||||||||||||||||||||||||||||
| service_events: dict[str, threading.Event] = {} | ||||||||||||||||||||||||||||||||||||||||
| job_set: set[str] = set() | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| # 日志目录 ~/.oocana/sessions/{session_id} | ||||||||||||||||||||||||||||||||||||||||
| # executor 的日志都会记录在 [python-executor-{identifier}.log | python-executor.log] | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -122,11 +124,15 @@ def service_exit(message: ReportStatusPayload): | |||||||||||||||||||||||||||||||||||||||
| service_hash = message.get("service_hash") | ||||||||||||||||||||||||||||||||||||||||
| if service_hash in service_store: | ||||||||||||||||||||||||||||||||||||||||
| del service_store[service_hash] | ||||||||||||||||||||||||||||||||||||||||
| if service_hash in service_events: | ||||||||||||||||||||||||||||||||||||||||
| del service_events[service_hash] | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def service_status(message: ReportStatusPayload): | ||||||||||||||||||||||||||||||||||||||||
| service_hash = message.get("service_hash") | ||||||||||||||||||||||||||||||||||||||||
| if service_hash in service_store: | ||||||||||||||||||||||||||||||||||||||||
| service_store[service_hash] = "running" | ||||||||||||||||||||||||||||||||||||||||
| if service_hash in service_events: | ||||||||||||||||||||||||||||||||||||||||
| service_events[service_hash].set() | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+127
to
+135
|
||||||||||||||||||||||||||||||||||||||||
| if service_hash in service_events: | |
| del service_events[service_hash] | |
| def service_status(message: ReportStatusPayload): | |
| service_hash = message.get("service_hash") | |
| if service_hash in service_store: | |
| service_store[service_hash] = "running" | |
| if service_hash in service_events: | |
| service_events[service_hash].set() | |
| # Safely remove the service event without raising KeyError | |
| service_events.pop(service_hash, None) | |
| def service_status(message: ReportStatusPayload): | |
| service_hash = message.get("service_hash") | |
| if service_hash in service_store: | |
| service_store[service_hash] = "running" | |
| event = service_events.get(service_hash) | |
| if event is not None: | |
| event.set() |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The service_store and service_events dictionaries are accessed from multiple threads without synchronization. service_store is modified from:
- The main loop (line 160)
- The send_service_config callback thread (line 186)
- The service_exit callback
- The service_status callback
Similarly, service_events is accessed from multiple threads. This creates race conditions that could lead to:
- Incorrect status checks
- Missing or duplicate event notifications
- Dictionary iteration errors during modification
Consider using threading.Lock to protect all accesses to these shared dictionaries.
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking the main event loop with event.wait will prevent processing of other messages during the wait period. This means:
- Regular block executions will be delayed
- Multiple service launches will be serialized instead of handled concurrently
- The entire executor becomes unresponsive for up to 30 seconds per service launch
Consider moving the service block execution to a separate thread to avoid blocking the main message processing loop, or use a non-blocking approach with callbacks.
| event.wait(timeout=30) | |
| loop = asyncio.get_running_loop() | |
| await loop.run_in_executor(None, event.wait, 30) |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memory leak: when the service launch times out or fails (line 222), the event is never removed from service_events dictionary. This event will remain in memory indefinitely, even though it will never be used again.
Add cleanup code to remove the event from service_events after timeout or failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
服务启动超时后消息会丢失,考虑是否需要重试或更明确的错误处理。
当服务启动超时或失败时(第 222 行),当前实现只记录警告日志,消息被丢弃。这可能导致服务块请求静默失败,调用方无法得知执行结果。
建议考虑:
- 向 mainframe 报告执行失败状态
- 或者将消息重新放回队列进行有限次数的重试
💡 可能的改进方向
if event:
logger.info(f"service {service_hash} is launching, waiting for ready")
event.wait(timeout=30)
if service_store.get(service_hash) == "running":
run_service_block(message)
else:
logger.warning(f"service {service_hash} launch timeout or failed")
+ # TODO: 考虑向 mainframe 报告执行失败,或实现有限次数的重试机制🤖 Prompt for AI Agents
In `@executor/python_executor/executor.py` around lines 214 - 225, The branch
handling a "launching" service currently logs a warning and drops the message on
timeout/failure; update the block in executor.py (the section that checks
service_events.get(service_hash) and then service_store.get(service_hash) ==
"running") to either requeue the message with a bounded retry counter
(attach/increment a retry field on message and call fs.put(message) when retry <
MAX_RETRIES) or call a failure-reporting helper (e.g., send_failure_to_mainframe
or similar) to notify mainframe of the failed execution; ensure you reference
the existing symbols service_events, service_store, run_service_block, fs.put,
logger.warning and message, implement a MAX_RETRIES constant, and handle the
terminal case (exceeded retries) by logging and reporting the failure rather
than silently dropping the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send_service_config中缺少event.set()调用,可能导致等待方超时。这里正确地在服务状态变为 "running" 时设置了 event。但是,
send_service_config函数(第 186 行)也会将service_store[service_hash]设置为 "running",却没有对应的event.set()调用。如果服务通过
prepare_report_topic回调变为 "running",等待的线程将不会被唤醒,导致 30 秒超时。🐛 建议的修复
def send_service_config(params: ServiceTopicParams, message: ServiceExecutePayload): async def run(): mainframe.publish(service_config_topic(params), message) service_store[service_hash] = "running" + if service_hash in service_events: + service_events[service_hash].set() run_in_new_thread(run)🤖 Prompt for AI Agents