Striim 3.9.7 documentation

Aggregating data with CQs

To aggregate data or perform any sort of calculation on it (see Functions), a CQ must select from a window (see Bounding data with windows). The basic pattern for a CQ that filters data is:

window / cache / WActionStore > CQ > stream / WActionStore

A CQ that aggregates data may have multiple inputs, including one or more streams, but at least one input must be a window, cache, or WactionStores. See TQL programming rules and best practices for more about this limitation.

Here is a simple example, from PosApp:

CREATE JUMPING WINDOW PosData5Minutes
OVER PosDataStream KEEP WITHIN 5 MINUTE ON dateTime PARTITION BY merchantId;

CREATE CQ GenerateMerchantTxRateOnly
INSERT INTO MerchantTxRateOnlyStream
SELECT p.merchantId,
  FIRST(p.zip),
  FIRST(p.dateTime),
  COUNT(p.merchantId),
  SUM(p.amount) ...
FROM PosData5Minutes p ...
GROUP BY p.merchantId;

Every time the PosData5Minutes window jumps, the GenerateMerchantTxRateOnly CQ will generate a summary event for each merchant including the merchant ID, the first Zip code and timestamp of the set, the number of transactions, and the total amount of the transactions. For more details, see PosApp.

Applications may use multiple windows and multiple CQs to perform more complex aggregation tasks. For example, from MultiLogApp:

CREATE JUMPING WINDOW ApiWindow 
OVER ApiEnrichedStream KEEP WITHIN 1 HOUR ON logTime 
PARTITION BY api;

CREATE CQ GetApiUsage 
INSERT INTO ApiUsageStream 
SELECT a.api, a.sobject, COUNT(a.userId), FIRST(a.logTime) 
FROM ApiWindow a ...

CREATE JUMPING WINDOW ApiSummaryWindow 
OVER ApiUsageStream KEEP WITHIN 1 HOUR ON logTime 
PARTITION BY api;

CREATE CQ GetApiSummaryUsage 
INSERT INTO ApiActivity 
SELECT a.api, sum(a.count), first(a.logTime)
FROM ApiSummaryWindow a ...
  • The jumping ApiWindow aggregates the application log events from the ApiEnrichedStream window into one-hour sets for each API call (PARTITION BY api) based on the events' log times.

  • Once an hour, when ApiWindow emits its latest set of data, the GetApiUsage CQ sends a summary event for each sobject in each API call including the name of the API call, the name of the sobject, the count of the sobject, and the log time of the first occurrence of that sobject during that hour (SELECT a.api, a.sobject, COUNT(a.userId), FIRST(a.logTime)).

  • The ApiSummaryWindow contains the summary events emitted by GetApiUsage. This window jumps in sync with ApiWindow since both use KEEP WITHIN 1 HOUR ON logTime.

  • The GetApiSummaryUsage discards the sobject details and generates summary events for the API call.