Developing Apache Storm Applications
Also available as:
PDF

Checkpointing

Checkpointing is triggered by an internal checkpoint spout at the interval specified by topology.state.checkpoint.interval.ms. If there is at least one IStatefulBolt in the topology, the checkpoint spout is automatically added by the topology builder .

For stateful topologies, the topology builder wraps the IStatefulBolt in a StatefulBoltExecutor, which handles the state commits on receiving the checkpoint tuples. Non-stateful bolts are wrapped in a CheckpointTupleForwarder, which simply forwards the checkpoint tuples so that the checkpoint tuples can flow through the topology directed acyclic graph (DAG).

Checkpoint tuples flow through a separate internal stream called $checkpoint. The topology builder wires the checkpoint stream across the whole topology, with the checkpoint spout at the root.



At specified checkpoint intervals, the checkpoint spout emits checkpoint tuples. Upon receiving a checkpoint tuple, the state of the bolt is saved and the checkpoint tuple is forwarded to the next component. Each bolt waits for the checkpoint to arrive on all of its input streams before it saves its state, so state is consistent across the topology. Once the checkpoint spout receives an ack from all bolts, the state commit is complete and the transaction is recorded as committed by the checkpoint spout.

This checkpoint mechanism builds on Storm's existing acking mechanism to replay the tuples. It uses concepts from the asynchronous snapshot algorithm used by Flink, and from the Chandy-Lamport algorithm for distributed snapshots. Internally, checkpointing uses a three-phase commit protocol with a prepare and commit phase, so that the state across the topology is saved in a consistent and atomic manner.