Bound data in batches by time
The following uses aggregate functions to summarize events in a stream every fifteen minutes:
CREATE TYPE OrderType( storeId String, orderId String, sku String, orderAmount double, dateTime DateTime ); CREATE STREAM RetailOrders Of OrderType; ... CREATE JUMPING WINDOW ProductData_15MIN OVER RetailOrders KEEP WITHIN 15 MINUTE ON dateTime; CREATE CQ GetProductActivity INSERT INTO ProductTrackingStream SELECT pd.sku, COUNT(*), SUM(pd.orderAmount), FIRST(pd.dateTime) FROM ProductData_15MIN pd;
Every 15 minutes, the output stream receives one event per SKU for which there was at least one order. Each SKU's event includes the number of orders (COUNT(*)
), the total amount of those orders (SUM(pd.orderAmount)
), and the timestamp of the first order in the batch (FIRST(pd.dateTime)
). You could use this data to graph changes over time or trigger alerts when the count or total amount vary significantly from what you expect.
Say that orders are not received 24 hours a day, seven days a week, but instead drop to zero after closing hours. In that case, the code above could leave events in the window overnight, where they would be mistakenly reported as occurring in the morning. To avoid that, use RANGE
to add a timeout:
CREATE JUMPING WINDOW ProductData_15MIN OVER RetailOrders KEEP RANGE 15 MINUTE ON dateTime WITHIN 16 MINUTE;
Now, so long as orders are being placed, the window will jump every 15 minutes, but if orders stop, the window will jump after 16 minutes.