Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ jobs:
uses: astral-sh/setup-uv@v6
with:
version: "0.8.2"
- name: Sync dependencies
run: |
uv sync --all-groups
- name: Static code check
run: uv run poe ci_checker
- name: Check docs build
working-directory: ./docs
run: |
uv sync --group docs
make html
- name: Check PR title style
uses: actions/github-script@v7
Expand Down
12 changes: 11 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ dependencies = [

[dependency-groups]
dev = ["mypy>=1.14.1", "poethepoet>=0.36.0", "ruff>=0.9.1"]
test = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"pytest-timeout>=2.3.0",
"pytest-xdist>=3.5.0",
]
docs = [
"enum-tools[sphinx]>=0.12.0",
"furo>=2024.8.6",
Expand All @@ -27,7 +33,7 @@ requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.mypy]
files = ["src/"]
files = ["src/", "tests/", "examples/"]

[tool.ruff]
exclude = [
Expand All @@ -51,3 +57,7 @@ ci_linter = "uv run ruff check"
ci_formatter = "uv run ruff format --check"
checker = ["linter", "formatter", "type_checker"]
ci_checker = ["ci_linter", "ci_formatter", "type_checker"]
e2e_tests = "uv run pytest tests/ -v -s"
e2e_account_tests = "uv run pytest tests/ -v -s -m account"
e2e_basin_tests = "uv run pytest tests/ -v -s -m basin"
e2e_stream_tests = "uv run pytest tests/ -v -s -m stream"
18 changes: 18 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[pytest]

testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*

asyncio_mode = auto
asyncio_default_fixture_loop_scope = session
asyncio_default_test_loop_scope = session

timeout = 300
timeout_method = thread

markers =
account: tests for account operations
basin: tests for basin operations
stream: tests for stream operations
Empty file added tests/__init__.py
Empty file.
99 changes: 99 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import os
import uuid
from typing import AsyncGenerator, Final

import pytest
import pytest_asyncio

from streamstore import S2, Basin, Stream

pytest_plugins = ["pytest_asyncio"]


BASIN_PREFIX: Final[str] = "test-py-sdk"


@pytest.fixture(scope="session")
def access_token() -> str:
token = os.getenv("S2_ACCESS_TOKEN")
if not token:
pytest.fail("S2_ACCESS_TOKEN environment variable not set")
return token


@pytest.fixture(scope="session")
def basin_prefix() -> str:
return BASIN_PREFIX


@pytest_asyncio.fixture(scope="session")
async def s2(access_token: str) -> AsyncGenerator[S2, None]:
async with S2(access_token=access_token) as client:
yield client


@pytest.fixture
def basin_name() -> str:
return _basin_name()


@pytest.fixture
def basin_names() -> list[str]:
return [_basin_name() for _ in range(3)]


@pytest.fixture
def stream_name() -> str:
return _stream_name()


@pytest.fixture
def stream_names() -> list[str]:
return [_stream_name() for _ in range(3)]


@pytest.fixture
def token_id() -> str:
return f"token-{uuid.uuid4().hex[:8]}"


@pytest_asyncio.fixture
async def basin(s2: S2, basin_name: str) -> AsyncGenerator[Basin, None]:
await s2.create_basin(
name=basin_name,
)

try:
yield s2.basin(basin_name)
finally:
await s2.delete_basin(basin_name)


@pytest_asyncio.fixture(scope="class")
async def shared_basin(s2: S2) -> AsyncGenerator[Basin, None]:
basin_name = _basin_name()
await s2.create_basin(name=basin_name)

try:
yield s2.basin(basin_name)
finally:
await s2.delete_basin(basin_name)


@pytest_asyncio.fixture
async def stream(shared_basin: Basin, stream_name: str) -> AsyncGenerator[Stream, None]:
basin = shared_basin
await basin.create_stream(name=stream_name)

try:
yield basin.stream(stream_name)
finally:
await basin.delete_stream(stream_name)


def _basin_name() -> str:
return f"{BASIN_PREFIX}-{uuid.uuid4().hex[:8]}"


def _stream_name() -> str:
return f"stream-{uuid.uuid4().hex[:8]}"
211 changes: 211 additions & 0 deletions tests/test_account_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import time

import pytest

from streamstore import S2, Basin
from streamstore.schemas import (
AccessTokenScope,
BasinConfig,
BasinScope,
BasinState,
Operation,
OperationGroupPermissions,
Permission,
ResourceMatchOp,
ResourceMatchRule,
StorageClass,
StreamConfig,
Timestamping,
TimestampingMode,
)


@pytest.mark.account
class TestAccountOperations:
async def test_create_basin(self, s2: S2, basin_name: str):
basin_info = await s2.create_basin(name=basin_name)

try:
assert basin_info.name == basin_name
assert basin_info.scope == BasinScope.AWS_US_EAST_1
assert basin_info.state in (BasinState.ACTIVE, BasinState.CREATING)
finally:
await s2.delete_basin(basin_name)

async def test_create_basin_with_config(self, s2: S2, basin_name: str):
config = BasinConfig(
default_stream_config=StreamConfig(
storage_class=StorageClass.STANDARD,
retention_age=86400 * 7,
timestamping=Timestamping(
mode=TimestampingMode.CLIENT_REQUIRE,
uncapped=True,
),
delete_on_empty_min_age=3600,
),
create_stream_on_append=True,
)

basin_info = await s2.create_basin(name=basin_name, config=config)

try:
assert basin_info.name == basin_name

retrieved_config = await s2.get_basin_config(basin_name)
assert config == retrieved_config
finally:
await s2.delete_basin(basin_name)

async def test_reconfigure_basin(self, s2: S2, basin: Basin):
config = BasinConfig(
default_stream_config=StreamConfig(
storage_class=StorageClass.STANDARD,
retention_age=3600,
),
create_stream_on_append=True,
)

updated_config = await s2.reconfigure_basin(basin.name, config)

assert config.default_stream_config is not None
assert (
updated_config.default_stream_config.storage_class
== config.default_stream_config.storage_class
)
assert (
updated_config.default_stream_config.retention_age
== config.default_stream_config.retention_age
)
assert updated_config.create_stream_on_append == config.create_stream_on_append

assert (
updated_config.default_stream_config.timestamping.mode
== TimestampingMode.UNSPECIFIED
)

assert updated_config.default_stream_config.delete_on_empty_min_age == 0

async def test_list_basins(self, s2: S2, basin_names: list[str]):
basin_infos = []
try:
for basin_name in basin_names:
stream_info = await s2.create_basin(name=basin_name)
basin_infos.append(stream_info)

page = await s2.list_basins()

retrieved_basin_names = [b.name for b in page.items]
assert set(basin_names).issubset(retrieved_basin_names)

finally:
for basin_info in basin_infos:
await s2.delete_basin(basin_info.name)

async def test_list_basins_with_limit(self, s2: S2, basin_names: list[str]):
basin_infos = []
try:
for basin_name in basin_names:
stream_info = await s2.create_basin(name=basin_name)
basin_infos.append(stream_info)

page = await s2.list_basins(limit=1)

assert len(page.items) == 1

finally:
for basin_info in basin_infos:
await s2.delete_basin(basin_info.name)

async def test_list_basins_with_prefix(self, s2: S2, basin_name: str):
await s2.create_basin(name=basin_name)

try:
prefix = basin_name[:5]
page = await s2.list_basins(prefix=prefix)

basin_names = [b.name for b in page.items]
assert basin_name in basin_names

for name in basin_names:
assert name.startswith(prefix)

finally:
await s2.delete_basin(basin_name)

async def test_issue_access_token(self, s2: S2, token_id: str, basin_prefix: str):
scope = AccessTokenScope(
basins=ResourceMatchRule(
match_op=ResourceMatchOp.PREFIX, value=basin_prefix
),
streams=ResourceMatchRule(match_op=ResourceMatchOp.PREFIX, value=""),
op_group_perms=OperationGroupPermissions(
basin=Permission.READ,
stream=Permission.READ,
),
)

token = await s2.issue_access_token(id=token_id, scope=scope)

try:
assert isinstance(token, str)
assert len(token) > 0
finally:
token_info = await s2.revoke_access_token(token_id)
assert token_info.scope == scope

async def test_issue_access_token_with_expiry(self, s2: S2, token_id: str):
expires_at = int(time.time()) + 3600

scope = AccessTokenScope(
streams=ResourceMatchRule(match_op=ResourceMatchOp.PREFIX, value=""),
ops=[Operation.READ, Operation.CHECK_TAIL],
)

token = await s2.issue_access_token(
id=token_id,
scope=scope,
expires_at=expires_at,
)

try:
assert isinstance(token, str)
assert len(token) > 0

page = await s2.list_access_tokens(prefix=token_id)

token_info = next((t for t in page.items if t.id == token_id), None)
assert token_info is not None
assert token_info.expires_at == expires_at
assert token_info.scope.streams == scope.streams
assert set(token_info.scope.ops) == set(scope.ops)

finally:
await s2.revoke_access_token(token_id)

async def test_issue_access_token_with_auto_prefix(self, s2: S2, token_id: str):
scope = AccessTokenScope(
streams=ResourceMatchRule(match_op=ResourceMatchOp.PREFIX, value="prefix/"),
op_group_perms=OperationGroupPermissions(stream=Permission.READ_WRITE),
)

token = await s2.issue_access_token(
id=token_id,
scope=scope,
auto_prefix_streams=True,
)

try:
assert isinstance(token, str)
assert len(token) > 0

page = await s2.list_access_tokens(prefix=token_id, limit=1)

assert len(page.items) == 1

token_info = page.items[0]
assert token_info is not None
assert token_info.scope == scope
assert token_info.auto_prefix_streams is True

finally:
await s2.revoke_access_token(token_id)
Loading