Applicable to Syntasa platforms installed in an Amazon AWS environment, this sample notebook provides examples of how to download files from S3 to your runtimes attached to your notebook. Downloading files from sources other than S3 will be done similarly.
Helper Methods
%%spark
import urllib3
from urllib3.exceptions import InsecureRequestWarning
from syn_utils.syn_notebook.lib import AmazonClusterPyPackageInstaller
import requests
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
TYPE_YUM='yum'
TYPE_PIP='pip'
INSTALL='install'
UNINSTALL='uninstall'
def run_cmd(cmd):
amazon_installer = AmazonClusterPyPackageInstaller()
for ip in amazon_installer.node_ip_addresses:
amazon_installer.execute_command(cmd, ip)
def custom_pkg_install(packages_arr, run_type, inst_uninst):
pkgs = ' '.join(packages_arr)
cmd_to_run = ''
if run_type == TYPE_YUM:
cmd_to_run = f'sudo yum {inst_uninst} {pkgs} -y'
elif run_type == TYPE_PIP:
if inst_uninst == INSTALL:
cmd_to_run = f'sudo python3 -m pip {inst_uninst} {pkgs}'
elif inst_uninst == UNINSTALL:
cmd_to_run = f'sudo python3 -m pip {inst_uninst} {pkgs} -y'
print(f'Running Command [{cmd_to_run}]')
run_cmd(cmd_to_run)
Download files onto the cluster using AWS CLI Utils
This will download the file noted in the "object_key" variable to the path noted by the "local_des_path" to all the nodes in the cluster.
%%spark
import shutil
import subprocess
bucket_name = 'syntasa-example-01'
object_prefix = 'other/sample-data/csv_test'
object_key = '20220701.export.CSV'
obj_in_s3_full_path = f's3://{bucket_name}/{object_prefix}/{object_key}'
local_des_path = '/tmp/my_files'
# Please note ::: If you are trying to create a folder or download a file to a folder other than /tmp/<subfolder> then you will have to run the below commands as the sudo user
# You can prepend 'sudo' to each one of the cmd, cmd1, cmd2 variables.
# Lets delete all the files/folders that we will create (start from scratch)
cmd = f'sudo rm -rf {local_des_path}'
run_cmd(cmd)
# Lets create a temporary folder locally to hold our files
cmd1 = f'sudo mkdir -p {local_des_path}'
run_cmd(cmd)
# Now download the file from s3
cmd2 = f'sudo aws s3 cp {obj_in_s3_full_path} {local_des_path}/'
run_cmd(cmd2)
# Now validate the file was downloaded to all the nodes
cmd3 = f'sudo ls -lah {local_des_path}/'
run_cmd(cmd3)
Download the entire folder onto the cluster using AWS CLI Utils
This will download an entire folder onto your remote cluster (all nodes of the cluster)
%%spark
import shutil
import subprocess
bucket_name = 'syntasa-example-01'
object_prefix = 'other/sample-data/geomesa_dataset/green_nyc_tripdata'
obj_in_s3_full_path = f's3://{bucket_name}/{object_prefix}/'
local_des_path = '/tmp/my_files/green_nyc_tripdata'
# Please note ::: If you are trying to create a folder or download a file to a folder other than /tmp/<subfolder> then you will have to run the below commands as the sudo user
# You can prepend 'sudo' to each one of the cmd, cmd1, cmd2 variables.
# Lets delete all the files/folders that we will create (start from scratch)
cmd = f'sudo rm -rf {local_des_path}'
run_cmd(cmd)
# Lets create a temporary folder locally to hold our files
cmd1 = f'sudo mkdir -p {local_des_path}'
run_cmd(cmd)
# Now download the file from s3
cmd2 = f'sudo aws s3 sync {obj_in_s3_full_path} {local_des_path}/ --quiet' #add the quiet method here if you don't want the entire S3 command logs (useful when trying to avoid printing every file copied)
run_cmd(cmd2)
# Now validate the file was downloaded to all the nodes
cmd3 = f'sudo ls -lah {local_des_path}/'
run_cmd(cmd3)
Reading Files into a Spark Dataframe
Reading a single file into a data frame (that was downloaded earlier)
%%spark
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
file_name = '20220701.export.CSV'
local_folder_path = '/tmp/my_files'
spark_df = spark.read.option('header', False).csv(f'file://{local_folder_path}/{file_name}')
record_count = spark_df.count()
print(f'Total Number of Recrods :: {record_count}')
spark_df.show(2, truncate=False)
Read the Entire folder of Parquet files (that were downloaded earlier)
%%spark
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
folder_path = '/tmp/my_files/green_nyc_tripdata'
spark_df1 = spark.read.parquet(f'file://{folder_path}/')
record_count = spark_df1.count()
print(f'Total Number of Recrods :: {record_count}')
spark_df1.show(2, truncate=False)
Read files directly from S3 into a Dataframe
%%spark
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
bucket_name = 'syntasa-example-01'
object_prefix = 'other/sample-data/geomesa_dataset/green_nyc_tripdata'
obj_in_s3_full_path = f's3a://{bucket_name}/{object_prefix}/'
spark_df2 = spark.read.parquet(obj_in_s3_full_path)
record_count = spark_df2.count()
print(f'Total Number of Recrods :: {record_count}')
spark_df2.show(2, truncate=False)
Reading Files into a Pandas DF
Before we begin, let's install some libraries on the cluster. We need Pandas, but we'll also install python3-devel and python3-tools from yum so that our cluster has the latest gcc and other binaries (these are not needed for pandas specifically, but they are needed for other libraries such as pycocotools, etc..)
%%spark
# Lets install some custom packages (we will install python3 tools as well as pandas)
custom_pkg_install(['python3-devel','python3-tools'], TYPE_YUM, INSTALL)
custom_pkg_install(['pandas'], TYPE_PIP, INSTALL)
Now let's convert our above data frame to a pandas data frame
%%spark
import pandas as pd
from pyspark.sql.functions import col
# Set the maxResultSize (this is not needed for smaller datasets)
spark.sparkContext._conf.set("spark.driver.maxResultSize", "2g")
pandasDF = spark_df.toPandas()
# Do some cool stuff with Pandas here.
pandasDF.columns = ['hey', 'this', 'is', 'pandas']
pandasDF
The best way to understand and learn how to perform this function is through hands-on experience. Follow the steps below to create the sample notebook in your Syntasa environment:
- Download the sample notebook .ipynb file from this article.
- Create a new notebook in your Syntasa environment using the import notebook option.