Big data messaging with Kafka, Part 1

When the big data movement started it was mostly focused on batch processing. Distributed data storage and querying tools like MapReduce, Hive, and Pig were all designed to process data in batches rather than continuously. Businesses would run multiple jobs every night to extract data from a database, then analyze, transform, and eventually store the data. More recently enterprises have discovered the power of analyzing and processing data and events as they happen, not just once every few hours. Most traditional messaging systems don't scale up to handle big data in realtime, however. So engineers at LinkedIn built and open-sourced Kafka: a distributed messaging framework that meets the demands of big data by scaling on commodity hardware.

Over the past few years, Kafka has emerged to solve a variety of use cases. In the simplest case, it could be a simple buffer for storing application logs. Combined with a technology like Spark Streaming, it can be used to track data changes and take action on that data before saving it to a final destination. Kafka's predictive mode makes it a powerful tool for detecting fraud, such as checking the validity of a credit card transaction when it happens, and not waiting for batch processing hours later.

This two-part tutorial introduces Kafka, starting with how to install and run it in your development environment. You'll get an overview of Kafka's architecture, followed by an introduction to developing an out-of-the-box Kafka messaging system. Finally, you'll build a custom producer/consumer application that sends and consumes messages via a Kafka server. In the second half of the tutorial you'll learn how to partition and group messages, and how to control which messages a Kafka consumer will consume.

What is Kafka?

Apache Kafka is messaging system built to scale for big data. Similar to Apache ActiveMQ or RabbitMq, Kafka enables applications built on different platforms to communicate via asynchronous message passing. But Kafka differs from these more traditional messaging systems in key ways:

  • It's designed to scale horizontally, by adding more commodity servers.
  • It provides much higher throughput for both producer and consumer processes.
  • It can be used to support both batch and real-time use cases.
  • It doesn't support JMS, Java's message-oriented middleware API.

Kafka's architecture

Before we explore Kafka's architecture, you should know its basic terminology:

  • A producer is process that can publish a message to a topic.
  • a consumer is a process that can subscribe to one or more topics and consume messages published to topics.
  • A topic category is the name of the feed to which messages are published.
  • A broker is a process running on single machine.
  • A cluster is a group of brokers working together.

Figure 1. Architecture of a Kafka message system

Kafka's architecture is very simple, which can result in better performance and throughput in some systems. Every topic in Kafka is like a simple log file. When a producer publishes a message, the Kafka server appends it to the end of the log file for its given topic. The server also assigns an offset, which is a number used to permanently identify each message. As the number of messages grows, the value of each offset increases; for example if the producer publishes three messages the first one might get an offset of 1, the second an offset of 2, and the third an offset of 3.

When the Kafka consumer first starts, it will send a pull request to the server, asking to retrieve any messages for a particular topic with an offset value higher than 0. The server will check the log file for that topic and return the three new messages. The consumer will process the messages, then send a request for messages with an offset higher than 3, and so on.

In Kafka, the client is responsible for remembering the offset count and retrieving messages.The Kafka server doesn't track or manage message consumption. By default, a Kafka server will keep a message for seven days. A background thread in the server checks and deletes messages that are seven days or older. A consumer can access messages as long as they are on the server. It can read a message multiple times, and even read messages in reverse order of receipt. But if the consumer fails to retrieve the message before the seven days are up, it will miss that message.

Quick setup and demo

We'll build a custom application in this tutorial, but let's start by installing and testing a Kafka instance with an out-of-the-box producer and consumer.

  1. Visit the Kafka download page to install the most recent version (0.9 as of this writing).
  2. Extract the binaries into a software/kafka folder. For the current version it's software/kafka_2.11-
  3. Change your current directory to point to the new folder.
  4. Start the Zookeeper server by executing the command: bin/zookeeper-server-start.sh config/zookeeper.properties.
  5. Start the Kafka server by executing: bin/kafka-server-start.sh config/server.properties.
  6. Create a test topic that you can use for testing: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld.
  7. Start a simple console consumer that can consume messages published to a given topic, such as javaworld: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning.
  8. Start up a simple producer console that can publish messages to the test topic: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld.
  9. Try typing one or two messages into the producer console. Your messages should show in the consumer console.

A simple producer/consumer application

You've seen how Kafka works out of the box. Next, let's develop a custom producer/consumer application. The producer will retrieve user input from the console and send each new line as a message to a Kafka server. The consumer will retrieve messages for a given topic and print them to the console. The producer and consumer components in this case are your own implementations of kafka-console-producer.sh and kafka-console-consumer.sh.

Let's start by creating a Producer.java class. This client class contains logic to read user input from the console and send that input as a message to the Kafka server.

We configure the producer by creating an object from the java.util.Properties class and setting its properties. The ProducerConfig class defines all the different properties available, but Kafka's default values are sufficient for most uses. For the default config we only need to set three mandatory properties:


BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) sets a list of host:port pairs used for establishing the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format. Even if we have more than one broker in our Kafka cluster, we only need to specify the value of the first broker's host:port. The Kafka client will use this value to make a discover call on the broker, which will return a list of all the brokers in the cluster. It's a good idea to specify more than one broker in the BOOTSTRAP_SERVERS_CONFIG, so that if that first broker is down the client will be able to try other brokers.

The Kafka server expects messages in byte[] key, byte[] value format. Rather than converting every key and value, Kafka's client-side library permits us to use friendlier types like String and int for sending messages. The library will convert these to the appropriate type. For example, the sample app doesn't have a message-specific key, so we'll use null for the key. For the value we'll use a String, which is the data entered by the user on the console.

To configure the message key, we set a value of KEY_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.ByteArraySerializer. This works because null doesn't need to be converted into byte[]. For the message value, we set VALUE_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.StringSerializer, because that class knows how to convert a String into a byte[].

The Kafka producer

After filling the Properties class with the necessary configuration properties, we can use it to create an object of KafkaProducer. Whenever we want to send a message to the Kafka server after that, we'll create an object of ProducerRecord and call the KafkaProducer's send() method with that record to send the message. The ProducerRecord takes two parameters: the name of the topic to which message should be published, and the actual message. Don't forget to call the Producer.close() method when you're done using the producer:

Listing 1. KafkaProducer

        public class Producer {
          private static Scanner in;
          public static void main(String[] argv)throws Exception {
              if (argv.length != 1) {
                  System.err.println("Please specify 1 parameters ");
              String topicName = argv[0];
              in = new Scanner(System.in);
              System.out.println("Enter message(type exit to quit)");

              //Configure the Producer
              Properties configProperties = new Properties();

              org.apache.kafka.clients.producer.Producer producer = new KafkaProducer<String, String>(configProperties);
              String line = in.nextLine();
              while(!line.equals("exit")) {
                  ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, line);
                  line = in.nextLine();

Configuring the message consumer

Next we'll create a simple consumer that subscribes to a topic. Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format.

As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we'll have to use a custom deserializer to convert byte[] back into the appropriate type.

In the case of the example application, we know the producer is using ByteArraySerializer for the key and StringSerializer for the value. On the client side we therefore need to use org.apache.kafka.common.serialization.ByteArrayDeserializer for the key and org.apache.kafka.common.serialization.StringDeserializer for the value. Setting those classes as values for KEY_DESERIALIZER_CLASS_CONFIG and VALUE_DESERIALIZER_CLASS_CONFIG will enable the consumer to deserialize byte[] encoded types sent by the producer.

Finally, we need to set the value of the GROUP_ID_CONFIG. This should be a group name in string format. I'll explain more about this config in a minute. For now, just look at the Kafka consumer with the four mandatory properties set:

Listing 2. KafkaConsumer

  public class Consumer {
      private static Scanner in;
      private static boolean stop = false;

      public static void main(String[] argv)throws Exception{
          if (argv.length != 2) {
              System.err.printf("Usage: %s <topicName> <groupId>\n",
          in = new Scanner(System.in);
          String topicName = argv[0];
          String groupId = argv[1];

          ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
          String line = "";
          while (!line.equals("exit")) {
              line = in.next();
          System.out.println("Stopping consumer .....");

      private static class ConsumerThread extends Thread{
          private String topicName;
          private String groupId;
          private KafkaConsumer<String,String> kafkaConsumer;

          public ConsumerThread(String topicName, String groupId){
              this.topicName = topicName;
              this.groupId = groupId;
          public void run() {
              Properties configProperties = new Properties();
              configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
              configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");

              //Figure out where to start processing messages from
              kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
              //Start processing messages
              try {
                  while (true) {
                      ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                      for (ConsumerRecord<String, String> record : records)
              }catch(WakeupException ex){
                  System.out.println("Exception caught " + ex.getMessage());
                  System.out.println("After closing KafkaConsumer");
          public KafkaConsumer<String,String> getKafkaConsumer(){
             return this.kafkaConsumer;

Consumer and ConsumerThread

Writing the consumer code in Listing 2 in two parts ensures that we close the Consumer object before exiting. I'll describe each class in turn. First, ConsumerThread is an inner class that takes a topic name and group name as its arguments. In the run() method it creates a KafkaConsumer object, with appropriate properties. It subscribes to the topic that was passed as an argument in the constructor, by calling the kafkaConsumer.subscribe() method, then polls the Kafka server every 100 milliseconds to check if there are any new messages in the topic. It will iterate through the list of any new messages and print them to the console.

In the Consumer class we create a new object of ConsumerThread and start it in a different thread. The ConsumerThead starts an infinite loop and keeps polling the topic for new messages. Meanwhile in the Consumer class, the main thread waits for a user to enter exit on the console. Once a user enters exit, it calls the KafkaConsumer.wakeup() method, causing the KafkaConsumer to stop polling for new messages and throw a WakeupException. We can then close the KafkaConsumer gracefully, by calling kafkaConsumer's close() method.

Source code for the example application used in this article, "Big data messaging with Kafka, Part 1." Created for JavaWorld by Sunil Patil.

Run the application

To test this application you can run the code in Listings 1 and 2 from your IDE, or you can follow these steps:

  1. Download the sample code, KafkaAPIClient, by executing the command: git clone https://github.com/sdpatil/KafkaAPIClient.git.
  2. Compile the code and create a fat JAR with the command: mvn clean compile assembly:single.
  3. Start the consumer: java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer test group1.
  4. Start the producer: java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Producer test.
  5. Enter a message in the producer console and check to see whether that message appears in the consumer. Try a few messages.
  6. Type exit in the consumer and producer consoles to close them.

Figure 2. A Kafka producer/consumer application

Conclusion to Part 1

In the first half of this tutorial you've learned the basics of big data messaging with Kafka, including a conceptual overview of Kafka, setup instructions, and how to configure a producer/consumer messaging system with Kafka.

As you've seen, Kafka's architecture is both simple and efficient, designed for performance and throughput. In Part 2 I'll introduce some more advanced techniques for distributed messaging with Kafka, starting with using partitions to subdivide topics. I'll also demonstrate how to manage message offsets in order to support different use cases.

IDG Insider


« OpenStack cloud's "killer use case": Telcos and NFV


55% off Wansview Wireless IP Security Camera - Deal Alert »
IDG News Service

The IDG News Service is the world's leading daily source of global IT news, commentary and editorial resources. The News Service distributes content to IDG's more than 300 IT publications in more than 60 countries.

  • Mail


Do you think your smartphone is making you a workaholic?