Apache Storm Component Guide
Also available as:
PDF
loading table of contents...

Spouts

All spouts must implement the org.apache.storm.topology.IRichSpout interface from the core-storm API. BaseRichSpout is the most basic implementation, but there are several others, including ClojureSpout, DRPCSpout, and FeederSpout. In addition, Hortonworks provides a Kafka spout to ingest data from a Kafka cluster. The following example, RandomSentenceSpout, is included with the storm-starter connector installed with Storm at /usr/lib/storm/contrib/storm-starter.

package storm.starter.spout;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

public class RandomSentenceSpout extends BaseRichSpout {
 SpoutOutputCollector _collector;
 Random _rand;


 @Override
 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
   _collector = collector;
   _rand = new Random();
 }

 @Override
 public void nextTuple() {
   Utils.sleep(100);
   String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
   String sentence = sentences[_rand.nextInt(sentences.length)];
   _collector.emit(new Values(sentence));
 }

 @Override
 public void ack(Object id) {
 }

 @Override
 public void fail(Object id) {
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
   declarer.declare(new Fields("word"));
 }

}