In this article, we will walk through the process of reading a CSV file stored in a Google Cloud Storage (GCS) bucket from an application running in an AWS environment using PySpark. This is particularly useful in multi-cloud architectures where data may reside in GCS while processing is done on AWS infrastructure.
Just like reading files from S3, reading from GCS in Spark requires appropriate authentication and configuration. We assume that the connection to the GCS bucket is defined in Syntasa using a connection that includes the GCP service account key.
Prerequisites
- Spark application running on AWS
- GCS bucket with the target CSV files
- GCP service account key is available as part of a Syntasa connection.
gcs-connector-hadoop3-latest.jar
added to Spark runtime classpath
To ensure the required GCS Hadoop connector is available to Spark, add this line to the custom runtime script:
sudo aws s3 cp s3://syntasa-qa-kube/syn-cluster-config/deps/jars/gcs-connector-hadoop3-latest.jar /usr/lib/hadoop-lzo/lib/
Step-by-Step Code Explanation
Import Utility to Run Shell Commands on Executor Nodes
from syn_utils.syn_notebook.lib.amazon_py_package_installer import *
- This utility helps execute shell commands (such as writing a keyfile) on all Spark executor nodes in an AWS EMR cluster.
Retrieve GCP Key File from Syntasa Connection
key_file = synutils.connection().getConnectionParam("@InputConnection1","keyfile")
- Reads the base64-encoded or JSON key file for GCS authentication from the connection stored under the param
@InputConnection1
.
Define Local Path to Store the Key File
key_file_path = '/home/hadoop/keyfile.json'
- Specifies the path where the key file will be stored, both on the driver and executor nodes.
Write the Key File to the Executor Nodes
shell_executor = AmazonClusterPyPackageInstaller()
tasks = shell_executor.execute_tasks(f"echo '{key_file}' > /home/hadoop/keyfile.json")
- Executes the shell command on all executor nodes to ensure they have access to the same key file during execution.
Write the Key File to the Local Path on the Driver Node
with open(key_file_path, 'w') as file:
file.write(key_file)
- Ensures the key file is available on the driver node for Spark configuration.
Set Spark Configuration for GCS
spark.conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
spark.conf.set("google.cloud.auth.service.account.enable", "true")
spark.conf.set("google.cloud.auth.service.account.json.keyfile", key_file_path)
spark.conf.set("fs.gs.status.parallel.enable", "true")
- These configurations enable Spark to access GCS as a file system and authenticate using the provided key file.
Reading CSV Files from GCS and Filtering the Records
bucket = str(getConnectionParam("@InputConnection1","bucketName"))
browser_csv = spark.read.csv(f"gs://{bucket}/demo/random/Browser.csv", header=True)
browser_csv = browser_csv.filter(browser_csv['browser'] == 'safari')
browser_csv.show(10, False)
country_csv = spark.read.csv(f"gs://{bucket}/demo/random/country.csv", header=True)
country_csv = country_csv.filter(country_csv['country'] == 'usa')
country_csv.show(10, False)
- Loads the data from GCS directly into Spark dataframes, applies filters, and previews the results.
Joining the Two Datasets and Writing the Results to the Event Store
joined_df = browser_csv.join(country_csv, on='visitor_id', how='left')
writeToEventStore(joined_df, "@OutputTable1")
- Performs a left join on the filtered datasets and writes the final result to the configured output table.
Save the Final Output to the GCS Bucket in a CSV file
joined_df = joined_df.coalesce(1)
output_path = f"gs://{bucket}/demo/random/gcs_direct_read_output"
joined_df.write.mode("overwrite").option("header", "true").csv(output_path)
- The final dataset is saved back to the same GCS bucket, demonstrating a complete read-transform-write cycle.
Here is the complete code:
#Add below in custom runtime script in runtime to copy the jar
#sudo aws s3 cp s3://syntasa-qa-kube/syn-cluster-config/deps/jars/gcs-connector-hadoop3-latest.jar /usr/lib/hadoop-lzo/lib/
from syn_utils.syn_notebook.lib.amazon_py_package_installer import *
# Get keyfile from connection
key_file = synutils.connection().getConnectionParam("@InputConnection1","keyfile")
# Tmp local key file path
key_file_path = '/home/hadoop/keyfile.json'
# Write the keyfile to executor nodes using synutils
shell_executor = AmazonClusterPyPackageInstaller()
tasks = shell_executor.execute_tasks(f"echo '{key_file}' > /home/hadoop/keyfile.json")
# Write the keyfile to local path in driver node
with open(key_file_path, 'w') as file:
file.write(key_file)
# Set gcs properties
spark.conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
spark.conf.set("google.cloud.auth.service.account.enable", "true")
spark.conf.set("google.cloud.auth.service.account.json.keyfile", key_file_path)
spark.conf.set("fs.gs.status.parallel.enable", "true")
# Get bucketname from connection
bucket = str(getConnectionParam("@InputConnection1","bucketName"))
browser_csv = spark.read.csv(f"gs://{bucket}/demo/random/Browser.csv", header=True)
#Filter results:
browser_csv = browser_csv.filter(browser_csv['browser'] == 'safari')
# 10 is for number of rows to be shown in the logs and False so that results aren't trimmed.
browser_csv.show(10,False)
#Country Data
country_csv = spark.read.csv(f"gs://{bucket}/demo/random/country.csv", header=True)
#Filter results:
country_csv = country_csv.filter(country_csv['country'] == 'usa')
# 10 is for number of rows to be shown in the logs and False so that results aren't trimmed.
country_csv.show(10,False)
# Doing a left join on the data
#joined_df = browser_csv.join(country_csv, browser_csv['visitor_id'] == country_csv['visitor_id'], 'left')
joined_df = browser_csv.join(country_csv, on='visitor_id', how='left')
#writing to event store
writeToEventStore(joined_df, "@OutputTable1")
#writing the results to csv file in GCS bucket
joined_df = joined_df.coalesce(1)
output_path = f"gs://{bucket}/qa/demo/gcs_direct_read_output"
#"s3://bucket/Pathto/my_file/output"
joined_df.write.mode("overwrite").option("header","true").csv(output_path)
With the correct authentication and configuration setup, reading and writing data from/to GCS using PySpark on AWS is seamless. This method allows you to leverage data stored in GCP from your AWS-based Spark jobs, enabling flexible and scalable multi-cloud data processing pipelines.
Use this setup when your data lake resides in GCS, but your processing and analytics workloads are hosted on AWS EMR or similar Spark infrastructure.