Developing Apache Storm Applications
Also available as:
PDF

Understanding Tuple Timestamps and Out-of-Order Tuples

By default, window calculations are performed based on the processing timestamp. The timestamp tracked in each window is the time when the tuple is processed by the bolt.

Storm can also track windows by source-generated timestamp. This can be useful for processing events based on the time that an event occurs, such as log entries with timestamps.

The following example specifies a source-generated timestamp field. The value for fieldName is retrieved from the incoming tuple, and then considered for use in windowing calculations.

When this option is specified, all tuples are expected to contain the timestamp field.

/**
 * Specify the tuple field that represents the timestamp as a long value. If this field
 * is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
 *
 * @param fieldName the name of the field that contains the timestamp
 */
public BaseWindowedBolt withTimestampField(String fieldName)  

Note: If the timestamp field is not present in the tuple, an exception is thrown and the topology terminates. To resolve this issue, remove the erroneous tuple manually from the source (such as Kafka), and then restart the topology.

In addition to using the timestamp field to trigger calculations, you can specify a time lag parameter that indicates the maximum time limit for tuples with out-of-order timestamps:

/**
 * Specify the maximum time lag of the tuple timestamp in millis. The tuple timestamps
 * cannot be out of order by more than this amount.
 *
 * @param duration the max lag duration
 */
public BaseWindowedBolt withLag(Duration duration)

For example, if the lag is five seconds and tuple t1 arrives with timestamp 06:00:05, no tuples can arrive with tuple timestamps earlier than 06:00:00. If a tuple arrives with timestamp 05:59:59 after t1 and the window has moved past t1, the tuple is considered late and is not processed; late tuples are ignored and are logged in the worker log files at the INFO level.