Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions google/cloud/storage/_media/_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,8 @@ class XMLMPUPart(UploadBase):
Args:
upload_url (str): The URL of the object (without query parameters).
upload_id (str): The ID of the upload from the initialization response.
filename (str): The name (path) of the file to upload.
filename (str): The name (path) of the file to upload. Can be None if
file_obj is provided.
start (int): The byte index of the beginning of the part.
end (int): The byte index of the end of the part.
part_number (int): The part number. Part numbers will be assembled in
Expand All @@ -1274,6 +1275,8 @@ class XMLMPUPart(UploadBase):
See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.
file_obj (IO[bytes]): file-like object to upload from. Can be None if
filename is provided.

Attributes:
upload_url (str): The URL of the object (without query parameters).
Expand All @@ -1297,9 +1300,16 @@ def __init__(
headers=None,
checksum="auto",
retry=DEFAULT_RETRY,
file_obj=None,
):
super().__init__(upload_url, headers=headers, retry=retry)
if (filename is None and file_obj is None) or (
filename is not None and file_obj is not None
):
Comment on lines +1306 to +1308
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This condition can be simplified for better readability. A more Pythonic way to express that exactly one of filename or file_obj must be provided is if (filename is None) == (file_obj is None):.

raise ValueError("Exactly one of filename or file_obj must be provided.")

self._filename = filename
self._file_obj = file_obj
self._start = start
self._end = end
self._upload_id = upload_id
Expand Down Expand Up @@ -1364,9 +1374,13 @@ def _prepare_upload_request(self):
if self.finished:
raise ValueError("This part has already been uploaded.")

with open(self._filename, "br") as f:
f.seek(self._start)
payload = f.read(self._end - self._start)
if self._file_obj:
self._file_obj.seek(self._start)
payload = self._file_obj.read(self._end - self._start)
else:
with open(self._filename, "br") as f:
f.seek(self._start)
payload = f.read(self._end - self._start)

self._checksum_object = _helpers._get_checksum_object(self._checksum_type)
if self._checksum_object is not None:
Expand Down
175 changes: 160 additions & 15 deletions google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

"""Create / interact with Google Cloud Storage blobs."""

import time
import base64
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This import is redundant. ThreadPoolExecutor is already imported from concurrent.futures on the previous line, and concurrent.futures itself is not used directly in this file. You can remove this line.

import copy
import hashlib
from io import BytesIO
from io import TextIOWrapper
import itertools
import logging
import mimetypes
import os
Expand All @@ -33,12 +37,15 @@
from urllib.parse import urlunsplit
import warnings

from google.cloud.storage import retry
from google.cloud.storage._media.requests import ChunkedDownload
from google.cloud.storage._media.requests import Download
from google.cloud.storage._media.requests import RawDownload
from google.cloud.storage._media.requests import RawChunkedDownload
from google.cloud.storage._media.requests import MultipartUpload
from google.cloud.storage._media.requests import ResumableUpload
from google.cloud.storage._media.requests import XMLMPUContainer
from google.cloud.storage._media.requests import XMLMPUPart

from google.api_core.iam import Policy
from google.cloud import exceptions
Expand Down Expand Up @@ -81,6 +88,16 @@
from google.cloud.storage.fileio import BlobReader
from google.cloud.storage.fileio import BlobWriter

METADATA_HEADER_TRANSLATION = {
"cacheControl": "Cache-Control",
"contentDisposition": "Content-Disposition",
"contentEncoding": "Content-Encoding",
"contentLanguage": "Content-Language",
"customTime": "x-goog-custom-time",
"storageClass": "x-goog-storage-class",
}

XML_CHUNK_SIZE = 100 * 1024 * 1024 # 8 MiB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment here is incorrect. 100 * 1024 * 1024 is 100 MiB, not 8 MiB. Please update the comment to reflect the correct size.

Suggested change
XML_CHUNK_SIZE = 100 * 1024 * 1024 # 8 MiB
XML_CHUNK_SIZE = 100 * 1024 * 1024 # 100 MiB


_DEFAULT_CONTENT_TYPE = "application/octet-stream"
_DOWNLOAD_URL_TEMPLATE = "{hostname}/download/storage/{api_version}{path}?alt=media"
Expand Down Expand Up @@ -1889,6 +1906,131 @@ def _get_upload_arguments(self, client, content_type, filename=None, command=Non
object_metadata = self._get_writable_metadata()
return headers, object_metadata, content_type

def _headers_from_metadata(self, metadata):
"""Helper function to translate object metadata into a header dictionary."""

headers = {}
# Handle standard writable metadata
for key, value in metadata.items():
if key in METADATA_HEADER_TRANSLATION:
headers[METADATA_HEADER_TRANSLATION[key]] = value
# Handle custom metadata
if "metadata" in metadata:
for key, value in metadata["metadata"].items():
headers["x-goog-meta-" + key] = value
return headers

def _do_xml_multipart_upload(
self, file_obj, retry=None, content_type=None, num_of_threads=6
):
"""
1. This should initialize XMLMPUContainer, container.initate(), you can keep filename as None.
2. read chunks of data from stream, read at most `n` chunks (even if the file_stream is more, hold the stream there)
Where each `chunk_size` is provided as `XML_CHUNK_SIZE`
3. Spawn multiple threads to upload each chunk using
part = XMLMPUPart()
part.upload() ->
each part upload should return (part_number, etag)
store these part numbers in a list/dictionary
using `container.register_part(part_number, etag)`

4. read further chunks from stream and repeat step 3 until stream is exhausted



5. Once all parts are uploaded, call
`container.finalize(blob._get_transport(client))`
to complete the multipart upload

"""
Comment on lines +1926 to +1945
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring for this method appears to be a set of implementation notes. It should be converted into a proper docstring that explains what the method does, its parameters (file_obj, retry, content_type, num_of_threads), and what it returns. This will improve maintainability and make the code easier to understand for other developers.

bucket = self.bucket
client = self.client
transport = self._get_transport(client)

hostname = _get_host_name(client._connection)
url = "{hostname}/{bucket}/{blob}".format(
hostname=hostname, bucket=bucket.name, blob=_quote(self.name)
)

base_headers, object_metadata, content_type = self._get_upload_arguments(
client, content_type, filename=None, command="tm.upload_sharded"
)
headers = {**base_headers, **self._headers_from_metadata(object_metadata)}

if self.user_project is not None:
headers["x-goog-user-project"] = self.user_project

if (
self.kms_key_name is not None
and "cryptoKeyVersions" not in self.kms_key_name
):
headers["x-goog-encryption-kms-key-name"] = self.kms_key_name

container = XMLMPUContainer(url, filename=None, headers=headers, retry=retry)
container.initiate(transport=transport, content_type=content_type)
upload_id = container.upload_id

def _upload_part_from_data(data, part_number, checksum="auto"):
data_stream = BytesIO(data)
part = XMLMPUPart(
url,
upload_id,
filename=None,
file_obj=data_stream,
start=0,
end=len(data),
part_number=part_number,
checksum=checksum,
headers=headers.copy(),
retry=retry,
)
part.upload(transport)
return (part_number, part.etag)

def read_chunks(stream, chunk_size):
while True:
data = stream.read(chunk_size)
if not data:
break
yield data

chunk_iterator = read_chunks(file_obj, XML_CHUNK_SIZE)
part_number = 1

try:
with ThreadPoolExecutor(max_workers=num_of_threads) as executor:
while True:
# Read a batch of chunks to be processed concurrently.
chunk_batch = list(itertools.islice(chunk_iterator, num_of_threads))
if not chunk_batch:
break

futures = []
# Submit upload tasks for the current batch of chunks.
for i, chunk_data in enumerate(chunk_batch):
current_part_number = part_number + i
future = executor.submit(
_upload_part_from_data, chunk_data, current_part_number
)
futures.append(future)

# Wait for the current batch to complete.
for future in futures:
part_num, etag = future.result()
container.register_part(part_num, etag)

part_number += len(chunk_batch)
print("num parts uploaded:", part_number - 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This print statement appears to be for debugging. It should be removed from production code. Consider using the logging module if this information is valuable for diagnostics.


res = container.finalize(transport)
print("MPU Complete Response:", res)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This print statement appears to be for debugging. It should be removed from production code. Consider using the logging module if this information is valuable for diagnostics.

self.reload(client=client)
return self._properties

except Exception:
container.cancel(transport)
raise

def _do_multipart_upload(
self,
client,
Expand Down Expand Up @@ -2483,6 +2625,7 @@ def _do_upload(
retry=None,
command=None,
crc32c_checksum_value=None,
perform_xml_mpu=True,
):
"""Determine an upload strategy and then perform the upload.

Expand Down Expand Up @@ -2626,23 +2769,20 @@ def _do_upload(
}
retry = retry.get_retry_policy_if_conditions_met(query_params=query_params)

if size is not None and size <= _MAX_MULTIPART_SIZE:
response = self._do_multipart_upload(
client,
stream,
content_type,
size,
predefined_acl,
if_generation_match,
if_generation_not_match,
if_metageneration_match,
if_metageneration_not_match,
timeout=timeout,
checksum=checksum,
retry=retry,
command=command,
st_time = time.monotonic_ns()
if perform_xml_mpu:
print("Performing XML MPU .")
response = self._do_xml_multipart_upload(
stream, retry=None, content_type=None, num_of_threads=1
)
print(
"Performed XMLMPU in ",
(time.monotonic_ns() - st_time) / 1_000_000,
response,
)
return response
else:
print("Performing Resumable Upload!!!! .")
response = self._do_resumable_upload(
client,
stream,
Expand All @@ -2659,6 +2799,11 @@ def _do_upload(
command=command,
crc32c_checksum_value=crc32c_checksum_value,
)
print(
"Performed Resumable upload.",
response,
(time.monotonic_ns() - st_time) / 1_000_000,
)

return response.json()

Expand Down
50 changes: 43 additions & 7 deletions samples/snippets/storage_upload_from_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import time

# [START storage_file_upload_from_memory]
from google.cloud import storage


def upload_blob_from_memory(bucket_name, contents, destination_blob_name):
def upload_blob_from_memory(bucket_name, destination_blob_name, size_in_mb=1):
"""Uploads a file to the bucket."""

# The ID of your GCS bucket
Expand All @@ -36,18 +38,52 @@ def upload_blob_from_memory(bucket_name, contents, destination_blob_name):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

bytes_to_upload = int(size_in_mb * 1024 * 1024)
contents = os.urandom(bytes_to_upload)

start_time = time.time_ns()
blob.upload_from_string(contents)
end_time = time.time_ns()

total_bytes_uploaded = len(contents)
# Time is in nanoseconds, convert to seconds for printing
total_time_taken_ns = end_time - start_time
total_time_taken_s = total_time_taken_ns / 1_000_000_000

if total_time_taken_ns > 0:
# Throughput calculation using nanoseconds
throughput_mb_s = (
total_bytes_uploaded / (total_time_taken_ns / 1_000_000_000)
) / (1024 * 1024)
else:
throughput_mb_s = float("inf") # Avoid division by zero

print(f"Uploaded {total_bytes_uploaded} bytes in {total_time_taken_s:.9f} seconds.")
print(f"Throughput: {throughput_mb_s:.2f} MB/s")

print(
f"{destination_blob_name} with contents {contents} uploaded to {bucket_name}."
)

# [END storage_file_upload_from_memory]


if __name__ == "__main__":
if len(sys.argv) < 3 or len(sys.argv) > 4:
print(
f"Usage: {sys.argv[0]} <bucket_name> <destination_blob_name> [size_in_mb]"
)
sys.exit(1)

bucket_name = sys.argv[1]
destination_blob_name = sys.argv[2]
size_mb = 1
if len(sys.argv) == 4:
try:
size_mb = float(sys.argv[3])
except ValueError:
print("Please provide a valid number for size_in_mb.")
sys.exit(1)

upload_blob_from_memory(
bucket_name=sys.argv[1],
contents=sys.argv[2],
destination_blob_name=sys.argv[3],
bucket_name=bucket_name,
destination_blob_name=destination_blob_name,
size_in_mb=size_mb,
)