From 4ab7877d3bbc3d2890010e6013f776db52f76c44 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 21 Jan 2026 14:48:09 +0000 Subject: [PATCH 1/5] chore: move system tests to single event loop --- tests/system/test_zonal.py | 661 ++++++++++++++++++++----------------- 1 file changed, 359 insertions(+), 302 deletions(-) diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index b5942365a..e0ac817ce 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -2,6 +2,7 @@ import os import uuid from io import BytesIO +import asyncio # python additional imports import google_crc32c @@ -33,12 +34,55 @@ _BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object" +async def create_async_grpc_client(attempt_direct_path=True): + """Initializes async client and gets the current event loop.""" + return AsyncGrpcClient(attempt_direct_path=attempt_direct_path).grpc_client + + +@pytest.fixture(scope="session") +def event_loop(): + """Redefine pytest-asyncio's event_loop fixture to be session-scoped.""" + loop = asyncio.new_event_loop() + yield loop + loop.close() + + +@pytest.fixture(scope="session") +def grpc_clients(event_loop): + + # grpc clients has to be instantiated in the event loop, + # otherwise grpc creates it's own event loop and attaches to the client. + # Which will lead to deadlock because client running in one event loop and + # MRD or Appendable-Writer in another. + # https://github.com/grpc/grpc/blob/61fe9b40a986792ab7d4eb8924027b671faf26ba/src/python/grpcio/grpc/aio/_channel.py#L369 + # https://github.com/grpc/grpc/blob/61fe9b40a986792ab7d4eb8924027b671faf26ba/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi#L249 + clients = { + True: event_loop.run_until_complete( + create_async_grpc_client(attempt_direct_path=True) + ), + False: event_loop.run_until_complete( + create_async_grpc_client(attempt_direct_path=False) + ), + } + return clients + + +# This fixture is for tests that are NOT parametrized by attempt_direct_path +@pytest.fixture +def grpc_client(grpc_clients): + return grpc_clients[False] + + +@pytest.fixture +def grpc_client_direct(grpc_clients): + return grpc_clients[True] + + def _get_equal_dist(a: int, b: int) -> tuple[int, int]: step = (b - a) // 3 return a + step, a + 2 * step -@pytest.mark.asyncio @pytest.mark.parametrize( "object_size", [ @@ -51,47 +95,46 @@ def _get_equal_dist(a: int, b: int) -> tuple[int, int]: "attempt_direct_path", [True, False], ) -async def test_basic_wrd( - storage_client, blobs_to_delete, attempt_direct_path, object_size +def test_basic_wrd( + storage_client, + blobs_to_delete, + attempt_direct_path, + object_size, + event_loop, + grpc_clients, ): object_name = f"test_basic_wrd-{str(uuid.uuid4())}" - # Client instantiation; it cannot be part of fixture because. - # grpc_client's event loop and event loop of coroutine running it - # (i.e. this test) must be same. - # Note: - # 1. @pytest.mark.asyncio ensures new event loop for each test. - # 2. we can keep the same event loop for entire module but that may - # create issues if tests are run in parallel and one test hogs the event - # loop slowing down other tests. - object_data = os.urandom(object_size) - object_checksum = google_crc32c.value(object_data) - grpc_client = AsyncGrpcClient(attempt_direct_path=attempt_direct_path).grpc_client - - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - await writer.open() - await writer.append(object_data) - object_metadata = await writer.close(finalize_on_close=True) - assert object_metadata.size == object_size - assert int(object_metadata.checksums.crc32c) == object_checksum - - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - buffer = BytesIO() - await mrd.open() - # (0, 0) means read the whole object - await mrd.download_ranges([(0, 0, buffer)]) - await mrd.close() - assert buffer.getvalue() == object_data - assert mrd.persisted_size == object_size - - # Clean up; use json client (i.e. `storage_client` fixture) to delete. - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - del writer - del mrd - gc.collect() - - -@pytest.mark.asyncio + async def _run(): + object_data = os.urandom(object_size) + object_checksum = google_crc32c.value(object_data) + grpc_client = grpc_clients[attempt_direct_path] + + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + await writer.append(object_data) + object_metadata = await writer.close(finalize_on_close=True) + assert object_metadata.size == object_size + assert int(object_metadata.checksums.crc32c) == object_checksum + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == object_data + assert mrd.persisted_size == object_size + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + @pytest.mark.parametrize( "object_size", [ @@ -100,48 +143,43 @@ async def test_basic_wrd( 20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE_BYTES ], ) -async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size): +def test_basic_wrd_in_slices( + storage_client, blobs_to_delete, object_size, event_loop, grpc_client +): object_name = f"test_basic_wrd-{str(uuid.uuid4())}" - # Client instantiation; it cannot be part of fixture because. - # grpc_client's event loop and event loop of coroutine running it - # (i.e. this test) must be same. - # Note: - # 1. @pytest.mark.asyncio ensures new event loop for each test. - # 2. we can keep the same event loop for entire module but that may - # create issues if tests are run in parallel and one test hogs the event - # loop slowing down other tests. - object_data = os.urandom(object_size) - object_checksum = google_crc32c.value(object_data) - grpc_client = AsyncGrpcClient().grpc_client - - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - await writer.open() - mark1, mark2 = _get_equal_dist(0, object_size) - await writer.append(object_data[0:mark1]) - await writer.append(object_data[mark1:mark2]) - await writer.append(object_data[mark2:]) - object_metadata = await writer.close(finalize_on_close=True) - assert object_metadata.size == object_size - assert int(object_metadata.checksums.crc32c) == object_checksum - - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - buffer = BytesIO() - await mrd.open() - # (0, 0) means read the whole object - await mrd.download_ranges([(0, 0, buffer)]) - await mrd.close() - assert buffer.getvalue() == object_data - assert mrd.persisted_size == object_size - - # Clean up; use json client (i.e. `storage_client` fixture) to delete. - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - del writer - del mrd - gc.collect() - - -@pytest.mark.asyncio + async def _run(): + object_data = os.urandom(object_size) + object_checksum = google_crc32c.value(object_data) + + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + mark1, mark2 = _get_equal_dist(0, object_size) + await writer.append(object_data[0:mark1]) + await writer.append(object_data[mark1:mark2]) + await writer.append(object_data[mark2:]) + object_metadata = await writer.close(finalize_on_close=True) + assert object_metadata.size == object_size + assert int(object_metadata.checksums.crc32c) == object_checksum + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == object_data + assert mrd.persisted_size == object_size + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + @pytest.mark.parametrize( "flush_interval", [ @@ -151,219 +189,233 @@ async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size) _DEFAULT_FLUSH_INTERVAL_BYTES, ], ) -async def test_wrd_with_non_default_flush_interval( +def test_wrd_with_non_default_flush_interval( storage_client, blobs_to_delete, flush_interval, + event_loop, + grpc_client, ): object_name = f"test_basic_wrd-{str(uuid.uuid4())}" object_size = 9 * 1024 * 1024 - # Client instantiation; it cannot be part of fixture because. - # grpc_client's event loop and event loop of coroutine running it - # (i.e. this test) must be same. - # Note: - # 1. @pytest.mark.asyncio ensures new event loop for each test. - # 2. we can keep the same event loop for entire module but that may - # create issues if tests are run in parallel and one test hogs the event - # loop slowing down other tests. - object_data = os.urandom(object_size) - object_checksum = google_crc32c.value(object_data) - grpc_client = AsyncGrpcClient().grpc_client - - writer = AsyncAppendableObjectWriter( - grpc_client, - _ZONAL_BUCKET, - object_name, - writer_options={"FLUSH_INTERVAL_BYTES": flush_interval}, - ) - await writer.open() - mark1, mark2 = _get_equal_dist(0, object_size) - await writer.append(object_data[0:mark1]) - await writer.append(object_data[mark1:mark2]) - await writer.append(object_data[mark2:]) - object_metadata = await writer.close(finalize_on_close=True) - assert object_metadata.size == object_size - assert int(object_metadata.checksums.crc32c) == object_checksum - - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - buffer = BytesIO() - await mrd.open() - # (0, 0) means read the whole object - await mrd.download_ranges([(0, 0, buffer)]) - await mrd.close() - assert buffer.getvalue() == object_data - assert mrd.persisted_size == object_size - - # Clean up; use json client (i.e. `storage_client` fixture) to delete. - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - del writer - del mrd - gc.collect() - - -@pytest.mark.asyncio -async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delete): + async def _run(): + object_data = os.urandom(object_size) + object_checksum = google_crc32c.value(object_data) + + writer = AsyncAppendableObjectWriter( + grpc_client, + _ZONAL_BUCKET, + object_name, + writer_options={"FLUSH_INTERVAL_BYTES": flush_interval}, + ) + await writer.open() + mark1, mark2 = _get_equal_dist(0, object_size) + await writer.append(object_data[0:mark1]) + await writer.append(object_data[mark1:mark2]) + await writer.append(object_data[mark2:]) + object_metadata = await writer.close(finalize_on_close=True) + assert object_metadata.size == object_size + assert int(object_metadata.checksums.crc32c) == object_checksum + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == object_data + assert mrd.persisted_size == object_size + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + +def test_read_unfinalized_appendable_object( + storage_client, blobs_to_delete, event_loop, grpc_client_direct +): object_name = f"read_unfinalized_appendable_object-{str(uuid.uuid4())[:4]}" - grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client - - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - await writer.open() - await writer.append(_BYTES_TO_UPLOAD) - await writer.flush() - - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - buffer = BytesIO() - await mrd.open() - assert mrd.persisted_size == len(_BYTES_TO_UPLOAD) - # (0, 0) means read the whole object - await mrd.download_ranges([(0, 0, buffer)]) - await mrd.close() - assert buffer.getvalue() == _BYTES_TO_UPLOAD - - # Clean up; use json client (i.e. `storage_client` fixture) to delete. - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - del writer - del mrd - gc.collect() - - -@pytest.mark.asyncio -async def test_mrd_open_with_read_handle(): - grpc_client = AsyncGrpcClient().grpc_client + + async def _run(): + grpc_client = grpc_client_direct + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + await writer.append(_BYTES_TO_UPLOAD) + await writer.flush() + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + assert mrd.persisted_size == len(_BYTES_TO_UPLOAD) + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == _BYTES_TO_UPLOAD + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + +@pytest.mark.skip(reason="failing") +def test_mrd_open_with_read_handle(event_loop, grpc_client): object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}" - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - await writer.open() - await writer.append(_BYTES_TO_UPLOAD) - await writer.close() - - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - await mrd.open() - read_handle = mrd.read_handle - await mrd.close() - - # Open a new MRD using the `read_handle` obtained above - new_mrd = AsyncMultiRangeDownloader( - grpc_client, _ZONAL_BUCKET, object_name, read_handle=read_handle - ) - await new_mrd.open() - # persisted_size not set when opened with read_handle - assert new_mrd.persisted_size is None - buffer = BytesIO() - await new_mrd.download_ranges([(0, 0, buffer)]) - await new_mrd.close() - assert buffer.getvalue() == _BYTES_TO_UPLOAD - del mrd - del new_mrd - gc.collect() - - -@pytest.mark.asyncio -async def test_read_unfinalized_appendable_object_with_generation( - storage_client, blobs_to_delete + + async def _run(): + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + await writer.append(_BYTES_TO_UPLOAD) + await writer.close() + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + await mrd.open() + read_handle = mrd.read_handle + await mrd.close() + print("*" * 88) + print(mrd.read_handle) + print("*" * 88) + + # Open a new MRD using the `read_handle` obtained above + new_mrd = AsyncMultiRangeDownloader( + grpc_client, _ZONAL_BUCKET, object_name, read_handle=read_handle + ) + await new_mrd.open() + # persisted_size not set when opened with read_handle + assert new_mrd.persisted_size is None + buffer = BytesIO() + await new_mrd.download_ranges([(0, 0, buffer)]) + await new_mrd.close() + assert buffer.getvalue() == _BYTES_TO_UPLOAD + del mrd + del new_mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + +def test_read_unfinalized_appendable_object_with_generation( + storage_client, blobs_to_delete, event_loop, grpc_client_direct ): object_name = f"read_unfinalized_appendable_object-{str(uuid.uuid4())[:4]}" - grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client + grpc_client = grpc_client_direct + + async def _run(): + async def _read_and_verify(expected_content, generation=None): + """Helper to read object content and verify against expected.""" + mrd = AsyncMultiRangeDownloader( + grpc_client, _ZONAL_BUCKET, object_name, generation + ) + buffer = BytesIO() + await mrd.open() + try: + assert mrd.persisted_size == len(expected_content) + await mrd.download_ranges([(0, 0, buffer)]) + assert buffer.getvalue() == expected_content + finally: + await mrd.close() + return mrd + + # First write + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + await writer.append(_BYTES_TO_UPLOAD) + await writer.flush() + generation = writer.generation + + # First read + mrd = await _read_and_verify(_BYTES_TO_UPLOAD) - async def _read_and_verify(expected_content, generation=None): - """Helper to read object content and verify against expected.""" - mrd = AsyncMultiRangeDownloader( - grpc_client, _ZONAL_BUCKET, object_name, generation + # Second write, using generation from the first write. + writer_2 = AsyncAppendableObjectWriter( + grpc_client, _ZONAL_BUCKET, object_name, generation=generation ) - buffer = BytesIO() - await mrd.open() - try: - assert mrd.persisted_size == len(expected_content) - await mrd.download_ranges([(0, 0, buffer)]) - assert buffer.getvalue() == expected_content - finally: - await mrd.close() - return mrd - - # First write - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - await writer.open() - await writer.append(_BYTES_TO_UPLOAD) - await writer.flush() - generation = writer.generation - - # First read - mrd = await _read_and_verify(_BYTES_TO_UPLOAD) - - # Second write, using generation from the first write. - writer_2 = AsyncAppendableObjectWriter( - grpc_client, _ZONAL_BUCKET, object_name, generation=generation - ) - await writer_2.open() - await writer_2.append(_BYTES_TO_UPLOAD) - await writer_2.flush() - - # Second read - mrd_2 = await _read_and_verify(_BYTES_TO_UPLOAD + _BYTES_TO_UPLOAD, generation) - - # Clean up - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - del writer - del writer_2 - del mrd - del mrd_2 - gc.collect() - - -@pytest.mark.asyncio -async def test_append_flushes_and_state_lookup(storage_client, blobs_to_delete): + await writer_2.open() + await writer_2.append(_BYTES_TO_UPLOAD) + await writer_2.flush() + + # Second read + mrd_2 = await _read_and_verify(_BYTES_TO_UPLOAD + _BYTES_TO_UPLOAD, generation) + + # Clean up + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del writer_2 + del mrd + del mrd_2 + gc.collect() + + event_loop.run_until_complete(_run()) + + +def test_append_flushes_and_state_lookup( + storage_client, blobs_to_delete, event_loop, grpc_client +): """ System test for AsyncAppendableObjectWriter, verifying flushing behavior for both small and large appends. """ object_name = f"test-append-flush-varied-size-{uuid.uuid4()}" - grpc_client = AsyncGrpcClient().grpc_client - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - # Schedule for cleanup - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + async def _run(): + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) - # --- Part 1: Test with small data --- - small_data = b"small data" + # Schedule for cleanup + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - await writer.open() - assert writer._is_stream_open + # --- Part 1: Test with small data --- + small_data = b"small data" - await writer.append(small_data) - persisted_size = await writer.state_lookup() - assert persisted_size == len(small_data) + await writer.open() + assert writer._is_stream_open - # --- Part 2: Test with large data --- - large_data = os.urandom(38 * 1024 * 1024) + await writer.append(small_data) + persisted_size = await writer.state_lookup() + assert persisted_size == len(small_data) - # Append data larger than the default flush interval (16 MiB). - # This should trigger the interval-based flushing logic. - await writer.append(large_data) + # --- Part 2: Test with large data --- + large_data = os.urandom(38 * 1024 * 1024) - # Verify the total data has been persisted. - total_size = len(small_data) + len(large_data) - persisted_size = await writer.state_lookup() - assert persisted_size == total_size + # Append data larger than the default flush interval (16 MiB). + # This should trigger the interval-based flushing logic. + await writer.append(large_data) - # --- Part 3: Finalize and verify --- - final_object = await writer.close(finalize_on_close=True) + # Verify the total data has been persisted. + total_size = len(small_data) + len(large_data) + persisted_size = await writer.state_lookup() + assert persisted_size == total_size - assert not writer._is_stream_open - assert final_object.size == total_size + # --- Part 3: Finalize and verify --- + final_object = await writer.close(finalize_on_close=True) - # Verify the full content of the object. - full_data = small_data + large_data - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) - buffer = BytesIO() - await mrd.open() - # (0, 0) means read the whole object - await mrd.download_ranges([(0, 0, buffer)]) - await mrd.close() - content = buffer.getvalue() - assert content == full_data + assert not writer._is_stream_open + assert final_object.size == total_size -@pytest.mark.asyncio -async def test_open_with_generation_zero(storage_client, blobs_to_delete): + # Verify the full content of the object. + full_data = small_data + large_data + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + content = buffer.getvalue() + assert content == full_data + + event_loop.run_until_complete(_run()) + + +def test_open_with_generation_zero( + storage_client, blobs_to_delete, event_loop, grpc_client +): """Tests that using `generation=0` fails if the object already exists. This test verifies that: @@ -373,62 +425,67 @@ async def test_open_with_generation_zero(storage_client, blobs_to_delete): precondition (object must not exist) is not met. """ object_name = f"test_append_with_generation-{uuid.uuid4()}" - grpc_client = AsyncGrpcClient().grpc_client - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name, generation=0) - # Empty object is created. - await writer.open() - assert writer.is_stream_open - - await writer.close() - assert not writer.is_stream_open - - - with pytest.raises(FailedPrecondition) as exc_info: + async def _run(): writer = AsyncAppendableObjectWriter( grpc_client, _ZONAL_BUCKET, object_name, generation=0 ) + + # Empty object is created. await writer.open() - assert exc_info.value.code == 400 + assert writer.is_stream_open + + await writer.close() + assert not writer.is_stream_open + + with pytest.raises(FailedPrecondition) as exc_info: + writer_fail = AsyncAppendableObjectWriter( + grpc_client, _ZONAL_BUCKET, object_name, generation=0 + ) + await writer_fail.open() + assert exc_info.value.code == 400 - # cleanup - del writer - gc.collect() + # cleanup + del writer + gc.collect() - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) -@pytest.mark.asyncio -async def test_open_existing_object_with_gen_None_overrides_existing(storage_client, blobs_to_delete): + event_loop.run_until_complete(_run()) + + +def test_open_existing_object_with_gen_None_overrides_existing( + storage_client, blobs_to_delete, event_loop, grpc_client +): """ Test that a new writer when specifies `None` overrides the existing object. """ object_name = f"test_append_with_generation-{uuid.uuid4()}" - grpc_client = AsyncGrpcClient().grpc_client - writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name, generation=0) - - # Empty object is created. - await writer.open() - assert writer.is_stream_open - old_gen = writer.generation - - - await writer.close() - assert not writer.is_stream_open + async def _run(): + writer = AsyncAppendableObjectWriter( + grpc_client, _ZONAL_BUCKET, object_name, generation=0 + ) + # Empty object is created. + await writer.open() + assert writer.is_stream_open + old_gen = writer.generation + await writer.close() + assert not writer.is_stream_open - new_writer = AsyncAppendableObjectWriter( + new_writer = AsyncAppendableObjectWriter( grpc_client, _ZONAL_BUCKET, object_name, generation=None - ) - await new_writer.open() - assert new_writer.generation != old_gen + ) + await new_writer.open() + assert new_writer.generation != old_gen - # assert exc_info.value.code == 400 + # cleanup + del writer + del new_writer + gc.collect() - # cleanup - del writer - del new_writer - gc.collect() + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) - blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) \ No newline at end of file + event_loop.run_until_complete(_run()) From 44390399c5ca25327ba8104c19d8db5109925493 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 21 Jan 2026 15:27:46 +0000 Subject: [PATCH 2/5] open with read handle works only in direct_path --- tests/system/open_with_read_handle.py | 40 ++++++++++++++++++++++++ tests/system/test_zonal.py | 45 ++++++++++++++++++++++----- 2 files changed, 78 insertions(+), 7 deletions(-) create mode 100644 tests/system/open_with_read_handle.py diff --git a/tests/system/open_with_read_handle.py b/tests/system/open_with_read_handle.py new file mode 100644 index 000000000..a8d4fe1c4 --- /dev/null +++ b/tests/system/open_with_read_handle.py @@ -0,0 +1,40 @@ +# py standard imports +import os +from io import BytesIO + +# current library imports +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + +# TODO: replace this with a fixture once zonal bucket creation / deletion +# is supported in grpc client or json client client. +_ZONAL_BUCKET = 'chandrasiri-rs' +_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object" + + +async def mrd_open_with_read_handle(appendable_object): + grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, appendable_object) + await mrd.open() + read_handle = mrd.read_handle + await mrd.close() + + # Open a new MRD using the `read_handle` obtained above + new_mrd = AsyncMultiRangeDownloader( + grpc_client, _ZONAL_BUCKET, appendable_object, read_handle=read_handle + ) + await new_mrd.open() + # persisted_size not set when opened with read_handle + assert new_mrd.persisted_size is None + buffer = BytesIO() + await new_mrd.download_ranges([(0, 0, buffer)]) + await new_mrd.close() + assert buffer.getvalue() == _BYTES_TO_UPLOAD + +if __name__ == "__main__": + import asyncio + asyncio.run(mrd_open_with_read_handle('read_handle_123')) + diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index e0ac817ce..b5291ca08 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -266,8 +266,41 @@ async def _run(): event_loop.run_until_complete(_run()) -@pytest.mark.skip(reason="failing") -def test_mrd_open_with_read_handle(event_loop, grpc_client): +def test_mrd_open_with_read_handle(event_loop, grpc_client_direct): + object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}" + + async def _run(): + writer = AsyncAppendableObjectWriter( + grpc_client_direct, _ZONAL_BUCKET, object_name + ) + await writer.open() + await writer.append(_BYTES_TO_UPLOAD) + await writer.close() + + mrd = AsyncMultiRangeDownloader(grpc_client_direct, _ZONAL_BUCKET, object_name) + await mrd.open() + read_handle = mrd.read_handle + await mrd.close() + + # Open a new MRD using the `read_handle` obtained above + new_mrd = AsyncMultiRangeDownloader( + grpc_client_direct, _ZONAL_BUCKET, object_name, read_handle=read_handle + ) + await new_mrd.open() + # persisted_size not set when opened with read_handle + assert new_mrd.persisted_size is None + buffer = BytesIO() + await new_mrd.download_ranges([(0, 0, buffer)]) + await new_mrd.close() + assert buffer.getvalue() == _BYTES_TO_UPLOAD + del mrd + del new_mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + +def test_mrd_open_with_read_handle_over_cloud_path(event_loop, grpc_client): object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}" async def _run(): @@ -280,17 +313,15 @@ async def _run(): await mrd.open() read_handle = mrd.read_handle await mrd.close() - print("*" * 88) - print(mrd.read_handle) - print("*" * 88) # Open a new MRD using the `read_handle` obtained above new_mrd = AsyncMultiRangeDownloader( grpc_client, _ZONAL_BUCKET, object_name, read_handle=read_handle ) await new_mrd.open() - # persisted_size not set when opened with read_handle - assert new_mrd.persisted_size is None + # persisted_size is set regardless of whether we use read_handle or not + # because read_handle won't work in CLOUD_PATH. + assert new_mrd.persisted_size == len(_BYTES_TO_UPLOAD) buffer = BytesIO() await new_mrd.download_ranges([(0, 0, buffer)]) await new_mrd.close() From 190498aa15430fd98526465f00b39079d4925940 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 21 Jan 2026 15:32:46 +0000 Subject: [PATCH 3/5] remove unwated file --- tests/system/open_with_read_handle.py | 40 --------------------------- 1 file changed, 40 deletions(-) delete mode 100644 tests/system/open_with_read_handle.py diff --git a/tests/system/open_with_read_handle.py b/tests/system/open_with_read_handle.py deleted file mode 100644 index a8d4fe1c4..000000000 --- a/tests/system/open_with_read_handle.py +++ /dev/null @@ -1,40 +0,0 @@ -# py standard imports -import os -from io import BytesIO - -# current library imports -from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient -from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( - AsyncMultiRangeDownloader, -) - -# TODO: replace this with a fixture once zonal bucket creation / deletion -# is supported in grpc client or json client client. -_ZONAL_BUCKET = 'chandrasiri-rs' -_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object" - - -async def mrd_open_with_read_handle(appendable_object): - grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client - - mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, appendable_object) - await mrd.open() - read_handle = mrd.read_handle - await mrd.close() - - # Open a new MRD using the `read_handle` obtained above - new_mrd = AsyncMultiRangeDownloader( - grpc_client, _ZONAL_BUCKET, appendable_object, read_handle=read_handle - ) - await new_mrd.open() - # persisted_size not set when opened with read_handle - assert new_mrd.persisted_size is None - buffer = BytesIO() - await new_mrd.download_ranges([(0, 0, buffer)]) - await new_mrd.close() - assert buffer.getvalue() == _BYTES_TO_UPLOAD - -if __name__ == "__main__": - import asyncio - asyncio.run(mrd_open_with_read_handle('read_handle_123')) - From 35a9e05485b311db25f2e44f8a2b1da8f299e025 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 21 Jan 2026 16:31:22 +0000 Subject: [PATCH 4/5] fix: rectify handle type --- .../asyncio/async_abstract_object_stream.py | 6 +++--- .../asyncio/async_appendable_object_writer.py | 9 ++++----- .../asyncio/async_multi_range_downloader.py | 12 ++++++------ .../asyncio/async_read_object_stream.py | 8 ++++---- .../asyncio/async_write_object_stream.py | 7 +++---- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 49d7a293a..26cbab7a0 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -32,7 +32,7 @@ class _AsyncAbstractObjectStream(abc.ABC): :param generation_number: (Optional) If present, selects a specific revision of this object. - :type handle: bytes + :type handle: Any :param handle: (Optional) The handle for the object, could be read_handle or write_handle, based on how the stream is used. """ @@ -42,13 +42,13 @@ def __init__( bucket_name: str, object_name: str, generation_number: Optional[int] = None, - handle: Optional[bytes] = None, + handle: Optional[Any] = None, ) -> None: super().__init__() self.bucket_name: str = bucket_name self.object_name: str = object_name self.generation_number: Optional[int] = generation_number - self.handle: Optional[bytes] = handle + self.handle: Optional[Any] = handle @abc.abstractmethod async def open(self) -> None: diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index 444e8d030..d680aa65f 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -49,8 +49,8 @@ def __init__( client: AsyncGrpcClient.grpc_client, bucket_name: str, object_name: str, - generation=None, - write_handle=None, + generation: Optional[int] = None, + write_handle: Optional[_storage_v2.BidiWriteHandle] = None, writer_options: Optional[dict] = None, ): """ @@ -106,10 +106,10 @@ def __init__( overwriting existing objects). Warning: If `None`, a new object is created. If an object with the - same name already exists, it will be overwritten the moment + same name already exists, it will be overwritten the moment `writer.open()` is called. - :type write_handle: bytes + :type write_handle: _storage_v2.BidiWriteHandle :param write_handle: (Optional) An handle for writing the object. If provided, opening the bidi-gRPC connection will be faster. @@ -363,7 +363,6 @@ async def finalize(self) -> _storage_v2.Object: def is_stream_open(self) -> bool: return self._is_stream_open - # helper methods. async def append_from_string(self, data: str): """ diff --git a/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py b/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py index dedff67e2..66a9f0057 100644 --- a/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py +++ b/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py @@ -129,7 +129,7 @@ async def create_mrd( bucket_name: str, object_name: str, generation_number: Optional[int] = None, - read_handle: Optional[bytes] = None, + read_handle: Optional[_storage_v2.BidiReadHandle] = None, retry_policy: Optional[AsyncRetry] = None, metadata: Optional[List[Tuple[str, str]]] = None, ) -> AsyncMultiRangeDownloader: @@ -149,7 +149,7 @@ async def create_mrd( :param generation_number: (Optional) If present, selects a specific revision of this object. - :type read_handle: bytes + :type read_handle: _storage_v2.BidiReadHandle :param read_handle: (Optional) An existing handle for reading the object. If provided, opening the bidi-gRPC connection will be faster. @@ -172,7 +172,7 @@ def __init__( bucket_name: str, object_name: str, generation_number: Optional[int] = None, - read_handle: Optional[bytes] = None, + read_handle: Optional[_storage_v2.BidiReadHandle] = None, ) -> None: """Constructor for AsyncMultiRangeDownloader, clients are not adviced to use it directly. Instead it's adviced to use the classmethod `create_mrd`. @@ -190,7 +190,7 @@ def __init__( :param generation_number: (Optional) If present, selects a specific revision of this object. - :type read_handle: bytes + :type read_handle: _storage_v2.BidiReadHandle :param read_handle: (Optional) An existing read handle. """ @@ -200,7 +200,7 @@ def __init__( self.bucket_name = bucket_name self.object_name = object_name self.generation_number = generation_number - self.read_handle = read_handle + self.read_handle: Optional[_storage_v2.BidiReadHandle] = read_handle self.read_obj_str: Optional[_AsyncReadObjectStream] = None self._is_stream_open: bool = False self._routing_token: Optional[str] = None @@ -493,4 +493,4 @@ async def close(self): @property def is_stream_open(self) -> bool: - return self._is_stream_open + return self._is_stream_open \ No newline at end of file diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 5e84485d5..15772da87 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -51,7 +51,7 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream): :param generation_number: (Optional) If present, selects a specific revision of this object. - :type read_handle: bytes + :type read_handle: _storage_v2.BidiReadHandle :param read_handle: (Optional) An existing handle for reading the object. If provided, opening the bidi-gRPC connection will be faster. """ @@ -62,7 +62,7 @@ def __init__( bucket_name: str, object_name: str, generation_number: Optional[int] = None, - read_handle: Optional[bytes] = None, + read_handle: Optional[_storage_v2.BidiReadHandle] = None, ) -> None: if client is None: raise ValueError("client must be provided") @@ -77,7 +77,7 @@ def __init__( generation_number=generation_number, ) self.client: AsyncGrpcClient.grpc_client = client - self.read_handle: Optional[bytes] = read_handle + self.read_handle: Optional[_storage_v2.BidiReadHandle] = read_handle self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}" @@ -195,4 +195,4 @@ async def recv(self) -> _storage_v2.BidiReadObjectResponse: @property def is_stream_open(self) -> bool: - return self._is_stream_open + return self._is_stream_open \ No newline at end of file diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index b79e707f2..4599712aa 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -59,7 +59,7 @@ class _AsyncWriteObjectStream(_AsyncAbstractObjectStream): same name already exists, it will be overwritten the moment `writer.open()` is called. - :type write_handle: bytes + :type write_handle: _storage_v2.BidiWriteHandle :param write_handle: (Optional) An existing handle for writing the object. If provided, opening the bidi-gRPC connection will be faster. """ @@ -70,7 +70,7 @@ def __init__( bucket_name: str, object_name: str, generation_number: Optional[int] = None, # None means new object - write_handle: Optional[bytes] = None, + write_handle: Optional[_storage_v2.BidiWriteHandle] = None, ) -> None: if client is None: raise ValueError("client must be provided") @@ -85,7 +85,7 @@ def __init__( generation_number=generation_number, ) self.client: AsyncGrpcClient.grpc_client = client - self.write_handle: Optional[bytes] = write_handle + self.write_handle: Optional[_storage_v2.BidiWriteHandle] = write_handle self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}" @@ -212,4 +212,3 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse: @property def is_stream_open(self) -> bool: return self._is_stream_open - From 9bc6375a2c764effd661174dc2fe6f44f1b9a65a Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 21 Jan 2026 18:04:17 +0000 Subject: [PATCH 5/5] feat: add support for opening via write handle --- .../asyncio/async_appendable_object_writer.py | 2 +- .../asyncio/async_write_object_stream.py | 41 +++++++----- tests/system/test_zonal.py | 50 +++++++++++++++ .../asyncio/test_async_write_object_stream.py | 64 ++++++++++++++----- 4 files changed, 126 insertions(+), 31 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index d680aa65f..c961fbefb 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -96,7 +96,7 @@ def __init__( :type object_name: str :param object_name: The name of the GCS Appendable Object to be written. - :type generation: int + :type generation: Optional[int] :param generation: (Optional) If present, creates writer for that specific revision of that object. Use this to append data to an existing Appendable Object. diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index 4599712aa..731b18e45 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -120,6 +120,9 @@ async def open(self) -> None: # Created object type would be Appendable Object. # if `generation_number` == 0 new object will be created only if there # isn't any existing object. + is_open_via_write_handle = ( + self.write_handle is not None and self.generation_number + ) if self.generation_number is None or self.generation_number == 0: self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest( write_object_spec=_storage_v2.WriteObjectSpec( @@ -136,6 +139,7 @@ async def open(self) -> None: bucket=self._full_bucket_name, object=self.object_name, generation=self.generation_number, + write_handle=self.write_handle, ), ) self.socket_like_rpc = AsyncBidiRpc( @@ -145,25 +149,32 @@ async def open(self) -> None: await self.socket_like_rpc.open() # this is actually 1 send response = await self.socket_like_rpc.recv() self._is_stream_open = True - - if not response.resource: - raise ValueError( - "Failed to obtain object resource after opening the stream" - ) - if not response.resource.generation: - raise ValueError( - "Failed to obtain object generation after opening the stream" - ) + if is_open_via_write_handle: + # Don't use if not response.persisted_size because this will be true + # if persisted_size==0 (0 is considered "Falsy" in Python) + if response.persisted_size is None: + raise ValueError( + "Failed to obtain persisted_size after opening the stream via write_handle" + ) + self.persisted_size = response.persisted_size + else: + if not response.resource: + raise ValueError( + "Failed to obtain object resource after opening the stream" + ) + if not response.resource.generation: + raise ValueError( + "Failed to obtain object generation after opening the stream" + ) + if not response.resource.size: + # Appending to a 0 byte appendable object. + self.persisted_size = 0 + else: + self.persisted_size = response.resource.size if not response.write_handle: raise ValueError("Failed to obtain write_handle after opening the stream") - if not response.resource.size: - # Appending to a 0 byte appendable object. - self.persisted_size = 0 - else: - self.persisted_size = response.resource.size - self.generation_number = response.resource.generation self.write_handle = response.write_handle diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index b5291ca08..0bd61ff53 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -333,6 +333,56 @@ async def _run(): event_loop.run_until_complete(_run()) +def test_wrd_open_with_write_handle( + event_loop, grpc_client_direct, storage_client, blobs_to_delete +): + object_name = f"test_write_handl-{str(uuid.uuid4())[:4]}" + + async def _run(): + # 1. Create an object and get its write_handle + writer = AsyncAppendableObjectWriter( + grpc_client_direct, _ZONAL_BUCKET, object_name + ) + await writer.open() + write_handle = writer.write_handle + await writer.close() + + # 2. Open a new writer using the obtained `write_handle` and generation + new_writer = AsyncAppendableObjectWriter( + grpc_client_direct, + _ZONAL_BUCKET, + object_name, + write_handle=write_handle, + generation=writer.generation, + ) + await new_writer.open() + # Verify that the new writer is open and has the same write_handle + assert new_writer.is_stream_open + assert new_writer.generation == writer.generation + + # 3. Append some data using the new writer + test_data = b"data_from_new_writer" + await new_writer.append(test_data) + await new_writer.close() + + # 4. Verify the data was written correctly by reading it back + mrd = AsyncMultiRangeDownloader(grpc_client_direct, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == test_data + + # Clean up + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del new_writer + del mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + def test_read_unfinalized_appendable_object_with_generation( storage_client, blobs_to_delete, event_loop, grpc_client_direct ): diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index f0d90543f..619d5f7e6 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -25,6 +25,7 @@ OBJECT = "my-object" GENERATION = 12345 WRITE_HANDLE = b"test-handle" +WRITE_HANDLE_PROTO = _storage_v2.BidiWriteHandle(handle=WRITE_HANDLE) @pytest.fixture @@ -56,7 +57,7 @@ async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, ope mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) mock_response.resource.generation = GENERATION mock_response.resource.size = 0 - mock_response.write_handle = WRITE_HANDLE + mock_response.write_handle = WRITE_HANDLE_PROTO socket_like_rpc.recv = AsyncMock(return_value=mock_response) write_obj_stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) @@ -90,18 +91,16 @@ def test_async_write_object_stream_init(mock_client): def test_async_write_object_stream_init_with_generation_and_handle(mock_client): """Test the constructor with optional arguments.""" - generation = 12345 - write_handle = b"test-handle" stream = _AsyncWriteObjectStream( mock_client, BUCKET, OBJECT, - generation_number=generation, - write_handle=write_handle, + generation_number=GENERATION, + write_handle=WRITE_HANDLE_PROTO, ) - assert stream.generation_number == generation - assert stream.write_handle == write_handle + assert stream.generation_number == GENERATION + assert stream.write_handle == WRITE_HANDLE_PROTO def test_async_write_object_stream_init_raises_value_error(): @@ -131,7 +130,7 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client): mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) mock_response.resource.generation = GENERATION mock_response.resource.size = 0 - mock_response.write_handle = WRITE_HANDLE + mock_response.write_handle = WRITE_HANDLE_PROTO socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) @@ -144,7 +143,7 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client): socket_like_rpc.open.assert_called_once() socket_like_rpc.recv.assert_called_once() assert stream.generation_number == GENERATION - assert stream.write_handle == WRITE_HANDLE + assert stream.write_handle == WRITE_HANDLE_PROTO assert stream.persisted_size == 0 @@ -163,7 +162,7 @@ async def test_open_for_new_object_with_generation_zero(mock_async_bidi_rpc, moc mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) mock_response.resource.generation = GENERATION mock_response.resource.size = 0 - mock_response.write_handle = WRITE_HANDLE + mock_response.write_handle = WRITE_HANDLE_PROTO socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT, generation_number=0) @@ -180,7 +179,7 @@ async def test_open_for_new_object_with_generation_zero(mock_async_bidi_rpc, moc socket_like_rpc.open.assert_called_once() socket_like_rpc.recv.assert_called_once() assert stream.generation_number == GENERATION - assert stream.write_handle == WRITE_HANDLE + assert stream.write_handle == WRITE_HANDLE_PROTO assert stream.persisted_size == 0 @@ -199,7 +198,7 @@ async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client): mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) mock_response.resource.size = 1024 mock_response.resource.generation = GENERATION - mock_response.write_handle = WRITE_HANDLE + mock_response.write_handle = WRITE_HANDLE_PROTO socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) stream = _AsyncWriteObjectStream( @@ -214,7 +213,7 @@ async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client): socket_like_rpc.open.assert_called_once() socket_like_rpc.recv.assert_called_once() assert stream.generation_number == GENERATION - assert stream.write_handle == WRITE_HANDLE + assert stream.write_handle == WRITE_HANDLE_PROTO assert stream.persisted_size == 1024 @@ -233,7 +232,7 @@ async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_cli mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) mock_response.resource.generation = GENERATION mock_response.resource.size = 0 - mock_response.write_handle = WRITE_HANDLE + mock_response.write_handle = WRITE_HANDLE_PROTO socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) @@ -315,6 +314,41 @@ async def test_open_raises_error_on_missing_write_handle( await stream.open() +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_raises_error_on_missing_persisted_size_with_write_handle( + mock_async_bidi_rpc, mock_client +): + """Test that open raises ValueError if persisted_size is None when opened via write_handle.""" + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + + # + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.persisted_size = None # This is the key part of the test + mock_response.write_handle = ( + WRITE_HANDLE_PROTO # Ensure write_handle is present to avoid that error + ) + socket_like_rpc.recv.return_value = mock_response + + # ACT + stream = _AsyncWriteObjectStream( + mock_client, + BUCKET, + OBJECT, + write_handle=WRITE_HANDLE_PROTO, + generation_number=GENERATION, + ) + + with pytest.raises( + ValueError, + match="Failed to obtain persisted_size after opening the stream via write_handle", + ): + await stream.open() + + @pytest.mark.asyncio @mock.patch( "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" @@ -452,4 +486,4 @@ async def test_requests_done(mock_cls_async_bidi_rpc, mock_client): # Assert write_obj_stream.socket_like_rpc.send.assert_called_once_with(None) - write_obj_stream.socket_like_rpc.recv.assert_called_once() \ No newline at end of file + write_obj_stream.socket_like_rpc.recv.assert_called_once()