2. Ingesting Data with the Apache Kafka Spout

Apache Kafka is a high-throughput distributed messaging system. Hortonworks provides a Kafka spout to facilitate ingesting data from Kafka 0.8x brokers. Storm developers should include downstream bolts in their topologies to process data ingested with the Kafka spout. The storm-kafka components include a standard storm spout, as well as fully transactional Trident spout implementations.

The storm-kafka spout provides the following key features:

  • Supports 'exactly once' tuple processing

  • Supports the Trident API

  • Supports dynamic discovery of Kafka brokers

SpoutConfig spoutConfig = new SpoutConfig(ImmutableList.of("kafkahost1","kafkahost2"), //List of Kafka brokers
   8,             // Number of partitions per Kafka host
   "clicks",      // Kafka topic to read from
   "/kafkastorm", // Root path in Zookeeper for the spout to store consumer offsets
   "discovery");  // ID for storing consumer offsets in Zookeeper
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

The Kafka spout stores its offsets in the same instance of Zookeeper used by Apache Storm. The Kafka spout use these offsets to replay tuples in the event of a downstream failure or timeout.

...
spoutConfig.forceStartOffsetTime($TIMESTAMP);
...

Kafka chooses the latest offset written around the specified timestamp. A value of -1 forces the Kafka spout to restart from the latest offset. A value of -2 forces the spout to restart from the earliest offset.

Limitations

The current version of the Kafka spout contains the following limitations:

  • Does not support Kafka 0.7x brokers.

  • Cannot dynamically discover Kafka partitions, topics, and hosts

  • Storm developers must include ${STORM_HOME}/lib/* in the CLASSPATH environment variable from the command line when running kafka-topology in local mode. Otherwise, developers will likely receive a java.lang.NoClassDefFoundError exception.

    java -cp "/usr/lib/storm/contrib/storm-kafka-example-0.9.1.2.1.1.0-320-jar-with-dependencies.jar: /usr/lib/storm/lib/*" org.apache.storm.kafka.TestKafkaTopology <zookeeper_host>

  • Secure Hadoop clusters must comment out the following statement from ${STORM_HOME}/bin/kafka-server-start.sh:

    EXTRA_ARGS="-name kafkaServer -loggc"

Kafka Configuration

The storm-kafka connector requires some configuration of the Apache Kafka installation. Kafka administrators must add a zookeeper.connect property with the hostnames and port numbers of the HDP Zookeeper nodes to Kafka's server.properties file.

zookeeper.connect=host1:2181,host2:2181,host3:2181