-
Notifications
You must be signed in to change notification settings - Fork 169
provide repro script #1711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
provide repro script #1711
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,191 @@ | ||
| """ | ||
|
|
||
|
|
||
| async def open_file('file_name') | ||
|
|
||
|
|
||
| async def append_to_file('file_name') | ||
|
|
||
|
|
||
| async def close('') | ||
|
|
||
| """ | ||
|
|
||
| import os | ||
| import time | ||
| import asyncio | ||
| from google.cloud import _storage_v2 as storage_v2 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The script imports |
||
| import argparse | ||
|
|
||
| # from async_get_object_metadata import get_object | ||
|
|
||
| BUCKET_NAME = "chandrasiri-rs" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| BUCKET_FULLNAME = f"projects/_/buckets/{BUCKET_NAME}" | ||
|
|
||
|
|
||
| def generate_random_bytes_os_urandom(length_mib): | ||
| """ | ||
| Generates cryptographically secure random bytes of a specified length in MiB. | ||
|
|
||
| Args: | ||
| length_mib (int): The desired length of random bytes in MiB (Megabytes). | ||
|
|
||
| Returns: | ||
| bytes: A bytes object containing the random bytes. | ||
| """ | ||
| length_bytes = length_mib * 1024 * 1024 # Convert MiB to bytes | ||
| random_bytes = os.urandom(length_bytes) | ||
| return random_bytes | ||
|
|
||
|
|
||
| def create_async_client(): | ||
| transport_cls = storage_v2.StorageClient.get_transport_class(label="grpc_asyncio") | ||
| channel = transport_cls.create_channel(attempt_direct_path=True) | ||
| transport = transport_cls(channel=channel) | ||
| async_client = storage_v2.StorageAsyncClient(transport=transport) | ||
|
|
||
| return async_client | ||
|
|
||
|
|
||
| async def open_file(filename, client): | ||
| first_request = storage_v2.BidiWriteObjectRequest( | ||
| write_object_spec=storage_v2.WriteObjectSpec( | ||
| resource=storage_v2.Object(name=filename, bucket=BUCKET_FULLNAME), | ||
| appendable=True, | ||
| ), | ||
| ) | ||
|
|
||
| def request_generator(): | ||
| for request in [first_request]: | ||
| yield request | ||
|
Comment on lines
+58
to
+60
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| req_param = f"bucket={BUCKET_FULLNAME}" | ||
| response_stream = await client.bidi_write_object( | ||
| requests=request_generator(), | ||
| metadata=( | ||
| ("x-goog-request-params", req_param), | ||
| ("x-goog-api-client", "gcloud-python-local/3.8.0"), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| ), | ||
| ) | ||
| generation = None | ||
| write_handle = None | ||
| count = 0 | ||
| async for response in response_stream: | ||
| # print("stream count:", count, "*" * 20) | ||
| # print(response) | ||
| # print("time elapsed", time_elapsed) | ||
| # print("stream count:", count, "*" * 20) | ||
| if response.resource is not None and (generation is None): | ||
|
|
||
| generation = response.resource.generation | ||
|
|
||
| # print("genration = ", generation) | ||
| if response.write_handle is not None and (write_handle is None): | ||
| write_handle = response.write_handle.handle | ||
| # print("write_handle = ", write_handle) | ||
|
|
||
| # if | ||
|
Comment on lines
+73
to
+87
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This block contains a significant amount of commented-out async for response in response_stream:
if response.resource is not None and (generation is None):
generation = response.resource.generation
if response.write_handle is not None and (write_handle is None):
write_handle = response.write_handle.handle |
||
| return generation, write_handle | ||
|
|
||
|
|
||
| async def append_to_file(filename, generation, client, data, write_handle=None): | ||
| # print("generation in append ", generation) | ||
|
|
||
| def request_generator_for_append(): | ||
| # current persisted size of object | ||
| # start_offset = 1 * 1024 * 1024 | ||
| start_offset = 0 | ||
| curr_byte = 0 | ||
| total_bytes = len(data) | ||
| chunk_size = 2 * 1024 * 1024 | ||
| stream_count = 0 | ||
| if total_bytes == 0: | ||
| # print("@@" * 20) | ||
| yield storage_v2.BidiWriteObjectRequest( | ||
| append_object_spec=storage_v2.AppendObjectSpec( | ||
| bucket=BUCKET_FULLNAME, | ||
| object=filename, | ||
| generation=generation, | ||
| ), | ||
| checksummed_data=storage_v2.ChecksummedData(content=b""), | ||
| write_offset=curr_byte + start_offset, | ||
| # flush=True, | ||
| # state_lookup=True, | ||
| # finish_write=True, | ||
| ) | ||
| while curr_byte < total_bytes: | ||
| curr_chunk_size = min(chunk_size, total_bytes - curr_byte) | ||
| chunked_data = data[curr_byte : curr_byte + curr_chunk_size] | ||
| # create req | ||
| if stream_count == 0: | ||
| bidi_request = storage_v2.BidiWriteObjectRequest( | ||
| append_object_spec=storage_v2.AppendObjectSpec( | ||
| bucket=BUCKET_FULLNAME, | ||
| object=filename, | ||
| generation=generation, | ||
| write_handle=storage_v2.BidiWriteHandle(handle=write_handle), | ||
| ), | ||
| checksummed_data=storage_v2.ChecksummedData(content=chunked_data), | ||
| write_offset=curr_byte + start_offset, | ||
| ) | ||
| else: | ||
| bidi_request = storage_v2.BidiWriteObjectRequest( | ||
| checksummed_data=storage_v2.ChecksummedData(content=chunked_data), | ||
| write_offset=curr_byte + start_offset, | ||
| ) | ||
|
|
||
| if curr_byte + chunk_size >= total_bytes: | ||
| bidi_request.flush = True | ||
| bidi_request.state_lookup = True | ||
|
|
||
| # yield req | ||
| yield bidi_request | ||
| curr_byte += curr_chunk_size | ||
| stream_count += 1 | ||
|
|
||
| req_param = f"bucket={BUCKET_FULLNAME}" | ||
| append_stream = await client.bidi_write_object( | ||
| requests=request_generator_for_append(), | ||
| metadata=(("x-goog-request-params", req_param),), | ||
| ) | ||
| count = 0 | ||
| prev_time = time.monotonic_ns() | ||
| total_time = 0 | ||
| async for response in append_stream: | ||
| end_time = time.monotonic_ns() | ||
| elapsed_time = end_time - prev_time | ||
| total_time += elapsed_time | ||
| print(f"Response count: {count}:", response) | ||
| count += 1 | ||
|
Comment on lines
+151
to
+159
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
|
|
||
| async def main(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| data = generate_random_bytes_os_urandom(10) | ||
| storage_async_client = create_async_client() | ||
|
|
||
| generation = None | ||
| write_handle = None | ||
|
|
||
| filename = args.filename | ||
| # if not args.skip_open: | ||
| generation, write_handle = await open_file(filename, storage_async_client) | ||
| print("Opend file for writing, gen:", generation) | ||
| # if not args.skip_append: | ||
| # if generation is None: | ||
| # print("generation is none requesting it") | ||
| # object_metadata = await get_object( | ||
| # bucket_fullname=BUCKET_FULLNAME, object_name=filename | ||
| # ) | ||
| # generation = object_metadata.generation | ||
| # print("generation is ", generation) | ||
| # await append_to_file(filename, generation, storage_async_client, data, write_handle) | ||
|
Comment on lines
+170
to
+181
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This section contains a lot of commented-out code which seems to be the main logic for appending to the file. The script as it is only opens the file but doesn't perform the append operation. The commented-out |
||
|
|
||
|
|
||
| parser = argparse.ArgumentParser() | ||
| parser.add_argument("--filename", required=True) | ||
| parser.add_argument("--skip_open", action="store_true") | ||
| parser.add_argument("--skip_append", action="store_true") | ||
| args = parser.parse_args() | ||
| # print(args.skip_open, args.skip_append) | ||
| # print("yo") | ||
| asyncio.run(main()) | ||
|
Comment on lines
+184
to
+191
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a Python best practice to place script execution logic within an if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--filename", required=True)
parser.add_argument("--skip_open", action="store_true")
parser.add_argument("--skip_append", action="store_true")
args = parser.parse_args()
asyncio.run(main(args)) |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current block at the top of the file appears to be scratchpad notes rather than a proper module docstring. To improve clarity and adhere to Python conventions, it should be replaced with a concise docstring that explains the script's purpose.
"""A script to demonstrate bidi writes using the google-cloud-storage GAPIC client."""