From 4ab7877d3bbc3d2890010e6013f776db52f76c44 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 21 Jan 2026 14:48:09 +0000 Subject: [PATCH 1/3] chore: move system tests to single event loop --- tests/system/test_zonal.py | 661 ++++++++++++++++++++----------------- 1 file changed, 359 insertions(+), 302 deletions(-) diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index b5942365a..e0ac817ce 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -2,6 +2,7 @@ import os import uuid from io import BytesIO +import asyncio # python additional imports import google_crc32c @@ -33,12 +34,55 @@ _BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object" +async def create_async_grpc_client(attempt_direct_path=True): + """Initializes async client and gets the current event loop.""" + return AsyncGrpcClient(attempt_direct_path=attempt_direct_path).grpc_client + + +@pytest.fixture(scope="session") +def event_loop(): + """Redefine pytest-asyncio's event_loop fixture to be session-scoped.""" + loop = asyncio.new_event_loop() + yield loop + loop.close() + + +@pytest.fixture(scope="session") +def grpc_clients(event_loop): + + # grpc clients has to be instantiated in the event loop, + # otherwise grpc creates it's own event loop and attaches to the client. + # Which will lead to deadlock because client running in one event loop and + # MRD or Appendable-Writer in another. + # https://github.com/grpc/grpc/blob/61fe9b40a986792ab7d4eb8924027b671faf26ba/src/python/grpcio/grpc/aio/_channel.py#L369 + # https://github.com/grpc/grpc/blob/61fe9b40a986792ab7d4eb8924027b671faf26ba/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi#L249 + clients = { + True: event_loop.run_until_complete( + create_async_grpc_client(attempt_direct_path=True) + ), + False: event_loop.run_until_complete( + create_async_grpc_client(attempt_direct_path=False) + ), + } + return clients + + +# This fixture is for tests that are NOT parametrized by attempt_direct_path +@pytest.fixture +def grpc_client(grpc_clients): + return grpc_clients[False] + + +@pytest.fixture +def grpc_client_direct(grpc_clients): + return grpc_clients[True] + + def _get_equal_dist(a: int, b: int) -> tuple[int, int]: step = (b - a) // 3 return a + step, a + 2 * step -@pytest.mark.asyncio @pytest.mark.parametrize( "object_size", [ @@ -51,47 +95,46 @@ def _get_equal_dist(a: int, b: int) -> tuple[int, int]: "attempt_direct_path", [True, False], ) -async def test_basic_wrd( - storage_client, blobs_to_delete, attempt_direct_path, object_size +def test_basic_wrd( + storage_client, + blobs_to_delete, + attempt_direct_path, + object_size, + event_loop, + grpc_clients, ): object_name = f"test_basic_wrd-{str(uuid.uuid4())}" - # Client instantiation; it cannot be part of fixture because. - # grpc_client's event loop and event loop of coroutine running it - # (i.e. this test) must be same. - # Note: - # 1. @pytest.mark.asyncio ensures new event loop for each test. - # 2. we can keep the same event loop for entire module but that may - # create issues if tests are run in parallel and one test hogs the event - # loop slowing down other tests. - object_data = os.urandom(object_size) - object_checksum = google_crc32c.value(object_data) - grpc_client = AsyncGrpcClient(attempt_direct_path=attempt_direct_path).grpc_client - - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - await writer.open() - await writer.append(object_data) - object_metadata = await writer.close(finalize_on_close=True) - assert object_metadata.size == object_size - assert int(object_metadata.checksums.crc32c) == object_checksum - - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - buffer = BytesIO() - await mrd.open() - # (0, 0) means read the whole object - await mrd.download_ranges([(0, 0, buffer)]) - await mrd.close() - assert buffer.getvalue() == object_data - assert mrd.persisted_size == object_size - - # Clean up; use json client (i.e. `storage_client` fixture) to delete. - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - del writer - del mrd - gc.collect() - - -@pytest.mark.asyncio + async def _run(): + object_data = os.urandom(object_size) + object_checksum = google_crc32c.value(object_data) + grpc_client = grpc_clients[attempt_direct_path] + + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + await writer.append(object_data) + object_metadata = await writer.close(finalize_on_close=True) + assert object_metadata.size == object_size + assert int(object_metadata.checksums.crc32c) == object_checksum + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == object_data + assert mrd.persisted_size == object_size + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + @pytest.mark.parametrize( "object_size", [ @@ -100,48 +143,43 @@ async def test_basic_wrd( 20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE_BYTES ], ) -async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size): +def test_basic_wrd_in_slices( + storage_client, blobs_to_delete, object_size, event_loop, grpc_client +): object_name = f"test_basic_wrd-{str(uuid.uuid4())}" - # Client instantiation; it cannot be part of fixture because. - # grpc_client's event loop and event loop of coroutine running it - # (i.e. this test) must be same. - # Note: - # 1. @pytest.mark.asyncio ensures new event loop for each test. - # 2. we can keep the same event loop for entire module but that may - # create issues if tests are run in parallel and one test hogs the event - # loop slowing down other tests. - object_data = os.urandom(object_size) - object_checksum = google_crc32c.value(object_data) - grpc_client = AsyncGrpcClient().grpc_client - - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - await writer.open() - mark1, mark2 = _get_equal_dist(0, object_size) - await writer.append(object_data[0:mark1]) - await writer.append(object_data[mark1:mark2]) - await writer.append(object_data[mark2:]) - object_metadata = await writer.close(finalize_on_close=True) - assert object_metadata.size == object_size - assert int(object_metadata.checksums.crc32c) == object_checksum - - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - buffer = BytesIO() - await mrd.open() - # (0, 0) means read the whole object - await mrd.download_ranges([(0, 0, buffer)]) - await mrd.close() - assert buffer.getvalue() == object_data - assert mrd.persisted_size == object_size - - # Clean up; use json client (i.e. `storage_client` fixture) to delete. - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - del writer - del mrd - gc.collect() - - -@pytest.mark.asyncio + async def _run(): + object_data = os.urandom(object_size) + object_checksum = google_crc32c.value(object_data) + + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + mark1, mark2 = _get_equal_dist(0, object_size) + await writer.append(object_data[0:mark1]) + await writer.append(object_data[mark1:mark2]) + await writer.append(object_data[mark2:]) + object_metadata = await writer.close(finalize_on_close=True) + assert object_metadata.size == object_size + assert int(object_metadata.checksums.crc32c) == object_checksum + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == object_data + assert mrd.persisted_size == object_size + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + @pytest.mark.parametrize( "flush_interval", [ @@ -151,219 +189,233 @@ async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size) _DEFAULT_FLUSH_INTERVAL_BYTES, ], ) -async def test_wrd_with_non_default_flush_interval( +def test_wrd_with_non_default_flush_interval( storage_client, blobs_to_delete, flush_interval, + event_loop, + grpc_client, ): object_name = f"test_basic_wrd-{str(uuid.uuid4())}" object_size = 9 * 1024 * 1024 - # Client instantiation; it cannot be part of fixture because. - # grpc_client's event loop and event loop of coroutine running it - # (i.e. this test) must be same. - # Note: - # 1. @pytest.mark.asyncio ensures new event loop for each test. - # 2. we can keep the same event loop for entire module but that may - # create issues if tests are run in parallel and one test hogs the event - # loop slowing down other tests. - object_data = os.urandom(object_size) - object_checksum = google_crc32c.value(object_data) - grpc_client = AsyncGrpcClient().grpc_client - - writer = AsyncAppendableObjectWriter( - grpc_client, - _ZONAL_BUCKET, - object_name, - writer_options={"FLUSH_INTERVAL_BYTES": flush_interval}, - ) - await writer.open() - mark1, mark2 = _get_equal_dist(0, object_size) - await writer.append(object_data[0:mark1]) - await writer.append(object_data[mark1:mark2]) - await writer.append(object_data[mark2:]) - object_metadata = await writer.close(finalize_on_close=True) - assert object_metadata.size == object_size - assert int(object_metadata.checksums.crc32c) == object_checksum - - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - buffer = BytesIO() - await mrd.open() - # (0, 0) means read the whole object - await mrd.download_ranges([(0, 0, buffer)]) - await mrd.close() - assert buffer.getvalue() == object_data - assert mrd.persisted_size == object_size - - # Clean up; use json client (i.e. `storage_client` fixture) to delete. - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - del writer - del mrd - gc.collect() - - -@pytest.mark.asyncio -async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delete): + async def _run(): + object_data = os.urandom(object_size) + object_checksum = google_crc32c.value(object_data) + + writer = AsyncAppendableObjectWriter( + grpc_client, + _ZONAL_BUCKET, + object_name, + writer_options={"FLUSH_INTERVAL_BYTES": flush_interval}, + ) + await writer.open() + mark1, mark2 = _get_equal_dist(0, object_size) + await writer.append(object_data[0:mark1]) + await writer.append(object_data[mark1:mark2]) + await writer.append(object_data[mark2:]) + object_metadata = await writer.close(finalize_on_close=True) + assert object_metadata.size == object_size + assert int(object_metadata.checksums.crc32c) == object_checksum + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == object_data + assert mrd.persisted_size == object_size + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + +def test_read_unfinalized_appendable_object( + storage_client, blobs_to_delete, event_loop, grpc_client_direct +): object_name = f"read_unfinalized_appendable_object-{str(uuid.uuid4())[:4]}" - grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client - - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - await writer.open() - await writer.append(_BYTES_TO_UPLOAD) - await writer.flush() - - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - buffer = BytesIO() - await mrd.open() - assert mrd.persisted_size == len(_BYTES_TO_UPLOAD) - # (0, 0) means read the whole object - await mrd.download_ranges([(0, 0, buffer)]) - await mrd.close() - assert buffer.getvalue() == _BYTES_TO_UPLOAD - - # Clean up; use json client (i.e. `storage_client` fixture) to delete. - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - del writer - del mrd - gc.collect() - - -@pytest.mark.asyncio -async def test_mrd_open_with_read_handle(): - grpc_client = AsyncGrpcClient().grpc_client + + async def _run(): + grpc_client = grpc_client_direct + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + await writer.append(_BYTES_TO_UPLOAD) + await writer.flush() + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + assert mrd.persisted_size == len(_BYTES_TO_UPLOAD) + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == _BYTES_TO_UPLOAD + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + +@pytest.mark.skip(reason="failing") +def test_mrd_open_with_read_handle(event_loop, grpc_client): object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}" - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - await writer.open() - await writer.append(_BYTES_TO_UPLOAD) - await writer.close() - - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - await mrd.open() - read_handle = mrd.read_handle - await mrd.close() - - # Open a new MRD using the `read_handle` obtained above - new_mrd = AsyncMultiRangeDownloader( - grpc_client, _ZONAL_BUCKET, object_name, read_handle=read_handle - ) - await new_mrd.open() - # persisted_size not set when opened with read_handle - assert new_mrd.persisted_size is None - buffer = BytesIO() - await new_mrd.download_ranges([(0, 0, buffer)]) - await new_mrd.close() - assert buffer.getvalue() == _BYTES_TO_UPLOAD - del mrd - del new_mrd - gc.collect() - - -@pytest.mark.asyncio -async def test_read_unfinalized_appendable_object_with_generation( - storage_client, blobs_to_delete + + async def _run(): + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + await writer.append(_BYTES_TO_UPLOAD) + await writer.close() + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + await mrd.open() + read_handle = mrd.read_handle + await mrd.close() + print("*" * 88) + print(mrd.read_handle) + print("*" * 88) + + # Open a new MRD using the `read_handle` obtained above + new_mrd = AsyncMultiRangeDownloader( + grpc_client, _ZONAL_BUCKET, object_name, read_handle=read_handle + ) + await new_mrd.open() + # persisted_size not set when opened with read_handle + assert new_mrd.persisted_size is None + buffer = BytesIO() + await new_mrd.download_ranges([(0, 0, buffer)]) + await new_mrd.close() + assert buffer.getvalue() == _BYTES_TO_UPLOAD + del mrd + del new_mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + +def test_read_unfinalized_appendable_object_with_generation( + storage_client, blobs_to_delete, event_loop, grpc_client_direct ): object_name = f"read_unfinalized_appendable_object-{str(uuid.uuid4())[:4]}" - grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client + grpc_client = grpc_client_direct + + async def _run(): + async def _read_and_verify(expected_content, generation=None): + """Helper to read object content and verify against expected.""" + mrd = AsyncMultiRangeDownloader( + grpc_client, _ZONAL_BUCKET, object_name, generation + ) + buffer = BytesIO() + await mrd.open() + try: + assert mrd.persisted_size == len(expected_content) + await mrd.download_ranges([(0, 0, buffer)]) + assert buffer.getvalue() == expected_content + finally: + await mrd.close() + return mrd + + # First write + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + await writer.append(_BYTES_TO_UPLOAD) + await writer.flush() + generation = writer.generation + + # First read + mrd = await _read_and_verify(_BYTES_TO_UPLOAD) - async def _read_and_verify(expected_content, generation=None): - """Helper to read object content and verify against expected.""" - mrd = AsyncMultiRangeDownloader( - grpc_client, _ZONAL_BUCKET, object_name, generation + # Second write, using generation from the first write. + writer_2 = AsyncAppendableObjectWriter( + grpc_client, _ZONAL_BUCKET, object_name, generation=generation ) - buffer = BytesIO() - await mrd.open() - try: - assert mrd.persisted_size == len(expected_content) - await mrd.download_ranges([(0, 0, buffer)]) - assert buffer.getvalue() == expected_content - finally: - await mrd.close() - return mrd - - # First write - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - await writer.open() - await writer.append(_BYTES_TO_UPLOAD) - await writer.flush() - generation = writer.generation - - # First read - mrd = await _read_and_verify(_BYTES_TO_UPLOAD) - - # Second write, using generation from the first write. - writer_2 = AsyncAppendableObjectWriter( - grpc_client, _ZONAL_BUCKET, object_name, generation=generation - ) - await writer_2.open() - await writer_2.append(_BYTES_TO_UPLOAD) - await writer_2.flush() - - # Second read - mrd_2 = await _read_and_verify(_BYTES_TO_UPLOAD + _BYTES_TO_UPLOAD, generation) - - # Clean up - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - del writer - del writer_2 - del mrd - del mrd_2 - gc.collect() - - -@pytest.mark.asyncio -async def test_append_flushes_and_state_lookup(storage_client, blobs_to_delete): + await writer_2.open() + await writer_2.append(_BYTES_TO_UPLOAD) + await writer_2.flush() + + # Second read + mrd_2 = await _read_and_verify(_BYTES_TO_UPLOAD + _BYTES_TO_UPLOAD, generation) + + # Clean up + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del writer_2 + del mrd + del mrd_2 + gc.collect() + + event_loop.run_until_complete(_run()) + + +def test_append_flushes_and_state_lookup( + storage_client, blobs_to_delete, event_loop, grpc_client +): """ System test for AsyncAppendableObjectWriter, verifying flushing behavior for both small and large appends. """ object_name = f"test-append-flush-varied-size-{uuid.uuid4()}" - grpc_client = AsyncGrpcClient().grpc_client - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - # Schedule for cleanup - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + async def _run(): + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - # --- Part 1: Test with small data --- - small_data = b"small data" + # Schedule for cleanup + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - await writer.open() - assert writer._is_stream_open + # --- Part 1: Test with small data --- + small_data = b"small data" - await writer.append(small_data) - persisted_size = await writer.state_lookup() - assert persisted_size == len(small_data) + await writer.open() + assert writer._is_stream_open - # --- Part 2: Test with large data --- - large_data = os.urandom(38 * 1024 * 1024) + await writer.append(small_data) + persisted_size = await writer.state_lookup() + assert persisted_size == len(small_data) - # Append data larger than the default flush interval (16 MiB). - # This should trigger the interval-based flushing logic. - await writer.append(large_data) + # --- Part 2: Test with large data --- + large_data = os.urandom(38 * 1024 * 1024) - # Verify the total data has been persisted. - total_size = len(small_data) + len(large_data) - persisted_size = await writer.state_lookup() - assert persisted_size == total_size + # Append data larger than the default flush interval (16 MiB). + # This should trigger the interval-based flushing logic. + await writer.append(large_data) - # --- Part 3: Finalize and verify --- - final_object = await writer.close(finalize_on_close=True) + # Verify the total data has been persisted. + total_size = len(small_data) + len(large_data) + persisted_size = await writer.state_lookup() + assert persisted_size == total_size - assert not writer._is_stream_open - assert final_object.size == total_size + # --- Part 3: Finalize and verify --- + final_object = await writer.close(finalize_on_close=True) - # Verify the full content of the object. - full_data = small_data + large_data - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - buffer = BytesIO() - await mrd.open() - # (0, 0) means read the whole object - await mrd.download_ranges([(0, 0, buffer)]) - await mrd.close() - content = buffer.getvalue() - assert content == full_data + assert not writer._is_stream_open + assert final_object.size == total_size -@pytest.mark.asyncio -async def test_open_with_generation_zero(storage_client, blobs_to_delete): + # Verify the full content of the object. + full_data = small_data + large_data + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + content = buffer.getvalue() + assert content == full_data + + event_loop.run_until_complete(_run()) + + +def test_open_with_generation_zero( + storage_client, blobs_to_delete, event_loop, grpc_client +): """Tests that using `generation=0` fails if the object already exists. This test verifies that: @@ -373,62 +425,67 @@ async def test_open_with_generation_zero(storage_client, blobs_to_delete): precondition (object must not exist) is not met. """ object_name = f"test_append_with_generation-{uuid.uuid4()}" - grpc_client = AsyncGrpcClient().grpc_client - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name, generation=0) - # Empty object is created. - await writer.open() - assert writer.is_stream_open - - await writer.close() - assert not writer.is_stream_open - - - with pytest.raises(FailedPrecondition) as exc_info: + async def _run(): writer = AsyncAppendableObjectWriter( grpc_client, _ZONAL_BUCKET, object_name, generation=0 ) + + # Empty object is created. await writer.open() - assert exc_info.value.code == 400 + assert writer.is_stream_open + + await writer.close() + assert not writer.is_stream_open + + with pytest.raises(FailedPrecondition) as exc_info: + writer_fail = AsyncAppendableObjectWriter( + grpc_client, _ZONAL_BUCKET, object_name, generation=0 + ) + await writer_fail.open() + assert exc_info.value.code == 400 - # cleanup - del writer - gc.collect() + # cleanup + del writer + gc.collect() - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) -@pytest.mark.asyncio -async def test_open_existing_object_with_gen_None_overrides_existing(storage_client, blobs_to_delete): + event_loop.run_until_complete(_run()) + + +def test_open_existing_object_with_gen_None_overrides_existing( + storage_client, blobs_to_delete, event_loop, grpc_client +): """ Test that a new writer when specifies `None` overrides the existing object. """ object_name = f"test_append_with_generation-{uuid.uuid4()}" - grpc_client = AsyncGrpcClient().grpc_client - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name, generation=0) - - # Empty object is created. - await writer.open() - assert writer.is_stream_open - old_gen = writer.generation - - - await writer.close() - assert not writer.is_stream_open + async def _run(): + writer = AsyncAppendableObjectWriter( + grpc_client, _ZONAL_BUCKET, object_name, generation=0 + ) + # Empty object is created. + await writer.open() + assert writer.is_stream_open + old_gen = writer.generation + await writer.close() + assert not writer.is_stream_open - new_writer = AsyncAppendableObjectWriter( + new_writer = AsyncAppendableObjectWriter( grpc_client, _ZONAL_BUCKET, object_name, generation=None - ) - await new_writer.open() - assert new_writer.generation != old_gen + ) + await new_writer.open() + assert new_writer.generation != old_gen - # assert exc_info.value.code == 400 + # cleanup + del writer + del new_writer + gc.collect() - # cleanup - del writer - del new_writer - gc.collect() + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) \ No newline at end of file + event_loop.run_until_complete(_run()) From 44390399c5ca25327ba8104c19d8db5109925493 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 21 Jan 2026 15:27:46 +0000 Subject: [PATCH 2/3] open with read handle works only in direct_path --- tests/system/open_with_read_handle.py | 40 ++++++++++++++++++++++++ tests/system/test_zonal.py | 45 ++++++++++++++++++++++----- 2 files changed, 78 insertions(+), 7 deletions(-) create mode 100644 tests/system/open_with_read_handle.py diff --git a/tests/system/open_with_read_handle.py b/tests/system/open_with_read_handle.py new file mode 100644 index 000000000..a8d4fe1c4 --- /dev/null +++ b/tests/system/open_with_read_handle.py @@ -0,0 +1,40 @@ +# py standard imports +import os +from io import BytesIO + +# current library imports +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + +# TODO: replace this with a fixture once zonal bucket creation / deletion +# is supported in grpc client or json client client. +_ZONAL_BUCKET = 'chandrasiri-rs' +_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object" + + +async def mrd_open_with_read_handle(appendable_object): + grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, appendable_object) + await mrd.open() + read_handle = mrd.read_handle + await mrd.close() + + # Open a new MRD using the `read_handle` obtained above + new_mrd = AsyncMultiRangeDownloader( + grpc_client, _ZONAL_BUCKET, appendable_object, read_handle=read_handle + ) + await new_mrd.open() + # persisted_size not set when opened with read_handle + assert new_mrd.persisted_size is None + buffer = BytesIO() + await new_mrd.download_ranges([(0, 0, buffer)]) + await new_mrd.close() + assert buffer.getvalue() == _BYTES_TO_UPLOAD + +if __name__ == "__main__": + import asyncio + asyncio.run(mrd_open_with_read_handle('read_handle_123')) + diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index e0ac817ce..b5291ca08 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -266,8 +266,41 @@ async def _run(): event_loop.run_until_complete(_run()) -@pytest.mark.skip(reason="failing") -def test_mrd_open_with_read_handle(event_loop, grpc_client): +def test_mrd_open_with_read_handle(event_loop, grpc_client_direct): + object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}" + + async def _run(): + writer = AsyncAppendableObjectWriter( + grpc_client_direct, _ZONAL_BUCKET, object_name + ) + await writer.open() + await writer.append(_BYTES_TO_UPLOAD) + await writer.close() + + mrd = AsyncMultiRangeDownloader(grpc_client_direct, _ZONAL_BUCKET, object_name) + await mrd.open() + read_handle = mrd.read_handle + await mrd.close() + + # Open a new MRD using the `read_handle` obtained above + new_mrd = AsyncMultiRangeDownloader( + grpc_client_direct, _ZONAL_BUCKET, object_name, read_handle=read_handle + ) + await new_mrd.open() + # persisted_size not set when opened with read_handle + assert new_mrd.persisted_size is None + buffer = BytesIO() + await new_mrd.download_ranges([(0, 0, buffer)]) + await new_mrd.close() + assert buffer.getvalue() == _BYTES_TO_UPLOAD + del mrd + del new_mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + +def test_mrd_open_with_read_handle_over_cloud_path(event_loop, grpc_client): object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}" async def _run(): @@ -280,17 +313,15 @@ async def _run(): await mrd.open() read_handle = mrd.read_handle await mrd.close() - print("*" * 88) - print(mrd.read_handle) - print("*" * 88) # Open a new MRD using the `read_handle` obtained above new_mrd = AsyncMultiRangeDownloader( grpc_client, _ZONAL_BUCKET, object_name, read_handle=read_handle ) await new_mrd.open() - # persisted_size not set when opened with read_handle - assert new_mrd.persisted_size is None + # persisted_size is set regardless of whether we use read_handle or not + # because read_handle won't work in CLOUD_PATH. + assert new_mrd.persisted_size == len(_BYTES_TO_UPLOAD) buffer = BytesIO() await new_mrd.download_ranges([(0, 0, buffer)]) await new_mrd.close() From 190498aa15430fd98526465f00b39079d4925940 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 21 Jan 2026 15:32:46 +0000 Subject: [PATCH 3/3] remove unwated file --- tests/system/open_with_read_handle.py | 40 --------------------------- 1 file changed, 40 deletions(-) delete mode 100644 tests/system/open_with_read_handle.py diff --git a/tests/system/open_with_read_handle.py b/tests/system/open_with_read_handle.py deleted file mode 100644 index a8d4fe1c4..000000000 --- a/tests/system/open_with_read_handle.py +++ /dev/null @@ -1,40 +0,0 @@ -# py standard imports -import os -from io import BytesIO - -# current library imports -from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient -from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( - AsyncMultiRangeDownloader, -) - -# TODO: replace this with a fixture once zonal bucket creation / deletion -# is supported in grpc client or json client client. -_ZONAL_BUCKET = 'chandrasiri-rs' -_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object" - - -async def mrd_open_with_read_handle(appendable_object): - grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client - - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, appendable_object) - await mrd.open() - read_handle = mrd.read_handle - await mrd.close() - - # Open a new MRD using the `read_handle` obtained above - new_mrd = AsyncMultiRangeDownloader( - grpc_client, _ZONAL_BUCKET, appendable_object, read_handle=read_handle - ) - await new_mrd.open() - # persisted_size not set when opened with read_handle - assert new_mrd.persisted_size is None - buffer = BytesIO() - await new_mrd.download_ranges([(0, 0, buffer)]) - await new_mrd.close() - assert buffer.getvalue() == _BYTES_TO_UPLOAD - -if __name__ == "__main__": - import asyncio - asyncio.run(mrd_open_with_read_handle('read_handle_123')) -