Syntasa enables you to write, test, and run data transformation logic using custom code processors such as Spark, Code container Process, or BQ process.
This guide walks you through the end-to-end process of moving from notebook-tested code to production-ready execution in Syntasa.
Jupyter Notebook for Testing the Code
Before building your data pipeline in Syntasa, you can use Jupyter Notebook to prototype and test your transformation logic. Once validated, this code can be easily moved to a Spark or Code Container processor in your Syntasa app. Syntasa provides utility functions and parameter handling that simplify this transition. To know more about Syntasa Notebook, how to use it, and sample use cases, please refer to the article Getting started with the Notebook.
Below is the sample code that we use in the notebook to read csv file from an S3 bucket. In the next step, we will show how to convert the same code for the code process.
from pyspark.sql import SparkSession
# Replace these with your AWS credentials and S3 bucket name
aws_access_key_id = "YOUR_AWS_ACCESS_KEY"
aws_secret_access_key = "YOUR_AWS_SECRET_KEY"
bucket = "your-bucket-name"
# Create Spark session with S3 support
spark = SparkSession.builder \
.appName("S3 Join Example") \
.config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \
.config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \
.config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
# Read CSVs from S3
browser_csv = spark.read.csv(f"s3a://{bucket}/syn-demo/Browser.csv", header=True)
country_csv = spark.read.csv(f"s3a://{bucket}/syn-demo/country.csv", header=True)
# Filter DataFrames
browser_csv = browser_csv.filter(browser_csv['browser'] == 'safari')
country_csv = country_csv.filter(country_csv['country'] == 'usa')
# Show filtered data
print("Filtered Browser Data:")
browser_csv.show(10, False)
print("Filtered Country Data:")
country_csv.show(10, False)
# Join the data on visitor_id
joined_df = browser_csv.join(country_csv, on='visitor_id', how='left')
# Show sample of the joined data
print("Joined Data:")
joined_df.show(10, False)
# Save the joined output back to S3 (coalesce to 1 file if needed)
output_path = f"s3a://{bucket}/syn-demo/test_joined_output"
joined_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)
print("✅ Done! Joined data written to S3.")
Step-by-Step Guide To Convert Notebook Code to Code Process
Let’s explore how to convert this notebook code into a Syntasa code process by leveraging the Library section, built-in Syntasa functions, and parameter support.
Prerequisites
Before configuring the code processor, make sure you have the following components ready:
-
Connection
A connection is required to access the data source that your code will read from. Syntasa supports various input sources such as S3, GCS, BigQuery, and others. In this example, we will use an S3 connection with Basic credentials. To learn more about connections and to learn how to create a connection, please refer to the article Getting Started with Connections -
Event Store
An event store is required to store the output of your transformation logic. After processing, the resulting dataset will be written to this destination. To understand event stores in more detail, see the article Event Store Overview.
Step 1: Create an App
Create an app using the event store configured earlier. Once the app is ready, you can begin building a data pipeline that reads data, performs transformations, and writes the output to the event store. Refer to the article Creating a new App for more information.
Step 2: Configure the Connection
The S3 connection created earlier must be added to the workflow to enable data access. Follow the steps below to configure the connection:
- Open the app.
- Navigate to the Development Workflow.
- Unlock the workflow.
- Drag and drop the S3 Connection from the list onto the workflow canvas.
- Click the connection block and select the connection name (S3 connection that was created).
- Click the tick icon to confirm the configuration.
- Click Save and Lock to save the workflow.
Step 3: Add and Configure a Code Processor
After adding the connection to the workflow, you can drag and drop a code processor to write your transformation logic. In this example, we will use a Spark Processor. Follow these steps to add and configure it:
- Unlock the Development Workflow.
- Drag and drop the Spark Processor onto the development workflow canvas.
- Connect the S3 Connection to the Spark Processor. (Note: You cannot add code until the processor is connected to a data source.)
- Click the Spark Processor to open the code editor and enter your transformation code that was used in the notebook for testing.
-
Update the code by incorporating Syntasa utilities, as described in the next steps.
- Click the tick icon to confirm the processor configuration.
- Click Save and Lock to save the workflow.
Note: Every Spark processor includes an output node, which represents a dataset (such as an event store) where the processed results will be saved.
Step 4: Mapping notebook structure to code process format
Syntasa offers a variety of built-in features that streamline the transition from a Jupyter notebook to a code processor. These utilities enhance maintainability, support dynamic configuration, and reduce boilerplate. Below are key features that help map notebook-based code into a production-ready format within a Syntasa code processor:
-
Using Built-in Parameters
- In Syntasa code processors, parameters allow you to dynamically reference configured connections, datasets, and runtime values, removing the need for hardcoded values in your code. This makes your pipelines reusable, secure, and easier to maintain.
- For example, instead of hardcoding a bucket name or using a table name, you can use a parameter like @InputConnection1 in your code, which automatically references the selected connection configured in the app.
- To know more about built-in parameters, please refer to the article Parameters in Code Processes.
-
Installing and Importing Libraries
- Starting from version 8.2.0, Syntasa code processors support library installation via the UI interface. Instead of manually installing packages within your script, you can now:
- Go to the Library section of the code processor
- Add the required Python package and version (e.g., google-cloud-storage==2.14.0)
- The system will handle installation automatically during runtime.
- Once installed, you can simply import the library into your code as usual.
- Starting from version 8.2.0, Syntasa code processors support library installation via the UI interface. Instead of manually installing packages within your script, you can now:
-
Leveraging Built-in Utility Functions
-
Syntasa provides several built-in utility functions that are not available in Jupyter notebooks but greatly simplify operational tasks in production pipelines. These functions abstract away environment-specific logic, allowing you to focus on the core transformation tasks while ensuring integration with Syntasa’s runtime infrastructure. Examples:
-
writeToEventStore(dataframe, "@OutputTable1")
Saves a DataFrame directly to a Syntasa-managed event store. -
getConnectionParam("@InputConnection1", "bucketName")
Retrieves values from a configured connection (e.g., bucket name, access key, host). - To know more about the Syntasa utilities, please refer to the section Code Utilities
-
-
Let's understand the above utilities with a code example:
# Get the detail from Connection Param
accessTokenKey = str(getConnectionParam("@InputConnection1","awsAccessKey"))
accessTokenSecretKey = str(getConnectionParam("@InputConnection1","awsSecretKey"))
#writing to Event store
writeToEventStore(df, "@OutputTable1")
-
@InputConnection1
is an input parameter referring to a connection configured in the Syntasa app (e.g., an S3 connection). -
@InputConnection1
is an input parameter referring to a connection configured in the Syntasa app (e.g., an S3 connection). - The
getConnectionParam()
function is used to dynamically retrieve attributes from that connection, such as the awsAccessKey and awsSecretKey. -
@OutputTable1
is an output parameter representing a target dataset (typically an Event Store) defined in the workflow. - The
writeToEventStore()
function writes the Spark DataFramedf
directly to that dataset. This allows you to control where the output goes from the UI, without modifying the code.
This approach ensures your code remains reusable across apps and environments and aligns with best practices for configuration and credential management.
Here’s how the complete code will look within a code process after incorporating Syntasa utilities, such as parameters and built-in functions
# Get the detail from Connection Param
accessTokenKey = str(getConnectionParam("@InputConnection1","awsAccessKey"))
accessTokenSecretKey = str(getConnectionParam("@InputConnection1","awsSecretKey"))
bucket = str(getConnectionParam("@InputConnection1","bucketName"))
# Get the Hadoop configuration from Spark context
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
# Set the credential configuration for external bucket
hadoop_conf.set(f"fs.s3a.bucket.{bucket}.access.key", accessTokenKey)
hadoop_conf.set(f"fs.s3a.bucket.{bucket}.secret.key", accessTokenSecretKey)
#Reading Browser.csv file from S3 bucket
browser_csv = spark.read.csv(f"s3a://{bucket}/syn-demo/Browser.csv", header=True)
#Filter results:
browser_csv = browser_csv.filter(browser_csv['browser'] == 'safari')
# 10 is for number of rows to be shown in the logs and False so that results aren't trimmed.
browser_csv.show(10,False)
#Reading country.csv file from S3 bucket
country_csv = spark.read.csv(f"s3a://{bucket}/syn-demo/country.csv", header=True)
#Filter results:
country_csv = country_csv.filter(country_csv['country'] == 'usa')
# 10 is for number of rows to be shown in the logs and False so that results aren't trimmed.
country_csv.show(10,False)
# Doing a left join on the data
#joined_df = browser_csv.join(country_csv, browser_csv['visitor_id'] == country_csv['visitor_id'], 'left')
joined_df = browser_csv.join(country_csv, on='visitor_id', how='left')
#writing to Event store
writeToEventStore(joined_df, "@OutputTable1")
#writing to csv file in S3 location
joined_df = joined_df.coalesce(1)
output_path = f"s3a://{bucket}/syn-demo/test_joined_output"
joined_df.write.mode("overwrite").option("header","true").csv(output_path)
You can check examples below for the complete code, which reads csv file and writes to the event store:
- Reading CSV file from S3 bucket and writing to Event Store & CSV file (Instance profile enabled)
- Reading CSV file from S3 bucket and writing to Event Store & CSV File (Instance profile disabled)
- Reading CSV file from GCS bucket and writing to Event Store & CSV file
Step 5: Testing the Code
Once your code is ready, you can test it by creating and executing a job within the Syntasa platform. To learn how to do this, refer to the following articles:
For iterative development and testing, Syntasa offers Interactive Mode. This feature allows you to start a cluster that remains active, enabling you to test and debug your code directly within the development workflow. Interactive Mode is particularly useful for rapid development and troubleshooting.
To learn more about enabling and using Interactive Mode, refer to the article: Interactive Mode.
Step 6: Deploying the Workflow to Production
After thoroughly testing your code and validating the output, the next step is to deploy the workflow to production. Deployment finalizes the development workflow and prepares it for scheduled execution.
Once the workflow is deployed, you can create and schedule jobs to run automatically based on your desired frequency and triggers. To learn more about deploying workflows, refer to the article: Deploy.