encode_utils.transfer_to_gcp.py

Contains a Transfer class that encapsulates working with the Google Storage Transfer Service to transfer files from AWS S3 to GCP buckets. If you want to run this on a GCP VM, then in the command used to launch the VM you should specify an appropriate security account and the cloud-platform scope as the following example demonstrates:

gcloud compute instances create myinstance --service-account="test-819@sigma-night-206802.iam.gserviceaccount.com" --scopes=cloud-platform --zone us-central1-a

Google implements OAuth 2 scopes for requesting accessing to specific Google APIs, and in our case it’s the cloud-platform scope that we need, which is associated with the Storage Transfer API, amongst others. See the documentation in the Transfer class below for more details. Also, the Storage Transfer API documentation is available at https://developers.google.com/resources/api-libraries/documentation/storagetransfer/v1/python/latest/ https://cloud.google.com/docs/authentication/production#auth-cloud-explicit-python

If running this in Google Cloud Composer, you must use specific versions of several Google libraries so that the tasks running in the environment can properly use the credentials and scope that you delegated to the environment when creating it. This is a work-around, as the Composer environment is buggy at present in this regard. You’ll need a requirements.txt file with the following (thanks to danxmoran for pointing out):

google-api-core==1.5.0 google-api-python-client==1.7.4 google-auth==1.5.1 google-auth-httplib2==0.0.3 google-cloud-core==0.28.1

Then use the following commands to create and set up your Cloud Composer environment:

```

gcloud beta composer environments create test-cc-env3 –python-version=3 –location=us-central1 –zone=us-central1-a –disk-size=20GB –service-account=fasdf-29@sigma-night-206802.iam.gserviceaccount.com

gcloud composer environments update env3 –location us-central1 –update-pypi-packages-from-file requirements.txt

```

exception encode_utils.transfer_to_gcp.AwsCredentialsMissing[source]

Bases: Exception

Raised when a method needs AWS credentials but can’t find them.

class encode_utils.transfer_to_gcp.Transfer(gcp_project, aws_creds=())[source]

Bases: object

See example at https://cloud.google.com/storage-transfer/docs/create-client and https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/storage/transfer_service/aws_request.py.

Encapsulates the transfer of files from AWS S3 storage to GCP storage by using the Google Storage Transfer Service (STS). The create() method is used to create a transfer job (termed transferJob in the STS API). A transferJob either runs once (a one-off job) or is scheduled to run repeatedly, depending on how the job schedule is specified.

Any transfer event of a trasferJob is termed as a transferOperation in the STS API. There are a few utility methods in this class that work with transferOperations.

You’ll need to have a Google service account set up with at least the two roles below:

  1. Project role with access level of Editor or greater.
  2. Storage role with access level of Storage Object Creator or greater.

If running on a non-GCP VM, the service account credentials are fetched from the environment via the variable GOOGLE_APPLICATION_CREDENTIALS. This should be set to the JSON file provided to you by the GCP Console when you create a service account; see https://cloud.google.com/docs/authentication/getting-started for more details.

If instead you are running this on a GCP VM, then you should specify the service account and OAuth 2 scope when launching the VM as described at the beginning; there is no need use the service account file itself.

Note1: if this is the first time that you are using the Google STS on your GCP bucket, it won’t work just yet as you’ll get an error that reads:

Failed to obtain the location of the destination Google Cloud Storage (GCS) bucket due to insufficient permissions. Please verify that the necessary permissions have been granted. (Google::Apis::ClientError)

To resolve this, I recommend that you go into the GCP Console and run a manual transfer there, as this adds the missing permission that you need. I personaly don’t know how to add it otherwise, or even know what it is that’s being added, but there you go!

Note2: If you try to transfer a file that is mistyped or doesn’t exist in the source bucket, then this will not set a failed status on the transferJob. If you really need to know whether a file was tranferred in the API, you need to query the transferOperation; see the method get_transfers_from_job().

Parameters:
  • gcp_projectstr. The GCP project that contains your GCP bucket(s). Can be given in either integer form or the user-friendly name form (i.e. sigma-night-207122)
  • aws_credstuple. Ideally, your AWS credentials will be stored in the environment. For additional flexability though, you can specify them here as well in the form (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY).
from_s3(s3_bucket, s3_paths, gcp_bucket, overwrite_existing=False, description='')[source]

Schedules an one-off transferJob that runs immediately to copy the specified file(s) from an s3_bucket to a gcp_bucket. AWS keys are required and must have the following permissions granted in source bucket policy:

  1. s3:GetBucketLocation
  2. s3:ListBucket
  3. s3:GetObject

AWS Credentials are fetched from the environment via the variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY, unless passed explicitly to the aws_creds argument when instantiating the Transfer class.

Parameters:
  • s3_bucketstr. The name of the AWS S3 bucket.
  • s3_pathslist. The paths to S3 objects in s3_bucket. Don’t include leading ‘/’ (it will be removed if seen at the beginning anyways). Up to 1000 files can be transferred in a given transfer job, per the Storage Transfer API transferJobs documentation. If you only need to transfer a single file, it may be given as a string.
  • gcp_bucketstr. The name of the GCP bucket.
  • overwrite_existingbool. True means that files in GCP get overwritten by any files being transferred with the same name (key).
  • descriptionstr. The description to show when querying transfers via the Google Storage Transfer API, or via the GCP Console. May be left empty, in which case the default description will be the value of the first S3 file name to transfer.
Returns:

The JSON response representing the newly created transferJob.

Return type:

dict

from_urllist(urllist, gcp_bucket, overwrite_existing=False, description='')[source]

Schedules an one-off transferJob that runs immediately to copy the files specified in the URL list to GCS. AWS keys are not used, and all URIs must be publicliy assessible.

Parameters:
  • gcp_bucketstr. The name of the GCP bucket.
  • overwrite_existingbool. True means that files in GCP get overwritten by any files being transferred with the same name (key).
  • descriptionstr. The description to show when querying transfers via the Google Storage Transfer API, or via the GCP Console. May be left empty, in which case the default description will be the value of the first S3 file name to transfer.
Returns:

The JSON response representing the newly created transferJob.

Return type:

dict

get_transfers_from_job(transferjob_name)[source]

Fetches descriptions in JSON format of any realized transfers under the specified transferJob. These are called transferOperations in the Google Storage Transfer API terminology.

See Google API example at https://cloud.google.com/storage-transfer/docs/create-manage-transfer-program?hl=ja in the section called “Check transfer operation status”. See API details at https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferOperations.

Parameters:transferjob_namestr. The value of the name key in the dictionary that is returned by self.from_s3 or self.from_urllist().
Returns:
list of transferOperations belonging to the specified transferJob. This will be a list
of only a single element if the transferJob is a one-off transfer. But if this is a repeating transferJob, then there could be several transferOperations in the list.
get_transfer_status(transferjob_name)[source]

Returns the transfer status of the first transferOperation that is returned for the given transferJob. Thus, this function really only makes sense for one-off transferJobs that don’t repeat.

Note: if a transferJob attempts to transfer a non-existing file from the source bucket, this has no effect on the transferOperation status (it will not cause a FAILED status). Moreover, transferOperation status doesn’t look at what files were and were not transferred and is ony concerned with the execution status of the transferOperation job itself.

Parameters:transferjob_namestr. The value of the name key in the dictionary that is returned by create().