Skip to main content

Bound data in batches by time

The following uses aggregate functions to summarize events in a stream every fifteen minutes:

CREATE TYPE OrderType(
  storeId      String,
  orderId      String,
  sku          String,
  orderAmount  double,
  dateTime     DateTime
);
CREATE STREAM RetailOrders Of OrderType;
...
CREATE JUMPING WINDOW ProductData_15MIN
OVER RetailOrders
KEEP WITHIN 15 MINUTE ON dateTime;

CREATE CQ GetProductActivity
INSERT INTO ProductTrackingStream
SELECT pd.sku, COUNT(*), SUM(pd.orderAmount), FIRST(pd.dateTime)
FROM ProductData_15MIN pd;

Every 15 minutes, the output stream receives one event per SKU for which there was at least one order. Each SKU's event includes the number of orders (COUNT(*)), the total amount of those orders (SUM(pd.orderAmount)), and the timestamp of the first order in the batch (FIRST(pd.dateTime)). You could use this data to graph changes over time or trigger alerts when the count or total amount vary significantly from what you expect.

Say that orders are not received 24 hours a day, seven days a week, but instead drop to zero after closing hours. In that case, the code above could leave events in the window overnight, where they would be mistakenly reported as occurring in the morning. To avoid that, use RANGE to add a timeout:

CREATE JUMPING WINDOW ProductData_15MIN
OVER RetailOrders
KEEP RANGE 15 MINUTE ON dateTime WITHIN 16 MINUTE;

Now, so long as orders are being placed, the window will jump every 15 minutes, but if orders stop, the window will jump after 16 minutes.