Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions executor/python_executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
from typing import Literal
from .topic import prepare_report_topic, service_config_topic, run_action_topic, ServiceTopicParams, ReportStatusPayload, exit_report_topic, status_report_topic
import uuid
import threading

logger = logging.getLogger(EXECUTOR_NAME)
service_store: dict[str, Literal["launching", "running"]] = {}
job_set = set()
service_events: dict[str, threading.Event] = {}
job_set: set[str] = set()

# 日志目录 ~/.oocana/sessions/{session_id}
# executor 的日志都会记录在 [python-executor-{identifier}.log | python-executor.log]
Expand Down Expand Up @@ -122,11 +124,15 @@ def service_exit(message: ReportStatusPayload):
service_hash = message.get("service_hash")
if service_hash in service_store:
del service_store[service_hash]
if service_hash in service_events:
del service_events[service_hash]

def service_status(message: ReportStatusPayload):
service_hash = message.get("service_hash")
if service_hash in service_store:
service_store[service_hash] = "running"
if service_hash in service_events:
service_events[service_hash].set()
Comment on lines +134 to +135
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

send_service_config 中缺少 event.set() 调用,可能导致等待方超时。

这里正确地在服务状态变为 "running" 时设置了 event。但是,send_service_config 函数(第 186 行)也会将 service_store[service_hash] 设置为 "running",却没有对应的 event.set() 调用。

如果服务通过 prepare_report_topic 回调变为 "running",等待的线程将不会被唤醒,导致 30 秒超时。

🐛 建议的修复
         def send_service_config(params: ServiceTopicParams, message: ServiceExecutePayload):

             async def run():
                 mainframe.publish(service_config_topic(params), message)
                 service_store[service_hash] = "running"
+                if service_hash in service_events:
+                    service_events[service_hash].set()
             run_in_new_thread(run)
🤖 Prompt for AI Agents
In `@executor/python_executor/executor.py` around lines 134 - 135, The
send_service_config path sets service_store[service_hash] to "running" but never
notifies waiting threads; update send_service_config (and any callbacks like
prepare_report_topic) to check service_events for service_hash and call
service_events[service_hash].set() immediately after setting the state to
"running" so waiters are woken; ensure you access service_events and
service_store using the same locking protocol already used elsewhere to avoid
race conditions.

Comment on lines +127 to +135
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TOCTOU race condition: between checking if service_hash is in service_events (line 134) and setting the event (line 135), another thread could delete the event via service_exit. This could raise a KeyError when trying to access service_events[service_hash].

Consider using service_events.get(service_hash) and check if it's not None before calling .set(), or protect both service_store and service_events accesses with a lock.

Suggested change
if service_hash in service_events:
del service_events[service_hash]
def service_status(message: ReportStatusPayload):
service_hash = message.get("service_hash")
if service_hash in service_store:
service_store[service_hash] = "running"
if service_hash in service_events:
service_events[service_hash].set()
# Safely remove the service event without raising KeyError
service_events.pop(service_hash, None)
def service_status(message: ReportStatusPayload):
service_hash = message.get("service_hash")
if service_hash in service_store:
service_store[service_hash] = "running"
event = service_events.get(service_hash)
if event is not None:
event.set()

Copilot uses AI. Check for mistakes.

def report_message(message):
type = message.get("type")
Expand All @@ -152,6 +158,7 @@ def report_message(message):
async def spawn_service(message: ServiceExecutePayload, service_hash: str):
logger.info(f"create new service {message.get('dir')}")
service_store[service_hash] = "launching"
service_events[service_hash] = threading.Event()
Comment on lines 160 to +161
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The service_store and service_events dictionaries are accessed from multiple threads without synchronization. service_store is modified from:

  1. The main loop (line 160)
  2. The send_service_config callback thread (line 186)
  3. The service_exit callback
  4. The service_status callback

Similarly, service_events is accessed from multiple threads. This creates race conditions that could lead to:

  • Incorrect status checks
  • Missing or duplicate event notifications
  • Dictionary iteration errors during modification

Consider using threading.Lock to protect all accesses to these shared dictionaries.

Copilot uses AI. Check for mistakes.

parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))

Expand Down Expand Up @@ -205,8 +212,17 @@ def run_service_block(message: ServiceExecutePayload):
elif status == "running":
run_service_block(message)
elif status == "launching":
logger.info(f"service {service_hash} is launching, set message back to fs to wait next time")
fs.put(message)
event = service_events.get(service_hash)
if event:
logger.info(f"service {service_hash} is launching, waiting for ready")
event.wait(timeout=30)
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking the main event loop with event.wait will prevent processing of other messages during the wait period. This means:

  1. Regular block executions will be delayed
  2. Multiple service launches will be serialized instead of handled concurrently
  3. The entire executor becomes unresponsive for up to 30 seconds per service launch

Consider moving the service block execution to a separate thread to avoid blocking the main message processing loop, or use a non-blocking approach with callbacks.

Suggested change
event.wait(timeout=30)
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, event.wait, 30)

Copilot uses AI. Check for mistakes.
if service_store.get(service_hash) == "running":
run_service_block(message)
else:
logger.warning(f"service {service_hash} launch timeout or failed")
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory leak: when the service launch times out or fails (line 222), the event is never removed from service_events dictionary. This event will remain in memory indefinitely, even though it will never be used again.

Add cleanup code to remove the event from service_events after timeout or failure.

Copilot uses AI. Check for mistakes.
else:
logger.info(f"service {service_hash} is launching, set message back to fs to wait next time")
fs.put(message)
Comment on lines 214 to +225
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

服务启动超时后消息会丢失,考虑是否需要重试或更明确的错误处理。

当服务启动超时或失败时(第 222 行),当前实现只记录警告日志,消息被丢弃。这可能导致服务块请求静默失败,调用方无法得知执行结果。

建议考虑:

  1. 向 mainframe 报告执行失败状态
  2. 或者将消息重新放回队列进行有限次数的重试
💡 可能的改进方向
                 if event:
                     logger.info(f"service {service_hash} is launching, waiting for ready")
                     event.wait(timeout=30)
                     if service_store.get(service_hash) == "running":
                         run_service_block(message)
                     else:
                         logger.warning(f"service {service_hash} launch timeout or failed")
+                        # TODO: 考虑向 mainframe 报告执行失败,或实现有限次数的重试机制
🤖 Prompt for AI Agents
In `@executor/python_executor/executor.py` around lines 214 - 225, The branch
handling a "launching" service currently logs a warning and drops the message on
timeout/failure; update the block in executor.py (the section that checks
service_events.get(service_hash) and then service_store.get(service_hash) ==
"running") to either requeue the message with a bounded retry counter
(attach/increment a retry field on message and call fs.put(message) when retry <
MAX_RETRIES) or call a failure-reporting helper (e.g., send_failure_to_mainframe
or similar) to notify mainframe of the failed execution; ensure you reference
the existing symbols service_events, service_store, run_service_block, fs.put,
logger.warning and message, implement a MAX_RETRIES constant, and handle the
terminal case (exceeded retries) by logging and reporting the failure rather
than silently dropping the message.

else:
if not_current_session(message):
continue
Expand Down
Loading