Pre-requisites:
System Parameters
- @databsase - Event store database.
- @fromDate - From date selected (or first date in dates to process).
- @toDate - To date selected (or last date in dates to process).
- @datesToProcess - Comma separated string of dates to process.
- @numPartitions - Total number of days to process.
- @location - Care needs to be taken while configuring this parameter in the code process.
- Only one output table - When the data needs to be written to only one output table, then you can use create table as shown below:
spark.sql(s"""
create table if not exists @database.TABLE_NAME
(
mcvisid string,
amount double
)
partitioned by (event_partition date)
stored as parquet
location @location
""") - More than one output table - There are cases where a user might require to write the data to more than one table as a part of a single code process. In such scenarios, you can use create table statements as shown below:
spark.sql(s"""
create table if not exists @database.event_reports
(
hits double,
amount double
)
partitioned by (event_partition date)
stored as parquet
location @location_event_reports
""")
spark.sql(s"""
create table if not exists @database.product_reports
(
product_code string,
records integer
)
partitioned by (event_partition date)
stored as parquet
location @location_product_reports
""")
- Only one output table - When the data needs to be written to only one output table, then you can use create table as shown below:
- @treatmentLocation - Event store location for Treatment Model.
- @learningLocation - Event store location for Learning Model.
Sample R Code process
This example is meant to show:
- How to generate R dataframe that has data read from Syntasa event store.
- Do something in R.
- Save results back to an event store.
- Additionally, how to use other libraries.
Please refer to the attached PDF for code samples to learn how to use R code using a Spark process.
Sample Python code process
This example is meant to show:
- How to generate Pandas dataframe that has data read from Syntasa event store.
- Do something in Python.
- Save results back to an event store.
- Additionally, how to use other libraries.
Creating a SparkSession in your Python code:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("yarn").appName("my app").enableHiveSupport().getOrCreate()
If using Pandas, it needs to be installed with pip (pip also needs to be installed):
import subprocess
import sys
def installpip():
subprocess.call([sys.executable, "-m", "easy_install", "pip"])
def install(package):
subprocess.call([sys.executable, "-m", "pip", "install", package])
installpip()
install("pandas")
import pandas as pd
- Generating a Pandas dataframe from Syntasa data:
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
df = spark.sql("""
SELECT
mcvisid, pagename, event_partition
FROM
@database.tb_event
WHERE
event_partition between @fromDate and @toDate
""")
pdf = df.toPandas() - Do something with the Pandas dataframe. In this case, filtering rows to only include those with pagename equal to 'us_en:shop:TV':
pdfShopTV = pdf[pdf.pagename == 'us_en:shop:TV']
- Convert Pandas dataframe back to a Spark dataframe, and then save that dataframe to a Syntasa event store:
sparkDf = spark.createDataFrame(pdfShopTV)
sparkDf.createOrReplaceTempView("sparkDf")
spark.sql("""
CREATE TABLE IF NOT EXISTS
@database.TV_Pages (mcvisid string, pagename string)
PARTITIONED BY (event_partition string)
STORED AS parquet
LOCATION @location
""")
spark.sql("""
INSERT OVERWRITE TABLE
@database.TV_Pages
SELECT
*
FROM
sparkDf
""")
Sample Scala Spark code process
This example is meant to show:
- How to generate spark dataframe that has data read from Syntasa event store.
- Do something in spark.
- Save results back to an event store.
- Additionally, how to use external libraries.
- Generating a spark dataframe from a Syntasa event store:
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
val df = spark.sql("""
SELECT
mcvisid, pagename, event_partition
FROM
@database.tb_event
WHERE
event_partition between @fromDate and @toDate
""") - Do something in spark. In this case, count up the number of distinct pages viewed by each user in each partition:
val df_counts = df.groupBy("mcvisid", "event_partition").agg(countDistinct("pagename").as("unique_pages"))
df_counts.createOrReplaceTempView("df_counts") - Save results back to an event store:
spark.sql(s"""
CREATE TABLE IF NOT EXISTS
@database.Unique_Pages (mcvisid string, unique_pages int)
PARTITIONED BY (event_partition string)
STORED AS parquet
LOCATION @location
""")
spark.sql(s"""
INSERT OVERWRITE TABLE
@database.Unique_Pages
SELECT
*
FROM
df_counts
""") - Using an external library. In this case, using a window function to rank users by the number of distinct page views in each partition:
import org.apache.spark.sql.expressions.Window
val someWindow = Window.partitionBy("event_partition").orderBy(desc("unique_pages"))
val df_ranked = df_counts.withColumn("unique_page_rank", dense_rank().over(someWindow))