Apache Storm Component Guide
Also available as:
PDF
loading table of contents...

KafkaBolt Integration: Core Storm APIs

To use KafkaBolt, create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology. The following example shows construction of a Kafka bolt using core Storm APIs, followed by details about the code:

TopologyBuilder builder = new TopologyBuilder();

Fields fields = new Fields("key", "message");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
            new Values("storm", "1"),
            new Values("trident", "1"),
            new Values("needs", "1"),
            new Values("javadoc", "1")
);
spout.setCycle(true);
builder.setSpout("spout", spout, 5);
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaBolt bolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector("test"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");

Config conf = new Config();

StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
  1. Instantiate a KafkaBolt.

    The core-storm API uses the storm.kafka.bolt.KafkaBolt class to instantiate a Kafka Bolt:

    KafkaBolt bolt = new KafkaBolt();
  2. Configure the KafkaBolt with a Tuple-to-Message Mapper.

    The KafkaBolt maps Storm tuples to Kafka messages. By default, KafkaBolt looks for fields named "key" and "message." Storm provides the storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper class to support this default behavior and provide backward compatibility. The class is used by both the core-storm and Trident APIs.

    KafkaBolt bolt = new KafkaBolt()
     .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); 
  3. Configure the Kafka Bolt with a Kafka Topic Selector.

    [Note]Note

    To ignore a message, return NULL from the getTopics() method.

    KafkaBolt bolt = new KafkaBolt().withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper())
     .withTopicSelector(new DefaultTopicSelector()); 

    If you need to write to multiple Kafka topics, you can write your own implementation of the KafkaTopicSelector interface .

  4. Configure the Kafka Bolt with Kafka Producer properties.

    You can specify producer properties in your Storm topology by calling KafkaBolt.withProducerProperties(). See the Apache Producer Configs documentation for more information.