This article demonstrates how to use PySpark in a Jupyter Notebook to read data from Amazon S3, perform transformations (filtering and joining), and write the results back to S3. This workflow is commonly used for quick data exploration, validation, or prototyping before integrating the logic into a production pipeline.
Before we walk through the code step by step, let’s first understand the prerequisites and the objective of what this code is designed to accomplish.
Prerequisites
-
Sample CSV Files with a Common Column
- To replicate this example, you’ll need two CSV files stored in an S3 bucket, each containing at least one common column that can be used to join the data.
- In our code example , we’ve used the following files:
- Browser.csv — includes sample fields such as visitor_id, browser, and browser_type
- Country.csv — includes sample fields such as visitor_id, country, and city
Both files contain the visitor_id column, which we use as the key for performing the join operation.
-
S3 Connection with Basic Credentials
- The S3 credentials (Access Key, Secret Key etc. )
What does the code do?
This code demonstrates a typical data pipeline pattern where we:
- Ingest data by reading two CSV files from an S3 bucket into Spark DataFrames.
- Transform the data by:
- Applying filters to include only relevant records (e.g., specific browsers or countries).
- Joining datasets on a shared key (e.g., visitor_id) to enrich the data with related attributes.
- Output the final results to:
- A CSV file in S3 for further analysis, export, or archival purposes.
Let's understand the code step by step:
Setting up Spark with S3 configurations
from pyspark.sql import SparkSession
# Replace these with your AWS credentials and S3 bucket name
aws_access_key_id = "YOUR_AWS_ACCESS_KEY"
aws_secret_access_key = "YOUR_AWS_SECRET_KEY"
bucket = "your-bucket-name"
# Create Spark session with S3 support
spark = SparkSession.builder \
.appName("S3 Join Example") \
.config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \
.config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \
.config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
This block initializes a Spark session with support for reading from and writing to Amazon S3. By configuring the session with your AWS credentials and specifying the s3a implementation, Spark is enabled to communicate directly with the S3 bucket. The session will use the s3a:// URI scheme, which is optimized for Hadoop-compatible S3 access.
Reading CSV files from S3
browser_csv = spark.read.csv(f"s3a://{bucket}/syn-demo/Browser.csv", header=True)
country_csv = spark.read.csv(f"s3a://{bucket}/syn-demo/country.csv", header=True)
This code reads two CSV files from the specified S3 bucket and loads them into Spark DataFrames. The header=True
option tells Spark to treat the first row of each CSV as column headers. At this point, browser_csv
and country_csv
contain structured data that can be transformed using Spark APIs.
Filtering the Data
browser_csv = browser_csv.filter(browser_csv['browser'] == 'safari')
country_csv = country_csv.filter(country_csv['country'] == 'usa')
Here, the data is filtered to focus on specific subsets. The browser_csv
DataFrame is filtered to include only rows where the browser type is "safari", while the country_csv
DataFrame is filtered for records where the country is "usa". This reduces the dataset to only the relevant entries needed for further analysis.
Joining the DataFrames
joined_df = browser_csv.join(country_csv, on='visitor_id', how='left')
This line performs a left join on the two DataFrames using the visitor_id
column as the key. The result, stored in joined_df
, contains all records from the filtered browser_csv
along with matching records from country_csv
where the visitor IDs align. This effectively combines the browser and country data.
Writing the Output to File in S3
output_path = f"s3a://{bucket}/syn-demo/test_joined_output"
joined_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)
print("\u2705 Done! Joined data written to S3.")
Finally, the joined DataFrame is written back to the S3 bucket as a CSV file. The coalesce(1)
method ensures that the output is written as a single file, which is helpful for small to medium-sized datasets. The overwrite
mode ensures any existing data at that path is replaced, and the header
option includes column names in the output file. A confirmation message is printed upon successful completion.
Here is the complete code that read the two csv files, transform the data and add final results into another file:
from pyspark.sql import SparkSession
# Replace these with your AWS credentials and S3 bucket name
aws_access_key_id = "YOUR_AWS_ACCESS_KEY"
aws_secret_access_key = "YOUR_AWS_SECRET_KEY"
bucket = "your-bucket-name"
# Create Spark session with S3 support
spark = SparkSession.builder \
.appName("S3 Join Example") \
.config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \
.config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \
.config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
# Read CSVs from S3
browser_csv = spark.read.csv(f"s3a://{bucket}/syn-demo/Browser.csv", header=True)
country_csv = spark.read.csv(f"s3a://{bucket}/syn-demo/country.csv", header=True)
# Filter DataFrames
browser_csv = browser_csv.filter(browser_csv['browser'] == 'safari')
country_csv = country_csv.filter(country_csv['country'] == 'usa')
# Show filtered data
print("Filtered Browser Data:")
browser_csv.show(10, False)
print("Filtered Country Data:")
country_csv.show(10, False)
# Join the data on visitor_id
joined_df = browser_csv.join(country_csv, on='visitor_id', how='left')
# Show sample of the joined data
print("Joined Data:")
joined_df.show(10, False)
# Save the joined output back to S3 (coalesce to 1 file if needed)
output_path = f"s3a://{bucket}/syn-demo/test_joined_output"
joined_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)
print("✅ Done! Joined data written to S3.")
The best way to understand and learn how to perform this function is through hands-on experience. Follow the steps below to create the sample notebook in your Syntasa environment:
- Download the sample notebook .ipynb file from this article.
- Create a new notebook in your Syntasa environment using the import notebook option.