diff --git a/tests/test_stream_ops.py b/tests/test_stream_ops.py index 7792e24..a603df7 100644 --- a/tests/test_stream_ops.py +++ b/tests/test_stream_ops.py @@ -17,7 +17,6 @@ from streamstore.utils import metered_bytes -@pytest.mark.asyncio(loop_scope="session") @pytest.mark.stream class TestStreamOperations: async def test_check_tail_empty_stream(self, stream: Stream): @@ -64,24 +63,24 @@ async def test_append_with_match_seq_num(self, stream: Stream): assert output_1.next_seq_num == 2 async def test_append_with_timestamp(self, stream: Stream): - timestamp_1 = int(time.time()) + timestamp_0 = int(time.time()) await asyncio.sleep(0.1) - timestamp_2 = int(time.time()) + timestamp_1 = int(time.time()) input = AppendInput( records=[ - Record(body=b"record-0", timestamp=timestamp_1), - Record(body=b"record-1", timestamp=timestamp_2), + Record(body=b"record-0", timestamp=timestamp_0), + Record(body=b"record-1", timestamp=timestamp_1), ] ) output = await stream.append(input) assert output.start_seq_num == 0 - assert output.start_timestamp == timestamp_1 + assert output.start_timestamp == timestamp_0 assert output.end_seq_num == 2 - assert output.end_timestamp == timestamp_2 + assert output.end_timestamp == timestamp_1 assert output.next_seq_num == 2 - assert output.last_timestamp == timestamp_2 + assert output.last_timestamp == timestamp_1 async def test_read_from_seq_num_zero(self, stream: Stream): await stream.append( @@ -133,6 +132,29 @@ async def test_read_from_tail_offset(self, stream: Stream): assert records[0].body == b"record-3" assert records[1].body == b"record-4" + async def test_read_until_timestamp(self, stream: Stream): + timestamp_0 = int(time.time() * 1000) + await asyncio.sleep(0.2) + timestamp_1 = int(time.time() * 1000) + await asyncio.sleep(0.2) + timestamp_2 = int(time.time() * 1000) + + await stream.append( + AppendInput( + records=[ + Record(body=b"record-0", timestamp=timestamp_0), + Record(body=b"record-1", timestamp=timestamp_1), + Record(body=b"record-2", timestamp=timestamp_2), + ] + ) + ) + + records = await stream.read(start=Timestamp(timestamp_0), until=timestamp_2) + assert isinstance(records, list) + assert len(records) == 2 + assert records[0].timestamp == timestamp_0 + assert records[1].timestamp == timestamp_1 + async def test_read_beyond_tail(self, stream: Stream): await stream.append( AppendInput(records=[Record(body=f"record-{i}".encode()) for i in range(5)]) @@ -188,7 +210,9 @@ async def producer(): producer_task = asyncio.create_task(producer()) try: - async for output in stream.read_session(start=SeqNum(tail.next_seq_num)): + async for output in stream.read_session( + start=SeqNum(tail.next_seq_num), clamp=True + ): if isinstance(output, list) and len(output) > 0: assert output[0].body == b"record-0" break