This guide explains how to read data using Python Spark (PySpark) SQL and write Hive table.
Prerequisite
When reading data from Snowflake, it's essential to consider both the necessary Python code and the required runtime configuration. The code used in this article is Python.
While it is out of the scope of this article on how to create a runtime, please see additional parameters that are required in your runtime; otherwise, your job may fail when executed.
Runtime Spark Configuration
Note: Be aware that your version of Spark might need different settings than those listed below.
spark.jars.packages:
net.snowflake:snowflake-jdbc:3.14.4,net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3
Importing Packages
from zoneinfo import ZoneInfo
from datetime import datetime
from pyspark.sql.functions import from_utc_timestamp, current_timestamp, date_format, col, regexp_replace, to_timestamp,to_date
Set Time Zone Variable
-
Purpose: Define which environment to target and set the reporting time zone.
- Usage: Syntasa will dynamically pass the actual environment value
report_suite_tz = ZoneInfo("America/New_York")
Configure Snowflake Connection Options
-
Purpose: Store all connection details needed to communicate with Snowflake.
account = getConnectionParam('@InputConnection1', 'host')
username = getConnectionParam('@InputConnection1', 'username')
password = getConnectionParam('@InputConnection1', 'password')
database = getConnectionParam('@InputConnection1', 'database')
warehouse = getConnectionParam('@InputConnection1', 'virtualwarehouse')
snowflake_options = {
'sfURL': account,
'sfRole': 'example_app_role',
'sfUser': username,
'sfPassword': password,
'sfDatabase': database,
'sfSchema': 'exmaple_schema',
'sfWarehouse': warehouse,
}
Extract Database and Schema Variables
-
Purpose: Pull out
sfDatabase
andsfSchema
for later use when building the table identifier.
database = snowflake_options.get('sfDatabase')
schema = snowflake_options.get('sfSchema')
Load Data from Snowflake into a Spark DataFrame
-
Purpose: Use the Spark Snowflake Connector to read a table called
sf_example_hourly
from Snowflake. -
Key Points:
-
Constructs the full table name using the
database
,schema
, and table name.
-
df_snowflake = spark.read \
.format('net.snowflake.spark.snowflake') \
.options(**snowflake_options) \
.option('dbtable', f'{database}.{schema}.sample_orders') \
.load()
Standardize Column Names
-
Purpose: Convert all column names in the DataFrame to lowercase for consistency.
df_snowflake = df_snowflake.select([col(c).alias(c.lower()) for c in df_sf_hourly.columns])
Convert thepartition_date_hour
Column to Timestamp (Optional)
-
Purpose: Clean and convert date-time strings into a proper timestamp format.
-
Steps:
-
Replace the character
'T'
in the date string with a space. -
Convert the resulting string to a timestamp with the format
'yyyy-MM-dd HH:mm'
.
-
df_snowflake = df_snowflake.withColumn(
'partition_date_hour',
to_timestamp(regexp_replace(col('partition_date_hour'), 'T', ' '), 'yyyy-MM-dd HH:mm')
)
Add a Fetch Timestamp Column (Optional)
-
Purpose: Record the current timestamp, converted to the specified time zone (America/New_York), to indicate when the data was fetched.
from zoneinfo import ZoneInfo
from datetime import datetime
from pyspark.sql.functions import from_utc_timestamp, current_timestamp, date_format, col, regexp_replace, to_timestamp,to_date
report_suite_tz = ZoneInfo("America/New_York")
account = getConnectionParam('@InputConnection1', 'host')
username = getConnectionParam('@InputConnection1', 'username')
password = getConnectionParam('@InputConnection1', 'password')
database = getConnectionParam('@InputConnection1', 'database')
warehouse = getConnectionParam('@InputConnection1', 'virtualwarehouse')
snowflake_options = {
'sfURL': account,
'sfRole': 'syntasa_app_role',
'sfUser': username,
'sfPassword': password,
'sfDatabase': database,
'sfSchema': 'public',
'sfWarehouse': warehouse,
}
database = snowflake_options.get('sfDatabase')
schema = snowflake_options.get('sfSchema')
df_snowflake = spark.read \
.format('net.snowflake.spark.snowflake') \
.options(**snowflake_options) \
.option('dbtable', f'{database}.{schema}.sample_orders') \
.load()
df_snowflake = df_snowflake.select([col(c).alias(c.lower()) for c in df_snowflake.columns])
#optional for formating timestamp column
df_sf_hourly = df_sf_hourly.withColumn('partition_date_hour', to_timestamp(regexp_replace(col('partition_date_hour'), 'T', ' '), 'yyyy-MM-dd HH:mm'))
#Optional for capturing when data was recieved from snowflake
df_snowflake = df_snowflake.withColumn("sf_fetch_time", from_utc_timestamp(current_timestamp(), report_suite_tz.key))
writeToEventStore(df_snowflake, '@OutputTable1')