Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions parsons/google/google_cloud_storage.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import datetime
import gzip
import logging
import time
import uuid
import zipfile
from typing import Optional, Union

from collections import OrderedDict
import google
import petl
from google.cloud import storage, storage_transfer
from google.oauth2.credentials import Credentials
from google.protobuf import json_format

from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
)
from parsons.utilities import files

Check failure on line 19 in parsons/google/google_cloud_storage.py

View workflow job for this annotation

GitHub Actions / ruff-check

Ruff (I001)

parsons/google/google_cloud_storage.py:1:1: I001 Import block is un-sorted or un-formatted

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -406,7 +407,7 @@
source_path: str = "",
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
):
) -> OrderedDict:
"""
Creates a one-time transfer job from Amazon S3 to Google Cloud
Storage. Copies all blobs within the bucket unless a key or prefix
Expand All @@ -426,6 +427,10 @@
Access key to authenticate storage transfer
aws_secret_access_key (str):
Secret key to authenticate storage transfer
`Returns:`
OrderedDict:
An ordered dict containing the metadata of
the finished transfer operation
"""
if source not in ["gcs", "s3"]:
raise ValueError(f"Blob transfer only supports gcs and s3 sources [source={source}]")
Expand Down Expand Up @@ -510,18 +515,16 @@
logger.debug("Operation still running...")

else:
operation_metadata = storage_transfer.TransferOperation.deserialize(
operation.metadata.value
)
error_output = operation_metadata.error_breakdowns
if len(error_output) != 0:
operation_metadata = json_format.MessageToDict(operation.metadata)
error_output = operation_metadata.get("errorBreakdowns", None)
if error_output:
raise Exception(
f"""{blob_storage} to GCS Transfer Job
{create_result.name} failed with error: {error_output}"""
)
else:
logger.info(f"TransferJob: {create_result.name} succeeded.")
return
return operation_metadata

else:
logger.info("Waiting to kickoff operation...")
Expand Down
Loading