This guide explains how to extract data using Python Spark (PySpark) SQL, transform the data, and write it to a Snowflake table.
Prerequisite
When writing data to Snowflake, it's essential to consider both the necessary Python code and the required runtime configuration. The code used in this article is Python.
While it is out of the scope of this article on how to create a runtime, please see additional parameters that are required in your runtime; otherwise, your job may fail when executed.
-
Snowflake Account
Note: Syntasa does not manage the Snowflake database/table or its schema, therefore, we expect that the relevant warehouse/database and table schema already exist and before any data is sent from Syntasa to Snowflake
-
You need a valid Snowflake account, warehouse, database, schema, user, and role with SELECT privileges.
- Connection - For Snowflake should be defined within Syntasa, thus allowing us to use the connection in the code.
-
Runtime Spark Configuration
Below is the configuration that should be added to your runtime and a screenshot of the Spark configuration in a runtime:
Note: Be aware that your version of Spark might need different settings than those listed below.
spark.jars.packages:
net.snowflake:snowflake-jdbc:3.14.4,net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3
Data Extraction with Spark SQL
What It Does:
Executes a SQL query on your Spark session, selecting all columns from the table represented by @InputTable1
filtered by a date range between @fromDate
and @toDate,
which are syntasa Parameters that dynamically pass the table and date range being queried into the code.
Here, @InputTable1
serves as an input source, which can either be the output table from a preceding process or an output table from an existing app.
df_daily = spark.sql("""select * from @InputTable1 where event_partition between @fromDate and @toDate """)
Data Transformation: Converting Date String to Date Format
What It Does:
Uses Spark’s to_date()
function to convert the event_partition
column from a string to a date type using the format 'yyyy-MM-dd'
.
This conversion ensures that the date values are in the correct data type and format, which is essential for downstream filtering, querying, or partitioning in Snowflake.
from pyspark.sql.functions import to_date
df_daily = df_daily.withColumn("event_partition", to_date(df_daily["event_partition"], 'yyyy-MM-dd'))
Snowflake Write Mode & Connection Credential Options
Write Mode:
-
'append': Data is added to the existing table.
-
'overwrite': Existing data in the table is replaced with new data
Retrieving Credentials:
The function getConnectionParam()
It is a syntasa utility, and is used to securely obtain the username and password from your connection settings.
Connection Details:
Note: Ensure these settings match your Snowflake instance details and the environment you're writing to.
-
sfURL
: The endpoint of your Snowflake instance. -
sfRole
: The role assigned for the connection (ensuring proper permissions). -
sfUser
&sfPassword
: Credential variables are defined below. -
sfDatabase
&sfSchema
: Designate the target database and schema in Snowflake. -
sfWarehouse
: The compute warehouse that Snowflake will use for this operation
snowflake_write_mode = 'append' # Can either be 'overwrite' OR 'append'
account = getConnectionParam('@InputConnection1', 'host')
username = getConnectionParam('@InputConnection1', 'username')
password = getConnectionParam('@InputConnection1', 'password')
database = getConnectionParam('@InputConnection1', 'database')
warehouse = getConnectionParam('@InputConnection1', 'virtualwarehouse')
snowflake_options = {
'sfURL': account,
'sfRole': 'example_app_role',
'sfUser': username,
'sfPassword': password,
'sfDatabase': database,
'sfSchema': 'example_schema',
'sfWarehouse': warehouse,
}
Write the DataFrame to Snowflake
-
Spark Write Method:
-
.format('net.snowflake.spark.snowflake')
: Tells Spark to use the Snowflake connector. -
.options(**snowflake_options)
: Applies the connection configuration defined earlier. -
.option('dbtable', '@OutputTable1'.split('.')[1])
:-
Retrieves the table name portion from
@OutputTable1
. If@OutputTable1
is in a format likeSCHEMA.TABLE
, splitting on the dot (.
) extracts the table name for writing.
-
-
.mode(snowflake_write_mode)
: Uses the specified write mode (append
oroverwrite
). -
.save()
: Initiates the write operation, sending the DataFrame's data to the specified Snowflake table.
-
-
How to Use:
Update@OutputTable1
With your target table name reference. Ensure that your Snowflake table structure matches the DataFrame’s schema or that you have configured the table accordingly.
df_daily.write \
.format('net.snowflake.spark.snowflake') \
.options(**snowflake_options) \
.option('dbtable', '@OutputTable1'.split('.')[1]) \
.mode(snowflake_write_mode) \
.save()
Full code for writing data to Snowflake
from pyspark.sql.functions import to_date
df_daily = spark.sql("""select * from @InputTable1 where event_partition between @fromDate and @toDate """)
df_daily = df_daily.withColumn("event_partition", to_date(df_daily["event_partition"], 'yyyy-MM-dd'))
snowflake_write_mode = 'append' # Can either be 'overwrite' OR 'append'
account = getConnectionParam('@InputConnection1', 'host')
username = getConnectionParam('@InputConnection1', 'username')
password = getConnectionParam('@InputConnection1', 'password')
database = getConnectionParam('@InputConnection1', 'database')
warehouse = getConnectionParam('@InputConnection1', 'virtualwarehouse')
snowflake_options = {
'sfURL': account,
'sfRole': 'example_app_role',
'sfUser': username,
'sfPassword': password,
'sfDatabase': database,
'sfSchema': 'example_schema',
'sfWarehouse': warehouse,
}
# Write the tables to Snowflake
df_daily.write \
.format('net.snowflake.spark.snowflake') \
.options(**snowflake_options) \
.option('dbtable', '@OutputTable1'.split('.')[1]) \
.mode(snowflake_write_mode) \
.save()