diff --git a/oocana/oocana/context.py b/oocana/oocana/context.py index 40f4cac..dee5d4b 100644 --- a/oocana/oocana/context.py +++ b/oocana/oocana/context.py @@ -434,6 +434,23 @@ async def oomol_token(self) -> str: """ return os.getenv("OOMOL_TOKEN", "") + def _wrap_output_with_warning(self, key: str, value: Any) -> tuple[Any, bool]: + """ + Wrap output value with error handling. + + Args: + key: The output handle key + value: The value to wrap + + Returns: + A tuple of (wrapped_value, success). If success is False, wrapped_value is None. + """ + try: + return self.__wrap_output_value(key, value), True + except (ValueError, IOError) as e: + self.send_warning(f"{e}") + return None, False + def output(self, key: str, value: Any, *, to_node: list[ToNode] | None = None, to_flow: list[ToFlow] | None = None): """ output the value to the next block @@ -444,29 +461,15 @@ def output(self, key: str, value: Any, *, to_node: list[ToNode] | None = None, t :param to_flow: list[ToFlow] | None, the target flow(with output handle) to send the output if both to_node and to_flow are None, the output will be sent to all connected nodes and flows. """ - - try: - wrap_value = self.__wrap_output_value(key, value) - except ValueError as e: - self.send_warning( - f"{e}" - ) - return - except IOError as e: - self.send_warning( - f"{e}" - ) + wrap_value, success = self._wrap_output_with_warning(key, value) + if not success: return + target = None if to_node is not None or to_flow is not None: - target = { - "to_node": to_node, - "to_flow": to_flow, - } - else: - target = None + target = {"to_node": to_node, "to_flow": to_flow} - payload = { + payload: Dict[str, Any] = { "type": "BlockOutput", "handle": key, "output": wrap_value, @@ -474,27 +477,19 @@ def output(self, key: str, value: Any, *, to_node: list[ToNode] | None = None, t if target is not None: payload["options"] = {"target": target} self.__mainframe.send(self.job_info, payload) - + def outputs(self, outputs: Dict[str, Any]): """ output the value to the next block map: Dict[str, Any], the key of the output, should be defined in the block schema output defs, the field name is handle """ - values = {} for key, value in outputs.items(): - try: - wrap_value = self.__wrap_output_value(key, value) + wrap_value, success = self._wrap_output_with_warning(key, value) + if success: values[key] = wrap_value - except ValueError as e: - self.send_warning( - f"{e}" - ) - except IOError as e: - self.send_warning( - f"{e}" - ) + self.__mainframe.send(self.job_info, { "type": "BlockOutputs", "outputs": values, @@ -516,16 +511,9 @@ def finish(self, *, result: Dict[str, Any] | None = None, error: str | None = No wrap_result = {} if isinstance(result, dict): for key, value in result.items(): - try: - wrap_result[key] = self.__wrap_output_value(key, value) - except ValueError as e: - self.send_warning( - f"Output handle key: [{key}] is not defined in Block outputs schema. {e}" - ) - except IOError as e: - self.send_warning( - f"Output handle key: [{key}] is not defined in Block outputs schema. {e}" - ) + wrap_value, success = self._wrap_output_with_warning(key, value) + if success: + wrap_result[key] = wrap_value self.__mainframe.send(self.job_info, {"type": "BlockFinished", "result": wrap_result}) else: