From 8bc0ec09bf9a52728b67b3af06c78e09461f4d7c Mon Sep 17 00:00:00 2001 From: leavesster <11785335+leavesster@users.noreply.github.com> Date: Sat, 31 Jan 2026 00:02:21 +0800 Subject: [PATCH] fix: replace service launch polling with event-based waiting Instead of re-queuing messages when service is launching (busy-wait), use threading.Event to block until service is ready. This eliminates unnecessary CPU usage from polling and reduces log spam. - Add service_events dict to track launch events - Wait up to 30s for service ready signal - Clean up events on service exit --- executor/python_executor/executor.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/executor/python_executor/executor.py b/executor/python_executor/executor.py index caca1d53..3d63caeb 100644 --- a/executor/python_executor/executor.py +++ b/executor/python_executor/executor.py @@ -14,10 +14,12 @@ from typing import Literal from .topic import prepare_report_topic, service_config_topic, run_action_topic, ServiceTopicParams, ReportStatusPayload, exit_report_topic, status_report_topic import uuid +import threading logger = logging.getLogger(EXECUTOR_NAME) service_store: dict[str, Literal["launching", "running"]] = {} -job_set = set() +service_events: dict[str, threading.Event] = {} +job_set: set[str] = set() # 日志目录 ~/.oocana/sessions/{session_id} # executor 的日志都会记录在 [python-executor-{identifier}.log | python-executor.log] @@ -122,11 +124,15 @@ def service_exit(message: ReportStatusPayload): service_hash = message.get("service_hash") if service_hash in service_store: del service_store[service_hash] + if service_hash in service_events: + del service_events[service_hash] def service_status(message: ReportStatusPayload): service_hash = message.get("service_hash") if service_hash in service_store: service_store[service_hash] = "running" + if service_hash in service_events: + service_events[service_hash].set() def report_message(message): type = message.get("type") @@ -152,6 +158,7 @@ def report_message(message): async def spawn_service(message: ServiceExecutePayload, service_hash: str): logger.info(f"create new service {message.get('dir')}") service_store[service_hash] = "launching" + service_events[service_hash] = threading.Event() parent_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) @@ -205,8 +212,17 @@ def run_service_block(message: ServiceExecutePayload): elif status == "running": run_service_block(message) elif status == "launching": - logger.info(f"service {service_hash} is launching, set message back to fs to wait next time") - fs.put(message) + event = service_events.get(service_hash) + if event: + logger.info(f"service {service_hash} is launching, waiting for ready") + event.wait(timeout=30) + if service_store.get(service_hash) == "running": + run_service_block(message) + else: + logger.warning(f"service {service_hash} launch timeout or failed") + else: + logger.info(f"service {service_hash} is launching, set message back to fs to wait next time") + fs.put(message) else: if not_current_session(message): continue