When working with Spark Processors in Syntasa, you may need to extend your Scala code using external libraries. These libraries can help perform advanced operations such as JSON parsing, complex string transformations, or custom analytics. Unlike Python, Scala libraries cannot be installed directly from the Spark Processor UI. Instead, they are managed through Spark runtime configurations. This ensures that all executors in the Spark cluster have access to the same dependencies at runtime.
How to Install Scala Libraries in Spark Processors
Scala libraries can be installed in two main ways:
- Using Maven, which automatically downloads the library and its dependencies from online repositories.
- Using JAR files uploaded to the
/deps/jarsfolder in your environment, which are then made available to Spark during execution.
Both approaches allow you to integrate external Scala functionality seamlessly into your Spark Processor workflows.
Option 1: Install Using Maven (Recommended if Internet Access is Available)
If your Spark runtime has access to the internet, the easiest way to install a Scala library is by using Maven coordinates. Spark will automatically download the specified library and its transitive dependencies from Maven Central or another configured repository.
Steps:
- Navigate to your Runtime Template configuration in Syntasa.
- Add the following Spark configuration property:
spark.jars.packages=org.json4s:json4s-native_2.12:4.0.6(This tells Spark to download and include the json4s-native library version 4.0.6 for Scala 2.12.) - Save the configuration and restart your runtime.
When the cluster initializes, Spark automatically downloads the JSON4S library and all its required dependencies. Once the job starts, you can directly import and use it in your Spark Processor code.
Option 2: Install Using JAR Files from /deps/jars Folder
In environments without internet access (such as private cloud setups), you can manually upload the required JAR files to the /deps/jars folder within your environment. This directory is checked by Spark at runtime when dependency loading is enabled.
Steps:
- Upload your JAR files (for example, json4s-jackson_2.12-4.0.7.jar) to the following path in your environment:
/syn-cluster-config/deps/jars/ - Go to your runtime configuration and add the following 02 settings:
syntasa.jar.enable.dependencies=true- This tells Spark to load libraries from the /deps/jar folder.syntasa.jar.dependencies.names=json4s-jackson_2.12-4.0.7.jar- This specifies the exact JAR files to include. You can list multiple files separated by commas.
- Save the configuration and restart your runtime.
Note: If first configuration (syntasa.jar.enable.dependencies=true) is only added without adding second configuration (syntasa.jar.dependencies.names =<jar names>, then all JAR files under /deps/jars will be loaded automatically.
Unlike Maven, this method does not automatically download dependencies. You must manually ensure that all required dependency JARs are available in the same folder. If any dependency is missing, Spark compilation or execution may fail. For libraries with multiple dependencies, you can either upload all the required JARs or create a single fat (standalone) JAR using tools like Maven Shade or SBT Assembly.
Example: Using JSON4S Library in Spark Processor
Once your library is installed (via Maven or uploaded JAR), you can use it in your Spark Processor code. The example below demonstrates how to use the JSON4S library to parse JSON data from an input dataset and extract specific fields for transformation.
We have input data which contains a column with json data as shown below:
Here is the complete code:
import org.apache.spark.sql.functions._
import org.json4s._
import org.json4s.jackson.JsonMethods._
// Read data from input Event Store
val df = spark.sql("SELECT * FROM @InputTable1")
println(s"Loaded input DataFrame with ${df.count()} rows")
df.printSchema()
// Define UDFs using JSON4S to extract fields from JSON
val getName = udf((jsonString: String) => {
implicit val formats: DefaultFormats.type = DefaultFormats
try {
val parsed = parse(jsonString)
(parsed \\ "Name").extractOrElse[String]("Unknown")
} catch {
case _: Exception => "Unknown"
}
})
val getCountry = udf((jsonString: String) => {
implicit val formats: DefaultFormats.type = DefaultFormats
try {
val parsed = parse(jsonString)
(parsed \\ "country").extractOrElse[String]("Unknown")
}catch {
case _: Exception => "Unknown"
}
})
val getCity = udf((jsonString: String) => {
implicit val formats: DefaultFormats.type = DefaultFormats
try {
val parsed = parse(jsonString)
(parsed \\ "city").extractOrElse[String]("Unknown")
} catch {
case _: Exception => "Unknown"
}
})
// Apply transformations — add new columns from JSON
val transformedDF = df
.withColumn("name", getName(col("user_details")))
.withColumn("country", getCountry(col("user_details")))
.withColumn("city", getCity(col("user_details")))
// Select both original and extracted columns
val finalDF = transformedDF.select(
col("id"),
col("created_at"),
col("name"),
col("country"),
col("city")
)
// Display transformed results
println("Transformed Data:")
finalDF.show(10, truncate = false)
// Write to output Event Store
writeToEventStore(finalDF, "@OutputTable1")- The processor starts by reading data from an input Event Store using
spark.sql("SELECT * FROM @InputTable1"). The dataset includes a column nameduser_detailscontaining JSON strings. - Using the JSON4S library, the code defines three UDFs:
getName,getCountry, andgetCity, each parsing the JSON string to extract specific fields. - New columns (
name,country,city) are added to the DataFrame using these UDFs. - The resulting DataFrame is then displayed and written back to an Event Store using
writeToEventStore(finalDF, "@OutputTable1").
Here is the output generated on running the above code via Spark Processor using Scala:
This example demonstrates how to combine the power of Spark transformations with external Scala libraries to enrich your datasets and extract structured insights from semi-structured data.