AmazonS3

Streamlining Data Transfers: Moving data from S3 to Snowflake

One of the clients had a ton of gigantic CSV files in Amazon S3 stored in a zip or otherwise and they wanted their snowflake database to be curated with tables that map to these csv files.

This blog post covers the basics of the process as well as some of the road blocks we hit in the process. I chose to do this in Python although I could have chosen any other language. Language choice is critical at times since end to end latency and time taken for completion of the transfer might be impacted by that choice. I say ‘might be’, because I did not validate it in this particular set up. However as you will see in my other article I have demonstrated Java to be multitudes faster as compared to python in an ETL process similar to this

Core-app-loggin-account

Reading CSV Files from S3 using Python

Amazon S3 is a popular cloud storage service that provides a scalable and secure storage option for large files. It is commonly used to store and distribute files, including data in the CSV (Comma Separated Values) format. In this tutorial, we will show you how to read CSV files from an S3 bucket using Python.

Prerequisites:

  • An AWS account and an S3 bucket
  • AWS CLI and boto3 library installed on your system
  • Python 3 installed

Step 1: Set up the AWS CLI To use the AWS CLI, you need to configure it with your AWS credentials. To do this, run the following command:

aws configure

You will be prompted to enter your AWS access key ID, secret access key, default region name, and default output format.

Alternatively, if you prefer to work with config or yaml files then create a config class to read credentials from a yaml file and add the properties you need to the class

class Config:
    def __init__(self, configyamlfile):
        self.settings = dict()
        with open(configyamlfile, "r") as stream:
            try:
                self.settings = yaml.safe_load(stream)
            except yaml.YAMLError as exception:
                print(exception)

    @property
    def User(self):
        return self.settings["user"]

Step 2: Install the boto3 library To read files from an S3 bucket, we will use the boto3 library, which is an Amazon Web Services (AWS) SDK for Python. To install boto3, run the following command:

pip install boto3

Step 3: Read the CSV file from the S3 bucket. We will create a class CsvReader that reads CSV files from S3. We can add different modes of reading to this class such as reading an individual csv file, reading a zip containing multiple csv files, processing a local csv file that has been downloaded in the previous run and more. We will start by importing the necessary libraries and initializing a boto3 client and a boto3 session.

import boto3

self.client = boto3.client('s3', 
                           aws_access_key_id=self.config.ACCESS_KEY,
                           aws_secret_access_key=self.config.SECRET_ACCESS_KEY,
                           region_name=self.config.REGION)

Next, we will list the files in a folder in the bucket using a prefix and process each of them one by one. For each file create a local path and copy the file locally

response = self.client.list_objects(Bucket=bucket_name, Prefix=prefix)
for content in response.get('Contents', []):
    s3filepath = content.get('Key')
    name = s3filepath.rsplit('/', 1)[-1]
    self.client.download_file(bucket_name, s3filepath, name)

Finally, we can use pandas dataframe to perform data analysis or visualization. I have used some extra parameters for read_csv. I will discuss those in the next article. You can just pass the file name for a generic reading of csv with the pandas dataframe

df = pd.read_csv(file, on_bad_lines='warn', nrows=3)

What if you need to process zip files in s3 buckets. If the file is large (> 2 GB) fetch the file locally as for any other file and process the zip.

If the zip file size is reasonable then you can process the csv files in the zip without ever downloading them.

 s3 = self.session.resource("s3")
 bucket = s3.Bucket(BUCKET_NAME)
 obj = bucket.Object(prefix)

 with io.BytesIO(obj.get()["Body"].read()) as tf:
            # rewind the file
            tf.seek(0)
            # Read the file as a zipfile and process the members
            with ZipFile(tf, mode='r') as zipf:
                for subfile in zipf.namelist():
                    name = subfile[:-4]
                    filepath = zipf.extract(subfile)
                    print(f"processing {subfile} at {filepath}")
                    self.process_csvfile(filepath, name)

In conclusion, reading CSV files from an S3 bucket in Python is a simple process that can be accomplished using the boto3 library. With just a few lines of code, you can retrieve and process data stored in an S3 bucket, making it a convenient and scalable option for data storage and distribution.

snowflake-logo

Create tables and insert data on snowflake from csv files using python

Snowflake is a cloud-based data warehousing platform that offers seamless integration with various programming languages, including Python. With Snowflake, you can easily create and manage tables using Python, without having to worry about the underlying infrastructure. In this blog post, we will show you how to create tables in Snowflake using Python.

Before we get started, make sure you have the Snowflake Python connector installed. You can install it using the following command:

pip install snowflake-connector-python

Next, you need to establish a connection to your Snowflake account. You can do this by providing your account information, username, password, and the name of your Snowflake warehouse. Here’s an example using values defined in yaml file and wired in using a config class

import snowflake.connector

class SnowflakeProcessor:
    def __init__(self, config):
        self.config = config
        self.conn = snowflake.connector.connect(
            user=self.config.User,
            password=self.config.Password,
            account=self.config.Account
            )

Once you have established a connection, you can create a table using the execute method of the connection object. The syntax for creating a table in Snowflake is similar to the syntax for creating a table in SQL. Since we are creating the table based on a csv lets use pandas dataframe to manufacture the create table SQL.
In the example below I have specified on_bad_lines setting to be warn and continue. You can choose to error or ignore. In my case I wanted to get the warning so that I can log and analyze for missing data after imports are complete.
Another thing you might notice is nrows=3. I am using the setting since the files are large and I am using dataframe to do some initial sanity check and get table creation string. It does not make sense to make pandas load a 100MB file to do that. Hence I load just the first few rows.

df = pd.read_csv(file, on_bad_lines='warn', nrows=3)
cols = ",".join([f'{c} date default 1/1/1950 '
                       if c.lower().endswith("_dt") 
                       else f'{c} varchar' 
                for c in df.columns.values])

Now that I have the create table SQL squared away lets look into how we plan to load the table with data from the csv. The best way to do that is to put the file into a stage on snowflake and then copy the data from the csv file on the stage into the table you just created. You can use an existing stage or create one for csv files with comma as the delimiter

# Create a stage
create stage <database_name>.<schema_name>.DATA_STAGE file_format = (type = ""csv"" field_delimiter = "","" skip_header = 1)

Now that you have the stage, you can add your csv file to it

# Upload csv file to stage
f'PUT file://{csvfile} @<database_name>.<schema_name>.DATA_STAGE auto_compress=true OVERWRITE = TRUE'

Last step is to copy data from csv into the table. Here is an example command to do that. There are some road blocks I hit during the copy step due to the data I was dealing with. I will explain each of the tweaks I used to work around the issues I faced.

  1. FIELD_OPTIONALLY_ENCLOSED_BY
    The data in some of the csv files was enclosed in double quotes.”Mohammed”,”Bloomberg”. This setting instructs snowflake to remove these enclosing double quotes before insertion
  2. NULL_IF
    I had some data columns. If the data is empty, snowflake correctly assigns null date or default date to it. However if the null is not clear then data insertion stops with errors. NULL_IF instructs snowflake for all the possible null values that the csv files might have
  3. ON_ERROR
    By default if there are errors snowflake returns the error and stops processing. If you want to continue data insertion in spite of errors you can set this to continue
  4. VALIDATION_MODE
    In case of errors in data, if you continue on error with the above setting the data will be inserted but you will not know what were the errors encountered and for which rows. To get the error list without inserting the data you can use validation_mode
# Copy data from csv file into the table
copy into {name} from @<database_name>.<schema_name>.DATA_STAGE/{csvfile}.gz file_format = (type = "csv" field_delimiter = "," skip_header = 1 field_optionally_enclosed_by = \'"\' trim_space = false  NULL_IF = (\'\\\\N\', \'NULL\', \'NUL\', \'\') ) ')
        

One thing to note is that to create table and insert data you need to run a few more sql statements to the set the arena correctly. Pick the correct role for the job. You dont have to be sysadmin. Set the correct database and the schema. Make sure the warehouse is set and is resumed if its suspended. Lastly you can drop the table if you want to start fresh

use role SYSADMIN
use database <db_name>
use warehouse <Warehouse>
ALTER WAREHOUSE <Warehouse> RESUME IF SUSPENDED
use schema <Schema>
drop table if exists <db_name>
create table {name}({cols})

And that’s it! You have successfully created tables in Snowflake using Python. With Snowflake’s robust data warehousing platform and seamless integration with various programming languages, you can easily manage your data and build powerful data applications.

In the next article we will see it all playing together