Developing Apache Storm Applications
Also available as:
PDF

Aggregator

The Aggregator interface represents the most general form of aggregation operations:

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T val, TridentTuple tuple, TridentCollector collector);
    void complete(T val, TridentCollector collector);
}

A key difference between Aggregator and other Trident aggregation interfaces is that an instance of TridentCollector is passed as a parameter to every method. This allows Aggregator implementations to emit tuples at any time during execution.

Storm executes Aggregator instances as follows:

  1. Storm calls the init() method, which returns an object T representing the initial state of the aggregation.

    T is also passed to the aggregate() and complete() methods.

  2. Storm calls the aggregate() method repeatedly, to process each tuple in the batch.
  3. Storm calls complete() with the final value of the aggregation.

The word count example uses the built-in Count class that implements the CombinerAggregator interface. The Count class could also be implemented as an Aggregator:

public class Count extends BaseAggregator<CountState> {
    static class CountState {
        long count = 0;
    }

    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        state.count+=1;
    }

    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }
}