Just to summarize in brief
Companies like Uber, PayPal, Spotify, Goldman Sachs, Tinder, Pinterest, and Tumbler also use Kafka for stream processing and message passing and claim it to be one of the most popular big data tools in the world. Below are the latest Apache Kafka interview questions answers:
Apache Kafka is a streaming platform that is free and open-source. It is written in Scala and Java. Kafka was first built at LinkedIn as a messaging queue, but it has evolved into much more. It’s a versatile tool for working with data streams that may be applied to a variety of scenarios.
Kafka is a distributed system, which means it can scale up as needed. All you have to do now is add new Kafka nodes (servers) to the cluster.
Kafka aims to provide a platform for handling real-time data feeds and can handle trillions of events on a daily basis. Here are some features of Apache Kafka:
Following are the traditional methods of message transfer:
Following are the major components of Kafka:
Following are the four core APIs that Kafka uses:
Topics in Kafka are divided into partitions. Multiple consumers can read from a topic in parallel by reading from each partition. The partitions are separated in order. While creating a topic, the number of partitions has to be specified, although this number is arbitrary and can be changed later on.
Apache ZooKeeper is a naming registry for distributed applications as well as a distributed, open-source configuration and synchronization service. It keeps track of the Kafka cluster nodes’ status, as well as Kafka topics, partitions, and so on.
ZooKeeper is used by Kafka brokers to maintain and coordinate the Kafka cluster. When the topology of the Kafka cluster changes, such as when brokers and topics are added or removed, ZooKeeper notifies all nodes. When a new broker enters the cluster, for example, ZooKeeper notifies the cluster, as well as when a broker fails. ZooKeeper also allows brokers and topic partition pairs to elect leaders, allowing them to select which broker will be the leader for a given partition (and server read and write operations from producers and consumers), as well as which brokers contain clones of the same data. When the cluster of brokers receives a notification from ZooKeeper, they immediately begin to coordinate with one another and elect any new partition leaders that are required. This safeguards against the unexpected absence of a broker.
In Kafka, each partition has one server that acts as a Leader and one or more servers that operate as Followers. The Leader is in charge of all read and writes requests for the partition, while the Followers are responsible for passively replicating the leader. In the case that the Leader fails, one of the Followers will assume leadership. The server’s load is balanced as a result of this.
Topic replication is critical for constructing Kafka deployments that are both durable and highly available. When one broker fails, topic replicas on other brokers remain available to ensure that data is not lost and that the Kafka deployment is not disrupted. The replication factor specifies the number of copies of a topic that are kept across the Kafka cluster. It takes place at the partition level and is defined at the subject level. A replication factor of two, for example, will keep two copies of a topic for each partition.
Each partition has an elected leader, and other brokers store a copy that can be used if necessary. Logically, the replication factor cannot be more than the cluster’s total number of brokers. An In-Sync Replica (ISR) is a replica that is up to date with the partition’s leader.
A consumer group in Kafka is a collection of consumers who work together to ingest data from the same topic or range of topics. The name of an application is essentially represented by a consumer group. Consumers in Kafka often fall into one of several categories. The ‘-group’ command must be used to consume messages from a consumer group.
By default, the maximum size of a Kafka message is 1MB (megabyte). The broker settings allow you to modify the size. Kafka, on the other hand, is designed to handle 1KB messages as well.
A replica that has been out of ISR for a long period of time indicates that the follower is unable to fetch data at the same rate as the leader.
Firstly, we extract Kafka once we have downloaded the most recent version. We must make sure that our local environment has Java 8+ installed in order to run Kafka.
The following commands must be done in order to start the Kafka server and ensure that all services are started in the correct order:
$bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
Geo-Replication is a Kafka feature that allows messages in one cluster to be copied across many data centers or cloud regions. Geo-replication entails replicating all of the files and storing them throughout the globe if necessary. Geo-replication can be accomplished with Kafka’s MirrorMaker Tool. Geo-replication is a technique for ensuring data backup.
Following are the disadvantages of Kafka :
Following are some of the real-world usages of Apache Kafka:
Following are the use cases of Kafka monitoring :
A Schema Registry is present for both producers and consumers in a Kafka cluster, and it holds Avro schemas. For easy serialization and de-serialization, Avro schemas enable the configuration of compatibility parameters between producers and consumers. The Kafka Schema Registry is used to ensure that the schema used by the consumer and the schema used by the producer are identical. The producers just need to submit the schema ID and not the whole schema when using the Confluent schema registry in Kafka. The consumer looks up the matching schema in the Schema Registry using the schema ID.
Kafka cluster is basically a group of multiple brokers. They are used to maintain load balance. Because Kafka brokers are stateless, they rely on Zookeeper to keep track of their cluster state. A single Kafka broker instance can manage hundreds of thousands of reads and writes per second, and each broker can handle TBs of messages without compromising performance. Zookeeper can be used to choose the Kafka broker leader. Thus having a cluster of Kafka brokers heavily increases the performance.
In Kafka terminology, messages are referred to as records. Each record has a key and a value, with the key being optional. For record partitioning, the record’s key is used. There will be one or more partitions for each topic. Partitioning is a straightforward data structure. It’s the append-only sequence of records, which is arranged chronologically by the time they were attached. Once a record is written to a partition, it is given an offset – a sequential id that reflects the record’s position in the partition and uniquely identifies it inside it.
Partitioning is done using the record’s key. By default, Kafka producer uses the record’s key to determine which partition the record should be written to. The producer will always choose the same partition for two records with the same key.
This is important because we may have to deliver records to customers in the same order that they were made. You want these events to come in the order they were created when a consumer purchases an eBook from your webshop and subsequently cancels the transaction. If you receive a cancellation event before a buy event, the cancellation will be rejected as invalid (since the purchase has not yet been registered in the system), and the system will then record the purchase and send the product to the client (and lose you money). You might use a customer id as the key of these Kafka records to solve this problem and assure ordering. This will ensure that all of a customer’s purchase events are grouped together in the same partition.
Partitions allow a single topic to be partitioned across numerous servers from the perspective of the Kafka broker. This allows you to store more data in a single topic than a single server can. If you have three brokers and need to store 10TB of data in a topic, one option is to construct a topic with only one partition and store all 10TB on one broker. Another alternative is to build a three-partitioned topic and distribute 10 TB of data among all brokers. A partition is a unit of parallelism from the consumer’s perspective.
Multi-tenancy is a software operation mode in which many instances of one or more programs operate in a shared environment independently of one another. The instances are considered to be physically separate yet logically connected. The level of logical isolation in a system that supports multi-tenancy must be comprehensive, but the level of physical integration can vary. Kafka is multi-tenant because it allows for the configuration of many topics for data consumption and production on the same cluster.
The Kafka Replication Tool is used to create a high-level design for the replica maintenance process. The following are some of the replication tools available:
Following are the differences between Kafka and Rabbitmq:
One of the Apache Kafka’s alternative is RabbitMQ. So, let’s compare both:
Two major measurements are taken into account while tuning for optimal performance: latency measures, which relate to the amount of time it takes to process one event, and throughput measures, which refer to the number of events that can be processed in a given length of time. Most systems are tuned for one of two things: delay or throughput, whereas Kafka can do both.
The following stages are involved in optimizing Kafka’s performance:
The following table illustrates the differences between Redis and Kafka:
Redis | Kafka |
---|---|
Push-based message delivery is supported by Redis. This means that messages published to Redis will be distributed to consumers automatically. | Pull-based message delivery is supported by Kafka. The messages published to the Kafka broker are not automatically sent to the consumers; instead, consumers must pull the messages when they are ready. |
Message retention is not supported by Redis. The communications are destroyed once they have been delivered to the recipients. | In its log, Kafka allows for message preservation. |
Parallel processing is not supported by Redis. | Multiple consumers in a consumer group can consume partitions of the topic concurrently because of the Kafka’s partitioning feature. |
Redis can not manage vast amounts of data because it’s an in-memory database. | Kafka can handle massive amounts of data since it uses disc space as its primary storage. |
Because Redis is an in-memory store, it is much faster than Kafka. | Because Kafka stores data on disc, it is slower than Redis. |
The security given by Kafka is made up of three parts:
The following table illustrates the differences between Kafka and Java Messaging Service:
Java Messaging Service(JMS) | Kafka |
---|---|
The push model is used to deliver the messages. Consumers receive messages on a regular basis. | A pull mechanism is used in the delivery method. When consumers are ready to receive the messages, they pull them. |
When the JMS queue receives confirmation from the consumer that the message has been received, it is permanently destroyed. | Even after the consumer has viewed the communications, they are maintained for a specified length of time. |
JMS is better suited to multi-node clusters in very complicated systems. | Kafka is better suited to handling big amounts of data. |
JMS is a FIFO queue that does not support any other type of ordering. | Kafka ensures that partitions are sent in the order in which they appeared in the message. |
The MirrorMaker is a standalone utility for copying data from one Apache Kafka cluster to another. The MirrorMaker reads data from original cluster topics and writes it to a destination cluster with the same topic name. The source and destination clusters are separate entities that can have various partition counts and offset values.
Apache Flume is a dependable, distributed, and available software for aggregating, collecting, and transporting massive amounts of log data quickly and efficiently. Its architecture is versatile and simple, based on streaming data flows. It’s written in the Java programming language. It features its own query processing engine, allowing it to alter each fresh batch of data before sending it to its intended sink. It is designed to be adaptable.
The following table illustrates the differences between Kafka and Flume :
Kafka | Flume |
---|---|
Kafka is a distributed data system. | Apache Flume is a system that is available, dependable, and distributed. |
It essentially functions as a pull model. | It essentially functions as a push model. |
It is made for absorbing and analysing real-time streaming data. | It collects, aggregates, and moves massive amounts of log data from a variety of sources to a centralised data repository in an efficient manner. |
If it is resilient to node failure, it facilitates automatic recovery. | If the flume-agent fails, you will lose events in the channel. |
Kafka operates as a cluster that manages incoming high-volume data streams in real-time. | Flume is a tool for collecting log data from web servers that are spread. |
It is a messaging system that is fault-tolerant, efficient, and scalable. | It is made specifically for Hadoop. |
It’s simple to scale. | In comparison to Kafka, it is not scalable. |
Confluent is an Apache Kafka-based data streaming platform: a full-scale streaming platform capable of not just publish-and-subscribe but also data storage and processing within the stream. Confluent Kafka is a more comprehensive Apache Kafka distribution. It enhances Kafka’s integration capabilities by including tools for optimizing and managing Kafka clusters, as well as ways for ensuring the streams’ security. Kafka is easy to construct and operate because of the Confluent Platform. Confluent’s software comes in three varieties:
Following are the advantages of Confluent Kafka :
Producers transmit data to brokers in JSON format in Kafka. The JSON format stores data in string form, which can result in several duplicate records being stored in the Kafka topic. As a result, the amount of disc space used increases. As a result, before delivering messages to Kafka, compression or delaying of data is performed to save disk space. Because message compression is performed on the producer side, no changes to the consumer or broker setup are required.
It is advantageous because of the following factors:
Message Compression has the following disadvantages :
Following are some of the use cases where Kafka is not suitable :
Log compaction is a way through which Kafka assures that for each topic partition, at least the last known value for each message key within the log of data is kept. This allows for the restoration of state following an application crash or a system failure. During any operational maintenance, it allows refreshing caches after an application restarts. Any consumer processing the log from the beginning will be able to see at least the final state of all records in the order in which they were written, because of the log compaction.
A Kafka cluster can apply quotas on producers and fetch requests as of Kafka 0.9. Quotas are byte-rate limits that are set for each client-id. A client-id is a logical identifier for a request-making application. A single client-id can therefore link to numerous producers and client instances. The quota will be applied to them all as a single unit. Quotas prevent a single application from monopolizing broker resources and causing network saturation by consuming extremely large amounts of data.
Following are the guarantees that Kafka assures :
It’s as simple as assigning a unique broker id, listeners, and log directory to the server.properties file to add new brokers to an existing Kafka cluster. However, these brokers will not be allocated any data partitions from the cluster’s existing topics, so they won’t be performing much work unless the partitions are moved or new topics are formed. A cluster is referred to as unbalanced if it has any of the following problems :
Leader Skew:
Consider the following scenario: a topic with three partitions and a replication factor of three across three brokers.
The leader receives all reads and writes on a partition. Followers send fetch requests to the leaders in order to receive their most recent messages. Followers exist solely for redundancy and fail-over purposes.
Consider the case of a broker who has failed. It’s possible that the failed broker was a collection of numerous leader partitions. Each unsuccessful broker’s leader partition is promoted as the leader by its followers on the other brokers. Because fail-over to an out-of-sync replica is not allowed, the follower must be in sync with the leader in order to be promoted as the leader.
If another broker goes down, all of the leaders are on the same broker, therefore there is no redundancy.
When both brokers 1 and 3 go live, the partitions gain some redundancy, but the leaders stay focused on broker 2.
As a result, the Kafka brokers have a leader imbalance. When a node is a leader for more partitions than the number of partitions/number of brokers, the cluster is in a leader skewed condition.
Solving the leader skew problem:
Kafka offers the ability to reassign leaders to the desired replicas in order to tackle this problem. This can be accomplished in one of two ways:
Broker Skew:
Let us consider a Kafka cluster with nine brokers. Let the topic name be “sample_topic.” The following is how the brokers are assigned to the topic in our example:
Broker Id | Number of Partitions | Partitions | Is Skewed? |
---|---|---|---|
0 | 3 | (0, 7, 8) | No |
1 | 4 | (0, 1, 8, 9) | No |
2 | 5 | (0, 1, 2 , 9, 10) | No |
3 | 6 | (1, 2, 3, 9, 19, 11) | Yes |
4 | 6 | (2, 3, 4, 10, 11, 12) | Yes |
5 | 6 | (3, 4, 5, 11, 12, 13) | Yes |
6 | 5 | (4, 5, 6, 12, 13) | No |
7 | 4 | (5, 6, 7, 13) | No |
8 | 3 | (6, 7, 8) | No |
On brokers 3,4 and 5, the topic “sample_topic” is skewed. This is because if the number of partitions per broker on a given issue is more than the average, the broker is considered to be skewed.
Solving the broker skew problem :
The following steps can be used to solve it:
To add a server to a Kafka cluster, it only needs to be given a unique broker id and Kafka must be started on that server. However, until a new topic is created, a new server will not be given any of the data partitions. As a result, when a new machine is introduced to the cluster, some existing data must be migrated to these new machines. To relocate some partitions to the new broker, we use the partition reassignment tool. Kafka will make the new server a follower of the partition it is migrating to, allowing it to replicate the data on that partition completely. When all of the data has been duplicated, the new server can join the ISR, and one of the current replicas will erase the data it has for that partition.
The Apache cluster will automatically identify any broker shutdown or failure. In this instance, new leaders for partitions previously handled by that device will be chosen. This can happen as a result of a server failure or even if it is shut down for maintenance or configuration changes. When a server is taken down on purpose, Kafka provides a graceful method for terminating the server rather than killing it.
When a server is switched off:
Currently, Kafka does not allow you to reduce the number of partitions for a topic. The partitions can be expanded but not shrunk. The alter command in Apache Kafka allows you to change the behavior of a topic and its associated configurations. To add extra partitions, use the alter command.
To increase the number of partitions to five, use the following command:
./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic sample-topic --partitions 5
When the producer can’t assign memory to a record because the buffer is full, a BufferExhaustedException is thrown. If the producer is in non-blocking mode, and the rate of production exceeds the rate at which data is transferred from the buffer for long enough, the allocated buffer will be depleted, the exception will be thrown.
If the consumers are sending huge messages or if there is a spike in the number of messages sent at a rate quicker than the rate of downstream processing, an OutOfMemoryException may arise. As a result, the message queue fills up, consuming memory space.
A topic’s retention time can be configured in Kafka. A topic’s default retention time is seven days. While creating a new subject, we can set the retention time. When a topic is generated, the broker’s property log.retention.hours are used to set the retention time. When configurations for a currently operating topic need to be modified, kafka-topic.sh must be used.
The right command is determined on the Kafka version in use.
Kafka Streams | Spark Streaming |
---|---|
Kafka is fault-tolerant because of partitions and their replicas. | Using Cache and RDD (Resilient Distributed Dataset), Spark can restore partitions. |
It is only capable of handling real-time streams | It is capable of handling both real-time and batch tasks. |
Messages in the Kafka log are persistent. | To keep the data durable, you’ll need to utilize a dataframe or another data structure. |
There are no interactive modes in Kafka. The data from the producer is simply consumed by the broker, who then waits for the client to read it. | Interactive modes are available. |
The nodes in a ZooKeeper tree are called znodes. Version numbers for data modifications, ACL changes, and timestamps are kept by Znodes in a structure. ZooKeeper uses the version number and timestamp to verify the cache and guarantee that updates are coordinated. Each time the data on Znode changes, the version number connected with it grows.
There are three different types of Znodes:
A stream of messages that belong to a particular category is called a topic in Kafka. Kafka stores data in topics that are split into partitions.
Consumers read data from the brokers. Consumers can subscribe to one or more topics and receive published messages from these topics by pulling data from the brokers. Consumers pull the data at their own pace.
Producers are responsible for publishing messages to one or more Kafka topics. Producers send data to the Kafka brokers. Whenever a producer publishes a message to the broker, the broker appends the message to a partition. The producer can also send messages to a partition of their choice.
A Kafka cluster contains one or more servers that are known as brokers. A broker works as a container that holds multiple topics, with various partitions. A broker in a cluster can only be identified by the integer ID associated with it. Connection with any one broker in a cluster implies a connection with the whole cluster. Brokers in Kafka do not contain the complete data, but they know about other brokers, topics, and partitions of the cluster.
Load balancing in Kafka is handled by the producers. The message load is spread out between the various partitions while maintaining the order of the message. By default, the producer selects the next partition to take up message data using a round-robin approach. If a different approach other than round-robin is to be used, users can also specify exact partitions for a message.
Messages sent to Kafka clusters get appended to one of the logs. These messages remain in the logs even after being consumed, for a configurable period of time or until a configurable size is reached. This configurable amount of time for which the message remains in the log is known as the retention period. The message will be available for the amount of time specified by the retention period. Kafka allows users to configure the retention period for messages on a per-topic basis. The default retention period for a message is 7 days.
In Kafka, the partition data is copied to other brokers, which are known as replicas. If there is a point of failure in the partition data in one node, then there are other nodes that will provide a backup and ensure that the data is still available. This is how Kafka allows fault tolerance.
Apache Kafka provides the alter command to change a topic behavior and modify the configurations associated with it. The alter command can be used to add more partitions.
The command to increase the partitions to 4 is as follows:
./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic my-topic --partitions 4
The optimal number of partitions a topic should be divided into must be equal to the number of consumers.
The Kafka-console-consumer.sh command can be used to view the messages. The following command can be used to view the messages from a topic :
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
There are 2 ways to get the list of available brokers in a Kafka cluster are as follows:
zookeeper-shell.sh :2181 ls /brokers/ids Which will give an output like: WATCHER:: WatchedEvent state:SyncConnected type:None path:null [0, 1, 2, 3] This indicates that there are 4 alive brokers - 0,1,2 and 3
First you have to login to ZooKeeper client zkCli.sh -server :2181 Then use the below command to list all the available brokers ls /brokers/ids
Both the methods used above make use of the ZooKeeper to find out the list of available brokers.
The Kafka Migration tool is used to efficiently move from one environment to another. It can be used to move existing Kafka data from an older version of Kafka to a newer version.
Once you start the ZooKeeper, you can list all the topics using
${kafka_home}/bin/kafka-topics.sh --list --zookeeper localhost:2181
According to Kafka, there are some legal rules to be followed to name topics, which are as follows:
Currently, in Apache Kafka, meta-information about topics is stored in the ZooKeeper. Information regarding the location of the partitions and the configuration details related to a topic are stored in the ZooKeeper in a separate Kafka cluster.
By default, the largest size of a message that can be sent in Kafka is 1MB. In order to send larger messages using Kafka, a few properties have to be adjusted. Here are the configuration details that have to be updated
In software terms, the scalability of an application is its ability to maintain its performance when it is exposed to changes in application and processing demands. In Kafka, the messages corresponding to a particular topic are divided into partitions. This allows the topic size to be scaled beyond the size that will fit on a single server. Allowing a topic to be divided into partitions ensures that Kafka can guarantee load balancing over multiple consumer processes. In addition, the concept of consumer groups in Kafka also contributes to making it more scalable. In a consumer group, a particular partition is consumed by only one consumer in the group. This aids in the parallelism of consuming multiple messages of a topic.
${kafka_home}/bin/zookeeper-server-start.sh
To create a topic:
${kafka_home}/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor [replication factor] --partitions [number_of_partitions] --topic [unique-topic-name]
To Delete/Remove a topic:
Go to ${kafka_home}/config/server.properties, and add the below line: Delete.topic.enable = true Restart the Kafka server with the new configuration: ${kafka_home}/bin/kafka-server-start.sh ~/kafka/config/server.properties Delete the topic: ${kafka_home}/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic-name
To add a config:
${kafka_home}/bin/kafka-configs.sh –zookeeper localhost:2181 –topics –topic_name –alter –add-config x=y
To remove a config:
${kafka_home}/bin/kafka-configs.sh –zookeeper localhost:2181 –topics –topic_name –alter –delete-config x
Where x is the particular configuration key that has to be changed.
Message compression in Kafka does not require any changes in the configuration of the broker or the consumer.
It is beneficial for the following reasons:
Disadvantages:
Produces write messages to Kafka, one at a time. Kafka waits for the messages that are being sent to Kafka, creates a batch and puts the messages into the batch, and waits until this batch becomes full. Only then is the batch sent to Kafka. The batch here is known as the producer batch. The default size of a producer batch is 16KB, but it can be modified. The larger the batch size the more is the compression and throughput of the producer requests.
Yes, if the number of partitions is greater than the number of consumers in a consumer group, then a consumer will have to read more than one partition from a topic.
Kafka clusters are assigned unique and immutable identifiers. The identifier for a particular cluster is known as the cluster id. A cluster id can have a maximum number of 22 characters and has to follow the regular expression [a-zA-Z0-9_\-]+. It is generated when a broker with version 0.10.1 or later is successfully started for the first time. The broker attempts to get the cluster id from the znode during startup. If the znode does not exist, the broker generates a new cluster id and creates a znode with this cluster id.
The Kafka broker does not keep a tab of which messages have been read by the consumers. It simply keeps all of the messages in its queue for a fixed period of time, known as the retention time, after which the messages are deleted. It is the responsibility of the consumer to pull the messages from the queue. Hence, Kafka is said to have a “smart-client, dumn-broker” architecture.
BufferExhaustedException is thrown when the producer cannot allocate memory to a record due to the buffer being too full. The exception is thrown if the producer is in non-blocking mode and the rate of production exceeds the rate at which data is sent from the buffer for long enough for the allocated buffer to be exhausted.
OutOfMemoryException can occur if the consumers are sending large messages or if there is a spike in the number of messages wherein the consumer is sending messages at a rate faster than the rate of downstream processing. This causes the message queue to fill up, taking up memory.
The retention time can be configured in Kafka for a topic. The default retention time for a topic is 7 days. The retention time can be configured while a new topic is set up. Log.retention.hours is the property of a broker which is used to set the retention time when a topic is created. However, when configurations have to be changed for a currently running topic, kafka-topic.sh will have to be used.
The correct command depends on the version of Kafka that is in use.
Up to 0.8.2 kafka-topics.sh –alter is the command to be used.
from 0.9.0 going forward, use kafka-configs.sh –alter
No, Kafka does not currently support reducing the number of partitions for a topic. The partitions can be increased, but not decreased.
In Kafka, message transfer among the producer, broker, and consumers is done by making use of standardized binary message format. The process of converting the data into a stream of bytes for the purpose of the transmission is known as Serialization. Deserialization is the process of converting the bytes of arrays into the desired data format. Custom serializers are used at the producer end to let the producer know how to convert the message into byte arrays. To convert the byte arrays back into the message, deserializers are used at the consumer end.
Kafka uses a standardized binary message format that is shared by the producer, broker, and consumer to ensure that the data can pass without any modification.
Create Znodes Command: Znodes are created within the given path. Syntax: create /path/data Flags can be used to specify whether the znode created will be persistent, ephemeral or sequential. E.g. create -e /path/data creates an ephemeral znode. create -s /path/data creates a sequential znode. Note: All znodes are persistent by default. Remove Znodes Command: rmr /path Can be used to remove the znode specified and all its children.
Reliability: the failure of a single or a few systems does not cause the whole system to fail.
Scalability: Performance of the application can be increased as per requirements by adding more machines with minor changes in the configuration of the application with no downtime.
Transparency: the complexity of the system is masked from the users, as the application shows itself as a single entity.
Race conditions may arise: two or more machines trying to access the same resource can cause a race condition, as the resource can only be given to a single machine at a time.
Deadlock: in the process of trying to solve race conditions, deadlocks may arise where two or more operations may end up waiting for each other to complete indefinitely.
Inconsistency may arise due to partial failure in some of the systems.
The answer to this question encompasses two main aspects – Partitions in a topic and Consumer Groups.
A Kafka topic is divided into partitions. The message sent by the producer is distributed among the topic’s partitions based on the message key. Here we can assume that the key is such that messages would get equally distributed among the partitions.
Consumer Group is a way to bunch together consumers so as to increase the throughput of the consumer application. Each consumer in a group latches to a partition in the topic. i.e. if there are 4 partitions in the topic and 4 consumers in the group then each consumer would read from a single partition. However, if there are 6 partitions and 4 consumers, then the data would be read in parallel from 4 partitions only. Hence its ideal to maintain a 1 to 1 mapping of partition to the consumer in the group.
Now in order to scale up processing at the consumer end, two things can be done:
Doing this would help read data from the topic in parallel and hence scale up the consumer from 2500 messages/sec to 10000 messages per second.
The working principle of Kafka follows the below order.
Below maven dependency is enough to configure the Kafka ecosystem in the application
groupId = org.apache.spark artifactId = spark-streaming-kafka-0-10_2.11 version = 2.2.0 groupId = org.apache.zookeeper artifactId = zookeeper version=3.4.5
This dependency comes with child dependency which will download and add to the application as a part of parent dependency.
Package which need to import in java/scala: import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
Kafka follows a pub-sub mechanism wherein producer writes to a topic and one or more consumers read from that topic. However, Reads in Kafka always lag behind Writes as there is always some delay between the moment a message is written and the moment it is consumed. This delta between Latest Offset and Consumer Offset is called Consumer Lag.
There are various open source tools available to measure consumer lag e.g. LinkedIn Burrow, Confluent Kafka comes with out of the box tools to measure lag.
Kafka is a durable, distributed and scalable messaging system designed to support high volume transactions. Use cases that require a publish-subscribe mechanism at high throughput are a good fit for Kafka. In case you need a point to point or request/reply type communication then other messaging queues like RabbitMQ can be considered.
Kafka is a good fit for real-time stream processing. It uses a dumb broker smart consumer model with the broker merely acting as a message store. So a scenario wherein the consumer cannot be smart and requires a broker to smart instead is not a good fit for Kafka. In such a case, RabbitMQ can be used which uses a smart broker model with the broker responsible for consistent delivery of messages at a roughly similar pace.
Also in cases where protocols like AMQP, MQTT, and features like message routing are needed, in those cases, RabbitMQ is a better alternative over Kafka.
Let’s consider a scenario where we need to read data from the Kafka topic and only after some custom validation, we can add data into some data storage system. To achieve this we would develop some consumer application which will subscribe to the topic. This ensures that our application will start receiving messages from the topic on which data validation and storage process would run eventually. Now we come across a scenario where messages publishing rate to topic exceed the rate at which it is consumed by our consumer application.
If we go with a single consumer then we may fall behind keeping our system updated with incoming messages. The solution to this problem is by adding more consumers. This will scale up the consumption of topics. This can be easily achieved by creating a consumer group, the consortium under which similar behaviour consumers would reside which can read messages from the same topic by splitting the workload. Consumers from the same group usually get their partition of the topic which eventually scales up message consumption and throughput. In case if we have a single consumer for a given topic with 4 partitions then it will read messages from all partitions :
The ideal architecture for the above scenario is as below when we have four consumers reading messages from individual partition :
Even in the case of more consumers then partition results in consumer sitting idle, which is also not good architecture design:
There is another scenario as well where we can have more than one consumer groups subscribed to the same topic:
Regular microservices arrangements will have many microservices collaborating, and that is a colossal issue if not taken care of appropriately. It isn’t practical for each service to have an immediate association with each service that it needs to converse with for 2 reasons:
Kafka is incredible because it enables us to have both Pub-Sub just as queuing highlights. It additionally ensures that the request of the messages is kept up and not expose to arrange idleness or different elements. Kafka likewise enables us to “communicate” messages to different consumers, if necessary. Kafka importance can be understood in building reliable, scalable microservices solution with minimum configuration.
You cannot do that from a class that behaves as a producer like in most queue systems, its role is to fire and forget the messages. The broker will do the rest of the work like appropriate metadata handling with id’s, offsets, etc.
As a consumer of the message, you can get the offset from a Kafka broker. If you gaze in the SimpleConsumer class, you will notice it fetches MultiFetchResponse objects that include offsets as a list. In addition to that, when you iterate the Kafka Message, you will have MessageAndOffset objects that include both, the offset and the message sent.
Replicating messages could be a smart follow in writer that assure that messages can ne’er lose though the most server fails.
This is one of the tricky Kafka interview questions that test the deeper knowledge of candidates in Kafka. Kafka provides the guarantee of tolerating up to N-1 server failures without losing any record committed to the log. In addition, Kafka also ensures that the order of messages sent by the producer to the specific topic partition will be the same for multiple messages. Kafka also provides the guarantee that consumer instance can view records in the order of their storage in the log.
Candidates could also expect such the latest Kafka interview questions. RabbitMQ is the most notable alternative for Apache Kafka. The features of Kafka as a distributed, highly available, and a durable system for data sharing and replication are better than RabbitMQ, which does not have these features. The performance rate of Apache Kafka could extend up to 100,000 messages per second. On the other hand, RabbitMQ has a limited performance rate of around 20,000 messages per second.
To achieve the FIFO behavior with Kafka, follow the steps mentioned below:
Set enable.auto.commit=false
Apache Kafka is capable of handling various use cases that are pretty common for a Data Lake implementation.
For example: Handling use cases concerning log aggregations, web activities tracking, and monitoring.
If the Kafka producer is continuously sending out the messages where the broker is not able to handle all the requests, then we get to see the following error:
QueueFullExecption.
So to handle these error situations and also to handle the message requests that are sent from Producers, we can have multiple brokers. So using multiple brokers, the load will be balanced.
In a single node model, you need to have more than one server. Properties file. Then each file has a definition for a broker-id. Here’s example:
broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1
The KafkaJsonSchema Serializer parses the JSON messages to strings. KafkaJsonSchema Desrializer converts strings to JSON format.
Yes, we can have. The thumb rule is; each consumer can associate with one partition. If the consumers are more than the partitions, the excess consumers in the group become idle. So ensure optimum consumers in a consumer group.
No, you cannot decrease. However, you can increase the partitions.
There are two differing frameworks. Those are “Connect Source” and “Connect Sink”. You can import data from source databases and export topics from Kafka to external databases.
Sometimes its difficult to know exactly how to keep Kafka pipeline running smoothly. Here we have five common pitfalls and tips for how to avoid them – from client and broker configurations to design and monitoring considerations, that are sure to save your time and effort down the road.
Let’s get started by looking at some of the common configuration mistakes users make on the client side and broker side.
From the client side:
request.timeout.ms is a client-side configuration that defines how long the client (both producer and consumer) will wait to receive a response from the broker. The default for this configuration is set to 30 seconds. If a response is not received before the timeout is reached, then one of two things will happen: either the client will attempt to resend the request (if retries are enabled and have not yet been exhausted, see below for more details on this) or it will simply fail. It’s recommended to leave request.timeout.ms at the default value of 30 seconds. As discussed above, a lower value could actually increase the amount of time it takes for server-side requests to be processed.
When executing producer.send(), the hope is that records go through and are successfully stored in a topic. The reality is that, for some reason or another, the producer request might fail. In certain cases, the failure is transient and retriable (i.e., the failure could be recovered from given a sufficient amount of time and the client retry of the request) whilst others will be permanent (i.e., something needs to be fixed before the request can succeed).
For example, during cluster rolling, some of the following retriable exceptions may be encountered by clients:
UNKNOWN_TOPIC_OR_PARTITION
LEADER_NOT_AVAILABLE
NOT_LEADER_FOR_PARTITION
NOT_ENOUGH_REPLICAS
NOT_ENOUGH_REPLICAS_AFTER_APPEND
If retries and retry time are not configured properly, all of these exceptions will be logged as errors, which can potentially disrupt your client and result in lost messages.
From the broker side:
Kafka brokers expose a number of really useful JMX metrics that give you great insight into the overall health of your cluster. Unfortunately, not all cluster admins pay enough attention to them.
The following five are a good place to start:
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
: Every partition has a number of replicas across brokers in the cluster. Data is first written to a leader replica and then replicated across to the follower replicas. This metric is a count of the partitions which have not yet been fully replicated across the cluster. Any number of under-replicated partitions is a sign of an unhealthy cluster, as it implies that your data is not fully replicated as expected.kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent
: This metric describes the percentage of time that your network processing threads are idle. All Kafka requests are routed through the network threads, so this metric is pretty crucial. 0 means that all resources are unavailable, and 1 means all resources are available. As a rule of thumb, this value should be above 30% so as to not continuously exhaust your cluster resources.kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent
: The request handler takes requests off of a request queue, processes them, and outputs the responses to response queues. Similar to the above, this metric describes the percentage of time that your broker request handler threads are idle. 0 means that the request handler threads are completely unavailable, and 1 means they are all available. Try to keep this value above 30% as well.kafka.network:type=RequestChannel,name=RequestQueueSize
: This describes the total number of requests in the request queue. A higher count means that the queue is congested, so it’s better to have a lower value for this metric. In conjunction with NetworkProcessorAvgIdlePercent
and RequestHandlerAvgIdlePercent
you can use this metric to get a good idea of how busy the full Kafka request pipeline is.kafka.network:type=RequestMetrics,name=TotalTimeMs,request={Produce|FetchConsumer|FetchFollower}
: This is a series of metrics that describes the total time it takes for a given type of request (including the time it takes to send). It exists for each request type—producer, fetchConsumer, and fetchFollower. This metric gives you a good idea of overall latency in your system. A lower time implies a healthier cluster.NOTE: Broker request latency, or how quickly requests are being handled by the broker, is extremely important to overall cluster health.
Partitions are Kafka’s unit of parallelism – barring other factors such as consumer throughput, of course -so in order to increase your overall throughput, it would make sense to use as many partitions as possible, right? Well, not necessarily. A higher partition count may have a number of consequences in the cluster including but not limited to:
During partitioning Kafka splits each partition into segments. Each segment represents an actual data file on disk. Understanding how segments work and are configured is important to ensure that the broker behaves optimally.
As messages are written to the topic, the data is simply appended to the most recent open log segment file for the partition in which the message belongs. While a segment file remains open, it cannot be considered a candidate for deletion or log compaction. By default, these log segment files will remain open until they’re completely full (1GB) as per the topic-level segment.bytes configuration parameter. Instead, you may wish to force the segment to roll after a specific amount of time; this can be set using segment.ms, another topic-level configuration.
Some users will attempt to set segment.ms to a low value to help trigger log compaction or deletion more frequently, reducing the amount of memory their cluster will take up on disk. However, if segment.ms is configured to be too low (the default is seven days), your cluster will generate a lot of small segment files. With too many of these small segments, your Kafka cluster is liable to encounter a “Too many open files” or “Out of memory” exception. Furthermore, a large number of small or empty segment files can have a negative performance impact on consumers of the topic. During a fetch, consumers can only receive data from at most one segment per partition. So if the segments are very small, the consumers are limited in what they can consume at a given time and, as a result, will have to make more trips to the broker.
Unfortunately, to avoid this common pitfall, you need to be diligent about your configurations when creating and managing topics in your cluster. If a user sets segment.ms to a low value, it’s important to remember to set the value back to the default as soon as possible. After doing so, note that these changes are not retroactive, and previous segments will not be affected.
This strategy is more generally known as load shedding. When a callback handler receives a transient error related to a sent message, it removes associated data from upstream data structures and performs no further actions. The application may log the event.
The situation should be externally visible to operators via monitoring.
Pro:
Con:
The application resends timed-out messages to the client library. At some point, dictated by unsent messages or something similar, the application closes itself off to further inbound traffic until messages start to flow again.
Applications that poll for their inputs can stop polling, resulting in the same effect.
Pros:
Cons:
[A, B, C]
. When message A
times out, is resent, and subsequently accepted, the messages will appear on the topic in the order [B, C, A]
.The application sends all messages to alternative local storage. A mechanism such as a Kafka Connect connector then ingests these into Kafka asynchronously.
Pros:
Cons:
This option uses local storage as a dead letter channel. A batch process imports these messages into Kafka once the system recovers to a normal state.
Pros:
Cons:
Note:Due to the complexity of this option, we strongly advise against it.
The system disables data flow to Kafka on failure and reroutes failed messages to local storage. The application plays back these messages from local storage and resends when Kafka recovers. Regular flow is then resumed.
You may typically use the circuit breaker pattern for synchronous interactions, such as a web service or database invocations that will fail quickly. Attempting to implement this pattern over the top of a Kafka client, which is an asynchronous library, needs to account for the following:
Pros:
Cons:
Note:Due to the complexity of this option, we strongly advise against it.
The sending application writes messages twice. Applications consuming from the affected topics must do so from two clusters and discard previously seen messages.
We have seen similar schemes applied in systems requiring ordered ledgers, where applications send messages to two locations. Consumers do not deduplicate using the typical method (idempotent consumer) of keeping track of previously seen business keys that uniquely identify each message payload and discarding duplicates—a read- and write-intensive process. Instead, they use a technique based on sequencing via a monotonically incrementing counter stamped on each message by the sending process.
The consuming system pulls messages from both clusters and keeps track of the highest counter value. It discards messages with a counter that is equal to or lower than the highest counter previously seen. You can avoid modifying message payloads by storing these counters in a message header.
Pros:
Cons:
Improving throughput by increasing the minimum amount of data fetched in a request.
Use the fetch.max.wait.ms and fetch.min.bytes configuration properties to set thresholds that control the number of requests from your consumer.
When the client application polls for data, both these properties govern the amount of data fetched by the consumer from the broker. You can adjust the properties higher so that there are fewer requests, and messages are delivered in bigger batches. You might want to do this if the amount of data being produced is low. Reducing the number of requests from the consumer also lowers the overhead on CPU utilization for the consumer and broker.
Throughput is improved by increasing the amount of data fetched in a batch, but with some cost to latency. Consequently, adjusting these properties lower has the effect of lowering end-to-end latency.
Thank you for reading. Keep learning and continue visiting the page, will add more Q&A 🙂