From 795986fefabcf02cdb26854b6fefd8d4a722b81f Mon Sep 17 00:00:00 2001 From: leavesster <11785335+leavesster@users.noreply.github.com> Date: Sat, 31 Jan 2026 17:14:11 +0800 Subject: [PATCH] refactor: simplify __wrap_output_value with extracted helper methods - Extract __is_dataframe_like() for duck-typing DataFrame detection - Extract __serialize_dataframe_for_cache() for cache serialization - Extract __handle_var_output() for var handle processing - Extract __handle_bin_output() for bin handle processing - Reduce max indentation from 5 levels to 2 levels - Improve code readability and testability --- oocana/oocana/context.py | 161 +++++++++++++++++++++++---------------- 1 file changed, 95 insertions(+), 66 deletions(-) diff --git a/oocana/oocana/context.py b/oocana/oocana/context.py index 40f4cac..eb4e0f1 100644 --- a/oocana/oocana/context.py +++ b/oocana/oocana/context.py @@ -344,87 +344,116 @@ def __store_ref(self, handle: str): job_id=self.job_id, session_id=self.session_id, ) - + def __is_basic_type(self, value: Any) -> bool: return isinstance(value, (int, float, str, bool)) - - def __wrap_output_value(self, handle: str, value: Any): + + def __is_dataframe_like(self, value: Any) -> bool: + """Check if value is DataFrame-like by duck-typing. + Supports pandas DataFrame and subclasses (GeoDataFrame, etc.) """ - wrap the output value: - if the value is a var handle, store it in the store and return the reference. - if the value is a bin handle, store it in the store and return the reference. - if the handle is not defined in the block outputs schema, raise an ValueError. - otherwise, return the value. - :param handle: the handle of the output - :param value: the value of the output - :return: the wrapped value + return ( + hasattr(value, 'to_pickle') and callable(getattr(value, 'to_pickle', None)) and + hasattr(value, 'copy') and callable(getattr(value, 'copy', None)) + ) + + def __serialize_dataframe_for_cache(self, handle: str, value: Any) -> Optional[str]: + """Serialize DataFrame to cache file in background thread. + Returns the serialize_path if successful, None otherwise. + """ + from .serialization import compression_suffix, compression_options + from .internal import string_hash + import threading + + suffix = compression_suffix(context=self) + compression = compression_options(context=self) + flow_node = self.__block_info.stacks[-1].get("flow", "unknown") + "-" + self.node_id + serialize_path = f"{self.pkg_data_dir}/.cache/{string_hash(flow_node)}/{handle}{suffix}" + os.makedirs(os.path.dirname(serialize_path), exist_ok=True) + + try: + copy_value = value.copy() + def write_pickle(): + copy_value.to_pickle(serialize_path, compression=compression) + thread = threading.Thread(target=write_pickle) + thread.start() + return serialize_path + except IOError: + return None + + def __handle_var_output(self, handle: str, value: Any, output_def: OutputHandleDef) -> VarValueDict: + """Handle var type output: store in store and optionally cache DataFrame.""" + ref = self.__store_ref(handle) + self.__store[ref] = value + + serialize_path = None + # Only cache root flow DataFrames + is_root_flow = len(self.__block_info.stacks) < 2 + should_cache = output_def.need_serialize_var_for_cache() and self.__is_dataframe_like(value) + + if is_root_flow and should_cache: + serialize_path = self.__serialize_dataframe_for_cache(handle, value) + + return { + "__OOMOL_TYPE__": "oomol/var", + "value": asdict(ref), + "serialize_path": serialize_path, + } + + def __handle_bin_output(self, handle: str, value: Any) -> BinValueDict: + """Handle binary type output: write to file and return reference.""" + if not isinstance(value, bytes): + self.send_warning( + f"Output handle key: [{handle}] is defined as binary, but the value is not bytes." + ) + raise ValueError("Binary handle requires bytes value") + + bin_file = f"{self.session_dir}/binary/{self.session_id}/{self.job_id}/{handle}" + os.makedirs(os.path.dirname(bin_file), exist_ok=True) + + try: + with open(bin_file, "wb") as f: + f.write(value) + except IOError as e: + raise IOError( + f"Output handle key: [{handle}] is defined as binary, but an error occurred while writing the file: {e}" + ) + + if not os.path.exists(bin_file): + raise IOError( + f"Output handle key: [{handle}] is defined as binary, but the file is not written." + ) + + return { + "__OOMOL_TYPE__": "oomol/bin", + "value": bin_file, + } + + def __wrap_output_value(self, handle: str, value: Any): + """Wrap the output value based on handle type. + + - var handle: store in store and return reference + - bin handle: write to file and return reference + - other: return value as-is """ - # __outputs_def should never be None if self.__outputs_def is None: return value - + output_def = self.__outputs_def.get(handle) if output_def is None: raise ValueError( f"Output handle key: [{handle}] is not defined in Block outputs schema." ) - + if output_def.is_var_handle() and not self.__is_basic_type(value): - ref = self.__store_ref(handle) - self.__store[ref] = value - - serialize_path = None - # only cache root flow - if len(self.__block_info.stacks) < 2 and output_def.need_serialize_var_for_cache() and value.__class__.__name__ == 'DataFrame' and callable(getattr(value, 'to_pickle', None)): - from .serialization import compression_suffix, compression_options - suffix = compression_suffix(context=self) - compression = compression_options(context=self) - flow_node = self.__block_info.stacks[-1].get("flow", "unknown") + "-" + self.node_id - serialize_path = f"{self.pkg_data_dir}/.cache/{string_hash(flow_node)}/{handle}{suffix}" - os.makedirs(os.path.dirname(serialize_path), exist_ok=True) - try: - copy_value = value.copy() # copy the value to avoid blocking the main thread - import threading - def write_pickle(): - copy_value.to_pickle(serialize_path, compression=compression) - thread = threading.Thread(target=write_pickle) - thread.start() - except IOError as e: - pass - var: VarValueDict = { - "__OOMOL_TYPE__": "oomol/var", - "value": asdict(ref), - "serialize_path": serialize_path, - } - return var - + return self.__handle_var_output(handle, value, output_def) + if output_def.is_bin_handle(): - if not isinstance(value, bytes): - self.send_warning( - f"Output handle key: [{handle}] is defined as binary, but the value is not bytes." - ) - return value - - bin_file = f"{self.session_dir}/binary/{self.session_id}/{self.job_id}/{handle}" - os.makedirs(os.path.dirname(bin_file), exist_ok=True) try: - with open(bin_file, "wb") as f: - f.write(value) - except IOError as e: - raise IOError( - f"Output handle key: [{handle}] is defined as binary, but an error occurred while writing the file: {e}" - ) + return self.__handle_bin_output(handle, value) + except ValueError: + return value # Already warned, return original value - if os.path.exists(bin_file): - bin_value: BinValueDict = { - "__OOMOL_TYPE__": "oomol/bin", - "value": bin_file, - } - return bin_value - else: - raise IOError( - f"Output handle key: [{handle}] is defined as binary, but the file is not written." - ) return value async def oomol_token(self) -> str: