Developing Apache Storm Applications
Also available as:
PDF

Implementing State Management

This subsection describes state management APIs and architecture for core Storm.

Stateful abstractions allow Storm bolts to store and retrieve the state of their computations. The state management framework automatically, periodically snapshots the state of bolts across a topology. There is a default in-memory-based state implementation, as well as a Redis-backed implementation that provides state persistence.

Bolts that require state to be managed and persisted by the framework should implement the IStatefulBolt interface or extend BaseStatefulBolt, and implement the void initState(T state) method. The initState method is invoked by the framework during bolt initialization. It contains the previously saved state of the bolt. Invoke initState after prepare, but before the bolt starts processing any tuples.

Currently the only supported State implementation is KeyValueState, which provides key-value mapping.

The following example describes how to implement a word count bolt that uses the key-value state abstraction for word counts:

public class WordCountBolt 
  extends BaseStatefulBolt<KeyValueState<String, Integer>> {
    private KeyValueState<String,Integer> wordCounts;
    ...
    @Override
    public void initState(KeyValueState<String,Integer> state) {
      wordCounts = state;
    }
    @Override
    public void execute(Tuple tuple) {
      String word = tuple.getString(0);
      Integer count = wordCounts.get(word, 0);
      count++;
      wordCounts.put(word, count);
      collector.emit(tuple, new Values(word, count));
      collector.ack(tuple);
    }
  ...
  }
  1. Extend the BaseStatefulBolt and type parameterize it with KeyValueState, to store the mapping of word to count.
  2. In the init method, initialize the bolt with its previously saved state: the word count last committed by the framework during the previous run.
  3. In the execute method, update the word count.

The framework periodically checkpoints the state of the bolt (default every second). The frequency can be changed by setting the storm config topology.state.checkpoint.interval.ms.

For state persistence, use a state provider that supports persistence by setting the topology.state.provider in the storm config. For example, for Redis based key-value state implementation, you can set topology.state.provider to org.apache.storm.redis.state.RedisKeyValueStateProvider in storm.yaml. The provider implementation .jar should be in the class path, which in this case means placing the storm-redis-*.jar in the extlib directory.

You can override state provider properties by setting topology.state.provider.config. For Redis state this is a JSON configuration with the following properties:

{
   "keyClass": "Optional fully qualified class name of the Key type.",
   "valueClass": "Optional fully qualified class name of the Value type.",
   "keySerializerClass": "Optional Key serializer implementation class.",
   "valueSerializerClass": "Optional Value Serializer implementation class.",
   "jedisPoolConfig": {
     "host": "localhost",
     "port": 6379,
     "timeout": 2000,
     "database": 0,
     "password": "xyz"
   }
}