Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 95 additions & 66 deletions oocana/oocana/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,87 +344,116 @@ def __store_ref(self, handle: str):
job_id=self.job_id,
session_id=self.session_id,
)

def __is_basic_type(self, value: Any) -> bool:
return isinstance(value, (int, float, str, bool))

def __wrap_output_value(self, handle: str, value: Any):

def __is_dataframe_like(self, value: Any) -> bool:
"""Check if value is DataFrame-like by duck-typing.
Supports pandas DataFrame and subclasses (GeoDataFrame, etc.)
"""
wrap the output value:
if the value is a var handle, store it in the store and return the reference.
if the value is a bin handle, store it in the store and return the reference.
if the handle is not defined in the block outputs schema, raise an ValueError.
otherwise, return the value.
:param handle: the handle of the output
:param value: the value of the output
:return: the wrapped value
return (
hasattr(value, 'to_pickle') and callable(getattr(value, 'to_pickle', None)) and
hasattr(value, 'copy') and callable(getattr(value, 'copy', None))
Comment on lines +356 to +357
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The redundant check callable(getattr(value, 'to_pickle', None)) after hasattr(value, 'to_pickle') is unnecessary. If hasattr(value, 'to_pickle') returns True, then getattr(value, 'to_pickle', None) will never be None (it will be the attribute itself). The callable check could be simplified to just check the attribute directly: callable(value.to_pickle). The same applies to the 'copy' check on line 357.

Suggested change
hasattr(value, 'to_pickle') and callable(getattr(value, 'to_pickle', None)) and
hasattr(value, 'copy') and callable(getattr(value, 'copy', None))
hasattr(value, 'to_pickle') and callable(value.to_pickle) and
hasattr(value, 'copy') and callable(value.copy)

Copilot uses AI. Check for mistakes.
)
Comment on lines +355 to +358
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DataFrame detection logic has changed behavior. The original code checked value.__class__.__name__ == 'DataFrame' which only matched objects with the exact class name "DataFrame". The new code uses duck-typing with hasattr(value, 'to_pickle') and hasattr(value, 'copy'), which will match any object with these methods (like GeoDataFrame, Dask DataFrame, etc.). While the PR description mentions supporting subclasses like GeoDataFrame, this is actually a behavioral change, not a pure refactoring. The new implementation is more flexible but differs from the original behavior.

Copilot uses AI. Check for mistakes.

def __serialize_dataframe_for_cache(self, handle: str, value: Any) -> Optional[str]:
"""Serialize DataFrame to cache file in background thread.
Returns the serialize_path if successful, None otherwise.
"""
from .serialization import compression_suffix, compression_options
from .internal import string_hash
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import from .internal import string_hash will fail because string_hash is not defined in the internal.py module. The string_hash function is actually defined at the module level in context.py itself (lines 22-26). Either move the function to internal.py or use the local string_hash function directly without importing it.

Suggested change
from .internal import string_hash

Copilot uses AI. Check for mistakes.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

关键错误:导入了不存在的符号 string_hash

string_hash 函数已在本文件第 22 行定义,但此处却尝试从 .internal 模块导入。这是导致 CI 流水线失败的直接原因。

🐛 修复导入错误
-        from .internal import string_hash

直接使用本文件中已定义的 string_hash 函数即可,无需导入。

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from .internal import string_hash
🧰 Tools
🪛 GitHub Actions: layer

[error] 365-365: "string_hash" is unknown import symbol (reportAttributeAccessIssue)

🪛 GitHub Actions: pr

[error] 365-365: pyright: 'string_hash' is an unknown import symbol (reportAttributeAccessIssue).

🤖 Prompt for AI Agents
In `@oocana/oocana/context.py` at line 365, The import statement is pulling a
nonexistent symbol string_hash from .internal while string_hash is already
defined in this file (line ~22); remove the erroneous import ("from .internal
import string_hash") and ensure all references use the local string_hash
function (avoid re-import or shadowing), keeping any existing usages intact and
adjusting any tests or callers if they expected an external symbol.

import threading
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import threading statement is placed inside the function at line 366. While this is valid Python, it's unconventional. The import should ideally be at the module level with other imports, or if it must be local, it should come before the other imports from the package. The current placement between two from imports is unusual and could be confusing.

Copilot uses AI. Check for mistakes.

suffix = compression_suffix(context=self)
compression = compression_options(context=self)
flow_node = self.__block_info.stacks[-1].get("flow", "unknown") + "-" + self.node_id
serialize_path = f"{self.pkg_data_dir}/.cache/{string_hash(flow_node)}/{handle}{suffix}"
os.makedirs(os.path.dirname(serialize_path), exist_ok=True)

try:
copy_value = value.copy()
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to value.copy() at line 375 is outside the try-except block, but in the original code it was inside the try-except. If copy() raises an exception (e.g., MemoryError or AttributeError), it will now propagate up and potentially crash the serialization, whereas the original code would silently handle it. This breaks the original behavior where all exceptions during the caching attempt were silently ignored.

Copilot uses AI. Check for mistakes.
def write_pickle():
copy_value.to_pickle(serialize_path, compression=compression)
thread = threading.Thread(target=write_pickle)
thread.start()
return serialize_path
except IOError:
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception handling has changed from catching a broad exception and passing silently to only catching IOError. The original code had except IOError as e: pass which captured the exception variable but didn't use it. The new code catches only IOError and returns None. However, other exceptions like AttributeError (if copy() or to_pickle() fail) will now propagate up instead of being silently ignored. This could be a breaking change if the original code relied on silently ignoring all exceptions during serialization.

Suggested change
except IOError:
except Exception:
logging.getLogger(__name__).exception(
"Failed to serialize DataFrame for cache for handle %s", handle
)

Copilot uses AI. Check for mistakes.
return None
Comment on lines +374 to +382
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

后台线程未被跟踪,可能导致数据丢失。

write_pickle 线程启动后立即返回 serialize_path,但没有任何机制确保线程完成。如果程序在序列化完成前退出,缓存文件可能不完整或损坏。

此外,当前仅捕获 IOError,但 value.copy()to_pickle() 可能抛出其他异常(如 MemoryErrorPicklingError)。

♻️ 建议:改进异常处理并考虑线程管理
         try:
             copy_value = value.copy()
             def write_pickle():
-                copy_value.to_pickle(serialize_path, compression=compression)
+                try:
+                    copy_value.to_pickle(serialize_path, compression=compression)
+                except Exception:
+                    pass  # 后台序列化失败不应影响主流程
             thread = threading.Thread(target=write_pickle)
             thread.start()
             return serialize_path
-        except IOError:
+        except Exception:
             return None
🧰 Tools
🪛 Ruff (0.14.14)

[warning] 380-380: Consider moving this statement to an else block

(TRY300)

🤖 Prompt for AI Agents
In `@oocana/oocana/context.py` around lines 374 - 382, The background thread
started for write_pickle (which calls value.copy() and
copy_value.to_pickle(serialize_path, compression=...)) is not tracked and the
code only catches IOError, risking incomplete/corrupt files; change the logic so
the serialization is either performed synchronously or the thread is joined
before returning (e.g., create the thread via
threading.Thread(target=write_pickle), start it, then call thread.join() or
return a handle/future instead of immediately returning serialize_path), and
broaden exception handling to catch Exception as e around value.copy() and
to_pickle so you log/propagate the actual error (include the exception object)
rather than only catching IOError. Ensure you reference write_pickle,
value.copy(), to_pickle, serialize_path, and the threading.Thread when making
these changes.


def __handle_var_output(self, handle: str, value: Any, output_def: OutputHandleDef) -> VarValueDict:
"""Handle var type output: store in store and optionally cache DataFrame."""
ref = self.__store_ref(handle)
self.__store[ref] = value

serialize_path = None
# Only cache root flow DataFrames
is_root_flow = len(self.__block_info.stacks) < 2
should_cache = output_def.need_serialize_var_for_cache() and self.__is_dataframe_like(value)

if is_root_flow and should_cache:
serialize_path = self.__serialize_dataframe_for_cache(handle, value)

return {
"__OOMOL_TYPE__": "oomol/var",
"value": asdict(ref),
"serialize_path": serialize_path,
}

def __handle_bin_output(self, handle: str, value: Any) -> BinValueDict:
"""Handle binary type output: write to file and return reference."""
if not isinstance(value, bytes):
self.send_warning(
f"Output handle key: [{handle}] is defined as binary, but the value is not bytes."
)
raise ValueError("Binary handle requires bytes value")

bin_file = f"{self.session_dir}/binary/{self.session_id}/{self.job_id}/{handle}"
os.makedirs(os.path.dirname(bin_file), exist_ok=True)

try:
with open(bin_file, "wb") as f:
f.write(value)
except IOError as e:
raise IOError(
f"Output handle key: [{handle}] is defined as binary, but an error occurred while writing the file: {e}"
)

if not os.path.exists(bin_file):
raise IOError(
f"Output handle key: [{handle}] is defined as binary, but the file is not written."
)

return {
"__OOMOL_TYPE__": "oomol/bin",
"value": bin_file,
}

def __wrap_output_value(self, handle: str, value: Any):
"""Wrap the output value based on handle type.

- var handle: store in store and return reference
- bin handle: write to file and return reference
- other: return value as-is
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for __wrap_output_value has been updated but is now less detailed than the original. The original docstring explained what happens when the handle is not defined in the block outputs schema (raises ValueError), and mentioned the return type ("return the wrapped value"). The new docstring omits these details. Consider adding back the information about the ValueError being raised and what the return value is.

Suggested change
- other: return value as-is
- other: return value as-is
Raises:
ValueError: If the given handle is not defined in the block outputs schema.
Returns:
The wrapped value for var/bin handles (as a VarValueDict or BinValueDict),
or the original value if no wrapping is applied or if binary handling fails.

Copilot uses AI. Check for mistakes.
"""
# __outputs_def should never be None
if self.__outputs_def is None:
return value

output_def = self.__outputs_def.get(handle)
if output_def is None:
raise ValueError(
f"Output handle key: [{handle}] is not defined in Block outputs schema."
)

if output_def.is_var_handle() and not self.__is_basic_type(value):
ref = self.__store_ref(handle)
self.__store[ref] = value

serialize_path = None
# only cache root flow
if len(self.__block_info.stacks) < 2 and output_def.need_serialize_var_for_cache() and value.__class__.__name__ == 'DataFrame' and callable(getattr(value, 'to_pickle', None)):
from .serialization import compression_suffix, compression_options
suffix = compression_suffix(context=self)
compression = compression_options(context=self)
flow_node = self.__block_info.stacks[-1].get("flow", "unknown") + "-" + self.node_id
serialize_path = f"{self.pkg_data_dir}/.cache/{string_hash(flow_node)}/{handle}{suffix}"
os.makedirs(os.path.dirname(serialize_path), exist_ok=True)
try:
copy_value = value.copy() # copy the value to avoid blocking the main thread
import threading
def write_pickle():
copy_value.to_pickle(serialize_path, compression=compression)
thread = threading.Thread(target=write_pickle)
thread.start()
except IOError as e:
pass
var: VarValueDict = {
"__OOMOL_TYPE__": "oomol/var",
"value": asdict(ref),
"serialize_path": serialize_path,
}
return var

return self.__handle_var_output(handle, value, output_def)

if output_def.is_bin_handle():
if not isinstance(value, bytes):
self.send_warning(
f"Output handle key: [{handle}] is defined as binary, but the value is not bytes."
)
return value

bin_file = f"{self.session_dir}/binary/{self.session_id}/{self.job_id}/{handle}"
os.makedirs(os.path.dirname(bin_file), exist_ok=True)
try:
with open(bin_file, "wb") as f:
f.write(value)
except IOError as e:
raise IOError(
f"Output handle key: [{handle}] is defined as binary, but an error occurred while writing the file: {e}"
)
return self.__handle_bin_output(handle, value)
except ValueError:
return value # Already warned, return original value
Comment on lines 452 to +455
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling flow for non-bytes binary values has changed. The original code sent a warning and returned the original value directly. The new code raises a ValueError in __handle_bin_output(), which is then caught in __wrap_output_value() to return the original value. While the end result is the same, this approach uses exceptions for control flow, which is generally considered an anti-pattern. The original simpler approach of returning the value directly after warning was clearer.

Copilot uses AI. Check for mistakes.

if os.path.exists(bin_file):
bin_value: BinValueDict = {
"__OOMOL_TYPE__": "oomol/bin",
"value": bin_file,
}
return bin_value
else:
raise IOError(
f"Output handle key: [{handle}] is defined as binary, but the file is not written."
)
return value

async def oomol_token(self) -> str:
Expand Down
Loading