Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions tests/test_stream_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from streamstore.utils import metered_bytes


@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.stream
class TestStreamOperations:
async def test_check_tail_empty_stream(self, stream: Stream):
Expand Down Expand Up @@ -64,24 +63,24 @@ async def test_append_with_match_seq_num(self, stream: Stream):
assert output_1.next_seq_num == 2

async def test_append_with_timestamp(self, stream: Stream):
timestamp_1 = int(time.time())
timestamp_0 = int(time.time())
await asyncio.sleep(0.1)
timestamp_2 = int(time.time())
timestamp_1 = int(time.time())

input = AppendInput(
records=[
Record(body=b"record-0", timestamp=timestamp_1),
Record(body=b"record-1", timestamp=timestamp_2),
Record(body=b"record-0", timestamp=timestamp_0),
Record(body=b"record-1", timestamp=timestamp_1),
]
)
output = await stream.append(input)

assert output.start_seq_num == 0
assert output.start_timestamp == timestamp_1
assert output.start_timestamp == timestamp_0
assert output.end_seq_num == 2
assert output.end_timestamp == timestamp_2
assert output.end_timestamp == timestamp_1
assert output.next_seq_num == 2
assert output.last_timestamp == timestamp_2
assert output.last_timestamp == timestamp_1

async def test_read_from_seq_num_zero(self, stream: Stream):
await stream.append(
Expand Down Expand Up @@ -133,6 +132,29 @@ async def test_read_from_tail_offset(self, stream: Stream):
assert records[0].body == b"record-3"
assert records[1].body == b"record-4"

async def test_read_until_timestamp(self, stream: Stream):
timestamp_0 = int(time.time() * 1000)
await asyncio.sleep(0.2)
timestamp_1 = int(time.time() * 1000)
await asyncio.sleep(0.2)
timestamp_2 = int(time.time() * 1000)

await stream.append(
AppendInput(
records=[
Record(body=b"record-0", timestamp=timestamp_0),
Record(body=b"record-1", timestamp=timestamp_1),
Record(body=b"record-2", timestamp=timestamp_2),
]
)
)

records = await stream.read(start=Timestamp(timestamp_0), until=timestamp_2)
assert isinstance(records, list)
assert len(records) == 2
assert records[0].timestamp == timestamp_0
assert records[1].timestamp == timestamp_1

async def test_read_beyond_tail(self, stream: Stream):
await stream.append(
AppendInput(records=[Record(body=f"record-{i}".encode()) for i in range(5)])
Expand Down Expand Up @@ -188,7 +210,9 @@ async def producer():
producer_task = asyncio.create_task(producer())

try:
async for output in stream.read_session(start=SeqNum(tail.next_seq_num)):
async for output in stream.read_session(
start=SeqNum(tail.next_seq_num), clamp=True
):
if isinstance(output, list) and len(output) > 0:
assert output[0].body == b"record-0"
break
Expand Down