Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class _AsyncAbstractObjectStream(abc.ABC):
:param generation_number: (Optional) If present, selects a specific revision of
this object.

:type handle: bytes
:type handle: Any
:param handle: (Optional) The handle for the object, could be read_handle or
write_handle, based on how the stream is used.
"""
Expand All @@ -42,13 +42,13 @@ def __init__(
bucket_name: str,
object_name: str,
generation_number: Optional[int] = None,
handle: Optional[bytes] = None,
handle: Optional[Any] = None,
) -> None:
super().__init__()
self.bucket_name: str = bucket_name
self.object_name: str = object_name
self.generation_number: Optional[int] = generation_number
self.handle: Optional[bytes] = handle
self.handle: Optional[Any] = handle

@abc.abstractmethod
async def open(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def __init__(
client: AsyncGrpcClient.grpc_client,
bucket_name: str,
object_name: str,
generation=None,
write_handle=None,
generation: Optional[int] = None,
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
writer_options: Optional[dict] = None,
):
"""
Expand Down Expand Up @@ -96,7 +96,7 @@ def __init__(
:type object_name: str
:param object_name: The name of the GCS Appendable Object to be written.

:type generation: int
:type generation: Optional[int]
:param generation: (Optional) If present, creates writer for that
specific revision of that object. Use this to append data to an
existing Appendable Object.
Expand All @@ -106,10 +106,10 @@ def __init__(
overwriting existing objects).

Warning: If `None`, a new object is created. If an object with the
same name already exists, it will be overwritten the moment
same name already exists, it will be overwritten the moment
`writer.open()` is called.

:type write_handle: bytes
:type write_handle: _storage_v2.BidiWriteHandle
:param write_handle: (Optional) An handle for writing the object.
If provided, opening the bidi-gRPC connection will be faster.

Expand Down Expand Up @@ -363,7 +363,6 @@ async def finalize(self) -> _storage_v2.Object:
def is_stream_open(self) -> bool:
return self._is_stream_open


# helper methods.
async def append_from_string(self, data: str):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async def create_mrd(
bucket_name: str,
object_name: str,
generation_number: Optional[int] = None,
read_handle: Optional[bytes] = None,
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
retry_policy: Optional[AsyncRetry] = None,
metadata: Optional[List[Tuple[str, str]]] = None,
) -> AsyncMultiRangeDownloader:
Expand All @@ -149,7 +149,7 @@ async def create_mrd(
:param generation_number: (Optional) If present, selects a specific
revision of this object.

:type read_handle: bytes
:type read_handle: _storage_v2.BidiReadHandle
:param read_handle: (Optional) An existing handle for reading the object.
If provided, opening the bidi-gRPC connection will be faster.

Expand All @@ -172,7 +172,7 @@ def __init__(
bucket_name: str,
object_name: str,
generation_number: Optional[int] = None,
read_handle: Optional[bytes] = None,
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
) -> None:
"""Constructor for AsyncMultiRangeDownloader, clients are not adviced to
use it directly. Instead it's adviced to use the classmethod `create_mrd`.
Expand All @@ -190,7 +190,7 @@ def __init__(
:param generation_number: (Optional) If present, selects a specific revision of
this object.

:type read_handle: bytes
:type read_handle: _storage_v2.BidiReadHandle
:param read_handle: (Optional) An existing read handle.
"""

Expand All @@ -200,7 +200,7 @@ def __init__(
self.bucket_name = bucket_name
self.object_name = object_name
self.generation_number = generation_number
self.read_handle = read_handle
self.read_handle: Optional[_storage_v2.BidiReadHandle] = read_handle
self.read_obj_str: Optional[_AsyncReadObjectStream] = None
self._is_stream_open: bool = False
self._routing_token: Optional[str] = None
Expand Down Expand Up @@ -493,4 +493,4 @@ async def close(self):

@property
def is_stream_open(self) -> bool:
return self._is_stream_open
return self._is_stream_open
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream):
:param generation_number: (Optional) If present, selects a specific revision of
this object.

:type read_handle: bytes
:type read_handle: _storage_v2.BidiReadHandle
:param read_handle: (Optional) An existing handle for reading the object.
If provided, opening the bidi-gRPC connection will be faster.
"""
Expand All @@ -62,7 +62,7 @@ def __init__(
bucket_name: str,
object_name: str,
generation_number: Optional[int] = None,
read_handle: Optional[bytes] = None,
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
) -> None:
if client is None:
raise ValueError("client must be provided")
Expand All @@ -77,7 +77,7 @@ def __init__(
generation_number=generation_number,
)
self.client: AsyncGrpcClient.grpc_client = client
self.read_handle: Optional[bytes] = read_handle
self.read_handle: Optional[_storage_v2.BidiReadHandle] = read_handle

self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"

Expand Down Expand Up @@ -195,4 +195,4 @@ async def recv(self) -> _storage_v2.BidiReadObjectResponse:

@property
def is_stream_open(self) -> bool:
return self._is_stream_open
return self._is_stream_open
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class _AsyncWriteObjectStream(_AsyncAbstractObjectStream):
same name already exists, it will be overwritten the moment
`writer.open()` is called.

:type write_handle: bytes
:type write_handle: _storage_v2.BidiWriteHandle
:param write_handle: (Optional) An existing handle for writing the object.
If provided, opening the bidi-gRPC connection will be faster.
"""
Expand All @@ -70,7 +70,7 @@ def __init__(
bucket_name: str,
object_name: str,
generation_number: Optional[int] = None, # None means new object
write_handle: Optional[bytes] = None,
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
) -> None:
if client is None:
raise ValueError("client must be provided")
Expand All @@ -85,7 +85,7 @@ def __init__(
generation_number=generation_number,
)
self.client: AsyncGrpcClient.grpc_client = client
self.write_handle: Optional[bytes] = write_handle
self.write_handle: Optional[_storage_v2.BidiWriteHandle] = write_handle

self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"

Expand Down Expand Up @@ -120,6 +120,9 @@ async def open(self) -> None:
# Created object type would be Appendable Object.
# if `generation_number` == 0 new object will be created only if there
# isn't any existing object.
is_open_via_write_handle = (
self.write_handle is not None and self.generation_number
)
if self.generation_number is None or self.generation_number == 0:
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
write_object_spec=_storage_v2.WriteObjectSpec(
Expand All @@ -136,6 +139,7 @@ async def open(self) -> None:
bucket=self._full_bucket_name,
object=self.object_name,
generation=self.generation_number,
write_handle=self.write_handle,
),
)
self.socket_like_rpc = AsyncBidiRpc(
Expand All @@ -145,25 +149,32 @@ async def open(self) -> None:
await self.socket_like_rpc.open() # this is actually 1 send
response = await self.socket_like_rpc.recv()
self._is_stream_open = True

if not response.resource:
raise ValueError(
"Failed to obtain object resource after opening the stream"
)
if not response.resource.generation:
raise ValueError(
"Failed to obtain object generation after opening the stream"
)
if is_open_via_write_handle:
# Don't use if not response.persisted_size because this will be true
# if persisted_size==0 (0 is considered "Falsy" in Python)
if response.persisted_size is None:
raise ValueError(
"Failed to obtain persisted_size after opening the stream via write_handle"
)
self.persisted_size = response.persisted_size
else:
if not response.resource:
raise ValueError(
"Failed to obtain object resource after opening the stream"
)
if not response.resource.generation:
raise ValueError(
"Failed to obtain object generation after opening the stream"
)
if not response.resource.size:
# Appending to a 0 byte appendable object.
self.persisted_size = 0
else:
self.persisted_size = response.resource.size

if not response.write_handle:
raise ValueError("Failed to obtain write_handle after opening the stream")

if not response.resource.size:
# Appending to a 0 byte appendable object.
self.persisted_size = 0
else:
self.persisted_size = response.resource.size

self.generation_number = response.resource.generation
self.write_handle = response.write_handle

Expand Down Expand Up @@ -212,4 +223,3 @@ async def recv(self) -> _storage_v2.BidiWriteObjectResponse:
@property
def is_stream_open(self) -> bool:
return self._is_stream_open

50 changes: 50 additions & 0 deletions tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,56 @@ async def _run():
event_loop.run_until_complete(_run())


def test_wrd_open_with_write_handle(
event_loop, grpc_client_direct, storage_client, blobs_to_delete
):
object_name = f"test_write_handl-{str(uuid.uuid4())[:4]}"

async def _run():
# 1. Create an object and get its write_handle
writer = AsyncAppendableObjectWriter(
grpc_client_direct, _ZONAL_BUCKET, object_name
)
await writer.open()
write_handle = writer.write_handle
await writer.close()

# 2. Open a new writer using the obtained `write_handle` and generation
new_writer = AsyncAppendableObjectWriter(
grpc_client_direct,
_ZONAL_BUCKET,
object_name,
write_handle=write_handle,
generation=writer.generation,
)
await new_writer.open()
# Verify that the new writer is open and has the same write_handle
assert new_writer.is_stream_open
assert new_writer.generation == writer.generation

# 3. Append some data using the new writer
test_data = b"data_from_new_writer"
await new_writer.append(test_data)
await new_writer.close()

# 4. Verify the data was written correctly by reading it back
mrd = AsyncMultiRangeDownloader(grpc_client_direct, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == test_data

# Clean up
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
del writer
del new_writer
del mrd
gc.collect()

event_loop.run_until_complete(_run())


def test_read_unfinalized_appendable_object_with_generation(
storage_client, blobs_to_delete, event_loop, grpc_client_direct
):
Expand Down
Loading