One of the clients was performing a study and needed help identifying the different factors that would influence the decision made by small medium or large enterprises in choosing between the different options available for kafka.
I did the research and categorized the needs based on whats a must have for small business vs for a large enterprise and which of the available options for kafka provide that.
I have summarized some of the costs for each of the options for equivalent implementations. As always reach out to the Data Experts at Aqib Technologies if you have questions / concerns / comments or need help in any implementation.
Before we delve in the topic of choice lets cover the basics of Kafka, its terminology and architecture
Kafka Architecture
Overall Recommended Layout
Terminology
Zookeeper
ZooKeeper is a centralized service for managing distributed processes. It is a mandatory component in every Apache Kafka cluster. To provide high availability, you will need at least three ZooKeeper nodes (allowing for one-node failure) or five nodes (allowing for two-node failures). All ZooKeeper nodes are equivalent, so they usually will run on identical nodes. The number of ZooKeeper nodes MUST be odd
Broker / Confluent Server
Apache Kafka uses messaging semantics, and Kafka brokers are Kafka’s main storage and messaging components. Kafka Brokers are referred to as Confluent servers in Confluent setup. The Kafka cluster maintains streams of messages, called topics; the topics are sharded into partitions (ordered, immutable logs of messages), and the partitions are replicated and distributed for high availability. The servers that run the Kafka cluster are called brokers. You usually will want at least three Kafka brokers in a cluster, each running on a separate server. This enables you to replicate each Kafka partition at least three times and have a cluster that will survive a failure of two nodes without data loss. Confluent Platform, a Kafka broker is referred to as Confluent Server)
Confluent REST Proxy
The Confluent REST Proxy is a HTTP server that provides a RESTful interface to a Kafka cluster. You can use it to produce and consume messages, view the state of the cluster, and perform administrative actions without using the native Kafka protocol or clients. The REST Proxy is not a mandatory component of the platform. Deploy the REST Proxy if you wish to produce and consume messages to or from Kafka using a RESTful HTTP protocol. If your applications only use the native clients mentioned above, you can choose to not deploy the REST Proxy.
For additional throughput and high availability, it is recommended that you deploy multiple REST Proxy servers behind a sticky load balancer. Confluent Server contains its own embedded REST Proxy, which can be used for administrative operations but not for producing or consuming messages.
Confluent Replicator
Confluent Replicator is a component added to Confluent Platform to help manage multi-cluster deployments of Confluent Platform and Apache Kafka. Replicator provides centralized configuration of cross-cluster replication. Unlike Apache Kafka’s MirrorMaker, it replicates topic configuration in addition to topic messages.
Confluent Replicator is integrated with the Kafka Connect framework and should be installed on the Connect nodes in the destination cluster. If there are multiple Connect worker nodes, install Replicator on all of them. When installed on a larger number of nodes, Replicator can scale to replicate at higher throughput and will be highly available through its built-in failover mechanism.
Confluent Self-Balancing Clusters
Confluent Self-Balancing Clusters is a component of Confluent Server that optimizes resource utilization and helps to scale Kafka clusters. Self-Balancing Clusters evaluates information about the number of brokers, partitions, leaders, and partition sizes, decides on a balanced placement of partitions on brokers, and modifies the replicas assigned to each broker to achieve a balanced placement. For example, when a new broker is added to the cluster, Self-Balancing Clusters will move partitions to the new broker to balance the load between all brokers available in the cluster. To avoid impact on production workloads, Self-Balancing Clusters throttles the re-balancing traffic to a fraction of the available network capacity.
You can enable Self-Balancing Clusters via a configuration option in Confluent Server. When enabled, it runs continuously in the background.
Confluent Control Center
Confluent Control Center is Confluent’s web-based tool for managing and monitoring Apache Kafka. It is part of Confluent Platform and provides three key types of functionality for building and monitoring production data pipelines and streaming applications:
Data stream monitoring and alerting
You can use the Control Center to monitor your data streams end to end, from producer to consumer. Use Control Center to verify that every message sent is received (and received only once), and to measure end-to-end system performance. Drill down to better understand cluster usage and identify any problems. Configure alerts to notify you when end-to-end performance does not match SLAs, and measure whether messages sent were received.
Multi-cluster monitoring and management
A single Control Center node can monitor data flows in multiple clusters and can manage data replication between the clusters.
Kafka Connect configuration
You can also use Control Center to manage and monitor Kafka Connect, the open source toolkit for connecting external systems to Kafka. You can easily add new sources to load data from external data systems and add new sinks to write data into external data systems. Additionally, you can use Confluent Control Center to manage, monitor, and configure connectors.
Kafka Latency View
Kafka Implementation Options
Summary of Comparison
Features
Kafka Plain Vanilla
Confluent Kafka
Apache Kafka
2.7
2.5
Client
Java
C,C++,C#,Java
File Connector
Available
Available
Other Connectors
Not available *
Available
REST endpoint
Not available *
Available
KSQL
Not available *
Available
Schema Registry
Not available *
Available
Replication
Not available
Available
Auto Load Balance
Not available
Available
Dashboard
Not available
Available
* => Manual provisioning can be done using community license
For most small and medium business Confluent Kafka seems to be a better option. For large enterprises with a dedicated devops team and data clusters, apache Kafka seems to be better choice.
Why is Confluent a better option for Small & Medium Businesses
REST endpoint
Confluent environment has built in support for a rest endpoint which is critical especially to write and read from Kafka on an Android or iphone tablet. This is also helpful on mainframe and other platforms
Replication
For any production installation you need to be DR-1 compliant and to that you would need more than 1 cluster. Once you have that you need to be able to replicate all data that is written to all clusters. Confluent takes care of this need automatically
Control Center
This is another crucial piece to monitor the health of your kafka installation and recognize trends and patterns and be proactive about future needs and also monitor current problems.
Self balancer
This is critical as more applications are on boarded onto kafka with more topics and more traffic and varied data packet sizes
Role-Based Access Control (RBAC)
Centrally manages access to critical resources across the entire platform, with fine-tuned authorization granularity
Structured Audit Logs
Trace user actions in a set of dedicated Kafka topics, to help operators detect abnormal behavior and security threats
Secret Protection
Avoids exposure of secrets by encrypting sensitive information (e.g. passwords) that is stored in configuration files
Encryption of data
Data at rest can be encrypted on both solutions using different third party encryption providers or home grown encryption packages. If you have your own key management service , these encryption options can integrate with it. If cloud is opted for, data can be sent over a secure connection using a REST endpoint with https
Data Privacy
Data privacy can be achieved on both solutions by third party products such as Privitar which de-identifies datasets to avoid PII. If not privitar then one can also integrate an internally developed de-identifying API with kafka as a connector.
Network Latency & Throughput
Reading or writing data to a local onprem kafka from an internal server will have lower latency than writing to a kafka cloud endpoint. Since the use case is for writing / reading to / from kafka from internal servers as devices on the go connect to the internal network, onprem latency should not be a bottleneck. Overall throughput will depend on many different factors such as latency, packet size, encryption, data schema, data payload format, de / serialization and others. However if there are issues with kafka such as lag, uneven distribution or similar, having a Confluent plan will help to get the support and resolve the issue
Project completion, rollout and enablement
The completion date and hence the cost of implementation will depend on options chosen. Based on the reasons given above, Confluent Kafka onprem implementation might take the shortest time. Overall implementation cost might also be cheaper for confluent kafka given the ease of rollout, maintenance, upgrades, notifications, load balancing and many more as given above.
Kafka Implementation Calculation
There are some open questions that need to be addressed before we move forward with installation, deployment and rollout
Zookeeper uses Paxos algorithm to arrive at a consensus by majority of nodes. Although kafka is a very stable system there are always possibilities of hardware failures or other outages. Thus it makes sense to accommodate for failures where some VMs will not be available. To have a fault tolerant system, an odd number of nodes is always better for consensus since an odd and the next even number of nodes will allow the same number of failures. So it’s more cost effective to have an odd number of nodes for Zookeeper as well as the broker.
For e.g if there are 3 nodes then the majority is 2 and even if there are 4 nodes the majority is still 2. So having 3 or 4 nodes give us a fault tolerance of 2
Broker
Kafka relies heavily on the filesystem for storing and caching messages. All data is immediately written to a persistent log on the filesystem without necessarily flushing to disk. Thus multiple dedicated drives are needed to maximize throughput. Drives should not be shared with application logs or other OS filesystem activity to ensure good latency. You can either combine these drives together into a single volume (RAID) or format and mount each drive as its own directory. RAID 10 should be used if the additional cost is acceptable. Otherwise, Kafka server can be configured with multiple log directories, each directory mounted on a separate drive.
A fast and reliable network is an essential performance component in a distributed system. Low latency ensures that nodes can communicate easily, while high bandwidth helps shard movement and recovery.
Control Center
Confluent Control Center is good for out-of-the-box Kafka cluster monitoring, making it easy to manage the entire Confluent Platform. Control Center is a web-based application that allows you to manage your cluster, to monitor Kafka system health in predefined dashboards and to alert on triggers. Additionally, Control Center reports end-to-end stream monitoring to assure that every message is delivered from producer to consumer, measures how long messages take to be delivered, and determines the source of any issues in your cluster.
3 steps to a healthy deployment of Kafka
Pick an optimization goal
Throughput
To optimize for throughput, the producers, brokers, and consumers need to move as much data as they can within a given amount of time. For high throughput, try to maximize the rate at which this data moves. This data rate should be as fast as possible.
Latency
Keeping the latency to a minimum does not necessarily mean higher throughput
Durability
Durability is all about reducing the chance for a message to get lost. The most important feature that enables durability is replication, which ensures that messages are copied to multiple brokers. If a broker has a failure, the data is available from at least one other broker.
Availability
To optimize for high availability, kafka should be tuned to recover as quickly as possible from failure scenarios. Higher partition counts may increase parallelism, but having more partitions can also increase recovery time in the event of a broker failure
Configure cluster to achieve the goal
Throughput Optimization
Producer:
Batch.size
100000 – 200000 (default 16384)
Linger.ms
10 – 100
Compression.type
lz4
Acks
1
Buffer.memory
default 33554432
Consumer
Fetch.min.bytes
~100000
Latency Optimization
Producer
Linger.ms
0
Durability Optimization
Producer
Replication.factor
3
Acks
All
Enable.idempotence
True
Max.in.flight.requests.per.connection
1
Consumer
Enable.auto.commit
False
Isolation.level
read_committed (when using EOS transactions)
Broker
Default.replication.factor
3
Auto.create.topics.enable
False
Min.insync.replicas
2
Unclean.leader.election.enable
False
Broker.rack
rack of the broker
Log.flush.interval.messages and Log.flush.interval.ms
Low value
Availability Optimization
Consumer
session.timeout.ms
As low as possible (Default 10000)
Broker
unclean.leader.election.enable
True
num.recovery.threads.per.data.dir
Same as log.dirs
Benchmark Monitor and Tune
Use Confluent Control Center on Onprem or the perf dashboard on the cloud. A few important metrics to measure
Kafka Options and Pricing
Based on the comparison given below Confluent On the cloud seems to be the best option. It’s almost free and comes with a handsome uptime guarantee and 24×7 Confluent support. So cheap and a great throughput and performance. On Prem plain vanilla kafka will turn out to be magnitudes higher in initial cost as well as maintenance. Confluent Kafka on Prem is 70K more than On Prem plain vanilla kafka.
OnPrem Confluent
Maintenance cost of hardware
Additionally 70K / year with 2 years commitment for Starter Essentials Package which entails
● 9 prod nodes
● 9 pre-prod or DR nodes
● Unlimited Dev nodes
● 1 CP Connector Pack (for both Prod & Pre-Prod)
● Support
● 24×7 (P1: 60 Min, P2: 4 Hrs)
● 3 cases / month limit
● Kafka streams support
Plain Vanilla Kafka
Local implementation and maintenance cost of hardware
Confluent On AWS Cloud
200$ / month based on 200KB writes and reads per second and a kafka cluster with 30 partitions for a basic cluster which is much more than needed for low to medium value usage case. There is no other hardware or any system maintenance cost. This comes with 24×7 support from Confluent and a guarantee of 99.5% uptime. This all appears to be more than what is needed to start with. If there are more partitions needed than the 30 specified above then the cost is $ 0.004 / partition
Another option is to use a standard confluent cloud cluster. This will have a cost of 1000 $ / month but that will allow 500 free partitions and a better uptime of 99.95%
What is an event driven architecture and how does kafka fit into that paradigm. Lets understand the basics:
Event
An event in an EDA is an update or a state change that occurred. E.g. A shipping ticket order created, a truck became available
Message
Data that gets published due to an event is called a message. Data usually conforms to some data schema and has all the required fields for all actors that care about the event to act on it
Three components of an EDA
Producers
Producers act on events that happen and create messages with all the data that would be needed for any actor down the line in the processing chain and publish that message to a channel or a router. Kafka is an event channel. Producers can produce messages to kafka or any other event router such as MSMQ or RabbitMQ. Producers do not directly interact with the consumer and they don’t know if a consumer exists for the message being produced.
Consumers (Sinks)
Consumers read messages as provided by the event channel such as Kafka and perform actions that they need to. A single message can be consumed by many different consumers to do different actions on the same message
Channels (Routers)
Kafka is an event channel. Event channels store the message for a certain period of time and make it available for any consumer to consume and process the data. Kafka can be configured to store messages for 24 hours or for a long period of time. A message that has already been processed is occupying wasteful space and hence it’s better to have short retention for the messages. For details on kafka see the other blog https://aqibtech.net/apache-kafka-vs-confluent-kafka/
Loose coupling
Components of a system are considered loosely coupled if each component can independently act without the knowledge of the other parts of the system. Each component can be serviced, brought down, replaced or modified without the need or necessity to modify or service other parts of the system. In this scenario the kafka consumers, producers and kafka storage itself act independently of each other.
Sync vs Async
Synchronous communication is when the communication happens in real time and asynchronous communication is when the components in a system interact without the requirement or expectation for any component to respond immediately. A web service making a db call to store data and waiting for a response is sync communication. A kafka producer sending a message to kafka and waiting for kafka to confirm that the message was successfully written is sync communication. However in the above example kafka producer, New Dock Processor, acts on CDC (Change Data Capture) from New DB and produces the message to the kafka topic ME2Dispatch_New. The Legacy processor consumes this message at its own pace and writes to the Legacy DB and then writes a resultant message to the other kafka topic called ME2Dispatch_Legacy. New Dock processor then consumes the message from the legacy topic which is actually a response to the message that was produced earlier in the “New” topic. The entire cycle can be completed in a few milliseconds based on how efficient the processing business logic is however due to the async approach overall latency decreases and many more messages can be processed in unit time
What does this setup lead to?
Loose Coupling
As described above each of the components of the kafka ecosystem or any event driven architecture should not and cannot assume about the other counterpart and hence is loosely coupled with other components. A procedure can keep producing data to kafka even if there are no consumers ready to consume the data. The data will stay in Kafka (based on retention window) and whenever consumer becomes available, every event that was created will eventually get processed
Atomicity of events
Each event generated should have no dependency on any other event generated before or after and hence each event should be self contained in itself.
Business Process Unit Abstraction
Any event should not include internal business logic details or should not be tied to a specific implementation. More leaky the abstraction more the messages and the events will be resistant to changes and updates
Specific events (non generic)
If the events and messages are very generic then all events that happen can be represented by the same event and then each of the events get separated out by some textual data leading to textual parsing and bad performance because each consumer has to deserialize the data in order to know whether its interested in the event or not.
Asynchronous
Events can be and will be generated out of order some times. If a sequence is desired then the partitioning key should be the unique identifier of an event.
Self Contained
Each event should have all data that any downstream processor (or consumer) can ever need. An event that depends on events before or after is flawed and will lead to issues
Hope this was helpful to you. For questions / concerns / comments / help feel free to reach out to the Data Experts at Aqib Technologies!
I have created a public repository that shows the code end to end. I wrote this script in one day since I needed it for a project. Feel free to use it and leave me feedback
One of the clients had a ton of gigantic CSV files in Amazon S3 stored in a zip or otherwise and they wanted their snowflake database to be curated with tables that map to these csv files.
This blog post covers the basics of the process as well as some of the road blocks we hit in the process. I chose to do this in Python although I could have chosen any other language. Language choice is critical at times since end to end latency and time taken for completion of the transfer might be impacted by that choice. I say ‘might be’, because I did not validate it in this particular set up. However as you will see in my other article I have demonstrated Java to be multitudes faster as compared to python in an ETL process similar to this
Amazon S3 is a popular cloud storage service that provides a scalable and secure storage option for large files. It is commonly used to store and distribute files, including data in the CSV (Comma Separated Values) format. In this tutorial, we will show you how to read CSV files from an S3 bucket using Python.
Prerequisites:
An AWS account and an S3 bucket
AWS CLI and boto3 library installed on your system
Python 3 installed
Step 1: Set up the AWS CLI To use the AWS CLI, you need to configure it with your AWS credentials. To do this, run the following command:
aws configure
You will be prompted to enter your AWS access key ID, secret access key, default region name, and default output format.
Alternatively, if you prefer to work with config or yaml files then create a config class to read credentials from a yaml file and add the properties you need to the class
class Config:
def __init__(self, configyamlfile):
self.settings = dict()
with open(configyamlfile, "r") as stream:
try:
self.settings = yaml.safe_load(stream)
except yaml.YAMLError as exception:
print(exception)
@property
def User(self):
return self.settings["user"]
Step 2: Install the boto3 library To read files from an S3 bucket, we will use the boto3 library, which is an Amazon Web Services (AWS) SDK for Python. To install boto3, run the following command:
pip install boto3
Step 3: Read the CSV file from the S3 bucket. We will create a class CsvReader that reads CSV files from S3. We can add different modes of reading to this class such as reading an individual csv file, reading a zip containing multiple csv files, processing a local csv file that has been downloaded in the previous run and more. We will start by importing the necessary libraries and initializing a boto3 client and a boto3 session.
Next, we will list the files in a folder in the bucket using a prefix and process each of them one by one. For each file create a local path and copy the file locally
response = self.client.list_objects(Bucket=bucket_name, Prefix=prefix)
for content in response.get('Contents', []):
s3filepath = content.get('Key')
name = s3filepath.rsplit('/', 1)[-1]
self.client.download_file(bucket_name, s3filepath, name)
Finally, we can use pandas dataframe to perform data analysis or visualization. I have used some extra parameters for read_csv. I will discuss those in the next article. You can just pass the file name for a generic reading of csv with the pandas dataframe
What if you need to process zip files in s3 buckets. If the file is large (> 2 GB) fetch the file locally as for any other file and process the zip.
If the zip file size is reasonable then you can process the csv files in the zip without ever downloading them.
s3 = self.session.resource("s3")
bucket = s3.Bucket(BUCKET_NAME)
obj = bucket.Object(prefix)
with io.BytesIO(obj.get()["Body"].read()) as tf:
# rewind the file
tf.seek(0)
# Read the file as a zipfile and process the members
with ZipFile(tf, mode='r') as zipf:
for subfile in zipf.namelist():
name = subfile[:-4]
filepath = zipf.extract(subfile)
print(f"processing {subfile} at {filepath}")
self.process_csvfile(filepath, name)
In conclusion, reading CSV files from an S3 bucket in Python is a simple process that can be accomplished using the boto3 library. With just a few lines of code, you can retrieve and process data stored in an S3 bucket, making it a convenient and scalable option for data storage and distribution.
Snowflake is a cloud-based data warehousing platform that offers seamless integration with various programming languages, including Python. With Snowflake, you can easily create and manage tables using Python, without having to worry about the underlying infrastructure. In this blog post, we will show you how to create tables in Snowflake using Python.
Before we get started, make sure you have the Snowflake Python connector installed. You can install it using the following command:
pip install snowflake-connector-python
Next, you need to establish a connection to your Snowflake account. You can do this by providing your account information, username, password, and the name of your Snowflake warehouse. Here’s an example using values defined in yaml file and wired in using a config class
Once you have established a connection, you can create a table using the execute method of the connection object. The syntax for creating a table in Snowflake is similar to the syntax for creating a table in SQL. Since we are creating the table based on a csv lets use pandas dataframe to manufacture the create table SQL. In the example below I have specified on_bad_lines setting to be warn and continue. You can choose to error or ignore. In my case I wanted to get the warning so that I can log and analyze for missing data after imports are complete. Another thing you might notice is nrows=3. I am using the setting since the files are large and I am using dataframe to do some initial sanity check and get table creation string. It does not make sense to make pandas load a 100MB file to do that. Hence I load just the first few rows.
df = pd.read_csv(file, on_bad_lines='warn', nrows=3)
cols = ",".join([f'{c} date default 1/1/1950 '
if c.lower().endswith("_dt")
else f'{c} varchar'
for c in df.columns.values])
Now that I have the create table SQL squared away lets look into how we plan to load the table with data from the csv. The best way to do that is to put the file into a stage on snowflake and then copy the data from the csv file on the stage into the table you just created. You can use an existing stage or create one for csv files with comma as the delimiter
Last step is to copy data from csv into the table. Here is an example command to do that. There are some road blocks I hit during the copy step due to the data I was dealing with. I will explain each of the tweaks I used to work around the issues I faced.
FIELD_OPTIONALLY_ENCLOSED_BY The data in some of the csv files was enclosed in double quotes.”Mohammed”,”Bloomberg”. This setting instructs snowflake to remove these enclosing double quotes before insertion
NULL_IF I had some data columns. If the data is empty, snowflake correctly assigns null date or default date to it. However if the null is not clear then data insertion stops with errors. NULL_IF instructs snowflake for all the possible null values that the csv files might have
ON_ERROR By default if there are errors snowflake returns the error and stops processing. If you want to continue data insertion in spite of errors you can set this to continue
VALIDATION_MODE In case of errors in data, if you continue on error with the above setting the data will be inserted but you will not know what were the errors encountered and for which rows. To get the error list without inserting the data you can use validation_mode
# Copy data from csv file into the table
copy into {name} from @<database_name>.<schema_name>.DATA_STAGE/{csvfile}.gz file_format = (type = "csv" field_delimiter = "," skip_header = 1 field_optionally_enclosed_by = \'"\' trim_space = false NULL_IF = (\'\\\\N\', \'NULL\', \'NUL\', \'\') ) ')
One thing to note is that to create table and insert data you need to run a few more sql statements to the set the arena correctly. Pick the correct role for the job. You dont have to be sysadmin. Set the correct database and the schema. Make sure the warehouse is set and is resumed if its suspended. Lastly you can drop the table if you want to start fresh
use role SYSADMIN
use database <db_name>
use warehouse <Warehouse>
ALTER WAREHOUSE <Warehouse> RESUME IF SUSPENDED
use schema <Schema>
drop table if exists <db_name>
create table {name}({cols})
And that’s it! You have successfully created tables in Snowflake using Python. With Snowflake’s robust data warehousing platform and seamless integration with various programming languages, you can easily manage your data and build powerful data applications.
In the next article we will see it all playing together