This guide explains how to read data using Python Spark (PySpark) SQL and write Hive table.
Prerequisite
When reading data from Snowflake, it's essential to consider both the necessary Python code and the required runtime configuration. The code used in this article is Python.
While it is out of the scope of this article on how to create a runtime, please see additional parameters that are required in your runtime; otherwise, your job may fail when executed.
All credentials are to be stored securely using the syntasa Connection. You will need to first create a connection in Syntasa. We will then use the getConnection utility to pull the credentials.
-
Snowflake Account
-
You need a valid Snowflake account, warehouse, database, schema, user, and role with SELECT privileges.
-
Runtime Spark Configuration
Below is the configuration that should be added to your runtime and a screenshot of the Spark configuration in a runtime:
Note: Be aware that your version of Spark might need different settings than those listed below.
spark.jars.packages:
net.snowflake:snowflake-jdbc:3.14.4,net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3
Importing Packages
from pyspark.sql.functions import col
Configure Snowflake Connection Options
-
Purpose: Store all connection details needed to communicate with Snowflake.
account = getConnectionParam('@InputConnection1', 'host')
username = getConnectionParam('@InputConnection1', 'username')
password = getConnectionParam('@InputConnection1', 'password')
database = getConnectionParam('@InputConnection1', 'database')
warehouse = getConnectionParam('@InputConnection1', 'virtualwarehouse')
snowflake_options = {
'sfURL': account,
'sfRole': 'example_app_role',
'sfUser': username,
'sfPassword': password,
'sfDatabase': database,
'sfSchema': 'exmaple_schema',
'sfWarehouse': warehouse,
}
Extract Database and Schema Variables
-
Purpose: Pull out
sfDatabaseandsfSchemafor later use when building the table identifier.
database = snowflake_options.get('sfDatabase')
schema = snowflake_options.get('sfSchema')
Load Data from Snowflake into a Spark DataFrame
-
Purpose: Use the Spark Snowflake Connector to read a table called
sf_example_hourlyfrom Snowflake. -
Key Points:
-
Constructs the full table name using the
database,schema, and table name.
-
df_snowflake = spark.read \
.format('net.snowflake.spark.snowflake') \
.options(**snowflake_options) \
.option('dbtable', f'{database}.{schema}.sf_example_hourly') \
.load()
Standardize Column Names
-
Purpose: Convert all column names in the DataFrame to lowercase for consistency.
df_snowflake = df_snowflake.select([col(c).alias(c.lower()) for c in df_snowflake.columns])
from pyspark.sql.functions import col
account = getConnectionParam('@InputConnection1', 'host')
username = getConnectionParam('@InputConnection1', 'username')
password = getConnectionParam('@InputConnection1', 'password')
database = getConnectionParam('@InputConnection1', 'database')
warehouse = getConnectionParam('@InputConnection1', 'virtualwarehouse')
snowflake_options = {
'sfURL': account,
'sfRole': 'syntasa_app_role',
'sfUser': username,
'sfPassword': password,
'sfDatabase': database,
'sfSchema': 'public',
'sfWarehouse': warehouse,
}
database = snowflake_options.get('sfDatabase')
schema = snowflake_options.get('sfSchema')
df_snowflake = spark.read \
.format('net.snowflake.spark.snowflake') \
.options(**snowflake_options) \
.option('dbtable', f'{database}.{schema}.example_table') \
.load()
df_snowflake = df_snowflake.select([col(c).alias(c.lower()) for c in df_snowflake.columns])
writeToEventStore(df_snowflake, '@OutputTable1')