From 190d169864223a4c8ad7d988290893e94a388d9c Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Fri, 16 Jan 2026 16:48:25 -0500 Subject: [PATCH 1/4] feat[python]: support full read/write from object storage Signed-off-by: Andrew Duffy --- Cargo.lock | 39 ++ docs/api/python/index.rst | 1 + docs/api/python/io.rst | 1 + docs/api/python/store.rst | 17 + vortex-io/src/write.rs | 2 +- vortex-python/Cargo.toml | 1 + vortex-python/python/vortex/__init__.py | 7 +- vortex-python/python/vortex/_lib/__init__.pyi | 1 + vortex-python/python/vortex/_lib/io.pyi | 13 +- .../python/vortex/_lib/store/LICENSE | 21 + .../python/vortex/_lib/store/__init__.pyi | 205 +++++++ .../python/vortex/_lib/store/_aws.pyi | 547 ++++++++++++++++++ .../python/vortex/_lib/store/_azure.pyi | 415 +++++++++++++ .../python/vortex/_lib/store/_client.pyi | 87 +++ .../python/vortex/_lib/store/_gcs.pyi | 222 +++++++ .../python/vortex/_lib/store/_http.pyi | 58 ++ .../python/vortex/_lib/store/_retry.pyi | 97 ++++ vortex-python/python/vortex/file.py | 7 +- vortex-python/src/dataset.rs | 41 +- vortex-python/src/file.rs | 24 +- vortex-python/src/io.rs | 132 ++++- vortex-python/src/iter/python.rs | 1 + vortex-python/src/lib.rs | 2 + vortex-python/src/object_store_urls.rs | 95 ++- vortex-python/src/store.rs | 14 + vortex-python/test/test_store.py | 27 + 26 files changed, 2023 insertions(+), 54 deletions(-) create mode 100644 docs/api/python/store.rst create mode 100644 vortex-python/python/vortex/_lib/store/LICENSE create mode 100644 vortex-python/python/vortex/_lib/store/__init__.pyi create mode 100644 vortex-python/python/vortex/_lib/store/_aws.pyi create mode 100644 vortex-python/python/vortex/_lib/store/_azure.pyi create mode 100644 vortex-python/python/vortex/_lib/store/_client.pyi create mode 100644 vortex-python/python/vortex/_lib/store/_gcs.pyi create mode 100644 vortex-python/python/vortex/_lib/store/_http.pyi create mode 100644 vortex-python/python/vortex/_lib/store/_retry.pyi create mode 100644 vortex-python/src/store.rs create mode 100644 vortex-python/test/test_store.py diff --git a/Cargo.lock b/Cargo.lock index 25c73dfe189..8d67be87990 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7500,6 +7500,8 @@ version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab53c047fcd1a1d2a8820fe84f05d6be69e9526be40cb03b73f86b6b03e6d87d" dependencies = [ + "chrono", + "indexmap", "indoc", "libc", "memoffset", @@ -7511,6 +7513,19 @@ dependencies = [ "unindent", ] +[[package]] +name = "pyo3-async-runtimes" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57ddb5b570751e93cc6777e81fee8087e59cd53b5043292f2a6d59d5bd80fdfd" +dependencies = [ + "futures", + "once_cell", + "pin-project-lite", + "pyo3", + "tokio", +] + [[package]] name = "pyo3-build-config" version = "0.27.2" @@ -7576,6 +7591,29 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "pyo3-object_store" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ef5552f108a4d65b78c924b27513471a9ba425341ada4be5ea0ca53806ae316" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "http 1.4.0", + "humantime", + "itertools 0.14.0", + "object_store", + "percent-encoding", + "pyo3", + "pyo3-async-runtimes", + "serde", + "thiserror 1.0.69", + "tokio", + "url", +] + [[package]] name = "quick-xml" version = "0.37.5" @@ -10758,6 +10796,7 @@ dependencies = [ "pyo3", "pyo3-bytes", "pyo3-log", + "pyo3-object_store", "tokio", "tracing", "url", diff --git a/docs/api/python/index.rst b/docs/api/python/index.rst index 29112cd5a05..5d4efa2b79c 100644 --- a/docs/api/python/index.rst +++ b/docs/api/python/index.rst @@ -10,5 +10,6 @@ Python API expr compress io + store dataset type_aliases diff --git a/docs/api/python/io.rst b/docs/api/python/io.rst index aaee91789fc..1c34003d7d6 100644 --- a/docs/api/python/io.rst +++ b/docs/api/python/io.rst @@ -28,3 +28,4 @@ HTTP, S3, Google Cloud Storage, and Azure Blob Storage. .. automodule:: vortex.io :members: :imported-members: + diff --git a/docs/api/python/store.rst b/docs/api/python/store.rst new file mode 100644 index 00000000000..bd2c2eb4892 --- /dev/null +++ b/docs/api/python/store.rst @@ -0,0 +1,17 @@ +Object Store support +==================== + +Vortex arrays support reading and writing to object storage systems such as, S3, Google Cloud Storage, and +Azure Blob Storage. + +.. autosummary:: + :nosignatures: + +.. raw:: html + +
+ +.. automodule:: vortex.store + :members: + :imported-members: + diff --git a/vortex-io/src/write.rs b/vortex-io/src/write.rs index 57048ce66ef..9afc0a31015 100644 --- a/vortex-io/src/write.rs +++ b/vortex-io/src/write.rs @@ -62,7 +62,7 @@ where } fn shutdown(&mut self) -> impl Future> { - ready(Ok(())) + ready(Write::flush(self)) } } diff --git a/vortex-python/Cargo.toml b/vortex-python/Cargo.toml index 47986dc7fb1..18f26f00691 100644 --- a/vortex-python/Cargo.toml +++ b/vortex-python/Cargo.toml @@ -39,6 +39,7 @@ parking_lot = { workspace = true } pyo3 = { workspace = true, features = ["abi3", "abi3-py311"] } pyo3-bytes = { workspace = true } pyo3-log = { workspace = true } +pyo3-object_store = { version = "0.7" } tokio = { workspace = true, features = ["fs", "rt-multi-thread"] } # This feature makes the underlying tracing logs to be emitted as `log` events tracing = { workspace = true, features = ["std", "log"] } diff --git a/vortex-python/python/vortex/__init__.py b/vortex-python/python/vortex/__init__.py index 6b5b10f07c3..57aa3b7b64e 100644 --- a/vortex-python/python/vortex/__init__.py +++ b/vortex-python/python/vortex/__init__.py @@ -1,9 +1,8 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors -from importlib import metadata as _metadata - from . import _lib, arrays, dataset, expr, file, io, ray, registry, scan +from ._lib import exceptions, store from ._lib.arrays import ( # pyright: ignore[reportMissingModuleSource] AlpArray, AlpRdArray, @@ -86,6 +85,8 @@ # Resolve the installed distribution version so it is available as vortex.__version__. __version__ = "unknown" try: + from importlib import metadata as _metadata + # Try to read the installed distribution version for the Python package name. __version__ = _metadata.version("vortex-data") except _metadata.PackageNotFoundError: @@ -96,12 +97,14 @@ # --- Modules --- "arrays", "dataset", + "exceptions", "expr", "file", "scan", "io", "registry", "ray", + "store", # --- Objects and Functions --- "array", "compress", diff --git a/vortex-python/python/vortex/_lib/__init__.pyi b/vortex-python/python/vortex/_lib/__init__.pyi index ca194192153..79f62c4521f 100644 --- a/vortex-python/python/vortex/_lib/__init__.pyi +++ b/vortex-python/python/vortex/_lib/__init__.pyi @@ -1,2 +1,3 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors + diff --git a/vortex-python/python/vortex/_lib/io.pyi b/vortex-python/python/vortex/_lib/io.pyi index 609bb31fc7c..22ea8eb7b3f 100644 --- a/vortex-python/python/vortex/_lib/io.pyi +++ b/vortex-python/python/vortex/_lib/io.pyi @@ -4,15 +4,22 @@ from ..type_aliases import IntoArrayIterator from .arrays import Array from .expr import Expr +from .store import ( + AzureStore, + GCSStore, + HTTPStore, + S3Store, +) def read_url( url: str, *, + store=None, projection: list[str] | list[int] | None = None, row_filter: Expr | None = None, indices: Array | None = None, ) -> Array: ... -def write(iter: IntoArrayIterator, path: str) -> None: ... +def write(iter: IntoArrayIterator, path: str, *, store: AzureStore | HTTPStore | GCSStore | S3Store | None) -> None: ... class VortexWriteOptions: @staticmethod @@ -20,4 +27,6 @@ class VortexWriteOptions: @staticmethod def compact() -> VortexWriteOptions: ... @staticmethod - def write_path(iter: IntoArrayIterator, path: str) -> VortexWriteOptions: ... + def write( + iter: IntoArrayIterator, path: str, *, store: AzureStore | HTTPStore | GCSStore | S3Store | None + ) -> VortexWriteOptions: ... diff --git a/vortex-python/python/vortex/_lib/store/LICENSE b/vortex-python/python/vortex/_lib/store/LICENSE new file mode 100644 index 00000000000..7bca320a280 --- /dev/null +++ b/vortex-python/python/vortex/_lib/store/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Development Seed + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vortex-python/python/vortex/_lib/store/__init__.pyi b/vortex-python/python/vortex/_lib/store/__init__.pyi new file mode 100644 index 00000000000..8b05aec394c --- /dev/null +++ b/vortex-python/python/vortex/_lib/store/__init__.pyi @@ -0,0 +1,205 @@ +# TODO: move to reusable types package +from collections.abc import Callable +from pathlib import Path +from typing import Any, Self, TypeAlias, Unpack, overload + +from ._aws import S3Config as S3Config +from ._aws import S3Credential as S3Credential +from ._aws import S3CredentialProvider as S3CredentialProvider +from ._aws import S3Store as S3Store +from ._azure import AzureAccessKey as AzureAccessKey +from ._azure import AzureBearerToken as AzureBearerToken +from ._azure import AzureConfig as AzureConfig +from ._azure import AzureCredential as AzureCredential +from ._azure import AzureCredentialProvider as AzureCredentialProvider +from ._azure import AzureSASToken as AzureSASToken +from ._azure import AzureStore as AzureStore +from ._client import ClientConfig as ClientConfig +from ._gcs import GCSConfig as GCSConfig +from ._gcs import GCSCredential as GCSCredential +from ._gcs import GCSCredentialProvider as GCSCredentialProvider +from ._gcs import GCSStore as GCSStore +from ._http import HTTPStore as HTTPStore +from ._retry import BackoffConfig as BackoffConfig +from ._retry import RetryConfig as RetryConfig + +@overload +def from_url( + url: str, + *, + config: S3Config | None = None, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, + credential_provider: S3CredentialProvider | None = None, + **kwargs: Unpack[S3Config], +) -> ObjectStore: ... +@overload +def from_url( + url: str, + *, + config: GCSConfig | None = None, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, + credential_provider: GCSCredentialProvider | None = None, + **kwargs: Unpack[GCSConfig], +) -> ObjectStore: ... +@overload +def from_url( + url: str, + *, + config: AzureConfig | None = None, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, + credential_provider: AzureCredentialProvider | None = None, + **kwargs: Unpack[AzureConfig], +) -> ObjectStore: ... +@overload +def from_url( + url: str, + *, + config: None = None, + client_options: None = None, + retry_config: None = None, + automatic_cleanup: bool = False, + mkdir: bool = False, +) -> ObjectStore: ... +def from_url( # type: ignore[misc] # docstring in pyi file + url: str, + *, + config: S3Config | GCSConfig | AzureConfig | None = None, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, + credential_provider: Callable | None = None, + **kwargs: Any, +) -> ObjectStore: + """Easy construction of store by URL, identifying the relevant store. + + This will defer to a store-specific `from_url` constructor based on the provided + `url`. E.g. passing `"s3://bucket/path"` will defer to + [`S3Store.from_url`][vortex.store.S3Store.from_url]. + + Supported formats: + + - `file:///path/to/my/file` -> [`LocalStore`][vortex.store.LocalStore] + - `memory:///` -> [`MemoryStore`][vortex.store.MemoryStore] + - `s3://bucket/path` -> [`S3Store`][vortex.store.S3Store] (also supports `s3a`) + - `gs://bucket/path` -> [`GCSStore`][vortex.store.GCSStore] + - `az://account/container/path` -> [`AzureStore`][vortex.store.AzureStore] (also + supports `adl`, `azure`, `abfs`, `abfss`) + - `http://mydomain/path` -> [`HTTPStore`][vortex.store.HTTPStore] + - `https://mydomain/path` -> [`HTTPStore`][vortex.store.HTTPStore] + + There are also special cases for AWS and Azure for `https://{host?}/path` paths: + + - `dfs.core.windows.net`, `blob.core.windows.net`, `dfs.fabric.microsoft.com`, + `blob.fabric.microsoft.com` -> [`AzureStore`][vortex.store.AzureStore] + - `amazonaws.com` -> [`S3Store`][vortex.store.S3Store] + - `r2.cloudflarestorage.com` -> [`S3Store`][vortex.store.S3Store] + + !!! note + For best static typing, use the constructors on individual store classes + directly. + + Args: + url: well-known storage URL. + + Keyword Args: + config: per-store Configuration. Values in this config will override values + inferred from the url. Defaults to None. + client_options: HTTP Client options. Defaults to None. + retry_config: Retry configuration. Defaults to None. + credential_provider: A callback to provide custom credentials to the underlying store classes. + kwargs: per-store configuration passed down to store-specific builders. + + """ + +class LocalStore: + """An ObjectStore interface to local filesystem storage. + + Can optionally be created with a directory prefix. + + ```py + from pathlib import Path + + store = LocalStore() + store = LocalStore(prefix="/path/to/directory") + store = LocalStore(prefix=Path(".")) + ``` + """ + + def __init__( + self, + prefix: str | Path | None = None, + *, + automatic_cleanup: bool = False, + mkdir: bool = False, + ) -> None: + """Create a new LocalStore. + + Args: + prefix: Use the specified prefix applied to all paths. Defaults to `None`. + + Keyword Args: + automatic_cleanup: if `True`, enables automatic cleanup of empty directories + when deleting files. Defaults to False. + mkdir: if `True` and `prefix` is not `None`, the directory at `prefix` will + attempt to be created. Note that this root directory will not be cleaned + up, even if `automatic_cleanup` is `True`. + + """ + @classmethod + def from_url( + cls, + url: str, + *, + automatic_cleanup: bool = False, + mkdir: bool = False, + ) -> Self: + """Construct a new LocalStore from a `file://` URL. + + **Examples:** + + Construct a new store pointing to the root of your filesystem: + ```py + url = "file:///" + store = LocalStore.from_url(url) + ``` + + Construct a new store with a directory prefix: + ```py + url = "file:///Users/kyle/" + store = LocalStore.from_url(url) + ``` + """ + + def __eq__(self, value: object) -> bool: ... + def __getnewargs_ex__(self): ... + @property + def prefix(self) -> Path | None: + """Get the prefix applied to all operations in this store, if any.""" + +class MemoryStore: + """A fully in-memory implementation of ObjectStore. + + Create a new in-memory store: + ```py + store = MemoryStore() + ``` + """ + + def __init__(self) -> None: ... + +ObjectStore: TypeAlias = AzureStore | GCSStore | HTTPStore | S3Store | LocalStore | MemoryStore +"""All supported ObjectStore implementations. + +!!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import ObjectStore + ``` +""" diff --git a/vortex-python/python/vortex/_lib/store/_aws.pyi b/vortex-python/python/vortex/_lib/store/_aws.pyi new file mode 100644 index 00000000000..600385ab2ef --- /dev/null +++ b/vortex-python/python/vortex/_lib/store/_aws.pyi @@ -0,0 +1,547 @@ +from collections.abc import Coroutine +from datetime import datetime +from typing import Any, Literal, NotRequired, Protocol, Self, TypeAlias, TypedDict, Unpack + +from ._client import ClientConfig +from ._retry import RetryConfig + +S3Regions: TypeAlias = Literal[ + "af-south-1", + "ap-east-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-south-1", + "ap-south-2", + "ap-southeast-1", + "ap-southeast-2", + "ap-southeast-3", + "ap-southeast-4", + "ap-southeast-5", + "ap-southeast-7", + "ca-central-1", + "ca-west-1", + "eu-central-1", + "eu-central-2", + "eu-north-1", + "eu-south-1", + "eu-south-2", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "il-central-1", + "me-central-1", + "me-south-1", + "mx-central-1", + "sa-east-1", + "us-east-1", + "us-east-2", + "us-gov-east-1", + "us-gov-west-1", + "us-west-1", + "us-west-2", +] +"""AWS regions.""" + +S3ChecksumAlgorithm: TypeAlias = Literal["SHA256"] +"""S3 Checksum algorithms + +From https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#using-additional-checksums +""" + +S3EncryptionAlgorithm: TypeAlias = Literal[ + "AES256", + "aws:kms", + "aws:kms:dsse", + "sse-c", +] + +class S3Config(TypedDict, total=False): + """Configuration parameters for S3Store. + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import S3Config + ``` + """ + + access_key_id: str + """AWS Access Key. + + **Environment variable**: `AWS_ACCESS_KEY_ID`. + """ + bucket: str + """Bucket name (required). + + **Environment variables**: + + - `AWS_BUCKET` + - `AWS_BUCKET_NAME` + """ + checksum_algorithm: S3ChecksumAlgorithm | str + """ + Sets the [checksum algorithm] which has to be used for object integrity check during upload. + + [checksum algorithm]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + + **Environment variable**: `AWS_CHECKSUM_ALGORITHM`. + """ + conditional_put: str + """Configure how to provide conditional put support + + Supported values: + + - `"etag"` (default): Supported for S3-compatible stores that support conditional + put using the standard [HTTP precondition] headers `If-Match` and + `If-None-Match`. + + [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions + + - `"dynamo:"` or `"dynamo::"`: The name of a DynamoDB table to + use for coordination. + + This will use the same region, credentials and endpoint as configured for S3. + + **Environment variable**: `AWS_CONDITIONAL_PUT`. + """ + container_credentials_relative_uri: str + """Set the container credentials relative URI + + + + **Environment variable**: `AWS_CONTAINER_CREDENTIALS_RELATIVE_URI`. + """ + copy_if_not_exists: Literal["multipart"] | str + """Configure how to provide "copy if not exists". + + Supported values: + + - `"multipart"`: + + Native Amazon S3 supports copy if not exists through a multipart upload + where the upload copies an existing object and is completed only if the + new object does not already exist. + + !!! warning + When using this mode, `copy_if_not_exists` does not copy tags + or attributes from the source object. + + !!! warning + When using this mode, `copy_if_not_exists` makes only a best + effort attempt to clean up the multipart upload if the copy operation + fails. Consider using a lifecycle rule to automatically clean up + abandoned multipart uploads. + + - `"header::"`: + + Some S3-compatible stores, such as Cloudflare R2, support copy if not exists + semantics through custom headers. + + If set, `copy_if_not_exists` will perform a normal copy operation with the + provided header pair, and expect the store to fail with `412 Precondition + Failed` if the destination file already exists. + + For example `header: cf-copy-destination-if-none-match: *`, would set + the header `cf-copy-destination-if-none-match` to `*`. + + - `"header-with-status:::"`: + + The same as the header variant above but allows custom status code checking, for + object stores that return values other than 412. + + - `"dynamo:"` or `"dynamo::"`: + + The name of a DynamoDB table to use for coordination. + + The default timeout is used if not specified. This will use the same region, + credentials and endpoint as configured for S3. + + **Environment variable**: `AWS_COPY_IF_NOT_EXISTS`. + """ + default_region: S3Regions | str + """Default region. + + **Environment variable**: `AWS_DEFAULT_REGION`. + """ + disable_tagging: bool + """Disable tagging objects. This can be desirable if not supported by the backing store. + + **Environment variable**: `AWS_DISABLE_TAGGING`. + """ + endpoint: str + """The endpoint for communicating with AWS S3. + + Defaults to the [region endpoint]. + + For example, this might be set to `"http://localhost:4566:` for testing against a + localstack instance. + + The `endpoint` field should be consistent with `with_virtual_hosted_style_request`, + i.e. if `virtual_hosted_style_request` is set to `True` then `endpoint` should have + the bucket name included. + + By default, only HTTPS schemes are enabled. To connect to an HTTP endpoint, enable + `allow_http` in the client options. + + [region endpoint]: https://docs.aws.amazon.com/general/latest/gr/s3.html + + **Environment variables**: + + - `AWS_ENDPOINT_URL` + - `AWS_ENDPOINT` + """ + imdsv1_fallback: bool + """Fall back to ImdsV1. + + By default instance credentials will only be fetched over [IMDSv2], as AWS + recommends against having IMDSv1 enabled on EC2 instances as it is vulnerable to + [SSRF attack] + + However, certain deployment environments, such as those running old versions of + kube2iam, may not support IMDSv2. This option will enable automatic fallback to + using IMDSv1 if the token endpoint returns a 403 error indicating that IMDSv2 is not + supported. + + This option has no effect if not using instance credentials. + + [IMDSv2]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html + [SSRF attack]: https://aws.amazon.com/blogs/security/defense-in-depth-open-firewalls-reverse-proxies-ssrf-vulnerabilities-ec2-instance-metadata-service/ + + **Environment variable**: `AWS_IMDSV1_FALLBACK`. + """ + metadata_endpoint: str + """Set the [instance metadata endpoint], used primarily within AWS EC2. + + This defaults to the IPv4 endpoint: `http://169.254.169.254`. One can alternatively + use the IPv6 endpoint `http://fd00:ec2::254`. + + **Environment variable**: `AWS_METADATA_ENDPOINT`. + """ + region: S3Regions | str + """The region, defaults to `us-east-1` + + **Environment variable**: `AWS_REGION`. + """ + request_payer: bool + """If `True`, enable operations on requester-pays buckets. + + + + **Environment variable**: `AWS_REQUEST_PAYER`. + """ + s3_express: bool + """Enable Support for S3 Express One Zone. + + **Environment variable**: `AWS_S3_EXPRESS`. + """ + secret_access_key: str + """Secret Access Key. + + **Environment variable**: `AWS_SECRET_ACCESS_KEY`. + """ + server_side_encryption: S3EncryptionAlgorithm | str + """Type of encryption to use. + + If set, must be one of: + + - `"AES256"` (SSE-S3) + - `"aws:kms"` (SSE-KMS) + - `"aws:kms:dsse"` (DSSE-KMS) + - `"sse-c"` + + **Environment variable**: `AWS_SERVER_SIDE_ENCRYPTION`. + """ + session_token: str + """Token to use for requests (passed to underlying provider). + + **Environment variables**: + + - `AWS_SESSION_TOKEN` + - `AWS_TOKEN` + """ + skip_signature: bool + """If `True`, S3Store will not fetch credentials and will not sign requests. + + This can be useful when interacting with public S3 buckets that deny authorized requests. + + **Environment variable**: `AWS_SKIP_SIGNATURE`. + """ + sse_bucket_key_enabled: bool + """Set whether to enable bucket key for server side encryption. + + This overrides the bucket default setting for bucket keys. + + - When `False`, each object is encrypted with a unique data key. + - When `True`, a single data key is used for the entire bucket, + reducing overhead of encryption. + + **Environment variable**: `AWS_SSE_BUCKET_KEY_ENABLED`. + """ + sse_customer_key_base64: str + """ + The base64 encoded, 256-bit customer encryption key to use for server-side + encryption. If set, the server side encryption config value must be `"sse-c"`. + + **Environment variable**: `AWS_SSE_CUSTOMER_KEY_BASE64`. + """ + sse_kms_key_id: str + """ + The KMS key ID to use for server-side encryption. + + If set, the server side encryption config value must be `"aws:kms"` or `"aws:kms:dsse"`. + + **Environment variable**: `AWS_SSE_KMS_KEY_ID`. + """ + unsigned_payload: bool + """Avoid computing payload checksum when calculating signature. + + See [unsigned payload option](https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html). + + - `False` (default): Signed payload option is used, where the checksum for the request body is computed + and included when constructing a canonical request. + - `True`: Unsigned payload option is used. `UNSIGNED-PAYLOAD` literal is included when constructing a + canonical request, + + **Environment variable**: `AWS_UNSIGNED_PAYLOAD`. + """ + virtual_hosted_style_request: bool + """If virtual hosted style request has to be used. + + If `virtual_hosted_style_request` is: + + - `False` (default): Path style request is used + - `True`: Virtual hosted style request is used + + If the `endpoint` is provided then it should be consistent with + `virtual_hosted_style_request`. i.e. if `virtual_hosted_style_request` is set to + `True` then `endpoint` should have bucket name included. + + **Environment variable**: `AWS_VIRTUAL_HOSTED_STYLE_REQUEST`. + """ + +class S3Credential(TypedDict): + """An S3 credential. + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import S3Credential + ``` + """ + + access_key_id: str + """AWS access key ID.""" + + secret_access_key: str + """AWS secret access key""" + + token: NotRequired[str | None] + """AWS token.""" + + expires_at: datetime | None + """Expiry datetime of credential. The datetime should have time zone set. + + If None, the credential will never expire. + """ + +class S3CredentialProvider(Protocol): + """A type hint for a synchronous or asynchronous callback to provide custom S3 credentials. + + This should be passed into the `credential_provider` parameter of `S3Store`. + + **Examples:** + + Return static credentials that don't expire: + ```py + def get_credentials() -> S3Credential: + return { + "access_key_id": "...", + "secret_access_key": "...", + "token": None, + "expires_at": None, + } + ``` + + Return static credentials that are valid for 5 minutes: + ```py + from datetime import datetime, timedelta, UTC + + async def get_credentials() -> S3Credential: + return { + "access_key_id": "...", + "secret_access_key": "...", + "token": None, + "expires_at": datetime.now(UTC) + timedelta(minutes=5), + } + ``` + + A class-based credential provider with state: + + ```py + from __future__ import annotations + + from typing import TYPE_CHECKING + + import boto3 + import botocore.credentials + + if TYPE_CHECKING: + from vortex.store import S3Credential + + + class Boto3CredentialProvider: + credentials: botocore.credentials.Credentials + + def __init__(self, session: boto3.session.Session) -> None: + credentials = session.get_credentials() + if credentials is None: + raise ValueError("Received None from session.get_credentials") + + self.credentials = credentials + + def __call__(self) -> S3Credential: + frozen_credentials = self.credentials.get_frozen_credentials() + return { + "access_key_id": frozen_credentials.access_key, + "secret_access_key": frozen_credentials.secret_key, + "token": frozen_credentials.token, + "expires_at": None, + } + ``` + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import S3CredentialProvider + ``` + """ + + def __call__(self) -> S3Credential | Coroutine[Any, Any, S3Credential]: + """Return an `S3Credential`.""" + +class S3Store: + """Interface to an Amazon S3 bucket. + + All constructors will check for environment variables. Refer to + [`S3Config`][vortex.store.S3Config] for valid environment variables. + + **Examples**: + + **Using requester-pays buckets**: + + Pass `request_payer=True` as a keyword argument or have `AWS_REQUESTER_PAYS=True` + set in the environment. + + **Anonymous requests**: + + Pass `skip_signature=True` as a keyword argument or have `AWS_SKIP_SIGNATURE=True` + set in the environment. + """ + + def __init__( # type: ignore[misc] # Overlap between argument names and ** TypedDict items: "bucket" + self, + bucket: str | None = None, + *, + prefix: str | None = None, + config: S3Config | None = None, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, + credential_provider: S3CredentialProvider | None = None, + **kwargs: Unpack[S3Config], # type: ignore # noqa: PGH003 (bucket key overlaps with positional arg) + ) -> None: + """Create a new S3Store. + + Args: + bucket: The AWS bucket to use. + + Keyword Args: + prefix: A prefix within the bucket to use for all operations. + config: AWS configuration. Values in this config will override values inferred from the + environment. Defaults to None. + client_options: HTTP Client options. Defaults to None. + retry_config: Retry configuration. Defaults to None. + credential_provider: A callback to provide custom S3 credentials. + kwargs: AWS configuration values. Supports the same values as `config`, but as named keyword + args. + + Returns: + S3Store + + """ + @classmethod + def from_url( + cls, + url: str, + *, + config: S3Config | None = None, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, + credential_provider: S3CredentialProvider | None = None, + **kwargs: Unpack[S3Config], + ) -> Self: + """Parse available connection info from a well-known storage URL. + + Any path on the URL will be assigned as the `prefix` for the store. So if you + pass `s3://bucket/path/to/directory`, the store will be created with a prefix of + `path/to/directory`, and all further operations will use paths relative to that + prefix. + + The supported url schemes are: + + - `s3:///` + - `s3a:///` + - `https://s3..amazonaws.com/` + - `https://.s3..amazonaws.com` + - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket` + + Args: + url: well-known storage URL. + + Keyword Args: + config: AWS Configuration. Values in this config will override values inferred from the url. + Defaults to None. + client_options: HTTP Client options. Defaults to None. + retry_config: Retry configuration. Defaults to None. + credential_provider: A callback to provide custom S3 credentials. + kwargs: AWS configuration values. Supports the same values as `config`, but as named keyword + args. + + + Returns: + S3Store + + """ + + def __eq__(self, value: object) -> bool: ... + def __getnewargs_ex__(self): ... + @property + def prefix(self) -> str | None: + """Get the prefix applied to all operations in this store, if any.""" + @property + def config(self) -> S3Config: + """Get the underlying S3 config parameters.""" + @property + def client_options(self) -> ClientConfig | None: + """Get the store's client configuration.""" + @property + def credential_provider(self) -> S3CredentialProvider | None: + """Get the store's credential provider.""" + @property + def retry_config(self) -> RetryConfig | None: + """Get the store's retry configuration.""" diff --git a/vortex-python/python/vortex/_lib/store/_azure.pyi b/vortex-python/python/vortex/_lib/store/_azure.pyi new file mode 100644 index 00000000000..851dab69cb5 --- /dev/null +++ b/vortex-python/python/vortex/_lib/store/_azure.pyi @@ -0,0 +1,415 @@ +from collections.abc import Coroutine +from datetime import datetime +from typing import Any, Protocol, Self, TypeAlias, TypedDict, Unpack + +from ._client import ClientConfig +from ._retry import RetryConfig + +class AzureConfig(TypedDict, total=False): + """Configuration parameters for AzureStore. + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import AzureConfig + ``` + """ + + account_name: str + """The name of the azure storage account. (Required.) + + **Environment variable**: `AZURE_STORAGE_ACCOUNT_NAME`. + """ + account_key: str + """Master key for accessing storage account. + + **Environment variables**: + + - `AZURE_STORAGE_ACCOUNT_KEY` + - `AZURE_STORAGE_ACCESS_KEY` + - `AZURE_STORAGE_MASTER_KEY` + """ + client_id: str + """The client id for use in client secret or k8s federated credential flow. + + **Environment variables**: + + - `AZURE_STORAGE_CLIENT_ID` + - `AZURE_CLIENT_ID` + """ + client_secret: str + """The client secret for use in client secret flow. + + **Environment variables**: + + - `AZURE_STORAGE_CLIENT_SECRET` + - `AZURE_CLIENT_SECRET` + """ + tenant_id: str + """The tenant id for use in client secret or k8s federated credential flow. + + **Environment variables**: + + - `AZURE_STORAGE_TENANT_ID` + - `AZURE_STORAGE_AUTHORITY_ID` + - `AZURE_TENANT_ID` + - `AZURE_AUTHORITY_ID` + """ + authority_host: str + """Sets an alternative authority host for OAuth based authorization. + + Defaults to `https://login.microsoftonline.com`. + + Common hosts for azure clouds are: + + - Azure China: `"https://login.chinacloudapi.cn"` + - Azure Germany: `"https://login.microsoftonline.de"` + - Azure Government: `"https://login.microsoftonline.us"` + - Azure Public: `"https://login.microsoftonline.com"` + + **Environment variables**: + + - `AZURE_STORAGE_AUTHORITY_HOST` + - `AZURE_AUTHORITY_HOST` + """ + sas_key: str + """ + Shared access signature. + + The signature is expected to be percent-encoded, `much `like they are provided in + the azure storage explorer or azure portal. + + **Environment variables**: + + - `AZURE_STORAGE_SAS_KEY` + - `AZURE_STORAGE_SAS_TOKEN` + """ + token: str + """A static bearer token to be used for authorizing requests. + + **Environment variable**: `AZURE_STORAGE_TOKEN`. + """ + use_emulator: bool + """Set if the Azure emulator should be used (defaults to `False`). + + **Environment variable**: `AZURE_STORAGE_USE_EMULATOR`. + """ + use_fabric_endpoint: bool + """Set if Microsoft Fabric url scheme should be used (defaults to `False`). + + When disabled the url scheme used is `https://{account}.blob.core.windows.net`. + When enabled the url scheme used is `https://{account}.dfs.fabric.microsoft.com`. + + !!! note + + `endpoint` will take precedence over this option. + """ + endpoint: str + """Override the endpoint used to communicate with blob storage. + + Defaults to `https://{account}.blob.core.windows.net`. + + By default, only HTTPS schemes are enabled. To connect to an HTTP endpoint, enable + `allow_http` in the client options. + + **Environment variables**: + + - `AZURE_STORAGE_ENDPOINT` + - `AZURE_ENDPOINT` + """ + msi_endpoint: str + """Endpoint to request a imds managed identity token. + + **Environment variables**: + + - `AZURE_MSI_ENDPOINT` + - `AZURE_IDENTITY_ENDPOINT` + """ + object_id: str + """Object id for use with managed identity authentication. + + **Environment variable**: `AZURE_OBJECT_ID`. + """ + msi_resource_id: str + """Msi resource id for use with managed identity authentication. + + **Environment variable**: `AZURE_MSI_RESOURCE_ID`. + """ + federated_token_file: str + """Sets a file path for acquiring azure federated identity token in k8s. + + Requires `client_id` and `tenant_id` to be set. + + **Environment variable**: `AZURE_FEDERATED_TOKEN_FILE`. + """ + use_azure_cli: bool + """Set if the Azure Cli should be used for acquiring access token. + + . + + **Environment variable**: `AZURE_USE_AZURE_CLI`. + """ + skip_signature: bool + """If enabled, `AzureStore` will not fetch credentials and will not sign requests. + + This can be useful when interacting with public containers. + + **Environment variable**: `AZURE_SKIP_SIGNATURE`. + """ + container_name: str + """Container name. + + **Environment variable**: `AZURE_CONTAINER_NAME`. + """ + disable_tagging: bool + """If set to `True` will ignore any tags provided to uploads. + + **Environment variable**: `AZURE_DISABLE_TAGGING`. + """ + fabric_token_service_url: str + """Service URL for Fabric OAuth2 authentication. + + **Environment variable**: `AZURE_FABRIC_TOKEN_SERVICE_URL`. + """ + fabric_workload_host: str + """Workload host for Fabric OAuth2 authentication. + + **Environment variable**: `AZURE_FABRIC_WORKLOAD_HOST`. + """ + fabric_session_token: str + """Session token for Fabric OAuth2 authentication. + + **Environment variable**: `AZURE_FABRIC_SESSION_TOKEN`. + """ + fabric_cluster_identifier: str + """Cluster identifier for Fabric OAuth2 authentication. + + **Environment variable**: `AZURE_FABRIC_CLUSTER_IDENTIFIER`. + """ + +class AzureAccessKey(TypedDict): + """A shared Azure Storage Account Key. + + + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import AzureAccessKey + ``` + """ + + access_key: str + """Access key value.""" + + expires_at: datetime | None + """Expiry datetime of credential. The datetime should have time zone set. + + If None, the credential will never expire. + """ + +class AzureSASToken(TypedDict): + """A shared access signature. + + + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import AzureSASToken + ``` + """ + + sas_token: str | list[tuple[str, str]] + """SAS token.""" + + expires_at: datetime | None + """Expiry datetime of credential. The datetime should have time zone set. + + If None, the credential will never expire. + """ + +class AzureBearerToken(TypedDict): + """An authorization token. + + + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import AzureBearerToken + ``` + """ + + token: str + """Bearer token.""" + + expires_at: datetime | None + """Expiry datetime of credential. The datetime should have time zone set. + + If None, the credential will never expire. + """ + +AzureCredential: TypeAlias = AzureAccessKey | AzureSASToken | AzureBearerToken +"""A type alias for supported azure credentials to be returned from `AzureCredentialProvider`. + +!!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import AzureCredential + ``` +""" + +class AzureCredentialProvider(Protocol): + """A type hint for a synchronous or asynchronous callback to provide custom Azure credentials. + + This should be passed into the `credential_provider` parameter of `AzureStore`. + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import AzureCredentialProvider + ``` + """ + + def __call__(self) -> AzureCredential | Coroutine[Any, Any, AzureCredential]: + """Return an `AzureCredential`.""" + +class AzureStore: + """Interface to a Microsoft Azure Blob Storage container. + + All constructors will check for environment variables. Refer to + [`AzureConfig`][vortex.store.AzureConfig] for valid environment variables. + """ + + def __init__( # type: ignore[misc] # Overlap between argument names and ** TypedDict items: "container_name" + self, + container_name: str | None = None, + *, + prefix: str | None = None, + config: AzureConfig | None = None, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, + credential_provider: AzureCredentialProvider | None = None, + **kwargs: Unpack[AzureConfig], # type: ignore # noqa: PGH003 (container_name key overlaps with positional arg) + ) -> None: + """Construct a new AzureStore. + + Args: + container_name: the name of the container. + + Keyword Args: + prefix: A prefix within the bucket to use for all operations. + config: Azure Configuration. Values in this config will override values inferred from + the url. Defaults to None. + client_options: HTTP Client options. Defaults to None. + retry_config: Retry configuration. Defaults to None. + credential_provider: A callback to provide custom Azure credentials. + kwargs: Azure configuration values. Supports the same values as `config`, but as named + keyword args. + + Returns: + AzureStore + + """ + + @classmethod + def from_url( + cls, + url: str, + *, + prefix: str | None = None, + config: AzureConfig | None = None, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, + credential_provider: AzureCredentialProvider | None = None, + **kwargs: Unpack[AzureConfig], + ) -> Self: + """Construct a new AzureStore with values populated from a well-known storage URL. + + Any path on the URL will be assigned as the `prefix` for the store. So if you + pass `https://.blob.core.windows.net//path/to/directory`, + the store will be created with a prefix of `path/to/directory`, and all further + operations will use paths relative to that prefix. + + The supported url schemes are: + + - `abfs[s]:///` (according to [fsspec](https://github.com/fsspec/adlfs)) + - `abfs[s]://@.dfs.core.windows.net/` + - `abfs[s]://@.dfs.fabric.microsoft.com/` + - `az:///` (according to [fsspec](https://github.com/fsspec/adlfs)) + - `adl:///` (according to [fsspec](https://github.com/fsspec/adlfs)) + - `azure:///` (custom) + - `https://.dfs.core.windows.net` + - `https://.blob.core.windows.net` + - `https://.blob.core.windows.net/` + - `https://.dfs.fabric.microsoft.com` + - `https://.dfs.fabric.microsoft.com/` + - `https://.blob.fabric.microsoft.com` + - `https://.blob.fabric.microsoft.com/` + + Args: + url: well-known storage URL. + + Keyword Args: + prefix: A prefix within the bucket to use for all operations. + config: Azure Configuration. Values in this config will override values inferred from the + url. Defaults to None. + client_options: HTTP Client options. Defaults to None. + retry_config: Retry configuration. Defaults to None. + credential_provider: A callback to provide custom Azure credentials. + kwargs: Azure configuration values. Supports the same values as `config`, but as named keyword + args. + + Returns: + AzureStore + + """ + + def __eq__(self, value: object) -> bool: ... + def __getnewargs_ex__(self): ... + @property + def prefix(self) -> str | None: + """Get the prefix applied to all operations in this store, if any.""" + @property + def config(self) -> AzureConfig: + """Get the underlying Azure config parameters.""" + @property + def client_options(self) -> ClientConfig | None: + """Get the store's client configuration.""" + @property + def credential_provider(self) -> AzureCredentialProvider | None: + """Get the store's credential provider.""" + @property + def retry_config(self) -> RetryConfig | None: + """Get the store's retry configuration.""" diff --git a/vortex-python/python/vortex/_lib/store/_client.pyi b/vortex-python/python/vortex/_lib/store/_client.pyi new file mode 100644 index 00000000000..a622caad434 --- /dev/null +++ b/vortex-python/python/vortex/_lib/store/_client.pyi @@ -0,0 +1,87 @@ +from datetime import timedelta +from typing import TypedDict + +class ClientConfig(TypedDict, total=False): + """HTTP client configuration. + + For timeout values (`connect_timeout`, `http2_keep_alive_timeout`, + `pool_idle_timeout`, and `timeout`), values can either be Python `timedelta` + objects, or they can be "human-readable duration strings". + + The human-readable duration string is a concatenation of time spans. Where each time + span is an integer number and a suffix. Supported suffixes: + + - `nsec`, `ns` -- nanoseconds + - `usec`, `us` -- microseconds + - `msec`, `ms` -- milliseconds + - `seconds`, `second`, `sec`, `s` + - `minutes`, `minute`, `min`, `m` + - `hours`, `hour`, `hr`, `h` + - `days`, `day`, `d` + - `weeks`, `week`, `w` + - `months`, `month`, `M` -- defined as 30.44 days + - `years`, `year`, `y` -- defined as 365.25 days + + For example: + + - `"2h 37min"` + - `"32ms"` + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import ClientConfig + ``` + """ + + allow_http: bool + """Allow non-TLS, i.e. non-HTTPS connections.""" + allow_invalid_certificates: bool + """Skip certificate validation on https connections. + + !!! warning + + You should think very carefully before using this method. If + invalid certificates are trusted, *any* certificate for *any* site + will be trusted for use. This includes expired certificates. This + introduces significant vulnerabilities, and should only be used + as a last resort or for testing + """ + connect_timeout: str | timedelta + """Timeout for only the connect phase of a Client""" + default_content_type: str + """Default `CONTENT_TYPE` for uploads""" + default_headers: dict[str, str] | dict[str, bytes] + """Default headers to be sent with each request""" + http1_only: bool + """Only use http1 connections.""" + http2_keep_alive_interval: str + """Interval for HTTP2 Ping frames should be sent to keep a connection alive.""" + http2_keep_alive_timeout: str | timedelta + """Timeout for receiving an acknowledgement of the keep-alive ping.""" + http2_keep_alive_while_idle: str + """Enable HTTP2 keep alive pings for idle connections""" + http2_only: bool + """Only use http2 connections""" + pool_idle_timeout: str | timedelta + """The pool max idle timeout. + + This is the length of time an idle connection will be kept alive. + """ + pool_max_idle_per_host: str + """Maximum number of idle connections per host.""" + proxy_url: str + """HTTP proxy to use for requests.""" + timeout: str | timedelta + """Request timeout. + + The timeout is applied from when the request starts connecting until the + response body has finished. + """ + user_agent: str + """User-Agent header to be used by this client.""" diff --git a/vortex-python/python/vortex/_lib/store/_gcs.pyi b/vortex-python/python/vortex/_lib/store/_gcs.pyi new file mode 100644 index 00000000000..133314ff9a1 --- /dev/null +++ b/vortex-python/python/vortex/_lib/store/_gcs.pyi @@ -0,0 +1,222 @@ +from collections.abc import Coroutine +from datetime import datetime +from typing import Any, Protocol, Self, TypedDict, Unpack + +from ._client import ClientConfig +from ._retry import RetryConfig + +class GCSConfig(TypedDict, total=False): + """Configuration parameters for GCSStore. + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import GCSConfig + ``` + """ + + service_account: str + """Path to the service account file. + + This or `service_account_key` must be set. + + Example value `"/tmp/gcs.json"`. Example contents of `gcs.json`: + + ```json + { + "gcs_base_url": "https://localhost:4443", + "disable_oauth": true, + "client_email": "", + "private_key": "" + } + ``` + + **Environment variables**: + + - `GOOGLE_SERVICE_ACCOUNT` + - `GOOGLE_SERVICE_ACCOUNT_PATH` + """ + + service_account_key: str + """The serialized service account key. + + The service account must be in the JSON format. This or `with_service_account_path` + must be set. + + **Environment variable**: `GOOGLE_SERVICE_ACCOUNT_KEY`. + """ + + bucket: str + """Bucket name. (required) + + **Environment variables**: + + - `GOOGLE_BUCKET` + - `GOOGLE_BUCKET_NAME` + """ + + application_credentials: str + """Application credentials path. + + See . + + **Environment variable**: `GOOGLE_APPLICATION_CREDENTIALS`. + """ + + skip_signature: bool + """If `True`, GCSStore will not fetch credentials and will not sign requests. + + This can be useful when interacting with public GCS buckets that deny authorized requests. + + **Environment variable**: `GOOGLE_SKIP_SIGNATURE`. + """ + +class GCSCredential(TypedDict): + """A Google Cloud Storage Credential. + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import GCSCredential + ``` + """ + + token: str + """An HTTP bearer token.""" + + expires_at: datetime | None + """Expiry datetime of credential. The datetime should have time zone set. + + If None, the credential will never expire. + """ + +class GCSCredentialProvider(Protocol): + """A type hint for a synchronous or asynchronous callback to provide custom Google Cloud Storage credentials. + + This should be passed into the `credential_provider` parameter of `GCSStore`. + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import GCSCredentialProvider + ``` + """ + + def __call__(self) -> GCSCredential | Coroutine[Any, Any, GCSCredential]: + """Return a `GCSCredential`.""" + +class GCSStore: + """Interface to Google Cloud Storage. + + All constructors will check for environment variables. Refer to + [`GCSConfig`][vortex.store.GCSConfig] for valid environment variables. + + If no credentials are explicitly provided, they will be sourced from the environment + as documented + [here](https://cloud.google.com/docs/authentication/application-default-credentials). + """ + + def __init__( # type: ignore[misc] # Overlap between argument names and ** TypedDict items: "bucket" + self, + bucket: str | None = None, + *, + prefix: str | None = None, + config: GCSConfig | None = None, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, + credential_provider: GCSCredentialProvider | None = None, + **kwargs: Unpack[GCSConfig], # type: ignore # noqa: PGH003 (bucket key overlaps with positional arg) + ) -> None: + """Construct a new GCSStore. + + Args: + bucket: The GCS bucket to use. + + Keyword Args: + prefix: A prefix within the bucket to use for all operations. + config: GCS Configuration. Values in this config will override values inferred from the + environment. Defaults to None. + client_options: HTTP Client options. Defaults to None. + retry_config: Retry configuration. Defaults to None. + credential_provider: A callback to provide custom Google credentials. + kwargs: GCS configuration values. Supports the same values as `config`, + but as named keyword args. + + Returns: + GCSStore + + """ + + @classmethod + def from_url( + cls, + url: str, + *, + prefix: str | None = None, + config: GCSConfig | None = None, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, + credential_provider: GCSCredentialProvider | None = None, + **kwargs: Unpack[GCSConfig], + ) -> Self: + """Construct a new GCSStore with values populated from a well-known storage URL. + + Any path on the URL will be assigned as the `prefix` for the store. So if you + pass `gs:///path/to/directory`, the store will be created with a prefix + of `path/to/directory`, and all further operations will use paths relative to + that prefix. + + The supported url schemes are: + + - `gs:///` + + Args: + url: well-known storage URL. + + Keyword Args: + prefix: A prefix within the bucket to use for all operations. + config: GCS Configuration. Values in this config will override values inferred from the + url. Defaults to None. + client_options: HTTP Client options. Defaults to None. + retry_config: Retry configuration. Defaults to None. + credential_provider: A callback to provide custom Google credentials. + kwargs: GCS configuration values. Supports the same values as `config`, but as named keyword + args. + + Returns: + GCSStore + + """ + + def __eq__(self, value: object) -> bool: ... + def __getnewargs_ex__(self): ... + @property + def prefix(self) -> str | None: + """Get the prefix applied to all operations in this store, if any.""" + @property + def config(self) -> GCSConfig: + """Get the underlying GCS config parameters.""" + @property + def client_options(self) -> ClientConfig | None: + """Get the store's client configuration.""" + @property + def credential_provider(self) -> GCSCredentialProvider | None: + """Get the store's credential provider.""" + @property + def retry_config(self) -> RetryConfig | None: + """Get the store's retry configuration.""" diff --git a/vortex-python/python/vortex/_lib/store/_http.pyi b/vortex-python/python/vortex/_lib/store/_http.pyi new file mode 100644 index 00000000000..f4ad000018e --- /dev/null +++ b/vortex-python/python/vortex/_lib/store/_http.pyi @@ -0,0 +1,58 @@ +from typing import Self + +from ._client import ClientConfig +from ._retry import RetryConfig + +class HTTPStore: + """Configure a connection to a generic HTTP server.""" + + def __init__( + self, + url: str, + *, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, + ) -> None: + """Construct a new HTTPStore from a URL. + + Any path on the URL will be assigned as the `prefix` for the store. So if you + pass `https://example.com/path/to/directory`, the store will be created with a + prefix of `path/to/directory`, and all further operations will use paths + relative to that prefix. + + Args: + url: The base URL to use for the store. + + Keyword Args: + client_options: HTTP Client options. Defaults to None. + retry_config: Retry configuration. Defaults to None. + + Returns: + HTTPStore + + """ + + @classmethod + def from_url( + cls, + url: str, + *, + client_options: ClientConfig | None = None, + retry_config: RetryConfig | None = None, + ) -> Self: + """Construct a new HTTPStore from a URL. + + This is an alias of [`HTTPStore.__init__`][vortex.store.HTTPStore.__init__]. + """ + + def __eq__(self, value: object) -> bool: ... + def __getnewargs_ex__(self): ... + @property + def url(self) -> str: + """Get the base url of this store.""" + @property + def client_options(self) -> ClientConfig | None: + """Get the store's client configuration.""" + @property + def retry_config(self) -> RetryConfig | None: + """Get the store's retry configuration.""" diff --git a/vortex-python/python/vortex/_lib/store/_retry.pyi b/vortex-python/python/vortex/_lib/store/_retry.pyi new file mode 100644 index 00000000000..e8c2ed0c4bd --- /dev/null +++ b/vortex-python/python/vortex/_lib/store/_retry.pyi @@ -0,0 +1,97 @@ +from datetime import timedelta +from typing import TypedDict + +class BackoffConfig(TypedDict, total=False): + """Exponential backoff with jitter. + + See + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import BackoffConfig + ``` + """ + + init_backoff: timedelta + """The initial backoff duration. + + Defaults to 100 milliseconds. + """ + + max_backoff: timedelta + """The maximum backoff duration. + + Defaults to 15 seconds. + """ + + base: int | float + """The base of the exponential to use. + + Defaults to `2`. + """ + +class RetryConfig(TypedDict, total=False): + """The configuration for how to respond to request errors. + + The following categories of error will be retried: + + * 5xx server errors + * Connection errors + * Dropped connections + * Timeouts for [safe] / read-only requests + + Requests will be retried up to some limit, using exponential + backoff with jitter. See [`BackoffConfig`][vortex.store.BackoffConfig] for + more information + + [safe]: https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1 + + !!! warning "Not importable at runtime" + + To use this type hint in your code, import it within a `TYPE_CHECKING` block: + + ```py + from __future__ import annotations + from typing import TYPE_CHECKING + if TYPE_CHECKING: + from vortex.store import RetryConfig + ``` + """ + + backoff: BackoffConfig + """The backoff configuration. + + Defaults to the values listed above if not provided. + """ + + max_retries: int + """ + The maximum number of times to retry a request + + Set to 0 to disable retries. + + Defaults to 10. + """ + + retry_timeout: timedelta + """ + The maximum length of time from the initial request + after which no further retries will be attempted + + This not only bounds the length of time before a server + error will be surfaced to the application, but also bounds + the length of time a request's credentials must remain valid. + + As requests are retried without renewing credentials or + regenerating request payloads, this number should be kept + below 5 minutes to avoid errors due to expired credentials + and/or request payloads. + + Defaults to 3 minutes. + """ diff --git a/vortex-python/python/vortex/file.py b/vortex-python/python/vortex/file.py index acd29763712..4a9f9ea8818 100644 --- a/vortex-python/python/vortex/file.py +++ b/vortex-python/python/vortex/file.py @@ -21,7 +21,7 @@ import polars -def open(path: str, *, without_segment_cache: bool = False) -> VortexFile: +def open(path: str, *, store=None, without_segment_cache: bool = False) -> VortexFile: """ Lazily open a Vortex file located at the given path or URL. @@ -29,6 +29,9 @@ def open(path: str, *, without_segment_cache: bool = False) -> VortexFile: ---------- path : :class:`str` A local path or URL to the Vortex file. + store : + An object store created from the `vortex.store` package. By default + the store is inferred based on the path without_segment_cache : :class:`bool` If true, disable the segment cache for this file, useful when memory is constrained. @@ -43,7 +46,7 @@ def open(path: str, *, without_segment_cache: bool = False) -> VortexFile: See also: :class:`vortex.dataset.VortexDataset` """ - return VortexFile(_file.open(path, without_segment_cache=without_segment_cache)) + return VortexFile(_file.open(path, store=store, without_segment_cache=without_segment_cache)) @final diff --git a/vortex-python/src/dataset.rs b/vortex-python/src/dataset.rs index 4608edd6c64..12b831cbd96 100644 --- a/vortex-python/src/dataset.rs +++ b/vortex-python/src/dataset.rs @@ -32,7 +32,8 @@ use crate::arrow::ToPyArrow; use crate::error::PyVortexResult; use crate::expr::PyExpr; use crate::install_module; -use crate::object_store_urls::object_store_from_url; +use crate::object_store_urls::ResolvedStore; +use crate::object_store_urls::resolve_store; pub(crate) fn init(py: Python, parent: &Bound) -> PyResult<()> { let m = PyModule::new(py, "dataset")?; @@ -110,14 +111,20 @@ impl PyVortexDataset { Ok(Self { vxf, schema }) } - pub async fn from_url(url: &str) -> VortexResult { - let (object_store, path) = object_store_from_url(url)?; - PyVortexDataset::try_new( - SESSION - .open_options() - .open_object_store(&object_store, path.as_ref()) - .await?, - ) + pub async fn from_url( + url: &str, + store: Option>, + ) -> VortexResult { + let vxf = match resolve_store(url, store)? { + ResolvedStore::ObjectStore(store, path) => { + SESSION + .open_options() + .open_object_store(&store, path.as_ref()) + .await? + } + ResolvedStore::Path(path) => SESSION.open_options().open_path(path).await?, + }; + PyVortexDataset::try_new(vxf) } } @@ -221,6 +228,18 @@ impl PyVortexDataset { } #[pyfunction] -pub fn dataset_from_url(py: Python, url: &str) -> PyVortexResult { - Ok(py.detach(|| TOKIO_RUNTIME.block_on(PyVortexDataset::from_url(url)))?) +#[pyo3(signature = (url, *, store = None))] +pub fn dataset_from_url( + py: Python, + url: &str, + store: Option>, +) -> PyVortexResult { + let store_arc = if let Some(store_obj) = store { + let py_store: pyo3_object_store::PyObjectStore = store_obj.extract()?; + Some(py_store.into_inner()) + } else { + None + }; + + Ok(py.detach(|| TOKIO_RUNTIME.block_on(PyVortexDataset::from_url(url, store_arc)))?) } diff --git a/vortex-python/src/file.rs b/vortex-python/src/file.rs index 4ba481bc3e0..ff090129826 100644 --- a/vortex-python/src/file.rs +++ b/vortex-python/src/file.rs @@ -7,6 +7,7 @@ use arrow_array::RecordBatchReader; use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; use pyo3::types::PyList; +use pyo3_object_store::PyObjectStore; use vortex::array::ArrayRef; use vortex::array::ToCanonical; use vortex::compute::cast; @@ -35,6 +36,8 @@ use crate::error::PyVortexResult; use crate::expr::PyExpr; use crate::install_module; use crate::iter::PyArrayIterator; +use crate::object_store_urls::ResolvedStore; +use crate::object_store_urls::resolve_store; use crate::scan::PyRepeatedScan; pub(crate) fn init(py: Python, parent: &Bound) -> PyResult<()> { @@ -48,9 +51,18 @@ pub(crate) fn init(py: Python, parent: &Bound) -> PyResult<()> { Ok(()) } +/// Open a Vortex file for reading. +/// +/// Callers can optionally configure an object store to build from using one of the definitions +/// in the `vortex.store` crate. #[pyfunction] -#[pyo3(signature = (path, *, without_segment_cache = false))] -pub fn open(py: Python, path: &str, without_segment_cache: bool) -> PyVortexResult { +#[pyo3(signature = (path, *, store = None, without_segment_cache = false))] +pub fn open( + py: Python, + path: &str, + store: Option, + without_segment_cache: bool, +) -> PyVortexResult { let vxf = py.detach(|| { TOKIO_RUNTIME.block_on(async move { let mut options = SESSION.open_options(); @@ -60,7 +72,13 @@ pub fn open(py: Python, path: &str, without_segment_cache: bool) -> PyVortexResu // TODO(ngates): use a globally shared segment cache for all files options = options.with_segment_cache(Arc::new(MokaSegmentCache::new(256 << 20))); } - options.open_path(path).await + + match resolve_store(path, store.map(|x| x.into_inner()))? { + ResolvedStore::ObjectStore(store, path) => { + options.open_object_store(&store, path.as_ref()).await + } + ResolvedStore::Path(path) => options.open_path(path).await, + } }) })?; diff --git a/vortex-python/src/io.rs b/vortex-python/src/io.rs index 8dbadc2db17..a3f67f93482 100644 --- a/vortex-python/src/io.rs +++ b/vortex-python/src/io.rs @@ -6,6 +6,7 @@ use arrow_array::ffi_stream::ArrowArrayStreamReader; use pyo3::exceptions::PyTypeError; use pyo3::prelude::*; use pyo3::pyfunction; +use pyo3_object_store::PyObjectStore; use tokio::fs::File; use vortex::array::ArrayRef; use vortex::array::Canonical; @@ -21,6 +22,8 @@ use vortex::error::VortexError; use vortex::error::VortexResult; use vortex::file::WriteOptionsSessionExt; use vortex::file::WriteStrategyBuilder; +use vortex::io::ObjectStoreWriter; +use vortex::io::VortexWrite; use crate::PyVortex; use crate::SESSION; @@ -33,6 +36,8 @@ use crate::error::PyVortexResult; use crate::expr::PyExpr; use crate::install_module; use crate::iter::PyArrayIterator; +use crate::object_store_urls::ResolvedStore; +use crate::object_store_urls::resolve_store; pub(crate) fn init(py: Python, parent: &Bound) -> PyResult<()> { let m = PyModule::new(py, "io")?; @@ -53,6 +58,11 @@ pub(crate) fn init(py: Python, parent: &Bound) -> PyResult<()> { /// ---------- /// url : str /// The URL to read from. +/// store : vortex.store.ObjectStore | None, optional +/// Pre-configured object store with credentials and settings. +/// If provided, uses this store's configuration. +/// If None, checks session registry for matching URL pattern. +/// If not found, raises VortexError. /// projection : list[str | int] | None /// The columns to read identified either by their index or name. /// row_filter : Expr | None @@ -100,24 +110,46 @@ pub(crate) fn init(py: Python, parent: &Bound) -> PyResult<()> { /// Read an array from a local file URL: /// /// ```python -/// >>> a = vx.io.read_url("file:/path/to/dataset.vortex") # doctest: +SKIP +/// >>> a = vx.io.read_url("file:///path/to/dataset.vortex") # doctest: +SKIP +/// ``` +/// +/// Read from S3 with explicit credentials: +/// +/// ```python +/// >>> from vortex import store as S +/// >>> store = S.S3Store( +/// ... bucket="my-bucket", +/// ... region="us-east-1", +/// ... access_key_id="AKIA...", +/// ... secret_access_key="..." +/// ... ) +/// >>> a = vx.io.read_url("s3://my-bucket/data.vortex", store=store) # doctest: +SKIP /// ``` /// #[pyfunction] -#[pyo3(signature = (url, *, projection = None, row_filter = None, indices = None, row_range = None))] +#[pyo3(signature = (url, *, store = None, projection = None, row_filter = None, indices = None, row_range = None))] pub fn read_url<'py>( py: Python<'py>, url: &str, + store: Option>, projection: Option>>, row_filter: Option<&Bound<'py, PyExpr>>, indices: Option, row_range: Option<(u64, u64)>, ) -> PyVortexResult { - let dataset = py.detach(|| TOKIO_RUNTIME.block_on(PyVortexDataset::from_url(url)))?; + let store_arc = if let Some(store_obj) = store { + let py_store: PyObjectStore = store_obj.extract()?; + Some(py_store.into_inner()) + } else { + None + }; + + let dataset = + py.detach(|| TOKIO_RUNTIME.block_on(PyVortexDataset::from_url(url, store_arc)))?; dataset.to_array(projection, row_filter, indices, row_range) } -/// Write a vortex struct array to the local filesystem. +/// Write an array to a Vortex file. /// /// Parameters /// ---------- @@ -128,6 +160,9 @@ pub fn read_url<'py>( /// path : str /// The file path. /// +/// store : vortex.store.AzureStore | vortex.store.GCSStore | vortex.store.HTTPStore | vortex.store.LocalStore | vortex.store.MemoryStore | vortex.store.S3Store | None +/// An optional object store configuration to use for writing the output. +/// /// Examples /// -------- /// @@ -168,15 +203,35 @@ pub fn read_url<'py>( /// /// :func:`vortex.io.VortexWriteOptions` #[pyfunction] -#[pyo3(signature = (iter, path))] -pub fn write(py: Python, iter: PyIntoArrayIterator, path: &str) -> PyVortexResult<()> { +#[pyo3(signature = (iter, path, *, store = None))] +pub fn write( + py: Python, + iter: PyIntoArrayIterator, + path: &str, + store: Option, +) -> PyVortexResult<()> { py.detach(|| { TOKIO_RUNTIME.block_on(async move { - let file = File::create(path).await?; - SESSION - .write_options() - .write(file, iter.into_inner().into_array_stream()) - .await + match resolve_store(path, store.map(|x| x.into_inner()))? { + ResolvedStore::ObjectStore(store, path) => { + let mut store = ObjectStoreWriter::new(store, &path).await?; + SESSION + .write_options() + .write(&mut store, iter.into_inner().into_array_stream()) + .await?; + store.shutdown().await?; + VortexResult::Ok(()) + } + ResolvedStore::Path(path) => { + let mut w = File::create(path).await?; + SESSION + .write_options() + .write(&mut w, iter.into_inner().into_array_stream()) + .await?; + w.shutdown().await?; + VortexResult::Ok(()) + } + } }) })?; @@ -220,7 +275,7 @@ impl PyVortexWriteOptions { /// also used by :func:`vortex.io.write`): /// /// ```python - /// >>> vx.io.VortexWriteOptions.default().write_path(sprl, "chonky.vortex") + /// >>> vx.io.VortexWriteOptions.default().write(sprl, "chonky.vortex") /// >>> import os /// >>> os.path.getsize('chonky.vortex') /// 216156 @@ -233,7 +288,7 @@ impl PyVortexWriteOptions { /// We sure can. /// /// ```python - /// >>> vx.io.VortexWriteOptions.compact().write_path(sprl, "tiny.vortex") + /// >>> vx.io.VortexWriteOptions.compact().write(sprl, "tiny.vortex") /// >>> os.path.getsize('tiny.vortex') /// 55116 /// ``` @@ -246,7 +301,7 @@ impl PyVortexWriteOptions { } } - /// Write an array or iterator of arrays into a local file. + /// Write an array or iterator of arrays to a file. /// /// /// Parameters @@ -258,6 +313,9 @@ impl PyVortexWriteOptions { /// path : str /// The file path. /// + /// store : vortex.store.AzureStore | vortex.store.GCSStore | vortex.store.HTTPStore | vortex.store.LocalStore | vortex.store.MemoryStore | vortex.store.S3Store | None + /// An optional object store configuration to use for writing the output. + /// /// Examples /// -------- /// @@ -267,7 +325,7 @@ impl PyVortexWriteOptions { /// >>> import vortex as vx /// >>> import random /// >>> a = vx.array([0, 1, 2, 3, None, 4]) - /// >>> vx.io.VortexWriteOptions.default().write_path(a, "a.vortex") # doctest: +SKIP + /// >>> vx.io.VortexWriteOptions.default().write(a, "a.vortex") # doctest: +SKIP /// ``` /// /// Write the same array while preferring small file sizes over read-throughput and @@ -275,34 +333,50 @@ impl PyVortexWriteOptions { /// /// ```python /// >>> import vortex as vx - /// >>> vx.io.VortexWriteOptions.compact().write_path(a, "a.vortex") # doctest: +SKIP + /// >>> vx.io.VortexWriteOptions.compact().write(a, "a.vortex") # doctest: +SKIP /// ``` /// /// See also /// -------- /// /// :func:`vortex.io.write` - #[pyo3(signature = (iter, path))] - pub fn write_path( + #[pyo3(signature = (iter, path, *, store = None))] + pub fn write( &self, py: Python, iter: PyIntoArrayIterator, path: &str, + store: Option, ) -> PyVortexResult<()> { py.detach(|| { + let mut strategy = WriteStrategyBuilder::new(); + if let Some(compressor) = self.compressor.as_ref() { + strategy = strategy.with_compressor(compressor.clone()) + } + let strategy = strategy.build(); TOKIO_RUNTIME.block_on(async move { - let file = File::create(path).await?; - - let mut strategy = WriteStrategyBuilder::new(); - if let Some(compressor) = self.compressor.as_ref() { - strategy = strategy.with_compressor(compressor.clone()) + match resolve_store(path, store.map(|x| x.into_inner()))? { + ResolvedStore::ObjectStore(store, path) => { + let mut store = ObjectStoreWriter::new(store, &path).await?; + SESSION + .write_options() + .with_strategy(strategy) + .write(&mut store, iter.into_inner().into_array_stream()) + .await?; + store.shutdown().await?; + VortexResult::Ok(()) + } + ResolvedStore::Path(path) => { + let mut w = File::create(path).await?; + SESSION + .write_options() + .with_strategy(strategy) + .write(&mut w, iter.into_inner().into_array_stream()) + .await?; + w.shutdown().await?; + VortexResult::Ok(()) + } } - - SESSION - .write_options() - .with_strategy(strategy.build()) - .write(file, iter.into_inner().into_array_stream()) - .await }) })?; diff --git a/vortex-python/src/iter/python.rs b/vortex-python/src/iter/python.rs index 2c946f70367..1b5aafc98ab 100644 --- a/vortex-python/src/iter/python.rs +++ b/vortex-python/src/iter/python.rs @@ -35,6 +35,7 @@ impl Iterator for PythonArrayIterator { type Item = VortexResult; fn next(&mut self) -> Option { + // Check for any signals on this chunk. Python::attach(|py| { let mut iter = self.iter.clone_ref(py).into_bound(py); iter.next().map(|array| { diff --git a/vortex-python/src/lib.rs b/vortex-python/src/lib.rs index 043ca43a031..54c5b00eee4 100644 --- a/vortex-python/src/lib.rs +++ b/vortex-python/src/lib.rs @@ -23,6 +23,7 @@ mod registry; pub mod scalar; mod scan; mod serde; +mod store; use log::LevelFilter; use pyo3_log::Caching; @@ -66,6 +67,7 @@ fn _lib(py: Python, m: &Bound) -> PyResult<()> { file::init(py, m)?; io::init(py, m)?; iter::init(py, m)?; + store::init(py, m)?; registry::init(py, m)?; scalar::init(py, m)?; serde::init(py, m)?; diff --git a/vortex-python/src/object_store_urls.rs b/vortex-python/src/object_store_urls.rs index 2e5b5999975..95d741931ed 100644 --- a/vortex-python/src/object_store_urls.rs +++ b/vortex-python/src/object_store_urls.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::path::PathBuf; use std::sync::Arc; use std::sync::LazyLock; @@ -14,8 +15,94 @@ use vortex::error::VortexResult; static REGISTRY: LazyLock = LazyLock::new(DefaultObjectStoreRegistry::new); -pub(crate) fn object_store_from_url(url_str: &str) -> VortexResult<(Arc, Path)> { - let parsed_url = Url::parse(url_str)?; - let (store, path) = REGISTRY.resolve(&parsed_url)?; - Ok((store, path)) +/// Resolve a path to either a local file system path, or a registered object store. +/// +/// An explicit `ObjectStore` can be provided optionally, in which case the path is resolved +/// against the store's prefix. +/// +/// If the store is provided, it is carried along, otherwise we look up an appropriate store +/// in the default registry. +pub(crate) fn resolve_store( + url_or_path: &str, + store: Option>, +) -> VortexResult { + match store { + // If explicit store is provided use that + Some(store) => Ok(ResolvedStore::ObjectStore(store, Path::from(url_or_path))), + None => { + // If the URL does not parse + match Url::parse(url_or_path) { + Ok(url) => { + let (store, path) = REGISTRY.resolve(&url)?; + Ok(ResolvedStore::ObjectStore(store, path)) + } + Err(_) => { + // Treat the input string as a local file system path, which may be + Ok(ResolvedStore::Path(PathBuf::from(url_or_path))) + } + } + } + } +} + +#[derive(Debug)] +pub(crate) enum ResolvedStore { + ObjectStore(Arc, Path), + Path(PathBuf), +} + +impl ResolvedStore { + #[cfg(test)] + fn unwrap_store(self) -> (Arc, Path) { + match self { + ResolvedStore::ObjectStore(store, path) => (store, path), + ResolvedStore::Path(_) => { + panic!("cannot unwrap ResolvedStore::Path as store") + } + } + } + + #[cfg(test)] + pub fn unwrap_path(self) -> PathBuf { + match self { + ResolvedStore::ObjectStore(..) => { + panic!("cannot unwrap ResolvedStore::ObjectStore as path") + } + ResolvedStore::Path(path_buf) => path_buf, + } + } +} + +#[cfg(test)] +mod test { + use std::path::PathBuf; + use std::sync::Arc; + + use object_store::local::LocalFileSystem; + use object_store::path::Path; + + use crate::object_store_urls::resolve_store; + + #[test] + fn test_resolve() { + assert_eq!( + resolve_store("/my/absolute/path", None) + .unwrap() + .unwrap_path(), + PathBuf::from("/my/absolute/path") + ); + + let (_store, path) = resolve_store("s3://my-bucket/first/second/third/", None) + .unwrap() + .unwrap_store(); + + assert_eq!(path, Path::from("first/second/third")); + + let local_store = Arc::new(LocalFileSystem::default()); + let (_store, path) = resolve_store("/root/test", Some(local_store)) + .unwrap() + .unwrap_store(); + + assert_eq!(path, Path::from("root/test")); + } } diff --git a/vortex-python/src/store.rs b/vortex-python/src/store.rs new file mode 100644 index 00000000000..20aeef6c375 --- /dev/null +++ b/vortex-python/src/store.rs @@ -0,0 +1,14 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! PyO3 bindings for object store registry functions. + +use pyo3::prelude::*; + +/// Add the `_store` module to the path for downstream consumers to use. +pub(crate) fn init(py: Python, parent: &Bound) -> PyResult<()> { + pyo3_object_store::register_store_module(py, parent, "vortex._lib", "store")?; + pyo3_object_store::register_exceptions_module(py, parent, "vortex._lib", "exceptions")?; + + Ok(()) +} diff --git a/vortex-python/test/test_store.py b/vortex-python/test/test_store.py new file mode 100644 index 00000000000..ef82a512d3f --- /dev/null +++ b/vortex-python/test/test_store.py @@ -0,0 +1,27 @@ +from vortex.store import LocalStore + +import vortex as vx + + +def test_store_roundtrip(tmpdir_factory): + data_dir = tmpdir_factory.mktemp("data") + + # create a local store to write into + local = LocalStore(prefix=str(data_dir)) + + records = vx.array([dict(name="Alice", salary=10), dict(name="Bob", salary=20), dict(name="Carol", salary=30)]) + + assert len(records) == 3 + + # write to the local store + vx.io.write(records, "people.vortex", store=local) + + # verify file got written to correct location + assert (data_dir / "people.vortex").exists() + + # test vx.read for eager full-scan + people = vx.io.read_url("people.vortex", store=local) + + print(people.to_pylist()) + print(records.to_pylist()) + assert people.to_pylist() == records.to_pylist() From ac3aff7d6a4dfe55670bd30f31e283f1395148d5 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 22 Jan 2026 13:27:40 -0500 Subject: [PATCH 2/4] fixup Signed-off-by: Andrew Duffy --- pyproject.toml | 3 + vortex-python/pyproject.toml | 3 + vortex-python/python/vortex/__init__.py | 10 ++-- vortex-python/python/vortex/_lib/file.pyi | 15 ++++- vortex-python/python/vortex/_lib/io.pyi | 16 ++++- .../python/vortex/_lib/store/__init__.pyi | 10 ++-- vortex-python/python/vortex/file.py | 15 ++++- vortex-python/python/vortex/store.py | 59 +++++++++++++++++++ vortex-python/test/test_store.py | 12 ++-- 9 files changed, 121 insertions(+), 22 deletions(-) create mode 100644 vortex-python/python/vortex/store.py diff --git a/pyproject.toml b/pyproject.toml index e2e96707a10..9c4c166872b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -68,3 +68,6 @@ known-first-party = [ log_cli = true log_cli_level = "INFO" xfail_strict = true + +[tool.basedpyright] +exclude = ["vortex-python/python/vortex/_lib/store/**.pyi"] diff --git a/vortex-python/pyproject.toml b/vortex-python/pyproject.toml index 5d9e1264930..ad8bffb5f53 100644 --- a/vortex-python/pyproject.toml +++ b/vortex-python/pyproject.toml @@ -64,6 +64,9 @@ include = [ { path = "python/vortex/py.typed", format = "sdist" }, ] +[tool.basedpyright] +exclude = ["python/vortex/_lib/store/**.pyi"] + [dependency-groups] dev = [ "basedpyright>=1.31", diff --git a/vortex-python/python/vortex/__init__.py b/vortex-python/python/vortex/__init__.py index 57aa3b7b64e..05d92f335b2 100644 --- a/vortex-python/python/vortex/__init__.py +++ b/vortex-python/python/vortex/__init__.py @@ -2,7 +2,7 @@ # SPDX-FileCopyrightText: Copyright the Vortex contributors from . import _lib, arrays, dataset, expr, file, io, ray, registry, scan -from ._lib import exceptions, store +from ._lib import store # pyright: ignore[reportMissingModuleSource] from ._lib.arrays import ( # pyright: ignore[reportMissingModuleSource] AlpArray, AlpRdArray, @@ -83,13 +83,14 @@ assert _lib, "Ensure we eagerly import the Vortex native library" # Resolve the installed distribution version so it is available as vortex.__version__. + __version__ = "unknown" try: - from importlib import metadata as _metadata + import importlib.metadata # Try to read the installed distribution version for the Python package name. - __version__ = _metadata.version("vortex-data") -except _metadata.PackageNotFoundError: + __version__ = importlib.metadata.version("vortex-data") +except importlib.metadata.PackageNotFoundError: # If the distribution is not installed, keep the unknown fallback. pass @@ -97,7 +98,6 @@ # --- Modules --- "arrays", "dataset", - "exceptions", "expr", "file", "scan", diff --git a/vortex-python/python/vortex/_lib/file.pyi b/vortex-python/python/vortex/_lib/file.pyi index acb79ee28dd..20fe942f25b 100644 --- a/vortex-python/python/vortex/_lib/file.pyi +++ b/vortex-python/python/vortex/_lib/file.pyi @@ -14,6 +14,14 @@ from .dtype import DType from .expr import Expr from .iter import ArrayIterator from .scan import RepeatedScan +from .store import ( + AzureStore, + GCSStore, + HTTPStore, + LocalStore, + MemoryStore, + S3Store, +) @final class VortexFile: @@ -47,4 +55,9 @@ class VortexFile: def to_polars(self) -> pl.LazyFrame: ... def splits(self) -> list[tuple[int, int]]: ... -def open(path: str, *, without_segment_cache: bool = False) -> VortexFile: ... +def open( + path: str, + *, + store: AzureStore | GCSStore | HTTPStore | LocalStore | MemoryStore | S3Store | None = None, + without_segment_cache: bool = False, +) -> VortexFile: ... diff --git a/vortex-python/python/vortex/_lib/io.pyi b/vortex-python/python/vortex/_lib/io.pyi index 22ea8eb7b3f..aeadb54beb0 100644 --- a/vortex-python/python/vortex/_lib/io.pyi +++ b/vortex-python/python/vortex/_lib/io.pyi @@ -8,18 +8,25 @@ from .store import ( AzureStore, GCSStore, HTTPStore, + LocalStore, + MemoryStore, S3Store, ) def read_url( url: str, *, - store=None, + store: AzureStore | GCSStore | HTTPStore | LocalStore | MemoryStore | S3Store | None = None, projection: list[str] | list[int] | None = None, row_filter: Expr | None = None, indices: Array | None = None, ) -> Array: ... -def write(iter: IntoArrayIterator, path: str, *, store: AzureStore | HTTPStore | GCSStore | S3Store | None) -> None: ... +def write( + iter: IntoArrayIterator, + path: str, + *, + store: AzureStore | GCSStore | HTTPStore | LocalStore | MemoryStore | S3Store | None = None, +) -> None: ... class VortexWriteOptions: @staticmethod @@ -28,5 +35,8 @@ class VortexWriteOptions: def compact() -> VortexWriteOptions: ... @staticmethod def write( - iter: IntoArrayIterator, path: str, *, store: AzureStore | HTTPStore | GCSStore | S3Store | None + iter: IntoArrayIterator, + path: str, + *, + store: AzureStore | GCSStore | HTTPStore | LocalStore | MemoryStore | S3Store | None = None, ) -> VortexWriteOptions: ... diff --git a/vortex-python/python/vortex/_lib/store/__init__.pyi b/vortex-python/python/vortex/_lib/store/__init__.pyi index 8b05aec394c..f5d91f6b317 100644 --- a/vortex-python/python/vortex/_lib/store/__init__.pyi +++ b/vortex-python/python/vortex/_lib/store/__init__.pyi @@ -1,7 +1,7 @@ # TODO: move to reusable types package from collections.abc import Callable from pathlib import Path -from typing import Any, Self, TypeAlias, Unpack, overload +from typing import Self, TypeAlias, Unpack, overload from ._aws import S3Config as S3Config from ._aws import S3Credential as S3Credential @@ -69,8 +69,8 @@ def from_url( # type: ignore[misc] # docstring in pyi file config: S3Config | GCSConfig | AzureConfig | None = None, client_options: ClientConfig | None = None, retry_config: RetryConfig | None = None, - credential_provider: Callable | None = None, - **kwargs: Any, + credential_provider: Callable[..., object] | None = None, + **kwargs: object, ) -> ObjectStore: """Easy construction of store by URL, identifying the relevant store. @@ -172,8 +172,8 @@ class LocalStore: ``` """ - def __eq__(self, value: object) -> bool: ... - def __getnewargs_ex__(self): ... + def __eq__(self, value: object, /) -> bool: ... # pyright: ignore[reportImplicitOverride] + def __getnewargs_ex__(self) -> tuple[tuple[()], dict[str, object]]: ... @property def prefix(self) -> Path | None: """Get the prefix applied to all operations in this store, if any.""" diff --git a/vortex-python/python/vortex/file.py b/vortex-python/python/vortex/file.py index 4a9f9ea8818..9632d851b57 100644 --- a/vortex-python/python/vortex/file.py +++ b/vortex-python/python/vortex/file.py @@ -15,13 +15,26 @@ from ._lib.iter import ArrayIterator # pyright: ignore[reportMissingModuleSource] from .dataset import VortexDataset from .scan import RepeatedScan +from .store import ( + AzureStore, + GCSStore, + HTTPStore, + LocalStore, + MemoryStore, + S3Store, +) from .type_aliases import IntoProjection, RecordBatchReader if TYPE_CHECKING: import polars -def open(path: str, *, store=None, without_segment_cache: bool = False) -> VortexFile: +def open( + path: str, + *, + store: AzureStore | GCSStore | HTTPStore | LocalStore | MemoryStore | S3Store | None = None, + without_segment_cache: bool = False, +) -> VortexFile: """ Lazily open a Vortex file located at the given path or URL. diff --git a/vortex-python/python/vortex/store.py b/vortex-python/python/vortex/store.py new file mode 100644 index 00000000000..b742ad32e69 --- /dev/null +++ b/vortex-python/python/vortex/store.py @@ -0,0 +1,59 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors + +from vortex._lib.store import ( # pyright: ignore[reportMissingModuleSource] + AzureAccessKey, + AzureBearerToken, + AzureConfig, + AzureCredential, + AzureCredentialProvider, + AzureSASToken, + AzureStore, + BackoffConfig, + ClientConfig, + GCSConfig, + GCSCredential, + GCSCredentialProvider, + GCSStore, + HTTPStore, + LocalStore, + MemoryStore, + RetryConfig, + S3Config, + S3Credential, + S3CredentialProvider, + S3Store, + from_url, +) + +__all__ = [ + # Azure + "AzureAccessKey", + "AzureBearerToken", + "AzureConfig", + "AzureCredential", + "AzureCredentialProvider", + "AzureSASToken", + "AzureStore", + # Client + "BackoffConfig", + "ClientConfig", + "RetryConfig", + # GCS + "GCSConfig", + "GCSCredential", + "GCSCredentialProvider", + "GCSStore", + # HTTP + "HTTPStore", + # Local + "LocalStore", + "MemoryStore", + # S3 + "S3Config", + "S3Credential", + "S3CredentialProvider", + "S3Store", + # Utility + "from_url", +] diff --git a/vortex-python/test/test_store.py b/vortex-python/test/test_store.py index ef82a512d3f..795a72805a4 100644 --- a/vortex-python/test/test_store.py +++ b/vortex-python/test/test_store.py @@ -1,13 +1,13 @@ +from pathlib import Path + from vortex.store import LocalStore import vortex as vx -def test_store_roundtrip(tmpdir_factory): - data_dir = tmpdir_factory.mktemp("data") - +def test_store_roundtrip(tmp_path: Path) -> None: # create a local store to write into - local = LocalStore(prefix=str(data_dir)) + local = LocalStore(prefix=tmp_path) records = vx.array([dict(name="Alice", salary=10), dict(name="Bob", salary=20), dict(name="Carol", salary=30)]) @@ -17,11 +17,9 @@ def test_store_roundtrip(tmpdir_factory): vx.io.write(records, "people.vortex", store=local) # verify file got written to correct location - assert (data_dir / "people.vortex").exists() + assert (tmp_path / "people.vortex").exists() # test vx.read for eager full-scan people = vx.io.read_url("people.vortex", store=local) - print(people.to_pylist()) - print(records.to_pylist()) assert people.to_pylist() == records.to_pylist() From 70d3743f3d989839ec5fd056abe6fc77789a71e8 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 22 Jan 2026 14:15:31 -0500 Subject: [PATCH 3/4] fix more Signed-off-by: Andrew Duffy --- vortex-python/python/vortex/__init__.py | 6 ++-- vortex-python/python/vortex/_lib/file.pyi | 16 ++-------- vortex-python/python/vortex/_lib/io.pyi | 15 +++------ vortex-python/python/vortex/store.py | 38 +++++++++++++---------- 4 files changed, 30 insertions(+), 45 deletions(-) diff --git a/vortex-python/python/vortex/__init__.py b/vortex-python/python/vortex/__init__.py index 05d92f335b2..93306b82d35 100644 --- a/vortex-python/python/vortex/__init__.py +++ b/vortex-python/python/vortex/__init__.py @@ -1,8 +1,9 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors +import importlib.metadata + from . import _lib, arrays, dataset, expr, file, io, ray, registry, scan -from ._lib import store # pyright: ignore[reportMissingModuleSource] from ._lib.arrays import ( # pyright: ignore[reportMissingModuleSource] AlpArray, AlpRdArray, @@ -86,8 +87,6 @@ __version__ = "unknown" try: - import importlib.metadata - # Try to read the installed distribution version for the Python package name. __version__ = importlib.metadata.version("vortex-data") except importlib.metadata.PackageNotFoundError: @@ -104,7 +103,6 @@ "io", "registry", "ray", - "store", # --- Objects and Functions --- "array", "compress", diff --git a/vortex-python/python/vortex/_lib/file.pyi b/vortex-python/python/vortex/_lib/file.pyi index 20fe942f25b..34645df3195 100644 --- a/vortex-python/python/vortex/_lib/file.pyi +++ b/vortex-python/python/vortex/_lib/file.pyi @@ -14,14 +14,7 @@ from .dtype import DType from .expr import Expr from .iter import ArrayIterator from .scan import RepeatedScan -from .store import ( - AzureStore, - GCSStore, - HTTPStore, - LocalStore, - MemoryStore, - S3Store, -) +from .store import ObjectStore @final class VortexFile: @@ -55,9 +48,4 @@ class VortexFile: def to_polars(self) -> pl.LazyFrame: ... def splits(self) -> list[tuple[int, int]]: ... -def open( - path: str, - *, - store: AzureStore | GCSStore | HTTPStore | LocalStore | MemoryStore | S3Store | None = None, - without_segment_cache: bool = False, -) -> VortexFile: ... +def open(path: str, *, store: ObjectStore | None = None, without_segment_cache: bool = False) -> VortexFile: ... diff --git a/vortex-python/python/vortex/_lib/io.pyi b/vortex-python/python/vortex/_lib/io.pyi index aeadb54beb0..d47afbd362a 100644 --- a/vortex-python/python/vortex/_lib/io.pyi +++ b/vortex-python/python/vortex/_lib/io.pyi @@ -4,19 +4,12 @@ from ..type_aliases import IntoArrayIterator from .arrays import Array from .expr import Expr -from .store import ( - AzureStore, - GCSStore, - HTTPStore, - LocalStore, - MemoryStore, - S3Store, -) +from .store import ObjectStore def read_url( url: str, *, - store: AzureStore | GCSStore | HTTPStore | LocalStore | MemoryStore | S3Store | None = None, + store: ObjectStore | None = None, projection: list[str] | list[int] | None = None, row_filter: Expr | None = None, indices: Array | None = None, @@ -25,7 +18,7 @@ def write( iter: IntoArrayIterator, path: str, *, - store: AzureStore | GCSStore | HTTPStore | LocalStore | MemoryStore | S3Store | None = None, + store: ObjectStore | None = None, ) -> None: ... class VortexWriteOptions: @@ -38,5 +31,5 @@ class VortexWriteOptions: iter: IntoArrayIterator, path: str, *, - store: AzureStore | GCSStore | HTTPStore | LocalStore | MemoryStore | S3Store | None = None, + store: ObjectStore | None = None, ) -> VortexWriteOptions: ... diff --git a/vortex-python/python/vortex/store.py b/vortex-python/python/vortex/store.py index b742ad32e69..b7b7bc04b3c 100644 --- a/vortex-python/python/vortex/store.py +++ b/vortex-python/python/vortex/store.py @@ -1,31 +1,37 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors -from vortex._lib.store import ( # pyright: ignore[reportMissingModuleSource] - AzureAccessKey, - AzureBearerToken, - AzureConfig, - AzureCredential, - AzureCredentialProvider, - AzureSASToken, +from typing import TYPE_CHECKING + +from ._lib.store import ( # pyright: ignore[reportMissingModuleSource] AzureStore, - BackoffConfig, - ClientConfig, - GCSConfig, - GCSCredential, - GCSCredentialProvider, GCSStore, HTTPStore, LocalStore, MemoryStore, - RetryConfig, - S3Config, - S3Credential, - S3CredentialProvider, S3Store, from_url, ) +if TYPE_CHECKING: + from ._lib.store import ( # pyright: ignore[reportMissingModuleSource] + AzureAccessKey, + AzureBearerToken, + AzureConfig, + AzureCredential, + AzureCredentialProvider, + AzureSASToken, + BackoffConfig, + ClientConfig, + GCSConfig, + GCSCredential, + GCSCredentialProvider, + RetryConfig, + S3Config, + S3Credential, + S3CredentialProvider, + ) + __all__ = [ # Azure "AzureAccessKey", From 2d94025f9fc7f560b4628a5d04f3e9d679803e5c Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 22 Jan 2026 14:39:33 -0500 Subject: [PATCH 4/4] this? Signed-off-by: Andrew Duffy --- vortex-python/python/vortex/store.py | 2 ++ vortex-python/src/io.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/vortex-python/python/vortex/store.py b/vortex-python/python/vortex/store.py index b7b7bc04b3c..3819ef86042 100644 --- a/vortex-python/python/vortex/store.py +++ b/vortex-python/python/vortex/store.py @@ -26,6 +26,7 @@ GCSConfig, GCSCredential, GCSCredentialProvider, + ObjectStore, RetryConfig, S3Config, S3Credential, @@ -62,4 +63,5 @@ "S3Store", # Utility "from_url", + "ObjectStore", ] diff --git a/vortex-python/src/io.rs b/vortex-python/src/io.rs index a3f67f93482..39eaf5dd207 100644 --- a/vortex-python/src/io.rs +++ b/vortex-python/src/io.rs @@ -58,7 +58,7 @@ pub(crate) fn init(py: Python, parent: &Bound) -> PyResult<()> { /// ---------- /// url : str /// The URL to read from. -/// store : vortex.store.ObjectStore | None, optional +/// store : vortex.store.AzureStore | vortex.store.GCSStore | vortex.store.HTTPStore | vortex.store.LocalStore | vortex.store.MemoryStore | vortex.store.S3Store | None /// Pre-configured object store with credentials and settings. /// If provided, uses this store's configuration. /// If None, checks session registry for matching URL pattern.