From 862c23eb659d90c8f9c31eb6bcedb2f668126191 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 15 Jan 2026 16:06:58 +0100 Subject: [PATCH 1/3] [Feature] Store resource dict in cache --- src/executorlib/standalone/hdf.py | 5 +++++ src/executorlib/task_scheduler/file/spawner_pysqa.py | 6 ++++-- src/executorlib/task_scheduler/file/spawner_subprocess.py | 4 +++- src/executorlib/task_scheduler/interactive/shared.py | 8 +++++--- 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/executorlib/standalone/hdf.py b/src/executorlib/standalone/hdf.py index 5e8970f4..ce5da62a 100644 --- a/src/executorlib/standalone/hdf.py +++ b/src/executorlib/standalone/hdf.py @@ -11,6 +11,7 @@ "kwargs": "input_kwargs", "output": "output", "error": "error", + "resource_dict": "resource_dict", "runtime": "runtime", "queue_id": "queue_id", "error_log_file": "error_log_file", @@ -61,6 +62,10 @@ def load(file_name: str) -> dict: data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"])) else: data_dict["kwargs"] = {} + if "resource_dict" in hdf: + data_dict["resource_dict"] = cloudpickle.loads(np.void(hdf["/resource_dict"])) + else: + data_dict["resource_dict"] = {} if "error_log_file" in hdf: data_dict["error_log_file"] = cloudpickle.loads( np.void(hdf["/error_log_file"]) diff --git a/src/executorlib/task_scheduler/file/spawner_pysqa.py b/src/executorlib/task_scheduler/file/spawner_pysqa.py index 66c5e4f4..7b4cd99d 100644 --- a/src/executorlib/task_scheduler/file/spawner_pysqa.py +++ b/src/executorlib/task_scheduler/file/spawner_pysqa.py @@ -45,13 +45,15 @@ def execute_with_pysqa( execute_command=pysqa_execute_command, ) queue_id = get_queue_id(file_name=file_name) + store_dict = data_dict.copy() + store_dict["resource_dict"] = resource_dict if resource_dict is not None else {} if os.path.exists(file_name) and ( queue_id is None or qa.get_status_of_job(process_id=queue_id) is None ): os.remove(file_name) - dump(file_name=file_name, data_dict=data_dict) + dump(file_name=file_name, data_dict=store_dict) elif not os.path.exists(file_name): - dump(file_name=file_name, data_dict=data_dict) + dump(file_name=file_name, data_dict=store_dict) check_file_exists(file_name=file_name) if queue_id is None or qa.get_status_of_job(process_id=queue_id) is None: if resource_dict is None: diff --git a/src/executorlib/task_scheduler/file/spawner_subprocess.py b/src/executorlib/task_scheduler/file/spawner_subprocess.py index c1b2157f..aede6944 100644 --- a/src/executorlib/task_scheduler/file/spawner_subprocess.py +++ b/src/executorlib/task_scheduler/file/spawner_subprocess.py @@ -41,7 +41,9 @@ def execute_in_subprocess( task_dependent_lst = [] if os.path.exists(file_name): os.remove(file_name) - dump(file_name=file_name, data_dict=data_dict) + store_dict = data_dict.copy() + store_dict["resource_dict"] = resource_dict if resource_dict is not None else {} + dump(file_name=file_name, data_dict=store_dict) check_file_exists(file_name=file_name) while len(task_dependent_lst) > 0: task_dependent_lst = [ diff --git a/src/executorlib/task_scheduler/interactive/shared.py b/src/executorlib/task_scheduler/interactive/shared.py index e4084222..dd2523f8 100644 --- a/src/executorlib/task_scheduler/interactive/shared.py +++ b/src/executorlib/task_scheduler/interactive/shared.py @@ -144,9 +144,11 @@ def _execute_task_with_cache( try: time_start = time.time() result = interface.send_and_receive_dict(input_dict=task_dict) - data_dict["output"] = result - data_dict["runtime"] = time.time() - time_start - dump(file_name=file_name, data_dict=data_dict) + store_dict = data_dict.copy() + store_dict["output"] = result + store_dict["runtime"] = time.time() - time_start + store_dict["resource_dict"] = task_dict.get("resource_dict", {}) + dump(file_name=file_name, data_dict=store_dict) future_obj.set_result(result) except Exception as thread_exception: if isinstance(thread_exception, ExecutorlibSocketError): From 359a887753958951120f7c732d838cfc2220737a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 15 Jan 2026 15:07:39 +0000 Subject: [PATCH 2/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/executorlib/standalone/hdf.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/executorlib/standalone/hdf.py b/src/executorlib/standalone/hdf.py index ce5da62a..14ffe4ad 100644 --- a/src/executorlib/standalone/hdf.py +++ b/src/executorlib/standalone/hdf.py @@ -63,7 +63,9 @@ def load(file_name: str) -> dict: else: data_dict["kwargs"] = {} if "resource_dict" in hdf: - data_dict["resource_dict"] = cloudpickle.loads(np.void(hdf["/resource_dict"])) + data_dict["resource_dict"] = cloudpickle.loads( + np.void(hdf["/resource_dict"]) + ) else: data_dict["resource_dict"] = {} if "error_log_file" in hdf: From 5c75ae016f2b558095395e25a810ce3f4477240e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 15 Jan 2026 16:21:42 +0100 Subject: [PATCH 3/3] revert --- src/executorlib/task_scheduler/file/spawner_pysqa.py | 6 ++---- src/executorlib/task_scheduler/file/spawner_subprocess.py | 4 +--- src/executorlib/task_scheduler/interactive/shared.py | 8 +++----- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/executorlib/task_scheduler/file/spawner_pysqa.py b/src/executorlib/task_scheduler/file/spawner_pysqa.py index 7b4cd99d..66c5e4f4 100644 --- a/src/executorlib/task_scheduler/file/spawner_pysqa.py +++ b/src/executorlib/task_scheduler/file/spawner_pysqa.py @@ -45,15 +45,13 @@ def execute_with_pysqa( execute_command=pysqa_execute_command, ) queue_id = get_queue_id(file_name=file_name) - store_dict = data_dict.copy() - store_dict["resource_dict"] = resource_dict if resource_dict is not None else {} if os.path.exists(file_name) and ( queue_id is None or qa.get_status_of_job(process_id=queue_id) is None ): os.remove(file_name) - dump(file_name=file_name, data_dict=store_dict) + dump(file_name=file_name, data_dict=data_dict) elif not os.path.exists(file_name): - dump(file_name=file_name, data_dict=store_dict) + dump(file_name=file_name, data_dict=data_dict) check_file_exists(file_name=file_name) if queue_id is None or qa.get_status_of_job(process_id=queue_id) is None: if resource_dict is None: diff --git a/src/executorlib/task_scheduler/file/spawner_subprocess.py b/src/executorlib/task_scheduler/file/spawner_subprocess.py index aede6944..c1b2157f 100644 --- a/src/executorlib/task_scheduler/file/spawner_subprocess.py +++ b/src/executorlib/task_scheduler/file/spawner_subprocess.py @@ -41,9 +41,7 @@ def execute_in_subprocess( task_dependent_lst = [] if os.path.exists(file_name): os.remove(file_name) - store_dict = data_dict.copy() - store_dict["resource_dict"] = resource_dict if resource_dict is not None else {} - dump(file_name=file_name, data_dict=store_dict) + dump(file_name=file_name, data_dict=data_dict) check_file_exists(file_name=file_name) while len(task_dependent_lst) > 0: task_dependent_lst = [ diff --git a/src/executorlib/task_scheduler/interactive/shared.py b/src/executorlib/task_scheduler/interactive/shared.py index dd2523f8..e4084222 100644 --- a/src/executorlib/task_scheduler/interactive/shared.py +++ b/src/executorlib/task_scheduler/interactive/shared.py @@ -144,11 +144,9 @@ def _execute_task_with_cache( try: time_start = time.time() result = interface.send_and_receive_dict(input_dict=task_dict) - store_dict = data_dict.copy() - store_dict["output"] = result - store_dict["runtime"] = time.time() - time_start - store_dict["resource_dict"] = task_dict.get("resource_dict", {}) - dump(file_name=file_name, data_dict=store_dict) + data_dict["output"] = result + data_dict["runtime"] = time.time() - time_start + dump(file_name=file_name, data_dict=data_dict) future_obj.set_result(result) except Exception as thread_exception: if isinstance(thread_exception, ExecutorlibSocketError):