Developing Apache Storm Applications
Also available as:
PDF

Implementing Windowing in Core Storm

If you want to use windowing in a bolt, you can implement the bolt interface IWindowedBolt:

public interface IWindowedBolt extends IComponent {
   void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
   /**
     * Process tuples falling within the window and optionally emit 
     * new tuples based on the tuples in the input window.
     */
   void execute(TupleWindow inputWindow);
   void cleanup(); 
}

Every time the window slides (the sliding interval elapses), Storm invokes the execute method.

You can use the TupleWindow parameter to access current tuples in the window, expired tuples, and tuples added since the window was last computed. You can use this information to optimize the efficiency of windowing computations.

Bolts that need windowing support would typically extend BaseWindowedBolt, which has APIs for specifying type of window, window length, and sliding interval:

public class SlidingWindowBolt extends BaseWindowedBolt {
   private OutputCollector collector;
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){
       this.collector = collector;
   }
   @Override
   public void execute(TupleWindow inputWindow) {
     for(Tuple tuple: inputWindow.get()) {
       // do the windowing computation
          ...
     }
     collector.emit(new Values(computedValue));
   }
}

You can specify window length and sliding interval as a count of the number of tuples, a duration of time, or both. The following window configuration settings are supported:

/* 
 * Tuple count based sliding window that slides after slidingInterval number of tuples 
 */
withWindow(Count windowLength, Count slidingInterval)

/* 
 * Tuple count based window that slides with every incoming tuple 
 */
withWindow(Count windowLength)

/* 
 * Tuple count based sliding window that slides after slidingInterval time duration 
 */
withWindow(Count windowLength, Duration slidingInterval)

/* 
 * Time duration based sliding window that slides after slidingInterval time duration 
 */
withWindow(Duration windowLength, Duration slidingInterval)

/* 
 * Time duration based window that slides with every incoming tuple 
 */
withWindow(Duration windowLength)

/* 
 * Time duration based sliding window that slides after slidingInterval number of tuples 
 */
withWindow(Duration windowLength, Count slidingInterval)

/* 
 * Count based tumbling window that tumbles after the specified count of tuples 
 */
withTumblingWindow(BaseWindowedBolt.Count count)

/* 
 * Time duration based tumbling window that tumbles after the specified time duration 
 */
withTumblingWindow(BaseWindowedBolt.Duration duration)   

To add windowed bolts to the topology, use the TopologyBuilder (as you would with non-windowed bolts):

TopologyBuilder builder = new TopologyBuilder();
/*
 * A windowed bolt that computes sum over a sliding window with window length of
 * 30 events that slides after every 10 events.
 */
builder.setBolt("sum", new WindowSumBolt().withWindow(Count.of(30), Count.of(10)), 1)
       .shuffleGrouping("spout");    

For a sample topology that shows how to use the APIs to compute a sliding window sum and a tumbling window average, see the SlidingWindowTopology.java file in the storm-starter GitHub directory.

For examples of tumbling and sliding windows, see the Apache document Windowing Support in Core Storm.

The following subsections describe additional aspects of windowing calculations: timestamps, watermarks, guarantees, and state management.