Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions samples/snippets/zonal_buckets/bidi_writes_from_gapic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""


async def open_file('file_name')


async def append_to_file('file_name')


async def close('')

"""
Comment on lines +1 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current block at the top of the file appears to be scratchpad notes rather than a proper module docstring. To improve clarity and adhere to Python conventions, it should be replaced with a concise docstring that explains the script's purpose.

"""A script to demonstrate bidi writes using the google-cloud-storage GAPIC client."""


import os
import time
import asyncio
from google.cloud import _storage_v2 as storage_v2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The script imports _storage_v2, which is a private module as indicated by the leading underscore. Public samples should only use the public API of the library. Please use the public google.cloud.storage module if possible. If this feature is only available in the private module, this script might not be suitable as a public sample.

import argparse

# from async_get_object_metadata import get_object

BUCKET_NAME = "chandrasiri-rs"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The bucket name is hardcoded, which limits the script's reusability. Consider making it a command-line argument, similar to filename.

You can add the following to your ArgumentParser setup:

parser.add_argument("--bucket_name", required=True)

And then use args.bucket_name here.

BUCKET_FULLNAME = f"projects/_/buckets/{BUCKET_NAME}"


def generate_random_bytes_os_urandom(length_mib):
"""
Generates cryptographically secure random bytes of a specified length in MiB.

Args:
length_mib (int): The desired length of random bytes in MiB (Megabytes).

Returns:
bytes: A bytes object containing the random bytes.
"""
length_bytes = length_mib * 1024 * 1024 # Convert MiB to bytes
random_bytes = os.urandom(length_bytes)
return random_bytes


def create_async_client():
transport_cls = storage_v2.StorageClient.get_transport_class(label="grpc_asyncio")
channel = transport_cls.create_channel(attempt_direct_path=True)
transport = transport_cls(channel=channel)
async_client = storage_v2.StorageAsyncClient(transport=transport)

return async_client


async def open_file(filename, client):
first_request = storage_v2.BidiWriteObjectRequest(
write_object_spec=storage_v2.WriteObjectSpec(
resource=storage_v2.Object(name=filename, bucket=BUCKET_FULLNAME),
appendable=True,
),
)

def request_generator():
for request in [first_request]:
yield request
Comment on lines +58 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The request_generator function can be simplified. The for loop over a single-element list is redundant. A simple yield is sufficient.

    def request_generator():
        yield first_request


req_param = f"bucket={BUCKET_FULLNAME}"
response_stream = await client.bidi_write_object(
requests=request_generator(),
metadata=(
("x-goog-request-params", req_param),
("x-goog-api-client", "gcloud-python-local/3.8.0"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The x-goog-api-client header is hardcoded. This is likely for specific debugging purposes. For a general-purpose script, it's better to remove this line and let the client library set its default user-agent header.

),
)
generation = None
write_handle = None
count = 0
async for response in response_stream:
# print("stream count:", count, "*" * 20)
# print(response)
# print("time elapsed", time_elapsed)
# print("stream count:", count, "*" * 20)
if response.resource is not None and (generation is None):

generation = response.resource.generation

# print("genration = ", generation)
if response.write_handle is not None and (write_handle is None):
write_handle = response.write_handle.handle
# print("write_handle = ", write_handle)

# if
Comment on lines +73 to +87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block contains a significant amount of commented-out print statements and a stray # if which seems to be leftover from development. This dead code should be removed to improve readability and maintainability.

    async for response in response_stream:
        if response.resource is not None and (generation is None):
            generation = response.resource.generation
        if response.write_handle is not None and (write_handle is None):
            write_handle = response.write_handle.handle

return generation, write_handle


async def append_to_file(filename, generation, client, data, write_handle=None):
# print("generation in append ", generation)

def request_generator_for_append():
# current persisted size of object
# start_offset = 1 * 1024 * 1024
start_offset = 0
curr_byte = 0
total_bytes = len(data)
chunk_size = 2 * 1024 * 1024
stream_count = 0
if total_bytes == 0:
# print("@@" * 20)
yield storage_v2.BidiWriteObjectRequest(
append_object_spec=storage_v2.AppendObjectSpec(
bucket=BUCKET_FULLNAME,
object=filename,
generation=generation,
),
checksummed_data=storage_v2.ChecksummedData(content=b""),
write_offset=curr_byte + start_offset,
# flush=True,
# state_lookup=True,
# finish_write=True,
)
while curr_byte < total_bytes:
curr_chunk_size = min(chunk_size, total_bytes - curr_byte)
chunked_data = data[curr_byte : curr_byte + curr_chunk_size]
# create req
if stream_count == 0:
bidi_request = storage_v2.BidiWriteObjectRequest(
append_object_spec=storage_v2.AppendObjectSpec(
bucket=BUCKET_FULLNAME,
object=filename,
generation=generation,
write_handle=storage_v2.BidiWriteHandle(handle=write_handle),
),
checksummed_data=storage_v2.ChecksummedData(content=chunked_data),
write_offset=curr_byte + start_offset,
)
else:
bidi_request = storage_v2.BidiWriteObjectRequest(
checksummed_data=storage_v2.ChecksummedData(content=chunked_data),
write_offset=curr_byte + start_offset,
)

if curr_byte + chunk_size >= total_bytes:
bidi_request.flush = True
bidi_request.state_lookup = True

# yield req
yield bidi_request
curr_byte += curr_chunk_size
stream_count += 1

req_param = f"bucket={BUCKET_FULLNAME}"
append_stream = await client.bidi_write_object(
requests=request_generator_for_append(),
metadata=(("x-goog-request-params", req_param),),
)
count = 0
prev_time = time.monotonic_ns()
total_time = 0
async for response in append_stream:
end_time = time.monotonic_ns()
elapsed_time = end_time - prev_time
total_time += elapsed_time
print(f"Response count: {count}:", response)
count += 1
Comment on lines +151 to +159
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block appears to be for debugging and performance measurement, printing each response and timing information. For a clean, reusable script, this should be removed. If you just need to consume the stream, an empty async for loop is sufficient.

    async for _ in append_stream:
        pass



async def main():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To make this function more self-contained and easier to test, it should receive the parsed command-line arguments as a parameter instead of relying on a global args variable.

async def main(args):

data = generate_random_bytes_os_urandom(10)
storage_async_client = create_async_client()

generation = None
write_handle = None

filename = args.filename
# if not args.skip_open:
generation, write_handle = await open_file(filename, storage_async_client)
print("Opend file for writing, gen:", generation)
# if not args.skip_append:
# if generation is None:
# print("generation is none requesting it")
# object_metadata = await get_object(
# bucket_fullname=BUCKET_FULLNAME, object_name=filename
# )
# generation = object_metadata.generation
# print("generation is ", generation)
# await append_to_file(filename, generation, storage_async_client, data, write_handle)
Comment on lines +170 to +181
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This section contains a lot of commented-out code which seems to be the main logic for appending to the file. The script as it is only opens the file but doesn't perform the append operation. The commented-out if conditions for skip_open and skip_append also suggest that the script is intended to be more configurable than it currently is. This commented code should be cleaned up and the intended logic should be implemented.



parser = argparse.ArgumentParser()
parser.add_argument("--filename", required=True)
parser.add_argument("--skip_open", action="store_true")
parser.add_argument("--skip_append", action="store_true")
args = parser.parse_args()
# print(args.skip_open, args.skip_append)
# print("yo")
asyncio.run(main())
Comment on lines +184 to +191
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's a Python best practice to place script execution logic within an if __name__ == "__main__": block. This prevents the code from being executed when the module is imported elsewhere. This also provides a good place to call main with the parsed args.

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--filename", required=True)
    parser.add_argument("--skip_open", action="store_true")
    parser.add_argument("--skip_append", action="store_true")
    args = parser.parse_args()
    asyncio.run(main(args))