When working with data pipelines in Syntasa, the Spark Processor is one of the most important components for data transformation and output generation. In Syntasa, a Spark Processor acts as the engine that connects to your data source (Event Store, GCS, S3, or Azure storage), applies transformations, and writes the results to the desired output. When dealing with data pipelines, one important concept is whether your data is partitioned or non-partitioned.
Before that, let's understand how the spark processor data pipeline looks like.
Spark Processor Data Pipeline
The typical data pipeline for Spark Processor in Syntasa app looks like this:
Input Connection → Spark Processor → Output
- Input Connection: Can be an Event Store (Hive table) or external storage like GCS/S3/Azure containing CSV/Parquet/AVRO files.
- Spark Processor: Runs your transformation logic using Spark DataFrame APIs.
- Output: Data is written back to an Event Store, with or without partitions.
Let’s explore how input and output can both be partitioned or non-partitioned, and what combinations are possible in Spark Processor:
Input Data for Spark Processor
When using a Spark Processor in Syntasa, the input data can come from different sources such as an Event Store or an external storage system (GCS, S3, Azure). Depending on how the data is stored, it may be partitioned or non-partitioned.
Partitioned Input (Event Store)
Event Store(hive tables) in Syntasa may contain either partitioned or non-partitioned datasets. If the dataset contains partitioned data, this means the dataset is physically divided into folders based on a partition column, typically a date field (e.g., 2025-01-01, 2025-01-02).
Non-Partitioned Input (Event Store)
In some cases, the Event Store table is not partitioned, and all records are stored as a single dataset. When working with non-partitioned input, but your output requires partitioning, you must specify which column from the input (for example, file_date or timestamp) should be used for partitioning during the write operation.
File from External Sources (CSV, Avro, etc.)
When Spark Processor is connected to external storage such as GCS, S3, or Azure Blob, the input may come in the form of files like CSV or Avro. These files are generally non-partitioned, but if they include a date-time column (e.g., event_date or order_date), you can instruct Spark to create partitioned output based on that column.
Step by Step Guide to Generate Partitioned and Non-Partitioned Output Data via Spark Processor Using Python
Syntasa provides the writeToEventStore() utility method, which is used to write a DataFrame into an Event Store. The parameters you pass to this method determine whether the output is generated as partitioned or non-partitioned data. We will discuss more on this below.
In this section, we will walk through how to read data from an Event Store, apply transformations, and write it back to an Event Store in either partitioned or non-partitioned format.
Reading and Transforming the data
When a Spark Processor is connected to an Event Store, you can query the dataset directly using SQL. The result is returned as a DataFrame, which you can then transform using PySpark operations. Just click on Spark processor and add the code as shown below:
# Read from Event Store using SQL
df = spark.sql("SELECT * FROM @InputTable1")
# Transformation: filter records where city = 'New York'
df_filtered = df.filter(col("city") == "New York")In the above code:
-
@InputTable1is a parameter provided by Syntasa that automatically maps to your input dataset. - Instead of writing the actual table name (e.g., customers), you can simply use the parameter. Syntasa translates the query internally, so the statement
SELECT * FROM @InputTable1is executed asSELECT * FROM customers. - You can find and manage such parameters in the Parameters section of the processor. (For more details, refer to the article: Parameters in Syntasa Code Processes.)
- The query result is stored in the DataFrame
df. From there, you can apply any Spark transformation. In this example, we filtered records where city = 'New York' and stored the result in a new DataFrame df_filtered.
Now that we have the transformed data, the next step is to write it back to an Event Store(output dataset). This is where the writeToEventStore() method comes in. The parameters you provide to this method determine whether the output will be saved as partitioned or non-partitioned data.
Generating Non-Partitioned Output
If you want your output to be non-partitioned, it does not matter whether the input dataset is partitioned or not. In such cases, you can simply use the writeToEventStore() method with only two parameters, and it will generate a non-partitioned output, as shown below:
# Read from Event Store using SQL
df = spark.sql("SELECT * FROM @InputTable1")
# Transformation: filter records where city = 'New York'
df_filtered = df.filter(col("city") == "New York")
# Write as non-partitioned output
writeToEventStore(df_filtered, @outputTable1)Explanation of parameters used:
-
df_filtered- The transformed DataFrame that you want to write to the output. -
@outputTable1- A Syntasa-provided parameter that maps to the connected output dataset. You don’t need to hardcode the table name; Syntasa automatically resolves it. - The above two parameters are mandatory for this method and will always be passed in the method irrespective of the usage or scenario.
- If there are multiple output dataset, you can find the exact parameter names of each output dataset under the Parameters section of the Spark processor.
- For more details, refer to the article: Parameters in Code Process.
By passing only these two parameters, all the data from the filtered DataFrame will be written into the connected output dataset in a non-partitioned format (i.e., combined as one table, without splitting by date or any column).
Generating Partitioned Output
If you want your output to be partitioned, it does not matter whether the input dataset is partitioned or not. The writeToEventStore() method requires four parameters for partitioned output. The definition of method is:
writeToEventStore(dataFrame: DataFrame, @outputDateSet1: String, filesPerPartition:Int, partitionedDateColumn:String)The first two parameters are the same as those used for non-partitioned output:
- The DataFrame containing the transformed data.
- The output dataset parameter (@outputDataset1) representing the connected Event Store.
The additional two parameters are:
- Output files per partition (n) – specifies how many output files(parquet, avro etc) should be generated within each partition.
- Partition column name – the column in the input dataset to be used to create partitions in the output.
To find name of the partition column:
- If the input dataset is already partitioned, you can check the partition column name and pass it as the parameter.
- If the input dataset is non-partitioned, you can select a date or timestamp column from the dataset to use for partitioning.
Here is the sample code below:
# Read non-partitioned data from Event Store
df = spark.sql("SELECT * FROM @InputTable1")
# Transformation: filter records where city = 'New York'
df_filtered = df.filter(col("city") == "New York")
# Write partitioned output by column 'file_date', with 3 files per partition
writeToEventStore(df_filtered, "@OutputTable1", 2, "order_date")Explanation of parameters used:
-
df_filtered- The DataFrame containing transformed data. -
@OutputTable1- The connected output dataset parameter in Syntasa. -
2- Number of output files(parquet, avro etc) to create per partition. You can adjust this based on dataset size and downstream requirements. -
order_date- Column in the DataFrame to use for partitioning the output. Each unique value will create a separate partition folder.
Key Point:
Even if the input data is non-partitioned, specifying a partition column here ensures that the output is organized into partitions, improving query efficiency and downstream processing.
How to Process Partitioned Data?
Suppose the column order_date contains values ranging from 1st January to 5th January 2025, and you execute the Spark job with a date filter for 1st January to 3rd January 2025. In this case, only the rows in the DataFrame(which is passed in the method as parameter) that satisfy this condition—i.e., where order_date falls between 1st and 3rd January—will be processed and written to the output.
This approach demonstrates how you can leverage a date column for partitioning and filtering efficiently.
For datasets that are incrementally updated on a daily basis, you can schedule the job to process only the previous day's data. The system will automatically select and write only those rows from the DataFrame where order_date corresponds to the last day, reducing unnecessary data processing and improving performance.
Notes for Hourly Partitions
When you want the output to be partitioned on hourly basis instead of daily partition, you need to perform only following two steps:
- In the Output tab of the Spark Processor), you need to choose Hourly as the partition type to ensure the output is shown correctly on the UI.
- When partitioning data on an hourly basis, you must select a column that contains time values.
-
Please note that Syntasa accepts the time format
yyyy-MM-dd-HH. If your input source has a different time format, you can convert it to a Syntasa-compatible format using the following code before writing to event store:df = df.withColumn( "modified_time_syntasa", date_format(to_timestamp("modified_time", "yyyy-MM-dd-HH:mm:ss"), "yyyy-MM-dd-HH") )Explanation:
-
modified_time– Column in the input source containing the original time. -
yyyy-MM-dd-HH:mm:ss– Current time format of themodified_timecolumn. -
yyyy-MM-dd-HH– Target time format accepted by Syntasa (or any format you want to convert to). -
modified_time_syntasa– New column that stores the time in Syntasa-compatible format. This column can be used as input to the writeToEventStore() method.
You can also overwrite the original column by using the same column name (modified_time) instead of creating a new one(modified_time_syntasa). This will update the time format in-place, and you can use the same column in subsequent code.
-