Using Apache Storm to Move Data
Also available as:
PDF

KafkaBolt Integration: Trident APIs

To use KafkaBolt, create an instance of org.apache.storm.kafka.trident.TridentState and org.apache.storm.kafka.trident.TridentStateFactory, and attach them to your topology.

The following example shows construction of a Kafka bolt using Trident APIs, followed by details about the code:

Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
);
 
spout.setCycle(true);
 
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
 
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
        .withProducerProperties(props)
        .withKafkaTopicSelector(new DefaultTopicSelector("test"))
        .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());

Config conf = new Config();
StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
  1. Instantiate a KafkaBolt.

    The Trident API uses a combination of the storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaStateFactory classes.

    TridentTopology topology = new TridentTopology();
     Stream stream = topology.newStream("spout");
     TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory();
     stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields()); 
  2. Configure the KafkaBolt with a Tuple-to-Message Mapper.

    The KafkaBolt must map Storm tuples to Kafka messages. By default, KafkaBolt looks for fields named "key" and "message." Storm provides the storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper class to support this default behavior and provide backward compatibility. The class is used by both the core-storm and Trident APIs.

    TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
     .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); 

    You must specify the field names for the Storm tuple key and the Kafka message for any implementation of the TridentKafkaState in the Trident API. This interface does not provide a default constructor.

    However, some Kafka bolts may require more than two fields. You can write your own implementation of the TupleToKafkaMapper and TridentTupleToKafkaMapper interfaces to customize the mapping of Storm tuples to Kafka messages. Both interfaces define two methods:

    K getKeyFromTuple(Tuple/TridentTuple tuple);
    V getMessageFromTuple(Tuple/TridentTuple tuple);
  3. Configure the KafkaBolt with a Kafka Topic Selector.
    Note
    Note

    To ignore a message, return NULL from the getTopics() method.

    TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
     .withKafkaTopicSelector(new DefaultTopicSelector("test"))
     .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); 

    If you need to write to multiple Kafka topics, you can write your own implementation of the KafkaTopicSelector interface; for example:

    public interface KafkaTopicSelector {
     String getTopics(Tuple/TridentTuple tuple);
     }
  4. Configure the KafkaBolt with Kafka Producer properties.

    You can specify producer properties in your Storm topology by calling TridentKafkaStateFactory.withProducerProperties(). See the Apache Producer Configs documentation for more information.