Apache Kafka is a high-throughput distributed messaging system. Hortonworks provides a
Kafka spout to facilitate ingesting data from Kafka 0.8x brokers. Storm developers
should include downstream bolts in their topologies to process data ingested with the
Kafka spout. The storm-kafka
components include a standard storm
spout, as well as fully transactional Trident spout implementations.
The storm-kafka
spout provides the following key features:
Supports 'exactly once' tuple processing
Supports the Trident API
Supports dynamic discovery of Kafka brokers
SpoutConfig spoutConfig = new SpoutConfig(ImmutableList.of("kafkahost1","kafkahost2"), //List of Kafka brokers 8, // Number of partitions per Kafka host "clicks", // Kafka topic to read from "/kafkastorm", // Root path in Zookeeper for the spout to store consumer offsets "discovery"); // ID for storing consumer offsets in Zookeeper KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
The Kafka spout stores its offsets in the same instance of Zookeeper used by Apache Storm. The Kafka spout use these offsets to replay tuples in the event of a downstream failure or timeout.
... spoutConfig.forceStartOffsetTime($TIMESTAMP); ...
Kafka chooses the latest offset written around the specified timestamp. A value of
-1
forces the Kafka spout to restart from the latest offset. A value of
-2
forces the spout to restart from the earliest offset.
Limitations
The current version of the Kafka spout contains the following limitations:
Does not support Kafka 0.7x brokers.
Cannot dynamically discover Kafka partitions, topics, and hosts
Storm developers must include
${STORM_HOME}/lib/*
in the CLASSPATH environment variable from the command line when runningkafka-topology
in local mode. Otherwise, developers will likely receive ajava.lang.NoClassDefFoundError
exception.java -cp "/usr/lib/storm/contrib/storm-kafka-example-0.9.1.2.1.1.0-320-jar-with-dependencies.jar: /usr/lib/storm/lib/*" org.apache.storm.kafka.TestKafkaTopology <zookeeper_host>
Secure Hadoop clusters must comment out the following statement from
${STORM_HOME}/bin/kafka-server-start.sh
:EXTRA_ARGS="-name kafkaServer -loggc"
Kafka Configuration
The storm-kafka
connector requires some configuration of the Apache Kafka
installation. Kafka administrators must add a zookeeper.connect
property
with the hostnames and port numbers of the HDP Zookeeper nodes to Kafka's
server.properties
file.
zookeeper.connect=host1:2181,host2:2181,host3:2181