Apache Storm Component Guide
Also available as:
PDF
loading table of contents...

KafkaSpout Integration: Core Storm APIs

The core-storm API represents a Kafka spout with the KafkaSpout class.

To initialize KafkaSpout, define a SpoutConfig subclass instance of the KafkaConfig class, representing configuration information needed to ingest data from a Kafka cluster. KafkaSpout requires an instance of the BrokerHosts interface.

BrokerHosts Interface

The BrokerHost interface maps Kafka brokers to topic partitions. Constructors for KafkaSpout (and, for the Trident API, TridentKafkaConfig) require an implementation of the BrokerHosts interface.

The storm-kafka component provides two implementations of BrokerHosts, ZkHosts and StaticHosts:

  • Use ZkHosts if you want to track broker-to-partition mapping dynamically.This class uses Kafka's ZooKeeper entries to track mapping.

    You can instantiate an object as follows:

    public ZkHosts(String brokerZkStr, String brokerZkPath)

    public ZkHosts(String brokerZkStr)

    where:

    • brokerZkStr is the IP:port address for the ZooKeeper host; for example, localhost:2181.

    • brokerZkPath is the root directory under which topics and partition information are stored. By default this is /brokers, which is the default used by Kafka.

    By default, broker-partition mapping refreshes every 60 seconds. If you want to change the refresh frequency, set host.refreshFreqSecs to your chosen value.

  • Use StaticHosts for static broker-to-partition mapping. To construct an instance of this class, you must first construct an instance of GlobalPartitionInformation; for example:

    Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
    Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
    Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
    GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
    partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0
    partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1
    partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2
    StaticHosts hosts = new StaticHosts(partitionInfo);

KafkaConfig Class and SpoutConfig Subclass

Next, define a SpoutConfig subclass instance of the KafkaConfig class.

KafkaConfig contains several fields used to configure the behavior of a Kafka spout in a Storm topology; Spoutconfig extends KafkaConfig, supporting additional fields for ZooKeeper connection info and for controlling behavior specific to KafkaSpout.

KafkaConfig implements the following constructors, each of which requires an implementation of the BrokerHosts interface:

public KafkaConfig(BrokerHosts hosts, String topic)
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)

KafkaConfig Parameters

hosts

One or more hosts that are Kafka ZooKeeper broker nodes (see "BrokerHosts Interface").

topic

Name of the Kafka topic that KafkaSpout will consume from.

clientId

Optional parameter used as part of the ZooKeeper path, specifying where the spout's current offset is stored.

KafkaConfig Fields

fetchSizeBytes

Number of bytes to attempt to fetch in one request to a Kafka server. The default is 1MB.

socketTimeoutMs

Number of milliseconds to wait before a socket fails an operation with a timeout. The default value is 10 seconds.

bufferSizeBytes

Buffer size (in bytes) for network requests. The default is 1MB.

scheme

The interface that specifies how a ByteBuffer from a Kafka topic is transformed into a Storm tuple.

The default, MultiScheme, returns a tuple and no additional processing.

The API provides many implementations of the Scheme class, including:

  • storm.kafka.StringScheme

  • storm.kafka.KeyValueSchemeAsMultiScheme

  • storm.kafka.StringKeyValueScheme

  • storm.kafka.KeyValueSchemeAsMultiScheme

[Important]Important

In Apache Storm versions prior to 1.0, MultiScheme methods accepted a byte[] parameter instead of a ByteBuffer. In Storm version 1.0, MultiScheme and related scheme APIs changed; they now accept a ByteBuffer instead of a byte[].

As a result, Kafka spouts built with Storm versions earlier than 1.0 do not work with Storm versions 1.0 and later. When running topologies with Storm version 1.0 and later, ensure that your version of storm-kafka is at least 1.0. Rebuild pre-1.0 shaded topology .jar files that bundle storm-kafka classes with storm-kafka version 1.0 before running them in clusters with Storm 1.0 and later.

ignoreZKOffsets

To force the spout to ignore any consumer state information stored in ZooKeeper, set ignoreZkOffsets to true. If true, the spout always begins reading from the offset defined by startOffsetTime. For more information, see "How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures."

startOffsetTime

Controls whether streaming for a topic starts from the beginning of the topic or whether only new messages are streamed. The following are valid values:

  • kafka.api.OffsetRequest.EarliestTime() starts streaming from the beginning of the topic

  • kafka.api.OffsetRequest.LatestTime() streams only new messages

maxOffsetBehind

Specifies how long a spout attempts to retry the processing of a failed tuple. If a failing tuple's offset is less than maxOffsetBehind, the spout stops retrying the tuple. The default is LONG.MAX_VALUE.

useStartOffsetTimeOfOffsetOutOfRange

Controls whether a spout streams messages from the beginning of a topic when the spout throws an exception for an out-of-range offset. The default value is true.

metricsTimeBucketSizeInSecs

Controls the time interval at which Storm reports spout-related metrics. The default is 60 seconds.

Instantiate SpoutConfig as follows:

public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String nodeId)

SpoutConfig Parameters

hosts

One or more hosts that are Kafka ZooKeeper broker nodes (see "BrokerHosts Interface").

topic

Name of the Kafka topic that KafkaSpout will consume from.

zkroot

Root directory in ZooKeeper under which KafkaSpout consumer offsets are stored. The default is /brokers.

nodeId

ZooKeeper node under which KafkaSpout stores offsets for each topic-partition. The node ID must be unique for each Topology. The topology uses this path to recover in failure scenarios, or when there is maintenance that requires killing the topology.

zkroot and nodeId are used to construct the ZooKeeper path where Storm stores the Kafka offset. You can find offsets at zkroot+"/"+nodeId.

To start processing messages from where the last operation left off, use the same zkroot and nodeId. To start from the beginning of the Kafka topic, set KafkaConfig.ignoreZKOffsets to true.

Example

The following example illustrates the use of the KafkaSpout class and related interfaces:

BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + zkrootDir, node);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);