Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.48.0] - 2026-01-29

### Added

`tilebox-storage`: Added a `LocalFileSystemStorageClient` to access data on a local file system, a mounted network file
- `tilebox-storage`: Added a `LocalFileSystemStorageClient` to access data on a local file system, a mounted network file
system or a syncified directory with a remote file system (e.g. Dropbox, Google Drive, etc.).
- `tilebox-workflows`: Added an `ObstoreCache` implementation for the task cache powered by `obstore`.

### Changed

`tilebox-storage`: Renamed the existing `StorageClient` base class in `tilebox.storage.aio` to `CachingStorageClient`
- `tilebox-storage`: Renamed the existing `StorageClient` base class in `tilebox.storage.aio` to `CachingStorageClient`
to accomodate the new `StorageClient` base class that does not provide caching, since `LocalFileSystemStorageClient` is
the first client that does not cache data (since it's already on the local file system).

Expand Down Expand Up @@ -321,7 +324,8 @@ the first client that does not cache data (since it's already on the local file
- Released under the [MIT](https://opensource.org/license/mit) license.
- Released packages: `tilebox-datasets`, `tilebox-workflows`, `tilebox-storage`, `tilebox-grpc`

[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.47.0...HEAD
[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.48.0...HEAD
[0.48.0]: https://github.com/tilebox/tilebox-python/compare/v0.47.0...v0.48.0
[0.47.0]: https://github.com/tilebox/tilebox-python/compare/v0.46.0...v0.47.0
[0.46.0]: https://github.com/tilebox/tilebox-python/compare/v0.45.0...v0.46.0
[0.45.0]: https://github.com/tilebox/tilebox-python/compare/v0.44.0...v0.45.0
Expand Down
64 changes: 58 additions & 6 deletions tilebox-workflows/tilebox/workflows/cache.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import contextlib
import warnings
from abc import ABC, abstractmethod
from collections.abc import Iterator
from io import BytesIO
from pathlib import Path
from pathlib import PurePosixPath as ObjectPath

import boto3
from botocore.exceptions import ClientError
from google.cloud.exceptions import NotFound
from google.cloud.storage import Blob, Bucket
from obstore.store import ObjectStore


class JobCache(ABC):
Expand Down Expand Up @@ -62,6 +65,53 @@ def group(self, key: str) -> "NoCache":
return self


class ObstoreCache(JobCache):
def __init__(self, store: ObjectStore, prefix: str | ObjectPath = ObjectPath(".")) -> None:
"""A cache implementation backed by an obstore ObjectStore.

This cache implementation is the recommended way of working with the cache, as it provides a unified interface
for working with different object stores, while also providing a way to transparently work with local files
as well.

Args:
store: The object store to use for the cache.
prefix: A path prefix to append to all objects stored in the cache. Defaults to no prefix.
"""
self.store = store
self.prefix = ObjectPath(prefix)

def __contains__(self, key: str) -> bool:
with contextlib.suppress(OSError):
self.store.get(str(self.prefix / key))
return True # if get is successful, we know the key is in the cache

return False

def __setitem__(self, key: str, value: bytes) -> None:
self.store.put(str(self.prefix / key), value)

def __delitem__(self, key: str) -> None:
try:
self.store.delete(str(self.prefix / key))
except OSError:
raise KeyError(f"{key} is not cached!") from None

def __getitem__(self, key: str) -> bytes:
try:
entry = self.store.get(str(self.prefix / key))
return bytes(entry.bytes())
except OSError:
raise KeyError(f"{key} is not cached!") from None

def __iter__(self) -> Iterator[str]:
for obj in self.store.list_with_delimiter(str(self.prefix))["objects"]:
path: str = obj["path"]
yield path.removeprefix(str(self.prefix) + "/")

def group(self, key: str) -> "ObstoreCache":
return ObstoreCache(self.store, prefix=str(self.prefix / key))


class InMemoryCache(JobCache):
def __init__(self) -> None:
"""A simple in-memory cache implementation.
Expand Down Expand Up @@ -153,7 +203,7 @@ def __init__(self, root: Path | str = Path("cache")) -> None:
Args:
root: File system path where the cache will be stored. Defaults to "cache" in the current working directory.
"""
self.root = root if isinstance(root, Path) else Path(root)
self.root = Path(root)

def __contains__(self, key: str) -> bool:
return (self.root / key).exists()
Expand Down Expand Up @@ -184,15 +234,17 @@ def group(self, key: str) -> "LocalFileSystemCache":


class GoogleStorageCache(JobCache):
def __init__(self, bucket: Bucket, prefix: str = "jobs") -> None:
def __init__(self, bucket: Bucket, prefix: str | ObjectPath = "jobs") -> None:
"""A cache implementation that stores data in Google Cloud Storage.

Args:
bucket: The Google Cloud Storage bucket to use for the cache.
prefix: A path prefix to append to all objects stored in the cache. Defaults to "jobs".
"""
self.bucket = bucket
self.prefix = Path(prefix) # we still use pathlib here, because it's easier to work with when joining paths
self.prefix = ObjectPath(
prefix
) # we still use pathlib here, because it's easier to work with when joining paths

def _blob(self, key: str) -> Blob:
return self.bucket.blob(str(self.prefix / key))
Expand Down Expand Up @@ -228,22 +280,22 @@ def __iter__(self) -> Iterator[str]:

# make the names relative to the cache prefix (but including the key in the name)
for blob in blobs:
yield str(Path(blob.name).relative_to(self.prefix))
yield str(ObjectPath(blob.name).relative_to(self.prefix))

def group(self, key: str) -> "GoogleStorageCache":
return GoogleStorageCache(self.bucket, prefix=str(self.prefix / key))


class AmazonS3Cache(JobCache):
def __init__(self, bucket: str, prefix: str = "jobs") -> None:
def __init__(self, bucket: str, prefix: str | ObjectPath = "jobs") -> None:
"""A cache implementation that stores data in Amazon S3.

Args:
bucket: The Amazon S3 bucket to use for the cache.
prefix: A path prefix to append to all objects stored in the cache. Defaults to "jobs".
"""
self.bucket = bucket
self.prefix = Path(prefix)
self.prefix = ObjectPath(prefix)
with warnings.catch_warnings():
# https://github.com/boto/boto3/issues/3889
warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*datetime.utcnow.*")
Expand Down
Loading