Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protos
70 changes: 36 additions & 34 deletions src/streamstore/_lib/s2/v1alpha/s2_pb2.py

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion src/streamstore/_lib/s2/v1alpha/s2_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ class ReadSessionResponse(_message.Message):
) -> None: ...

class StreamConfig(_message.Message):
__slots__ = ("storage_class", "age", "timestamping", "delete_on_empty")
__slots__ = ("storage_class", "age", "infinite", "timestamping", "delete_on_empty")
class Timestamping(_message.Message):
__slots__ = ("mode", "uncapped")
MODE_FIELD_NUMBER: _ClassVar[int]
Expand All @@ -656,18 +656,25 @@ class StreamConfig(_message.Message):
min_age_secs: int
def __init__(self, min_age_secs: _Optional[int] = ...) -> None: ...

class InfiniteRetention(_message.Message):
__slots__ = ()
def __init__(self) -> None: ...

STORAGE_CLASS_FIELD_NUMBER: _ClassVar[int]
AGE_FIELD_NUMBER: _ClassVar[int]
INFINITE_FIELD_NUMBER: _ClassVar[int]
TIMESTAMPING_FIELD_NUMBER: _ClassVar[int]
DELETE_ON_EMPTY_FIELD_NUMBER: _ClassVar[int]
storage_class: StorageClass
age: int
infinite: StreamConfig.InfiniteRetention
timestamping: StreamConfig.Timestamping
delete_on_empty: StreamConfig.DeleteOnEmpty
def __init__(
self,
storage_class: _Optional[_Union[StorageClass, str]] = ...,
age: _Optional[int] = ...,
infinite: _Optional[_Union[StreamConfig.InfiniteRetention, _Mapping]] = ...,
timestamping: _Optional[_Union[StreamConfig.Timestamping, _Mapping]] = ...,
delete_on_empty: _Optional[_Union[StreamConfig.DeleteOnEmpty, _Mapping]] = ...,
) -> None: ...
Expand Down
23 changes: 18 additions & 5 deletions src/streamstore/_mappers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime
from typing import cast
from typing import Literal, cast

from google.protobuf.internal.containers import RepeatedCompositeFieldContainer

Expand Down Expand Up @@ -127,15 +127,18 @@ def stream_config_message(
stream_config = msgs.StreamConfig()
if config:
storage_class = config.storage_class
retention_age = config.retention_age
retention_policy = config.retention_policy
timestamping = config.timestamping
delete_on_empty_min_age = config.delete_on_empty_min_age
if storage_class is not None:
paths.append(f"{mask_path_prefix}storage_class")
stream_config.storage_class = storage_class.value
if retention_age is not None:
if retention_policy is not None:
paths.append(f"{mask_path_prefix}retention_policy")
stream_config.age = retention_age
if retention_policy == "infinite":
stream_config.infinite.CopyFrom(msgs.StreamConfig.InfiniteRetention())
else:
stream_config.age = retention_policy
if timestamping is not None:
paths.append(f"{mask_path_prefix}timestamping")
if timestamping.mode is not None:
Expand Down Expand Up @@ -183,9 +186,19 @@ def basin_config_message(


def stream_config_schema(config: msgs.StreamConfig) -> StreamConfig:
retention_policy: int | Literal["infinite"]
match config.WhichOneof("retention_policy"):
case "age":
retention_policy = config.age
case "infinite":
retention_policy = "infinite"
case _:
raise RuntimeError(
"StreamConfig retention_policy doesn't match any of the expected values"
)
return StreamConfig(
StorageClass(config.storage_class),
config.age,
retention_policy,
Timestamping(
mode=TimestampingMode(config.timestamping.mode),
uncapped=config.timestamping.uncapped,
Expand Down
12 changes: 7 additions & 5 deletions src/streamstore/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Generic, TypeVar
from typing import Generic, Literal, TypeVar

from streamstore._exceptions import fallible

Expand Down Expand Up @@ -294,13 +294,15 @@ class StreamConfig:
#:
#: If not specified, the default is :attr:`.StorageClass.EXPRESS`.
storage_class: StorageClass | None = None
#: Age in seconds for automatic trimming of records older than this threshold.
#: Retention policy for records in this stream.
#:
#: If not specified, the default is to retain records for 7 days.
#: Retention duration in seconds to automatically trim records older than this duration.
#:
#: If set to ``0``, the stream will have infinite retention.
#: ``'infinite'`` to retain records indefinitely.
#: (While S2 is in public preview, this is capped at 28 days. Let us know if you'd like the cap removed.)
retention_age: int | None = None
#:
#: If not specified, the default is to retain records for 7 days.
retention_policy: int | Literal["infinite"] | None = None
#: Timestamping behavior for appends to this stream, which influences how timestamps are handled.
timestamping: Timestamping | None = None
#: Minimum age in seconds before this stream can be automatically deleted if empty.
Expand Down
8 changes: 4 additions & 4 deletions tests/test_account_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ async def test_create_basin_with_config(self, s2: S2, basin_name: str):
config = BasinConfig(
default_stream_config=StreamConfig(
storage_class=StorageClass.STANDARD,
retention_age=86400 * 7,
retention_policy=86400 * 7,
timestamping=Timestamping(
mode=TimestampingMode.CLIENT_REQUIRE,
uncapped=True,
Expand All @@ -60,7 +60,7 @@ async def test_reconfigure_basin(self, s2: S2, basin: Basin):
config = BasinConfig(
default_stream_config=StreamConfig(
storage_class=StorageClass.STANDARD,
retention_age=3600,
retention_policy=3600,
),
create_stream_on_append=True,
)
Expand All @@ -73,8 +73,8 @@ async def test_reconfigure_basin(self, s2: S2, basin: Basin):
== config.default_stream_config.storage_class
)
assert (
updated_config.default_stream_config.retention_age
== config.default_stream_config.retention_age
updated_config.default_stream_config.retention_policy
== config.default_stream_config.retention_policy
)
assert updated_config.create_stream_on_append == config.create_stream_on_append

Expand Down
18 changes: 14 additions & 4 deletions tests/test_basin_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def test_create_stream_with_config(

config = StreamConfig(
storage_class=StorageClass.STANDARD,
retention_age=86400 * 3,
retention_policy=86400 * 3,
timestamping=Timestamping(
mode=TimestampingMode.ARRIVAL,
uncapped=False,
Expand All @@ -53,14 +53,13 @@ async def test_default_stream_config(self, shared_basin: Basin, stream: Stream):

config = await basin.get_stream_config(stream.name)
assert config.storage_class == StorageClass.EXPRESS
assert config.retention_age == 86400 * 7
assert config.retention_policy == 86400 * 7

async def test_reconfigure_stream(self, shared_basin: Basin, stream: Stream):
basin = shared_basin

config = StreamConfig(
storage_class=StorageClass.STANDARD,
retention_age=86400 * 21,
retention_policy="infinite",
timestamping=Timestamping(
mode=TimestampingMode.CLIENT_REQUIRE, uncapped=True
),
Expand All @@ -70,6 +69,17 @@ async def test_reconfigure_stream(self, shared_basin: Basin, stream: Stream):
updated_config = await basin.reconfigure_stream(stream.name, config)
assert updated_config == config

config = StreamConfig(
storage_class=StorageClass.EXPRESS,
retention_policy=86400 * 90,
timestamping=Timestamping(
mode=TimestampingMode.CLIENT_PREFER, uncapped=False
),
delete_on_empty_min_age=3600,
)
updated_config = await basin.reconfigure_stream(stream.name, config)
assert updated_config == config

async def test_list_streams(self, shared_basin: Basin, stream_names: list[str]):
basin = shared_basin

Expand Down