Kafka vs Kinesis vs MSK

One of the clients was performing a study and needed help identifying the different factors that would influence the decision made by small medium or large enterprises in choosing between the different options available for kafka.

I did the research and categorized the needs based on whats a must have for small business vs for a large enterprise and which of the available options for kafka provide that.

I have summarized some of the costs for each of the options for equivalent implementations. As always reach out to the Data Experts at Aqib Technologies if you have questions / concerns / comments or need help in any implementation.

FeaturesSmall/Medium businessLarge EnterpriseApache KafkaConfluent KafkaAWS KinesisAWS MSK
ServerlessCriticalNot neededNYYY
Elastic ScalingCriticalNice to haveNYYY
Infinite Storage / Tiered StorageNice to haveNice to haveNYNY
High AvailabilityCriticalCriticalYYYY
No ZooKeeper managementNice to haveNice to haveYYYN
No-touch patching and upgradesNice to haveCriticalNYYY
Connect to data sources / data sinksCriticalCriticalYYYY
Kafka StreamsCriticalCriticalYYYY
Java ClientsCriticalCriticalYYYY
Non Java ClientsCriticalCriticalYYYY
REST ProxyCriticalCriticalYYYY
ConnectorsCriticalCriticalYYYY
MQTT ProxyCriticalCriticalYYYY
Schema RegistryNice to haveNice to haveYYYY
ksqlDBNot neededNot neededNYYN
Cloud UI / Control CenterCriticalNice to haveNYYY
Metrics APICriticalCriticalYYYY
Datadog / Prometheus IntegrationNice to haveNice to haveNYYY
Health+CriticalCriticalNYYY
Admin REST APIsNice to haveNice to haveNYYY
Confluent for KubernetesNice to haveNice to haveNYYY
Ansible PlaybooksNice to haveNice to haveNYYY
Self-Balancing ClustersNice to haveCriticalNYNY
Access Control ListsNice to haveNice to haveNYYY
Role-Based Access ControlNice to haveCriticalNYYY
Audit LogsNice to haveCriticalNYYY
Secret Protection / EncryptionNice to haveCriticalNYYY
Bring Your Own KeyNice to haveCriticalNYYY
Private NetworkingNice to haveCriticalNYYY
HIPAA/GDPR/CCPA readinessNot neededCriticalNYYY
Schema ValidationNice to haveCriticalNYYY
Stream CatalogNice to haveNice to haveNYYY
Stream LineageNice to haveNice to haveNYYY
99.99% Uptime SLANice to haveCriticalNYYY
Cluster LinkingNice to haveNice to haveNYYY
Multi-AZ / Multi-Region ClustersNice to haveCriticalNYYY
ReplicationCriticalCriticalYYYY
24x7x365 Expert SupportCriticalNice to haveNYYY
Professional ServicesCriticalNice to haveNYYY
EducationCriticalCriticalYYYY
Monthly Cost in Dollars for 1 MB/Sec60750900300
Performance (MB ./ sec )600600200600
Ease of Deployment (Scale 1 – 5)CriticalCritical5122
Ease of Maintenance (Scale 1 – 5)CriticalCritical2111
synchronous

How to choose between Apache & Confluent Kafka

Before we delve in the topic of choice lets cover the basics of Kafka, its terminology and architecture

Kafka Architecture

Overall Recommended Layout

Terminology

Zookeeper

ZooKeeper is a centralized service for managing distributed processes. It is a mandatory component in every Apache Kafka cluster. To provide high availability, you will need at least three ZooKeeper nodes (allowing for one-node failure) or five nodes (allowing for two-node failures). All ZooKeeper nodes are equivalent, so they usually will run on identical nodes. The number of ZooKeeper nodes MUST be odd

Broker / Confluent Server

Apache Kafka uses messaging semantics, and Kafka brokers are Kafka’s main storage and messaging components. Kafka Brokers are referred to as Confluent servers in Confluent setup. The Kafka cluster maintains streams of messages, called topics; the topics are sharded into partitions (ordered, immutable logs of messages), and the partitions are replicated and distributed for high availability. The servers that run the Kafka cluster are called brokers. You usually will want at least three Kafka brokers in a cluster, each running on a separate server. This enables you to replicate each Kafka partition at least three times and have a cluster that will survive a failure of two nodes without data loss. Confluent Platform, a Kafka broker is referred to as Confluent Server)

Confluent REST Proxy

The Confluent REST Proxy is a HTTP server that provides a RESTful interface to a Kafka cluster. You can use it to produce and consume messages, view the state of the cluster, and perform administrative actions without using the native Kafka protocol or clients. The REST Proxy is not a mandatory component of the platform. Deploy the REST Proxy if you wish to produce and consume messages to or from Kafka using a RESTful HTTP protocol. If your applications only use the native clients mentioned above, you can choose to not deploy the REST Proxy.

For additional throughput and high availability, it is recommended that you deploy multiple REST Proxy servers behind a sticky load balancer. Confluent Server contains its own embedded REST Proxy, which can be used for administrative operations but not for producing or consuming messages.

Confluent Replicator

Confluent Replicator is a component added to Confluent Platform to help manage multi-cluster deployments of Confluent Platform and Apache Kafka. Replicator provides centralized configuration of cross-cluster replication. Unlike Apache Kafka’s MirrorMaker, it replicates topic configuration in addition to topic messages.

Confluent Replicator is integrated with the Kafka Connect framework and should be installed on the Connect nodes in the destination cluster. If there are multiple Connect worker nodes, install Replicator on all of them. When installed on a larger number of nodes, Replicator can scale to replicate at higher throughput and will be highly available through its built-in failover mechanism.

Confluent Self-Balancing Clusters

Confluent Self-Balancing Clusters is a component of Confluent Server that optimizes resource utilization and helps to scale Kafka clusters. Self-Balancing Clusters evaluates information about the number of brokers, partitions, leaders, and partition sizes, decides on a balanced placement of partitions on brokers, and modifies the replicas assigned to each broker to achieve a balanced placement. For example, when a new broker is added to the cluster, Self-Balancing Clusters will move partitions to the new broker to balance the load between all brokers available in the cluster. To avoid impact on production workloads, Self-Balancing Clusters throttles the re-balancing traffic to a fraction of the available network capacity.

You can enable Self-Balancing Clusters via a configuration option in Confluent Server. When enabled, it runs continuously in the background.

Confluent Control Center

Confluent Control Center is Confluent’s web-based tool for managing and monitoring Apache Kafka. It is part of Confluent Platform and provides three key types of functionality for building and monitoring production data pipelines and streaming applications:

Data stream monitoring and alerting

You can use the Control Center to monitor your data streams end to end, from producer to consumer. Use Control Center to verify that every message sent is received (and received only once), and to measure end-to-end system performance. Drill down to better understand cluster usage and identify any problems. Configure alerts to notify you when end-to-end performance does not match SLAs, and measure whether messages sent were received.

Multi-cluster monitoring and management

A single Control Center node can monitor data flows in multiple clusters and can manage data replication between the clusters.

Kafka Connect configuration

You can also use Control Center to manage and monitor Kafka Connect, the open source toolkit for connecting external systems to Kafka. You can easily add new sources to load data from external data systems and add new sinks to write data into external data systems. Additionally, you can use Confluent Control Center to manage, monitor, and configure connectors.

Kafka Latency View

Kafka Implementation Options

Summary of Comparison

FeaturesKafka Plain VanillaConfluent Kafka
Apache Kafka2.72.5
ClientJavaC,C++,C#,Java
File ConnectorAvailableAvailable
Other ConnectorsNot available *Available
REST endpointNot available *Available
KSQLNot available *Available
Schema Registry Not available *Available
ReplicationNot availableAvailable
Auto Load BalanceNot availableAvailable
DashboardNot availableAvailable


* => Manual provisioning can be done using community license

For most small and medium business Confluent Kafka seems to be a better option. For large enterprises with a dedicated devops team and data clusters, apache Kafka seems to be better choice.

Why is Confluent a better option for Small & Medium Businesses

REST endpoint

Confluent environment has built in support for a rest endpoint which is critical especially to write and read from Kafka on an Android or iphone tablet. This is also helpful on mainframe and other platforms

Replication

For any production installation you need to be DR-1 compliant and to that you would need more than 1 cluster. Once you have that you need to be able to replicate all data that is written to all clusters. Confluent takes care of this need automatically

Control Center

This is another crucial piece to monitor the health of your kafka installation and recognize trends and patterns and be proactive about future needs and also monitor current problems. 

Self balancer 

This is critical as more applications are on boarded onto kafka with more topics and more traffic and varied data packet sizes

Role-Based Access Control (RBAC)

Centrally manages access to critical resources across the entire platform, with fine-tuned authorization granularity

Structured Audit Logs

Trace user actions in a set of dedicated Kafka topics, to help operators detect abnormal behavior and security threats

Secret Protection

Avoids exposure of secrets by encrypting sensitive information (e.g. passwords) that is stored in configuration files

Encryption of data

Data at rest can be encrypted on both solutions using different third party encryption providers or home grown encryption packages. If you have your own key management service , these encryption options can integrate with it. If cloud is opted for, data can be sent over a secure connection using a REST endpoint with https

Data Privacy

Data privacy can be achieved on both solutions by third party products such as Privitar which de-identifies datasets to avoid PII. If not privitar then one can also integrate an internally developed de-identifying API with kafka as a connector.

Network Latency & Throughput

Reading or writing data to a local onprem kafka from an internal server will have lower latency than writing to a kafka cloud endpoint. Since the use case is for writing / reading to / from kafka from internal servers as devices on the go connect to the internal network, onprem latency should not be a bottleneck. Overall throughput will depend on many different factors such as latency, packet size, encryption, data schema, data payload format, de / serialization and others. However if there are issues with kafka such as lag, uneven distribution or similar, having a Confluent plan will help to get the support and resolve the issue

Project completion, rollout and enablement

The completion date and hence the cost of implementation will depend on options chosen. Based on the reasons given above, Confluent Kafka onprem implementation might take the shortest time. Overall implementation cost might also be cheaper for confluent kafka given the ease of rollout, maintenance, upgrades, notifications, load balancing and many more as given above.

Kafka Implementation Calculation

There are some open questions that need to be addressed before we move forward with installation, deployment and rollout

Number of hosts that will be used for rollout

Number of cores per host

Network Card Type

Storage Volumes Per Host

Storage Capacity Per Volume

Storage Volume Configuration

Volume Read / Write Throughput (Per Volume)

Memory 

Linux Flavor & Version

Kafka OnPrem Hardware Recommendation

Ideal Configuration

ComponentVMStorageMin MemoryMin Cores
Zookeeper5Transaction log: 512 GB SSD Storage: 2 x 1TB SATARAID 1032 GB2-4
Broker512 x 1 TB disk.64 GBDual 12 core
Connect2AnyAny2
REST Proxy2Any64 MB * Num Producer + 16 MB * Num Consumers + 216
Control Center1300GB Min64 GB8

Basic Configuration

ComponentVMStorageMin MemoryMin Cores
Zookeeper31 SSD 64GB4GB1
Broker3RAID1064 GB12 core
Connect2AnyAny2
Control Center1300GB Min4 GB2

Zookeeper

Zookeeper uses Paxos algorithm to arrive at a consensus by majority of nodes. Although kafka is a very stable system there are always possibilities of hardware failures or other outages. Thus it makes sense to accommodate for failures where some VMs will not be available. To have a fault tolerant system, an odd number of nodes is always better for consensus since an odd and the next even number of nodes will allow the same number of failures. So it’s more cost effective to have an odd number of nodes for Zookeeper as well as the broker. 

For e.g if there are 3 nodes then the majority is 2 and even if there are 4 nodes the majority is still 2. So having 3 or 4 nodes give us a fault tolerance of 2

Broker

Kafka relies heavily on the filesystem for storing and caching messages. All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. Thus multiple dedicated drives are needed to maximize throughput. Drives should not be shared with application logs or other OS filesystem activity to ensure good latency. You can either combine these drives together into a single volume (RAID) or format and mount each drive as its own directory. RAID 10 should be used if the additional cost is acceptable. Otherwise, Kafka server can be configured with multiple log directories, each directory mounted on a separate drive. 

A fast and reliable network is an essential performance component in a distributed system. Low latency ensures that nodes can communicate easily, while high bandwidth helps shard movement and recovery.

Control Center

Confluent Control Center is good for out-of-the-box Kafka cluster monitoring, making it easy to manage the entire Confluent Platform. Control Center is a web-based application that allows you to manage your cluster, to monitor Kafka system health in predefined dashboards and to alert on triggers. Additionally, Control Center reports end-to-end stream monitoring to assure that every message is delivered from producer to consumer, measures how long messages take to be delivered, and determines the source of any issues in your cluster.

3 steps to a healthy deployment of Kafka

Pick an optimization goal

Throughput

To optimize for throughput, the producers, brokers, and consumers need to move as much data as they can within a given amount of time. For high throughput, try to maximize the rate at which this data moves. This data rate should be as fast as possible.

Latency

Keeping the latency to a minimum does not necessarily mean higher throughput

Durability

Durability is all about reducing the chance for a message to get lost. The most important feature that enables durability is replication, which ensures that messages are copied to multiple brokers. If a broker has a failure, the data is available from at least one other broker.

Availability

To optimize for high availability, kafka should be tuned to recover as quickly as possible from failure scenarios. Higher partition counts may increase parallelism, but having more partitions can also increase recovery time in the event of a broker failure

Configure cluster to achieve the goal

Throughput Optimization

Producer:
Batch.size100000 – 200000 (default 16384)
Linger.ms10 – 100
Compression.typelz4
Acks1
Buffer.memorydefault 33554432
Consumer
Fetch.min.bytes~100000

Latency Optimization

Producer
Linger.ms0

Durability Optimization

Producer
Replication.factor3
AcksAll
Enable.idempotenceTrue
Max.in.flight.requests.per.connection1
Consumer
Enable.auto.commitFalse
Isolation.levelread_committed (when using EOS transactions) 
Broker
Default.replication.factor3
Auto.create.topics.enableFalse
Min.insync.replicas2
Unclean.leader.election.enableFalse
Broker.rackrack of the broker
Log.flush.interval.messages and Log.flush.interval.msLow value

Availability Optimization

Consumer
session.timeout.msAs low as possible (Default 10000)
Broker
unclean.leader.election.enableTrue
num.recovery.threads.per.data.dirSame as log.dirs

Benchmark Monitor and Tune

Use Confluent Control Center on Onprem or the perf dashboard on the cloud. A few important metrics to measure

Kafka Options and Pricing

Based on the comparison given below Confluent On the cloud seems to be the best option. It’s almost free and comes with a handsome uptime guarantee and 24×7 Confluent support. So cheap and a great throughput and performance. On Prem plain vanilla kafka will turn out to be magnitudes higher in initial cost as well as maintenance. Confluent Kafka on Prem is 70K more than On Prem plain vanilla kafka. 

OnPrem Confluent

  1. Maintenance cost of hardware
  2. Additionally 70K / year with 2 years commitment for Starter Essentials Package which entails

● 9 prod nodes

● 9 pre-prod or DR nodes

● Unlimited Dev nodes

● 1 CP Connector Pack (for both Prod & Pre-Prod)

● Support

● 24×7 (P1: 60 Min, P2: 4 Hrs)

● 3 cases / month limit

● Kafka streams support

Plain Vanilla Kafka

Local implementation and maintenance cost of hardware

Confluent On AWS Cloud 

200$ / month based on 200KB writes and reads per second and a kafka cluster with 30 partitions for a basic cluster which is much more than needed for low to medium value usage case. There is no other hardware or any system maintenance cost. This comes with 24×7 support from Confluent and a guarantee of 99.5% uptime. This all appears to be more than what is needed to start with. If there are more partitions needed than the 30 specified above then the cost is $ 0.004 / partition

Another option is to use a standard confluent cloud cluster. This will have a cost of 1000 $ / month but that will allow 500 free partitions and a better uptime of 99.95%

kafka-apis

Event Driven Architecture

What is an event driven architecture and how does kafka fit into that paradigm. Lets understand the basics:

Event

An event in an EDA is an update or a state change that occurred. E.g. A shipping ticket order created, a truck became available

Message

Data that gets published due to an event is called a message. Data usually conforms to some data schema and has all the required fields for all actors that care about the event to act on it

Three components of an EDA

Producers

Producers act on events that happen and create messages with all the data that would be needed for any actor down the line in the processing chain and publish that message to a channel or a router. Kafka is an event channel. Producers can produce messages to kafka or any other event router such as MSMQ or RabbitMQ. Producers do not directly interact with the consumer and they don’t know if a consumer exists for the message being produced.

Consumers (Sinks)

Consumers read messages as provided by the event channel such as Kafka and perform actions that they need to. A single message can be consumed by many different consumers to do different actions on the same message

Channels (Routers)

Kafka is an event channel. Event channels store the message for a certain period of time and make it available for any consumer to consume and process the data. Kafka can be configured to store messages for 24 hours or for a long period of time. A message that has already been processed is occupying wasteful space and hence it’s better to have short retention for the messages. For details on kafka see the other blog https://aqibtech.net/apache-kafka-vs-confluent-kafka/

A sample layout

Loose coupling

Components of a system are considered loosely coupled if each component can independently act without the knowledge of the other parts of the system. Each component can be serviced, brought down, replaced or modified without the need or necessity to modify or service other parts of the system. In this scenario the kafka consumers, producers and kafka storage itself act independently of each other. 

Sync vs Async

Synchronous communication is when the communication happens in real time and asynchronous communication is when the components in a system interact without the requirement or expectation for any component to respond immediately. A web service making a db call to store data and waiting for a response is sync communication. A kafka producer sending a message to kafka and waiting for kafka to confirm that the message was successfully written is sync communication. However in the above example kafka producer, New Dock Processor, acts on CDC (Change Data Capture) from New DB and produces the message to the kafka topic ME2Dispatch_New. The Legacy processor consumes this message at its own pace and writes to the Legacy DB and then writes a resultant message to the other kafka topic called ME2Dispatch_Legacy. New Dock processor then consumes the message from the legacy topic which is actually a response to the message that was produced earlier in the “New” topic. The entire cycle can be completed in a few milliseconds  based on how efficient the processing business logic is however due to the async approach overall latency decreases and many more messages can be processed in unit time

What does this setup lead to?

Loose Coupling

As described above each of the components of the kafka ecosystem or any event driven architecture should not and cannot assume about the other counterpart and hence is loosely coupled with other components. A procedure can keep producing data to kafka even if there are no consumers ready to consume the data. The data will stay in Kafka (based on retention window) and whenever consumer becomes available, every event that was created will eventually get processed

Atomicity of events

Each event generated should have no dependency on any other event generated before or after and hence each event should be self contained in itself. 

Business Process Unit Abstraction

Any event should not include internal business logic details or should not be tied to a specific implementation. More leaky the abstraction more the messages and the events will be resistant to changes and updates

Specific events (non generic)

If the events and messages are very generic then all events that happen can be represented by the same event and then each of the events get separated out by some textual data leading to textual parsing and bad performance because each consumer has to deserialize the data in order to know whether its interested in the event or not. 

Asynchronous

Events can be and will be generated out of order some times. If a sequence is desired then the partitioning key should be the unique identifier of an event. 

Self Contained

Each event should have all data that any downstream processor (or consumer) can ever need. An event that depends on events before or after is flawed and will lead to issues


Hope this was helpful to you. For questions / concerns / comments / help feel free to reach out to the Data Experts at Aqib Technologies!

AmazonS3

Streamlining Data Transfers: Moving data from S3 to Snowflake

One of the clients had a ton of gigantic CSV files in Amazon S3 stored in a zip or otherwise and they wanted their snowflake database to be curated with tables that map to these csv files.

This blog post covers the basics of the process as well as some of the road blocks we hit in the process. I chose to do this in Python although I could have chosen any other language. Language choice is critical at times since end to end latency and time taken for completion of the transfer might be impacted by that choice. I say ‘might be’, because I did not validate it in this particular set up. However as you will see in my other article I have demonstrated Java to be multitudes faster as compared to python in an ETL process similar to this

Core-app-loggin-account

Reading CSV Files from S3 using Python

Amazon S3 is a popular cloud storage service that provides a scalable and secure storage option for large files. It is commonly used to store and distribute files, including data in the CSV (Comma Separated Values) format. In this tutorial, we will show you how to read CSV files from an S3 bucket using Python.

Prerequisites:

  • An AWS account and an S3 bucket
  • AWS CLI and boto3 library installed on your system
  • Python 3 installed

Step 1: Set up the AWS CLI To use the AWS CLI, you need to configure it with your AWS credentials. To do this, run the following command:

aws configure

You will be prompted to enter your AWS access key ID, secret access key, default region name, and default output format.

Alternatively, if you prefer to work with config or yaml files then create a config class to read credentials from a yaml file and add the properties you need to the class

class Config:
    def __init__(self, configyamlfile):
        self.settings = dict()
        with open(configyamlfile, "r") as stream:
            try:
                self.settings = yaml.safe_load(stream)
            except yaml.YAMLError as exception:
                print(exception)

    @property
    def User(self):
        return self.settings["user"]

Step 2: Install the boto3 library To read files from an S3 bucket, we will use the boto3 library, which is an Amazon Web Services (AWS) SDK for Python. To install boto3, run the following command:

pip install boto3

Step 3: Read the CSV file from the S3 bucket. We will create a class CsvReader that reads CSV files from S3. We can add different modes of reading to this class such as reading an individual csv file, reading a zip containing multiple csv files, processing a local csv file that has been downloaded in the previous run and more. We will start by importing the necessary libraries and initializing a boto3 client and a boto3 session.

import boto3

self.client = boto3.client('s3', 
                           aws_access_key_id=self.config.ACCESS_KEY,
                           aws_secret_access_key=self.config.SECRET_ACCESS_KEY,
                           region_name=self.config.REGION)

Next, we will list the files in a folder in the bucket using a prefix and process each of them one by one. For each file create a local path and copy the file locally

response = self.client.list_objects(Bucket=bucket_name, Prefix=prefix)
for content in response.get('Contents', []):
    s3filepath = content.get('Key')
    name = s3filepath.rsplit('/', 1)[-1]
    self.client.download_file(bucket_name, s3filepath, name)

Finally, we can use pandas dataframe to perform data analysis or visualization. I have used some extra parameters for read_csv. I will discuss those in the next article. You can just pass the file name for a generic reading of csv with the pandas dataframe

df = pd.read_csv(file, on_bad_lines='warn', nrows=3)

What if you need to process zip files in s3 buckets. If the file is large (> 2 GB) fetch the file locally as for any other file and process the zip.

If the zip file size is reasonable then you can process the csv files in the zip without ever downloading them.

 s3 = self.session.resource("s3")
 bucket = s3.Bucket(BUCKET_NAME)
 obj = bucket.Object(prefix)

 with io.BytesIO(obj.get()["Body"].read()) as tf:
            # rewind the file
            tf.seek(0)
            # Read the file as a zipfile and process the members
            with ZipFile(tf, mode='r') as zipf:
                for subfile in zipf.namelist():
                    name = subfile[:-4]
                    filepath = zipf.extract(subfile)
                    print(f"processing {subfile} at {filepath}")
                    self.process_csvfile(filepath, name)

In conclusion, reading CSV files from an S3 bucket in Python is a simple process that can be accomplished using the boto3 library. With just a few lines of code, you can retrieve and process data stored in an S3 bucket, making it a convenient and scalable option for data storage and distribution.

snowflake-logo

Create tables and insert data on snowflake from csv files using python

Snowflake is a cloud-based data warehousing platform that offers seamless integration with various programming languages, including Python. With Snowflake, you can easily create and manage tables using Python, without having to worry about the underlying infrastructure. In this blog post, we will show you how to create tables in Snowflake using Python.

Before we get started, make sure you have the Snowflake Python connector installed. You can install it using the following command:

pip install snowflake-connector-python

Next, you need to establish a connection to your Snowflake account. You can do this by providing your account information, username, password, and the name of your Snowflake warehouse. Here’s an example using values defined in yaml file and wired in using a config class

import snowflake.connector

class SnowflakeProcessor:
    def __init__(self, config):
        self.config = config
        self.conn = snowflake.connector.connect(
            user=self.config.User,
            password=self.config.Password,
            account=self.config.Account
            )

Once you have established a connection, you can create a table using the execute method of the connection object. The syntax for creating a table in Snowflake is similar to the syntax for creating a table in SQL. Since we are creating the table based on a csv lets use pandas dataframe to manufacture the create table SQL.
In the example below I have specified on_bad_lines setting to be warn and continue. You can choose to error or ignore. In my case I wanted to get the warning so that I can log and analyze for missing data after imports are complete.
Another thing you might notice is nrows=3. I am using the setting since the files are large and I am using dataframe to do some initial sanity check and get table creation string. It does not make sense to make pandas load a 100MB file to do that. Hence I load just the first few rows.

df = pd.read_csv(file, on_bad_lines='warn', nrows=3)
cols = ",".join([f'{c} date default 1/1/1950 '
                       if c.lower().endswith("_dt") 
                       else f'{c} varchar' 
                for c in df.columns.values])

Now that I have the create table SQL squared away lets look into how we plan to load the table with data from the csv. The best way to do that is to put the file into a stage on snowflake and then copy the data from the csv file on the stage into the table you just created. You can use an existing stage or create one for csv files with comma as the delimiter

# Create a stage
create stage <database_name>.<schema_name>.DATA_STAGE file_format = (type = ""csv"" field_delimiter = "","" skip_header = 1)

Now that you have the stage, you can add your csv file to it

# Upload csv file to stage
f'PUT file://{csvfile} @<database_name>.<schema_name>.DATA_STAGE auto_compress=true OVERWRITE = TRUE'

Last step is to copy data from csv into the table. Here is an example command to do that. There are some road blocks I hit during the copy step due to the data I was dealing with. I will explain each of the tweaks I used to work around the issues I faced.

  1. FIELD_OPTIONALLY_ENCLOSED_BY
    The data in some of the csv files was enclosed in double quotes.”Mohammed”,”Bloomberg”. This setting instructs snowflake to remove these enclosing double quotes before insertion
  2. NULL_IF
    I had some data columns. If the data is empty, snowflake correctly assigns null date or default date to it. However if the null is not clear then data insertion stops with errors. NULL_IF instructs snowflake for all the possible null values that the csv files might have
  3. ON_ERROR
    By default if there are errors snowflake returns the error and stops processing. If you want to continue data insertion in spite of errors you can set this to continue
  4. VALIDATION_MODE
    In case of errors in data, if you continue on error with the above setting the data will be inserted but you will not know what were the errors encountered and for which rows. To get the error list without inserting the data you can use validation_mode
# Copy data from csv file into the table
copy into {name} from @<database_name>.<schema_name>.DATA_STAGE/{csvfile}.gz file_format = (type = "csv" field_delimiter = "," skip_header = 1 field_optionally_enclosed_by = \'"\' trim_space = false  NULL_IF = (\'\\\\N\', \'NULL\', \'NUL\', \'\') ) ')
        

One thing to note is that to create table and insert data you need to run a few more sql statements to the set the arena correctly. Pick the correct role for the job. You dont have to be sysadmin. Set the correct database and the schema. Make sure the warehouse is set and is resumed if its suspended. Lastly you can drop the table if you want to start fresh

use role SYSADMIN
use database <db_name>
use warehouse <Warehouse>
ALTER WAREHOUSE <Warehouse> RESUME IF SUSPENDED
use schema <Schema>
drop table if exists <db_name>
create table {name}({cols})

And that’s it! You have successfully created tables in Snowflake using Python. With Snowflake’s robust data warehousing platform and seamless integration with various programming languages, you can easily manage your data and build powerful data applications.

In the next article we will see it all playing together