In Syntasa, Spark processors can be configured to access S3 buckets without hardcoding credentials by leveraging instance profiles. Before running this Spark code within the Syntasa application, it’s common to first test and validate the logic using Syntasa/JupyterLab notebook. If you’d like to first explore or run the notebook version of this process, you can refer to the following guide: Reading & Writing a CSV File
This article demonstrates how you can take working code from a Syntasa/Jupyter notebook and adapt it into a Spark Code Processor within Syntasa, particularly when reading CSV files from an S3 bucket, applying filters, performing joins, and writing the results to both event store and S3 storage.
No Credentials Required For S3 Connection with Instance Profile Enabled
When your Syntasa application is hosted in an AWS environment and connected to an S3 bucket using an S3 connection, credentials like access keys are not needed in your Spark code. This is because:
- Your application is running within an EC2 instance or container that has been assigned an IAM role.
- This IAM role has permissions to access the S3 bucket.
- The Spark processor uses the instance profile assigned to the AWS instance.
- In the S3 connection settings within Syntasa, you only need to provide the S3 endpoint and bucket name.
Before we walk through the code step by step, let’s first understand the prerequisites and the objective of what this code is designed to accomplish.
Prerequisites
-
Sample CSV Files with a Common Column
- To replicate this example, you’ll need two CSV files stored in an S3 bucket, each containing at least one common column that can be used to join the data.
- In our example, we’ve used the following files:
- Browser.csv — includes sample fields such as visitor_id, browser, and browser_type
- Country.csv — includes sample fields such as visitor_id, country, and city
Both files contain the visitor_id column, which we use as the key for performing the join operation.
-
S3 Connection with Instance Profile enabled
- No AWS credentials are needed in the code because:
- The Spark Code Processor is running in an AWS environment with an instance profile.
- The S3 connection is already configured within the application — it only requires the bucket name and endpoint.
- This allows the application to read/write from/to S3 using the role-based access provided by AWS.
- No AWS credentials are needed in the code because:
What does the code do?
This code demonstrates a typical data pipeline pattern where we:
- Ingest data by reading multiple CSV files from an S3 bucket into Spark DataFrames.
- Transform the data by:
- Applying filters to include only relevant records (e.g., specific browsers or countries).
- Joining datasets on a shared key (e.g., visitor_id) to enrich the data with related attributes.
- Output the final results to:
- A Syntasa event dataset (via writeToEventStore) for use in downstream workflows or visualizations.
- A CSV file in S3 for further analysis, export, or archival purposes.
Step-by-Step Code Explanation
Getting Bucket Name from Associated Connection
bucket = str(getConnectionParam("@InputConnection1","bucketName"))
-
Fetches the S3 bucket name from a pre-configured Syntasa connection.
- @InputConnection1 is the Input parameter defined under the Parameters section of the Spark processor.
Reading the browser.csv file
browser_csv = spark.read.csv(f"s3a://{bucket}/demo/browser.csv", header=True)
- This line reads the Browser.csv file from the specified path in the S3 bucket using the Spark CSV reader.
- The prefix s3a:// is used for optimized S3 access in Spark.
- header=True tells Spark to treat the first line of the file as column headers.
- The loaded data is stored in a Spark DataFrame named browser_csv.
Filtering Data
browser_csv = browser_csv.filter(browser_csv['browser'] == 'safari')
- This applies a filter transformation on the "browser_csv" DataFrame to keep only the records where the browser column has the value "safari".
- The result overwrites the original "browser_csv" with the filtered data.
Reading the country.csv file and filtering the records
country_csv = spark.read.csv(f"s3a://{bucket}/demo/country.csv", header=True)
country_csv = country_csv.filter(country_csv['country'] == 'usa')
- The first line reads the country.csv file from the same S3 bucket into a new DataFrame called "country_csv".
- In the second line, we filtered the "country_csv" DataFrame to keep only rows where the country column equals "usa". This transformation reduces the dataset to relevant records only.
Joining browser_csv
and country_csv
data in the column visitor_id
joined_df = browser_csv.join(country_csv, on='visitor_id', how='left')
- This line performs a left join on the two filtered DataFrames (
browser_csv
andcountry_csv
) using the shared columnvisitor_id
. - The left join ensures that all rows from
browser_csv
are kept, and matching data fromcountry_csv
is included when available. - The result is stored in a new DataFrame called
joined_df
.
Writing to Event Store
writeToEventStore(joined_df, "@OutputTable1")
- This custom Syntasa function writes the joined_df DataFrame to a configured event dataset (@OutputTable1).
- @OutputTable1 is the output parameter defined under the Parameters section of the Spark processor.
- This allows the output to be used in visual apps, further processing steps, or data exports within the Syntasa platform.
Writing to a CSV File in S3
joined_df = joined_df.coalesce(1)
output_path = f"s3a://{bucket}/demo/test_output"
joined_df.write.mode("overwrite").option("header", "true").csv(output_path)
- These lines write the joined and filtered data back into S3 as a new CSV file.
-
.coalesce(1)
reduces the number of output files to a single partition, creating one CSV file instead of multiple chunks. -
write.mode("overwrite")
ensures any previous file at the target location is replaced. -
.option("header", "true")
Includes the column names in the output. -
.csv(output_path)
writes the file to the specified S3 path.
Here is the entire code in a single block:
#fetching bucket name from the associated connection
bucket = str(getConnectionParam("@InputConnection1","bucketName"))
#reading browser.csv file and filtering the data
browser_csv = spark.read.csv(f"s3a://{bucket}/demo/browser.csv", header=True)
browser_csv = browser_csv.filter(browser_csv['browser'] == 'safari')
browser_csv.show(10,False)
#reading country.csv file and filtering the data
country_csv = spark.read.csv(f"s3a://{bucket}/qa/adhoc/jatin/country.csv", header=True)
country_csv = country_csv.filter(country_csv['country'] == 'usa')
country_csv.show(10,False)
#left join on the data
joined_df = browser_csv.join(country_csv, on='visitor_id', how='left')
joined_df.show(1)
#writing to event store
writeToEventStore(joined_df, "@OutputTable1")
#Writing to csv file in S3 location
joined_df = joined_df.coalesce(1)
output_path = f"s3a://{bucket}/demo/test_output"
joined_df.write.mode("overwrite").option("header","true").csv(output_path)
If the S3 connection you intend to use is not associated with the same AWS instance where the application is hosted, you can configure the connection using basic AWS credentials (Access Key and Secret Key) instead. For details on how to read a CSV file from an S3 bucket using basic credentials, refer to the article: Reading CSV File From S3 Bucket (Instance Profile Disabled)