To load S3 data into GCP, you can use the following steps:
- First, you need to create a Google Cloud Storage bucket where you can upload your S3 data. You can create a bucket using the following code:
from google.cloud import storage
# Set your GCP project ID and bucket name
project_id = "your-project-id"
bucket_name = "your-bucket-name"
# Authenticate using a service account key file
client = storage.Client.from_service_account_json("path/to/service/account/key.json")
# Create the bucket
bucket = client.create_bucket(bucket_name)
- After creating the bucket, you can use the
boto3
library to download the data from S3 and upload it to GCS. Here’s an example code:
import boto3
from google.cloud import storage
# Set your AWS credentials and bucket name
aws_access_key_id = "your-aws-access-key-id"
aws_secret_access_key = "your-aws-secret-access-key"
bucket_name = "your-s3-bucket-name"
# Authenticate to S3
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
# Authenticate to GCS
client = storage.Client.from_service_account_json("path/to/service/account/key.json")
bucket = client.bucket(bucket_name)
# Download S3 data and upload it to GCS
for obj in s3.list_objects(Bucket=bucket_name)['Contents']:
key = obj['Key']
blob = bucket.blob(key)
s3.download_file(bucket_name, key, "/tmp/" + key)
blob.upload_from_filename("/tmp/" + key)
- After uploading the data to GCS, you can use Google Cloud Dataflow to transform and load the data into BigQuery. Here’s an example code:
import apache_beam as beam
from google.cloud import bigquery
# Set your GCP project ID, bucket name, and BigQuery dataset and table names
project_id = "your-project-id"
bucket_name = "your-bucket-name"
dataset_name = "your-dataset-name"
table_name = "your-table-name"
# Authenticate to BigQuery
client = bigquery.Client()
# Define the schema of your BigQuery table
schema = bigquery.SchemaField('field1', 'STRING'),
bigquery.SchemaField('field2', 'INTEGER'),
bigquery.SchemaField('field3', 'FLOAT')
# Define the Dataflow pipeline
options = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=options)
(p | "Read data from GCS" >> beam.io.ReadFromText("gs://{}/{}".format(bucket_name, "data.csv"))
| "Transform data" >> beam.Map(transform_data)
| "Write data to BigQuery" >> beam.io.WriteToBigQuery(
"{}.{}".format(dataset_name, table_name),
schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
# Define the transform function
def transform_data(row):
# Do your transformation here
return {'field1': row[0], 'field2': row[1], 'field3': row[2]}
# Run the Dataflow pipeline
result = p.run()
result.wait_until_finish()
This will load the data from S3 to GCS and then transform and load it into BigQuery using Dataflow.