When developing data pipelines with Spark processor using the Scala language, Scala serves as a native language for Spark, offering the most direct and optimized way to work with large-scale distributed data. The Spark Processor connects to various input sources—such as Event Stores, GCS, S3, or Azure storage—reads the data as Spark DataFrames, applies transformations using native Scala Spark APIs, and writes the processed output back to an Event Store.
Before that, let's understand how the spark processor data pipeline looks like.
Spark Processor Data Pipeline
The typical data pipeline for Spark Processor in Syntasa app looks like this:
Input Connection → Spark Processor → Output
- Input Connection: Can be an Event Store (Hive table) or external storage like GCS/S3/Azure containing CSV/Parquet/AVRO files.
- Spark Processor: Runs your transformation logic using Spark DataFrame APIs.
- Output: Data is written back to an Event Store, with or without partitions.
Let’s explore how input and output can both be partitioned or non-partitioned, and what combinations are possible in Spark Processor:
Input Data for Spark Processor
When using a Spark Processor in Syntasa, the input data can come from different sources such as an Event Store or an external storage system (GCS, S3, Azure). Depending on how the data is stored, it may be partitioned or non-partitioned.
Partitioned Input (Event Store)
Event Store(hive tables) in Syntasa may contain either partitioned or non-partitioned datasets. If the dataset contains partitioned data, this means the dataset is physically divided into folders based on a partition column, typically a date field (e.g., 2025-01-01, 2025-01-02).
Non-Partitioned Input (Event Store)
In some cases, the Event Store table is not partitioned, and all records are stored as a single dataset. When working with non-partitioned input, but your output requires partitioning, you must specify which column from the input (for example, file_date or timestamp) should be used for partitioning during the write operation.
File from External Sources (CSV, Avro, etc.)
When Spark Processor is connected to external storage such as GCS, S3, or Azure Blob, the input may come in the form of files like CSV or Avro. These files are generally non-partitioned, but if they include a date-time column (e.g., event_date or order_date), you can instruct Spark to create partitioned output based on that column.
Step by Step Guide to Generate Partitioned and Non-Partitioned Output Data via Spark Processor using Scala
Syntasa provides the writeToEventStore() utility method, which is used to write a DataFrame into an Event Store. The parameters you pass to this method determine whether the output is generated as partitioned or non-partitioned data. We will discuss more on this below.
In this section, we will walk through how to read data from an Event Store, apply transformations, and write it back to an Event Store in either partitioned or non-partitioned format.
Reading and transforming the data
When using the Spark Processor (with Scala) connected to an Event Store, you can query the dataset directly using Spark SQL. However, the transformations are performed using native Scala Spark APIs, which are strongly typed and compile-time checked, making them efficient and less error-prone.
Here’s how you can read from an Event Store, apply a filter transformation:
// Read from Event Store using SQL
val df = spark.sql("SELECT * FROM @InputTable1")
// Transformation: Filter records where country = 'USA'
import org.apache.spark.sql.functions._
val df_filtered = df.filter(col("country") === "USA")
Explanation of parameters used:
-
@InputTable1is a parameter provided by Syntasa that automatically maps to your input dataset. - Instead of writing the actual table name (e.g., customers), you can simply use the parameter. Syntasa translates the query internally, so the statement
SELECT * FROM @InputTable1is executed asSELECT * FROM customers. - You can find and manage such parameters in the Parameters section of the processor. (For more details, refer to the article: Parameters in Syntasa Code Processes.)
import org.apache.spark.sql.functions._:This line imports common transformation functions such as col(), lit(), concat(), upper(), etc. These functions are used to manipulate columns in a DataFrame.- The query result is stored in the DataFrame
df_filtered. From there, you can apply any Spark transformation. In this example, we filtered records where country= 'USA' and stored the result in a new DataFramedf_filtered.
Now that we have the transformed data, the next step is to write it back to an Event Store(output dataset). This is where the writeToEventStore() method comes in. The parameters you provide to this method determine whether the output will be saved as partitioned or non-partitioned data.
We have already discussed all scenarios for generating partitioned and non-partitioned output in the article Handling Partitioned and Non-Partitioned Data in Spark Processor using Python. The same logic applies to Scala as both use the same Syntasa utility functions. The only key difference lies in syntax — when writing Scala code, parameters like @outputTable1 must be enclosed in double quotes (" "), whereas Python uses single quotes (' ').