diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 49d7a293a..26cbab7a0 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -32,7 +32,7 @@ class _AsyncAbstractObjectStream(abc.ABC): :param generation_number: (Optional) If present, selects a specific revision of this object. - :type handle: bytes + :type handle: Any :param handle: (Optional) The handle for the object, could be read_handle or write_handle, based on how the stream is used. """ @@ -42,13 +42,13 @@ def __init__( bucket_name: str, object_name: str, generation_number: Optional[int] = None, - handle: Optional[bytes] = None, + handle: Optional[Any] = None, ) -> None: super().__init__() self.bucket_name: str = bucket_name self.object_name: str = object_name self.generation_number: Optional[int] = generation_number - self.handle: Optional[bytes] = handle + self.handle: Optional[Any] = handle @abc.abstractmethod async def open(self) -> None: diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index 444e8d030..c961fbefb 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -49,8 +49,8 @@ def __init__( client: AsyncGrpcClient.grpc_client, bucket_name: str, object_name: str, - generation=None, - write_handle=None, + generation: Optional[int] = None, + write_handle: Optional[_storage_v2.BidiWriteHandle] = None, writer_options: Optional[dict] = None, ): """ @@ -96,7 +96,7 @@ def __init__( :type object_name: str :param object_name: The name of the GCS Appendable Object to be written. - :type generation: int + :type generation: Optional[int] :param generation: (Optional) If present, creates writer for that specific revision of that object. Use this to append data to an existing Appendable Object. @@ -106,10 +106,10 @@ def __init__( overwriting existing objects). Warning: If `None`, a new object is created. If an object with the - same name already exists, it will be overwritten the moment + same name already exists, it will be overwritten the moment `writer.open()` is called. - :type write_handle: bytes + :type write_handle: _storage_v2.BidiWriteHandle :param write_handle: (Optional) An handle for writing the object. If provided, opening the bidi-gRPC connection will be faster. @@ -363,7 +363,6 @@ async def finalize(self) -> _storage_v2.Object: def is_stream_open(self) -> bool: return self._is_stream_open - # helper methods. async def append_from_string(self, data: str): """ diff --git a/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py b/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py index dedff67e2..66a9f0057 100644 --- a/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py +++ b/google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py @@ -129,7 +129,7 @@ async def create_mrd( bucket_name: str, object_name: str, generation_number: Optional[int] = None, - read_handle: Optional[bytes] = None, + read_handle: Optional[_storage_v2.BidiReadHandle] = None, retry_policy: Optional[AsyncRetry] = None, metadata: Optional[List[Tuple[str, str]]] = None, ) -> AsyncMultiRangeDownloader: @@ -149,7 +149,7 @@ async def create_mrd( :param generation_number: (Optional) If present, selects a specific revision of this object. - :type read_handle: bytes + :type read_handle: _storage_v2.BidiReadHandle :param read_handle: (Optional) An existing handle for reading the object. If provided, opening the bidi-gRPC connection will be faster. @@ -172,7 +172,7 @@ def __init__( bucket_name: str, object_name: str, generation_number: Optional[int] = None, - read_handle: Optional[bytes] = None, + read_handle: Optional[_storage_v2.BidiReadHandle] = None, ) -> None: """Constructor for AsyncMultiRangeDownloader, clients are not adviced to use it directly. Instead it's adviced to use the classmethod `create_mrd`. @@ -190,7 +190,7 @@ def __init__( :param generation_number: (Optional) If present, selects a specific revision of this object. - :type read_handle: bytes + :type read_handle: _storage_v2.BidiReadHandle :param read_handle: (Optional) An existing read handle. """ @@ -200,7 +200,7 @@ def __init__( self.bucket_name = bucket_name self.object_name = object_name self.generation_number = generation_number - self.read_handle = read_handle + self.read_handle: Optional[_storage_v2.BidiReadHandle] = read_handle self.read_obj_str: Optional[_AsyncReadObjectStream] = None self._is_stream_open: bool = False self._routing_token: Optional[str] = None @@ -493,4 +493,4 @@ async def close(self): @property def is_stream_open(self) -> bool: - return self._is_stream_open + return self._is_stream_open \ No newline at end of file diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 5e84485d5..15772da87 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -51,7 +51,7 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream): :param generation_number: (Optional) If present, selects a specific revision of this object. - :type read_handle: bytes + :type read_handle: _storage_v2.BidiReadHandle :param read_handle: (Optional) An existing handle for reading the object. If provided, opening the bidi-gRPC connection will be faster. """ @@ -62,7 +62,7 @@ def __init__( bucket_name: str, object_name: str, generation_number: Optional[int] = None, - read_handle: Optional[bytes] = None, + read_handle: Optional[_storage_v2.BidiReadHandle] = None, ) -> None: if client is None: raise ValueError("client must be provided") @@ -77,7 +77,7 @@ def __init__( generation_number=generation_number, ) self.client: AsyncGrpcClient.grpc_client = client - self.read_handle: Optional[bytes] = read_handle + self.read_handle: Optional[_storage_v2.BidiReadHandle] = read_handle self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}" @@ -195,4 +195,4 @@ async def recv(self) -> _storage_v2.BidiReadObjectResponse: @property def is_stream_open(self) -> bool: - return self._is_stream_open + return self._is_stream_open \ No newline at end of file diff --git a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py index b79e707f2..731b18e45 100644 --- a/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_write_object_stream.py @@ -59,7 +59,7 @@ class _AsyncWriteObjectStream(_AsyncAbstractObjectStream): same name already exists, it will be overwritten the moment `writer.open()` is called. - :type write_handle: bytes + :type write_handle: _storage_v2.BidiWriteHandle :param write_handle: (Optional) An existing handle for writing the object. If provided, opening the bidi-gRPC connection will be faster. """ @@ -70,7 +70,7 @@ def __init__( bucket_name: str, object_name: str, generation_number: Optional[int] = None, # None means new object - write_handle: Optional[bytes] = None, + write_handle: Optional[_storage_v2.BidiWriteHandle] = None, ) -> None: if client is None: raise ValueError("client must be provided") @@ -85,7 +85,7 @@ def __init__( generation_number=generation_number, ) self.client: AsyncGrpcClient.grpc_client = client - self.write_handle: Optional[bytes] = write_handle + self.write_handle: Optional[_storage_v2.BidiWriteHandle] = write_handle self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}" @@ -120,6 +120,9 @@ async def open(self) -> None: # Created object type would be Appendable Object. # if `generation_number` == 0 new object will be created only if there # isn't any existing object. + is_open_via_write_handle = ( + self.write_handle is not None and self.generation_number + ) if self.generation_number is None or self.generation_number == 0: self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest( write_object_spec=_storage_v2.WriteObjectSpec( @@ -136,6 +139,7 @@ async def open(self) -> None: bucket=self._full_bucket_name, object=self.object_name, generation=self.generation_number, + write_handle=self.write_handle, ), ) self.socket_like_rpc = AsyncBidiRpc( @@ -145,25 +149,32 @@ async def open(self) -> None: await self.socket_like_rpc.open() # this is actually 1 send response = await self.socket_like_rpc.recv() self._is_stream_open = True - - if not response.resource: - raise ValueError( - "Failed to obtain object resource after opening the stream" - ) - if not response.resource.generation: - raise ValueError( - "Failed to obtain object generation after opening the stream" - ) + if is_open_via_write_handle: + # Don't use if not response.persisted_size because this will be true + # if persisted_size==0 (0 is considered "Falsy" in Python) + if response.persisted_size is None: + raise ValueError( + "Failed to obtain persisted_size after opening the stream via write_handle" + ) + self.persisted_size = response.persisted_size + else: + if not response.resource: + raise ValueError( + "Failed to obtain object resource after opening the stream" + ) + if not response.resource.generation: + raise ValueError( + "Failed to obtain object generation after opening the stream" + ) + if not response.resource.size: + # Appending to a 0 byte appendable object. + self.persisted_size = 0 + else: + self.persisted_size = response.resource.size if not response.write_handle: raise ValueError("Failed to obtain write_handle after opening the stream") - if not response.resource.size: - # Appending to a 0 byte appendable object. - self.persisted_size = 0 - else: - self.persisted_size = response.resource.size - self.generation_number = response.resource.generation self.write_handle = response.write_handle @@ -212,4 +223,3 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse: @property def is_stream_open(self) -> bool: return self._is_stream_open - diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index b5291ca08..0bd61ff53 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -333,6 +333,56 @@ async def _run(): event_loop.run_until_complete(_run()) +def test_wrd_open_with_write_handle( + event_loop, grpc_client_direct, storage_client, blobs_to_delete +): + object_name = f"test_write_handl-{str(uuid.uuid4())[:4]}" + + async def _run(): + # 1. Create an object and get its write_handle + writer = AsyncAppendableObjectWriter( + grpc_client_direct, _ZONAL_BUCKET, object_name + ) + await writer.open() + write_handle = writer.write_handle + await writer.close() + + # 2. Open a new writer using the obtained `write_handle` and generation + new_writer = AsyncAppendableObjectWriter( + grpc_client_direct, + _ZONAL_BUCKET, + object_name, + write_handle=write_handle, + generation=writer.generation, + ) + await new_writer.open() + # Verify that the new writer is open and has the same write_handle + assert new_writer.is_stream_open + assert new_writer.generation == writer.generation + + # 3. Append some data using the new writer + test_data = b"data_from_new_writer" + await new_writer.append(test_data) + await new_writer.close() + + # 4. Verify the data was written correctly by reading it back + mrd = AsyncMultiRangeDownloader(grpc_client_direct, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == test_data + + # Clean up + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + del writer + del new_writer + del mrd + gc.collect() + + event_loop.run_until_complete(_run()) + + def test_read_unfinalized_appendable_object_with_generation( storage_client, blobs_to_delete, event_loop, grpc_client_direct ): diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index f0d90543f..619d5f7e6 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -25,6 +25,7 @@ OBJECT = "my-object" GENERATION = 12345 WRITE_HANDLE = b"test-handle" +WRITE_HANDLE_PROTO = _storage_v2.BidiWriteHandle(handle=WRITE_HANDLE) @pytest.fixture @@ -56,7 +57,7 @@ async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, ope mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) mock_response.resource.generation = GENERATION mock_response.resource.size = 0 - mock_response.write_handle = WRITE_HANDLE + mock_response.write_handle = WRITE_HANDLE_PROTO socket_like_rpc.recv = AsyncMock(return_value=mock_response) write_obj_stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) @@ -90,18 +91,16 @@ def test_async_write_object_stream_init(mock_client): def test_async_write_object_stream_init_with_generation_and_handle(mock_client): """Test the constructor with optional arguments.""" - generation = 12345 - write_handle = b"test-handle" stream = _AsyncWriteObjectStream( mock_client, BUCKET, OBJECT, - generation_number=generation, - write_handle=write_handle, + generation_number=GENERATION, + write_handle=WRITE_HANDLE_PROTO, ) - assert stream.generation_number == generation - assert stream.write_handle == write_handle + assert stream.generation_number == GENERATION + assert stream.write_handle == WRITE_HANDLE_PROTO def test_async_write_object_stream_init_raises_value_error(): @@ -131,7 +130,7 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client): mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) mock_response.resource.generation = GENERATION mock_response.resource.size = 0 - mock_response.write_handle = WRITE_HANDLE + mock_response.write_handle = WRITE_HANDLE_PROTO socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) @@ -144,7 +143,7 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client): socket_like_rpc.open.assert_called_once() socket_like_rpc.recv.assert_called_once() assert stream.generation_number == GENERATION - assert stream.write_handle == WRITE_HANDLE + assert stream.write_handle == WRITE_HANDLE_PROTO assert stream.persisted_size == 0 @@ -163,7 +162,7 @@ async def test_open_for_new_object_with_generation_zero(mock_async_bidi_rpc, moc mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) mock_response.resource.generation = GENERATION mock_response.resource.size = 0 - mock_response.write_handle = WRITE_HANDLE + mock_response.write_handle = WRITE_HANDLE_PROTO socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT, generation_number=0) @@ -180,7 +179,7 @@ async def test_open_for_new_object_with_generation_zero(mock_async_bidi_rpc, moc socket_like_rpc.open.assert_called_once() socket_like_rpc.recv.assert_called_once() assert stream.generation_number == GENERATION - assert stream.write_handle == WRITE_HANDLE + assert stream.write_handle == WRITE_HANDLE_PROTO assert stream.persisted_size == 0 @@ -199,7 +198,7 @@ async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client): mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) mock_response.resource.size = 1024 mock_response.resource.generation = GENERATION - mock_response.write_handle = WRITE_HANDLE + mock_response.write_handle = WRITE_HANDLE_PROTO socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) stream = _AsyncWriteObjectStream( @@ -214,7 +213,7 @@ async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client): socket_like_rpc.open.assert_called_once() socket_like_rpc.recv.assert_called_once() assert stream.generation_number == GENERATION - assert stream.write_handle == WRITE_HANDLE + assert stream.write_handle == WRITE_HANDLE_PROTO assert stream.persisted_size == 1024 @@ -233,7 +232,7 @@ async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_cli mock_response.resource = mock.MagicMock(spec=_storage_v2.Object) mock_response.resource.generation = GENERATION mock_response.resource.size = 0 - mock_response.write_handle = WRITE_HANDLE + mock_response.write_handle = WRITE_HANDLE_PROTO socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response) stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) @@ -315,6 +314,41 @@ async def test_open_raises_error_on_missing_write_handle( await stream.open() +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" +) +async def test_open_raises_error_on_missing_persisted_size_with_write_handle( + mock_async_bidi_rpc, mock_client +): + """Test that open raises ValueError if persisted_size is None when opened via write_handle.""" + socket_like_rpc = mock.AsyncMock() + mock_async_bidi_rpc.return_value = socket_like_rpc + + # + mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse) + mock_response.persisted_size = None # This is the key part of the test + mock_response.write_handle = ( + WRITE_HANDLE_PROTO # Ensure write_handle is present to avoid that error + ) + socket_like_rpc.recv.return_value = mock_response + + # ACT + stream = _AsyncWriteObjectStream( + mock_client, + BUCKET, + OBJECT, + write_handle=WRITE_HANDLE_PROTO, + generation_number=GENERATION, + ) + + with pytest.raises( + ValueError, + match="Failed to obtain persisted_size after opening the stream via write_handle", + ): + await stream.open() + + @pytest.mark.asyncio @mock.patch( "google.cloud.storage._experimental.asyncio.async_write_object_stream.AsyncBidiRpc" @@ -452,4 +486,4 @@ async def test_requests_done(mock_cls_async_bidi_rpc, mock_client): # Assert write_obj_stream.socket_like_rpc.send.assert_called_once_with(None) - write_obj_stream.socket_like_rpc.recv.assert_called_once() \ No newline at end of file + write_obj_stream.socket_like_rpc.recv.assert_called_once()