Using Apache Storm to Move Data
Also available as:

KafkaBolt Integration: Core Storm APIs

To use KafkaBolt, create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology.

The following example shows construction of a Kafka bolt using core Storm APIs, followed by details about the code:

TopologyBuilder builder = new TopologyBuilder();

Fields fields = new Fields("key", "message");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
            new Values("storm", "1"),
            new Values("trident", "1"),
            new Values("needs", "1"),
            new Values("javadoc", "1")
builder.setSpout("spout", spout, 5);
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaBolt bolt = new KafkaBolt()
.withTopicSelector(new DefaultTopicSelector("test"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");

Config conf = new Config();

StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
  1. Instantiate a KafkaBolt.

    The core-storm API uses the storm.kafka.bolt.KafkaBolt class to instantiate a Kafka Bolt:

    KafkaBolt bolt = new KafkaBolt();
  2. Configure the KafkaBolt with a Tuple-to-Message Mapper.

    The KafkaBolt maps Storm tuples to Kafka messages. By default, KafkaBolt looks for fields named "key" and "message." Storm provides the storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper class to support this default behavior and provide backward compatibility. The class is used by both the core-storm and Trident APIs.

    KafkaBolt bolt = new KafkaBolt()
     .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); 
  3. Configure the Kafka Bolt with a Kafka Topic Selector.

    To ignore a message, return NULL from the getTopics() method.

    KafkaBolt bolt = new KafkaBolt().withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper())
     .withTopicSelector(new DefaultTopicSelector()); 

    If you need to write to multiple Kafka topics, you can write your own implementation of the KafkaTopicSelector interface.

  4. Configure the Kafka Bolt with Kafka Producer properties.

    You can specify producer properties in your Storm topology by calling KafkaBolt.withProducerProperties(). See the Apache Producer Configs documentation for more information.