Basic Operations of Kafka
In this section, we will be providing discussions about the basic operations of Kafka as follows:
The Addition and Deletion of Topics
We can add and delete Topics both automatically and manually. For the addition, the command will be
> bin/Kafka-Topics.sh --zookeeper zk_host:port/chroot --create --Topic my_Topic_name
--partitions 20 --replication-factor 3 --config x=y
The messages that are written are replicated by the servers are called as brokers. And the whole process of replication is done by the replication factors. For boucing without any obstacles the standard replication factor should be either 2 or 3.
The Topic is split into a number of logs and this is controlled by the partition count.A partition count should follow a lot of features. Firstly, they should fit singly in one server. So a total of 50 partitions will result in 50 servers. Maximum equalizations among the consumers is obtained by these partitions.The addition of Topics overrides the default settings.
How the Topics can be modified
The Topics can be best modified by adding as well as deleting contexts. In order to add partitions the command be like
> bin/Kafka-Topics.sh --zookeeper zk_host:port/chroot --alter --Topic my_Topic_name
--partitions 40
Partitions are used to divide the data into smaller sections. Adding configs we should follow as below
> bin/Kafka-Topics.sh --zookeeper zk_host:port/chroot --alter --Topic my_Topic_name --config x=y
To delete configs:
> bin/Kafka-Topics.sh --zookeeper zk_host:port/chroot --alter --Topic my_Topic_name --deleteConfig x
And to delete off the entire Topic,
> bin/Kafka-Topics.sh --zookeeper zk_host:port/chroot --delete --Topic my_Topic_name
Kafka Spark Streaming Tutorial Video:
Distinguished Turnoff
The Kafka cluster will automatically detect any broker shutdown or failure and elect new partition leaders. This will occur whether a server fails or it is brought down intentionally for maintenance. For configuration changes, Kafka supports a more graceful mechanism for stoping a server than just destroying it. When we cut down a server in an elegant way, it has two optimizations it will take advantage of:
If any of the servers is failed, the cluster will automatically detect it irresponsible of whether the broker is failed or broken. It can beautifully shut down the servers. If done, then
Need of log recovery is removed and so the process becomes faster. Before shutting down, it will send all the partitions of that server to its replicas.
Mirroring data between clusters
The Kafka tool has got a mirroring process also which reads the materials from the sources and writes to the destination. The mirror replicates what is written in the sources and then writes down the destination.
Mirroring increases the throughput and you can use a lot of it in order to keep the process stay robust.The mirror maker transfers data from cluster to cluster. The source cluster and destination clusters are almost different from each other. The mirror maker is same to both. Because the data the source cluster contains before mirroring will be same to the destination cluster’s data after mirroring. Both names will be same in both clusters with different addresses because of its data replication and different offsets and partitions.
So, since mirror cluster will copy the data in a consumer with a different address, so it provides less fault-tolerance. In order to obtain more fault-tolerance, normal in-cluster replication should be used.
The mirroring command for mirroring a single Topic named as your-Topic from two inputs are:
> bin/Kafka-run-class.sh Kafka.tools.MirrorMaker
–consumer.config consumer-1.properties –consumer.config consumer-
2.properties
–producer.config producer.properties –whitelist your-Topic
Get 100% Hike!
Master Most in Demand Skills Now!
Finding the position of the Consumer
Finding out the positions of the consumers is very important. A command for finding out the consumer’s location is as follows:
> bin/Kafka-run-class.sh Kafka.tools.ConsumerOffsetChecker –zkconnect localhost:2181 –group test
Expanding your cluster
You can anytime add new servers to the clusters by adding them up with a new id and then start them up with new servers. But until and unless partitions are assigned to these new servers, they will not work at all and sit idlely. So to add new ones, the existing ones need to change their positions a little bit.So the movement of the data is done automatically behind the covers. What we do is we have to add the new server in such a position that it has to be behind the partition that seeking for migration. So the newly introduced server walks behind the migrating partition and in the process all the data from the migrating partition will be copied to the server and finally the partition is kicked out .
We run this reassigning of partitions in 3 ways:
–generate:
Here when we are having a group of Topics and a group of brokers, all the Topics need to be assigned to the new servers. So the generate tool creates the reassignment tool to move all the partitions to the new servers prior to migration.
–execute:
In the execute plan, the tool kicks off the reassignment of the Topics as planed by the user for execution.
–verify:
This tool verifies once the reassignment is done. It tells whether the status is done, ongoing , finished or even if it is failed.
Migration of Data Automatically
The Topics from the present servers are assigned to the new servers by the reassignment tools. Usually the partitions are expanded in order to move the entire Topic instead of moving a part of it. So the list of servers from whom the Topics are to be transferred and a list of the servers to whom the Topics are to be moved are included in a single list.The reassignment tool transfers the Topics from the old to he new servers and all the replicas are also transferred to new servers keeping the mirroring in even mood.
Firstly the json file is created to save the Topics. The creation of this file is as follows:
> cat Topics-to-move.json
{"Topics": [{"Topic": "top1"},
{"Topic": "top2"}],
"version":1
}
Where top1 and top2 are Topics.
After generating the json file, the assignment is introduced as follows
> bin/Kafka-reassign-partitions.sh –zookeeper localhost:2181 –Topics-to-move-json-file Topics-to-move.json –broker-list “5,6” –generate
For assignment:
{"version":1,
"partitions":[{"Topic":"top1","partition":2,"replicas":[1,2]},
{"Topic":"top1","partition":0,"replicas":[3,4]},
{"Topic":"top2","partition":2,"replicas":[1,2]},
{"Topic":"top2","partition":0,"replicas":[3,4]},
{"Topic":"top1","partition":1,"replicas":[2,3]},
{"Topic":"top2","partition":1,"replicas":[2,3]}]
}
For reassignment :
{"Version":1,
"Partitions":[{"Topic":"top1","partition":2,"replicas": [1,2] } ,
{"Topic":"top1","partition":0,"replicas":[1,2]},
{"Topic":"top2","partition":2,"replicas":[1,2]},
{"Topic":"top2","partition":0,"replicas":[1,2]},
{"Topic":"top1","partition":1,"replicas":[1,2]},
{"Topic":"top2","partition":1,"replicas":[1,2]}]
}
Hence all partitions will be moved from Topics top1, top2 to the servers 1 and 2. The new assignment should be saved.Hence all Topics top1top2 are to be moved to the new servers 5and 6. So now we know what has to be moved from which to which server. So the new positions of the Topics are saved in the json file and the present positions of the Topics are saved for keeping a back up in case you really want to bring the Topics back to their old server.
The commands are as given below for Kafka versions:
> bin/Kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –execute
Replica Assigning for the present one:
{"Version":1,
"Partitions": [{"Topic":"top1","partition":2,"replicas":[1,2]},
{"Topic":"top1","partition":0,"replicas":[3,4]},
{"Topic":"top2","partition":2,"replicas":[1,2]},
{"Topic":"top2","partition":0,"replicas":[3,4]},
{"Topic":"top1","partition":1,"replicas":[2,3]},
{"Topic":"top2","partition":1,"replicas":[2,3]}]
}
For reassigning partitions
{"Version":1,
"Partitions":[{"Topic":"top1","partition":2,"replicas":[1,2]},
{"Topic":"top1","partition":0,"replicas":[1,2]},
{"Topic":"top2","partition":2,"replicas":[1,2]},
{"Topic":"top2","partition":0,"replicas":[1,2]},
{"Topic":"top1","partition":1,"replicas":[1,2]},
{"Topic":"top2","partition":1,"replicas":[1,2]}]
}
Now in order to verify whether the reassigning is done perfectly, follow this command
> bin/Kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –verify
Output of Verification:
Reassignment of partition [top1,0] completed successfully
Reassignment of partition [top1,1] completed successfully
Reassignment of partition [top1,2] completed successfully
Reassignment of partition [top2,0] is in progress
Reassignment of partition [top2,1] completed successfully
Reassignment of partition [top2,2] is in progress
Instead of moving the entire set of partitions we can select a certain number of partitions to be assigned to new servers. So here we have to directly go to the execute step
Retiring Servers
Sometimes few servers become de-active and all that we need to do is to remove the Topics from the retiring server to the new servers. New tools are still to be generated to help this process.
Data Centers
Datacenters are used for the maintenance of operations, which manage the data pipelines. In Kafka we are towards operating with some local clusters so that the data centers communicate directly to the nearest cluster.
Here the data centers work independently. Hence, even if the in between links are broken, those data centers can work independently and are best in operation. The failure of the inner links leads to the failure of the mirroring process. It comes back to its actual position only when the links are redeveloped again.
Some data centers ask for viewing the complete set of data in the Topics , so in that case make sure you do mirroring the clusters of data to show to the new data centers.
These mirrored data are accumulated to data clusters which can be read when full reading of the complete data is required by the applications. This deployment pattern is very proper and can warranty latency to obtain the clusters.
Even in high latency, the Kafka tool provides better through put than other simpler messaging tools because Kafka batches the data in both the sides i.e in the sources as well as in the destinations. The source in the Kafka is the producer and the destination is the consumer.
We should always prevent using high latency links because it will introduce high latency for the writing acts of the Kafka technology as a result of which Kafka will not be available in all the locations if there is problem in the network.