Operations of Kafka

Basic Operations of Kafka

In this section, we will be providing  discussions about the basic operations of Kafka as follows:

The Addition and Deletion of Topics

We can add and delete Topics both automatically and manually. For the addition, the command will be

> bin/Kafka-Topics.sh --zookeeper zk_host:port/chroot --create --Topic my_Topic_name
--partitions 20 --replication-factor 3 --config x=y

The messages  that are written  are replicated by the servers are called as brokers. And the whole process of replication is done by the replication factors. For boucing without any obstacles the standard replication factor should be either 2 or 3.

The Topic is split into a number of logs and this is controlled  by the  partition count.A partition count should follow a lot of features. Firstly, they should fit singly in one server. So a total of 50 partitions will result in 50 servers. Maximum equalizations among the consumers is obtained by these partitions.The addition of Topics overrides the default settings.

How the Topics can be modified

The Topics can be best modified by adding as well as deleting contexts.  In order to add partitions the command be like

> bin/Kafka-Topics.sh --zookeeper zk_host:port/chroot --alter --Topic my_Topic_name
--partitions 40
Partitions are used to divide the data into smaller sections. Adding configs we should follow as below
> bin/Kafka-Topics.sh --zookeeper zk_host:port/chroot --alter --Topic my_Topic_name --config x=y
To delete configs:
> bin/Kafka-Topics.sh --zookeeper zk_host:port/chroot --alter --Topic my_Topic_name --deleteConfig x
And to delete off the entire Topic,
> bin/Kafka-Topics.sh --zookeeper zk_host:port/chroot --delete --Topic my_Topic_name

Kafka Spark Streaming Tutorial Video:

Video Thumbnail

Distinguished Turnoff

The Kafka cluster will automatically detect any broker shutdown or failure and elect new partition leaders. This will occur whether a server fails or it is brought down intentionally for maintenance. For configuration changes, Kafka supports a more graceful mechanism for stoping a server than just destroying it. When we cut down a server in an elegant way, it has two optimizations it will take advantage of:

If any of  the servers is failed, the cluster will automatically detect it irresponsible of whether the  broker is failed or broken. It can beautifully shut down the servers. If done, then

Need of log recovery is removed and so the process becomes faster. Before shutting down, it will send all the partitions of that server to its replicas.

Mirroring data between clusters

The Kafka tool has got a mirroring process also which reads the materials from the sources and writes to the destination. The mirror replicates what is written in the sources and then writes down the destination.

Cluster

Mirroring increases the throughput and you can use a lot of it in order to keep the process stay robust.The mirror maker transfers data from cluster to cluster. The source cluster and destination clusters are almost different from each other. The mirror maker is same to both. Because the data the source cluster contains before mirroring will be same  to the destination cluster’s data after mirroring. Both names will be same  in both clusters with different addresses because of its data replication and different offsets and partitions.

So, since mirror cluster will copy the data in a consumer with a different address, so it provides less fault-tolerance. In order to obtain more fault-tolerance, normal in-cluster replication should be used.
The mirroring command for mirroring a single Topic named as your-Topic from two inputs are:

> bin/Kafka-run-class.sh Kafka.tools.MirrorMaker

–consumer.config consumer-1.properties –consumer.config consumer-
2.properties
–producer.config producer.properties –whitelist your-Topic

Get 100% Hike!

Master Most in Demand Skills Now!

Finding the position of the Consumer

Finding out the positions of the consumers is very important. A command for finding out the consumer’s  location is as follows:
> bin/Kafka-run-class.sh Kafka.tools.ConsumerOffsetChecker –zkconnect localhost:2181 –group test

Expanding your cluster

You can anytime add new servers to the clusters by adding them up with a new id and then start them up with new servers. But until and unless partitions are assigned to these new servers, they will not work at all and sit idlely. So to add new ones, the existing ones need to change their positions a little bit.So the movement of the data is done automatically behind the covers. What we do is we have to add the new server in such a position that it has to be behind the partition that seeking for migration. So the newly introduced server walks behind the migrating partition and in the process all the data from the migrating partition will be copied to the  server and finally the partition is kicked out .

We run this reassigning of partitions in 3 ways:
–generate:
Here when we are having a group of Topics and a group of brokers, all the Topics need to be assigned to the new servers. So the generate tool creates the reassignment tool to move all the partitions to the new servers prior to migration.
–execute:
In the execute plan, the tool kicks off the reassignment of the Topics as planed by the user for execution.
–verify:
This tool verifies once the reassignment is done. It tells whether the status is done, ongoing , finished  or even if it is failed.

Learn Kafka

Migration of Data Automatically

The Topics from the present servers are assigned to the new servers  by the reassignment tools. Usually the partitions are expanded in order to move the entire Topic instead of moving a part of it. So the list of servers from whom the Topics are to be transferred and a list of the servers to whom the Topics are to be moved are included in a single list.The reassignment tool transfers the Topics from the old to he new servers  and all the replicas are also transferred to new servers keeping the mirroring in even mood.

Firstly the json file is created  to save the Topics. The creation of this file is as follows:

> cat Topics-to-move.json
{"Topics": [{"Topic": "top1"},
{"Topic": "top2"}],
"version":1
}

Where top1 and top2 are Topics.
After generating the json file, the assignment is introduced as follows
> bin/Kafka-reassign-partitions.sh –zookeeper localhost:2181 –Topics-to-move-json-file Topics-to-move.json –broker-list “5,6” –generate
For assignment:

{"version":1,
"partitions":[{"Topic":"top1","partition":2,"replicas":[1,2]},
{"Topic":"top1","partition":0,"replicas":[3,4]},
{"Topic":"top2","partition":2,"replicas":[1,2]},
{"Topic":"top2","partition":0,"replicas":[3,4]},
{"Topic":"top1","partition":1,"replicas":[2,3]},
{"Topic":"top2","partition":1,"replicas":[2,3]}]
}

For reassignment :

{"Version":1,
"Partitions":[{"Topic":"top1","partition":2,"replicas":  [1,2] } ,
{"Topic":"top1","partition":0,"replicas":[1,2]},
{"Topic":"top2","partition":2,"replicas":[1,2]},
{"Topic":"top2","partition":0,"replicas":[1,2]},
{"Topic":"top1","partition":1,"replicas":[1,2]},
{"Topic":"top2","partition":1,"replicas":[1,2]}]
}

Hence all partitions will be moved from Topics top1, top2 to the servers 1 and 2.  The new assignment should be saved.Hence all Topics top1top2 are to be moved to the new servers 5and 6. So now we know  what has to be moved from which to which server. So the new positions of the Topics are saved in the json file and the present positions of the Topics are saved for keeping a back up in case you really want to bring the Topics back to their old server.

The commands are as given below for Kafka versions:
> bin/Kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –execute
Replica Assigning for the present one:

{"Version":1,
"Partitions": [{"Topic":"top1","partition":2,"replicas":[1,2]},
{"Topic":"top1","partition":0,"replicas":[3,4]},
{"Topic":"top2","partition":2,"replicas":[1,2]},
{"Topic":"top2","partition":0,"replicas":[3,4]},
{"Topic":"top1","partition":1,"replicas":[2,3]},
{"Topic":"top2","partition":1,"replicas":[2,3]}]
}

For reassigning partitions

{"Version":1,
"Partitions":[{"Topic":"top1","partition":2,"replicas":[1,2]},
{"Topic":"top1","partition":0,"replicas":[1,2]},
{"Topic":"top2","partition":2,"replicas":[1,2]},
{"Topic":"top2","partition":0,"replicas":[1,2]},
{"Topic":"top1","partition":1,"replicas":[1,2]},
{"Topic":"top2","partition":1,"replicas":[1,2]}]
}

Now in order to verify whether the reassigning is done perfectly, follow this command
> bin/Kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –verify
Output of Verification:
Reassignment of partition [top1,0] completed successfully
Reassignment of partition [top1,1] completed successfully
Reassignment of partition [top1,2] completed successfully
Reassignment of partition [top2,0] is in progress
Reassignment of partition [top2,1] completed successfully
Reassignment of partition [top2,2] is in progress

Instead of moving the entire set of partitions we can select a certain number of partitions to be assigned to new servers. So here we have to directly go to the execute step

Retiring Servers

Sometimes few servers become de-active and all that we need to do is to remove the Topics from the retiring server to the new servers. New tools are still to be generated to help this process.

Data Centers

Datacenters are used for the maintenance of  operations, which manage the data pipelines. In Kafka we are towards operating with some local clusters so that the data centers communicate directly to the nearest cluster.

Here the data centers work independently. Hence, even if the in between links are broken, those data centers can work independently and are best in operation. The failure of the inner links leads to the failure of the mirroring process. It comes back to its actual position only when the links are redeveloped again.

Some data centers ask for viewing the complete set of data in the Topics , so in that case make sure you do mirroring the clusters of data to show to the new data centers.

These mirrored data are accumulated to data clusters  which can be read when full reading of the complete data is required  by the applications. This deployment pattern is very proper and can warranty latency  to obtain the clusters.

Even in high latency, the Kafka tool provides better through put than other simpler messaging tools because Kafka batches the data in both the sides i.e in the sources as well as in the destinations. The source in the Kafka is the producer and the destination is the consumer.

We should always  prevent using high latency links because it will introduce high latency for the writing acts of the Kafka technology as a result of which Kafka will not be available in all the locations if there is problem in the network.

Our Big Data Courses Duration and Fees

Program Name
Start Date
Fees
Cohort starts on 11th Jan 2025
₹22,743
Cohort starts on 18th Jan 2025
₹22,743
Cohort starts on 11th Jan 2025
₹22,743

About the Author

Technical Research Analyst - Big Data Engineering

Abhijit is a Technical Research Analyst specialising in Big Data and Azure Data Engineering. He has 4+ years of experience in the Big data domain and provides consultancy services to several Fortune 500 companies. His expertise includes breaking down highly technical concepts into easy-to-understand content.