In the previous article, we explored how to read a CSV file from an S3 bucket located within the same AWS environment, using an instance profile for authentication. In some cases, the S3 bucket you want to access is not hosted within the same AWS environment as your Syntasa application. This typically happens when the S3 bucket belongs to another account or region, or when you're working across environments.
In such situations, you cannot rely on instance profiles for authentication. Instead, you’ll need to use an S3 connection configured with basic AWS credentials (Access Key and Secret Key).
This article explains how to modify your Spark code within a Syntasa Code Processor to authenticate and read files from an S3 bucket using explicit AWS credentials.
Before running this Spark code within the Syntasa application, it’s common to first test and validate the logic using a Syntasa/JupyterLab notebook. If you’d like to first explore or run the notebook version of this process, you can refer to the following guide: Reading & Writing a CSV File
This article demonstrates how you can take working code from a Syntasa/JupyterLab notebook and adapt it into a Spark Code Processor within Syntasa, particularly when reading CSV files from an S3 bucket, applying filters, performing joins, and writing the results to both the event store and S3 storage.
Before we walk through the code step by step, let’s first understand the prerequisites and the objective of what this code is designed to accomplish.
Prerequisites
-
Sample CSV Files with a Common Column
- To replicate this example, you’ll need two CSV files stored in an S3 bucket, each containing at least one common column that can be used to join the data.
- In our example, we’ve used the following files:
- Browser.csv — includes sample fields such as visitor_id, browser, and browser_type
- Country.csv — includes sample fields such as visitor_id, country, and city
Both files contain the visitor_id column, which we use as the key for performing the join operation.
-
S3 Connection with Basic Credentials
- The S3 connection with basic credentials (Access Key and Secret Key) is already configured within the application.
- This allows the application to read/write from/to S3 using the role-based access provided by AWS.
What does the code do?
This code demonstrates a data pipeline pattern where we:
- Ingest data by reading multiple CSV files from an S3 bucket into Spark DataFrames.
- Transform the data by:
- Applying filters to include only relevant records (e.g., specific browsers or countries).
- Joining datasets on a shared key (e.g., visitor_id) to enrich the data with related attributes.
- Output the final results to:
- A Syntasa event dataset (via writeToEventStore) for use in downstream workflows or visualizations.
- A CSV file in S3 for further analysis, export, or archival purposes.
Step-by-Step Code Explanation
Authenticating to S3 with Access & Secret Key
# Initialize the S3 client using basic credentials
accessTokenKey = str(getConnectionParam("@InputConnection1", "awsAccessKey"))
accessTokenSecretKey = str(getConnectionParam("@InputConnection1", "awsSecretKey"))
-
getConnectionParam(...)
pulls the credentials from the Syntasa-managed connection. -
With this setup, you don’t need to hard-code credentials in your script or config files. Syntasa keeps them secure and accessible via the connection layer.
# Get the Hadoop configuration from Spark context
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
- Accesses the underlying Hadoop configuration object used by the Spark context.
- This is required because Spark’s S3 integration uses Hadoop’s file system APIs, and authentication with S3 is done through Hadoop config settings.
# Set the credential configuration for external bucket
hadoop_conf.set(f"fs.s3a.bucket.{bucket}.access.key", accessTokenKey)
hadoop_conf.set(f"fs.s3a.bucket.{bucket}.secret.key", accessTokenSecretKey)
- These lines set bucket-specific credentials for the s3a:// file system.
- It tells Spark to use these credentials when accessing this particular bucket (instead of using default instance profile credentials).
- This is especially useful when:
- The Spark job runs on an EC2 instance/profile that doesn't have access to this external bucket.
- You're accessing a cross-account S3 bucket or from a different AWS region.
This setup overrides the default authentication method (e.g., instance profile or environment variables) and injects explicit credentials into Spark’s Hadoop configuration, enabling secure access to S3 buckets outside the current AWS environment.
The remaining code follows the same structure as outlined in the article Reading CSV Files from S3 Bucket (Instance Profile Enabled). The only addition is the two authentication lines mentioned above. You can refer to that article for a detailed explanation of each step. For convenience, the complete code is provided below in a single block:
# Get the detail from Connection Param
accessTokenKey = str(getConnectionParam("@InputConnection1","awsAccessKey"))
accessTokenSecretKey = str(getConnectionParam("@InputConnection1","awsSecretKey"))
bucket = str(getConnectionParam("@InputConnection1","bucketName"))
# Get the Hadoop configuration from Spark context
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
# Set the credential configuration for external bucket
hadoop_conf.set(f"fs.s3a.bucket.{bucket}.access.key", accessTokenKey)
hadoop_conf.set(f"fs.s3a.bucket.{bucket}.secret.key", accessTokenSecretKey)
#Reading Browser.csv file from S3 bucket
browser_csv = spark.read.csv(f"s3a://{bucket}/syn-qa/Browser.csv", header=True)
#Filter results:
browser_csv = browser_csv.filter(browser_csv['browser'] == 'safari')
# 10 is for number of rows to be shown in the logs and False so that results aren't trimmed.
browser_csv.show(10,False)
#Reading country.csv file from S3 bucket
country_csv = spark.read.csv(f"s3a://{bucket}/syn-qa/country.csv", header=True)
#Filter results:
country_csv = country_csv.filter(country_csv['country'] == 'usa')
# 10 is for number of rows to be shown in the logs and False so that results aren't trimmed.
country_csv.show(10,False)
# Doing a left join on the data
#joined_df = browser_csv.join(country_csv, browser_csv['visitor_id'] == country_csv['visitor_id'], 'left')
joined_df = browser_csv.join(country_csv, on='visitor_id', how='left')
#writing to Event store
writeToEventStore(joined_df, "@OutputTable1")
#writing to csv file in S3 location
joined_df = joined_df.coalesce(1)
output_path = f"s3a://{bucket}/syn-qa/test_joined_output"
joined_df.write.mode("overwrite").option("header","true").csv(output_path)