When processing tuples using a timestamp field, Storm computes watermarks based on the timestamp of an incoming tuple. Each watermark is the minimum of the latest tuple timestamps (minus the lag) across all the input streams. At a higher level, this is similar to the watermark concept used by Google's MillWheel for tracking event-based timestamps.
Periodically (by default, every second), Storm emits watermark timestamps, which are used as the “clock tick” for the window calculation when tuple-based timestamps are in use. You can change the interval at which watermarks are emitted by using the following API:
/** * Specify the watermark event generation interval. Watermark events * are used to track the progress of time * * @param interval the interval at which watermark events are generated */ public BaseWindowedBolt withWatermarkInterval(Duration interval)
When a watermark is received, all windows up to that timestamp are evaluated.
For example, consider tuple timestamp-based processing with the following window parameters:
Window length equals 20 seconds, sliding interval equals 10 seconds, watermark emit frequency equals 1 second, max lag equals 5 seconds.
Current timestamp equals 09:00:00.
Tuples e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36) arrive between 9:00:00 and 9:00:01.
At time t equals 09:00:01, the following actions occur:
- Storm emits watermark w1 at 6:00:31, because no tuples earlier than 6:00:31 can arrive.
- Three windows are evaluated.
The first window ending timestamp (06:00:10) is computed by taking the earliest event timestamp (06:00:03) and computing the duration based on the sliding interval (10 seconds):
5:59:50 to 06:00:10 with tuples e1, e2, e3
6:00:00 to 06:00:20 with tuples e1, e2, e3, e4
6:00:10 to 06:00:30 with tuples e4, e5
- Tuple e6 is not evaluated, because watermark timestamp 6:00:31 is less than tuple timestamp 6:00:36.
- Tuples e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39) arrive between 9:00:01 and 9:00:02.
At time t equals 09:00:02, the following actions occur:
- Storm emits watermark w2 at 08:00:34, because no tuples earlier than 8:00:34 can arrive.
- Three windows are evaluated:
6:00:20 to 06:00:40, with tuples e5 and e6 (from an earlier batch)
6:00:30 to 06:00:50, with tuple e6 (from an earlier batch)
8:00:10 to 08:00:30, with tuples e7, e8, and e9
- Tuple e10 is not evaluated, because the tuple timestamp 8:00:39 is beyond the watermark time 8:00:34.
The window calculation considers the time gaps and computes the windows based on the tuple timestamp.