Kafka recently released Kafka 4.0. It brings many changes, including removal of Zookeeper and going full throttle on KRaft. Now KRaft is responsible for metadata management and removes any depdency on Zookeeper.
Let us setup a three node Kafka 4.0 cluster. I am going to use all the three nodes as both brokers and controllers.
- Download and extract Kafka archive
wget https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz tar -xzvf kafka_2.13-4.0.0.tgz
- Create a user for running kafka
adduser kafka - Create a log directory. By default this is a location in /tmp. However, /tmp won’t survive a reboot and data will be wiped out. Creating a durable storage is very important.<
mkdir /var/lib/kafka chown -R kafka /var/lib/kafka
-
Move the code to a well known location.
mv kafka_2.13-4.0.0 /opt/ cd /opt/kafka_2.13-4.0.0
- Edit the config to add the IPs of the servers. Sample config that I use looks something like this:
# vim config/server.properties process.roles=broker,controller node.id=1 controller.quorum.voters=1@kafka1.staging.example.com:9093,2@kafka2.staging.example.com:9093,3@kafka3.staging.example.com:9093 listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 inter.broker.listener.name=PLAINTEXT advertised.listeners=PLAINTEXT://kafka1.staging.example.com:9092,CONTROLLER://kafka1.staging.example.com:9093 controller.listener.names=CONTROLLER listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXTnum.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/var/lib/kafka/kraft-combined-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 share.coordinator.state.topic.replication.factor=1 share.coordinator.state.topic.min.isr=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000
- Generate UUIDs for nodes and cluster using kafka-storage.sh utility on one of the nodes:
CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) CONTROLLER_1_UUID=$(bin/kafka-storage.sh random-uuid) CONTROLLER_2_UUID=$(bin/kafka-storage.sh random-uuid) CONTROLLER_3_UUID=$(bin/kafka-storage.sh random-uuid)
- Use the same UUIDs across all the nodes and execute the following to prepare the log directory.
$ bin/kafka-storage.sh format --cluster-id ${CLUSTER_ID} --initial-controllers "1@kafka1.staging.example.com:9092:${CONTROLLER_1_UUID},2@kafka2.staging.example.com:9092:${CONTROLLER_2_UUID},3@kafka3.staging.example.com:9092:${CONTROLLER_3_UUID}" --config config/server.properties - Check if the cluster is working by starting all the nodes
bin/kafka-server-start.sh config/server.properties
- Create a systemd unit file
$ cat /etc/systemd/system/kafka.service [Unit] Description=kafka-server [Service] Type=simple User=kafka WorkingDirectory=/opt/kafka_2.13-4.0.0/ ExecStart=/bin/sh -c '/opt/kafka_2.13-4.0.0/bin/kafka-server-start.sh /opt/kafka_2.13-4.0.0/config/server.properties' ExecStop=/opt/kafka_2.13-4.0.0/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
Notes For The Configuration
Change the node.id property in every node. I use 1, 2 and 3 for my servers respectively.
For high volume workloads/production workloads, split the broker and controller. That helps isolate the failure points and lets us scale the brokers independently.
Pay attention to “log.retention.hours” value. It can overwhelm disk but can really help when your consumers are slow.
Checkout “max.message.bytes”. A low value would result in large messages being drop. Set it sufficiently high.
Checkout “offsets.topic.replication.factor” as well. This would help with node failures.
Leave a Reply