Aggregating data with CQs
To aggregate data or perform any sort of calculation on it (see Functions), a CQ must select from a window (see Bounding data with windows). The basic pattern for a CQ that filters data is:
window / cache / WActionStore > CQ > stream / WActionStore
A CQ that aggregates data may have multiple inputs, including one or more streams, but at least one input must be a window, cache, or WactionStores. See TQL programming rules and best practices for more about this limitation.
Here is a simple example, from PosApp:
CREATE JUMPING WINDOW PosData5Minutes OVER PosDataStream KEEP WITHIN 5 MINUTE ON dateTime PARTITION BY merchantId; CREATE CQ GenerateMerchantTxRateOnly INSERT INTO MerchantTxRateOnlyStream SELECT p.merchantId, FIRST(p.zip), FIRST(p.dateTime), COUNT(p.merchantId), SUM(p.amount) ... FROM PosData5Minutes p ... GROUP BY p.merchantId;
Every time the PosData5Minutes window jumps, the GenerateMerchantTxRateOnly CQ will generate a summary event for each merchant including the merchant ID, the first Zip code and timestamp of the set, the number of transactions, and the total amount of the transactions. For more details, see PosApp.
Applications may use multiple windows and multiple CQs to perform more complex aggregation tasks. For example, from MultiLogApp:
CREATE JUMPING WINDOW ApiWindow OVER ApiEnrichedStream KEEP WITHIN 1 HOUR ON logTime PARTITION BY api; CREATE CQ GetApiUsage INSERT INTO ApiUsageStream SELECT a.api, a.sobject, COUNT(a.userId), FIRST(a.logTime) FROM ApiWindow a ... CREATE JUMPING WINDOW ApiSummaryWindow OVER ApiUsageStream KEEP WITHIN 1 HOUR ON logTime PARTITION BY api; CREATE CQ GetApiSummaryUsage INSERT INTO ApiActivity SELECT a.api, sum(a.count), first(a.logTime) FROM ApiSummaryWindow a ...
The jumping ApiWindow aggregates the application log events from the ApiEnrichedStream window into one-hour sets for each API call (
PARTITION BY api
) based on the events' log times.Once an hour, when ApiWindow emits its latest set of data, the GetApiUsage CQ sends a summary event for each sobject in each API call including the name of the API call, the name of the sobject, the count of the sobject, and the log time of the first occurrence of that sobject during that hour (
SELECT a.api, a.sobject, COUNT(a.userId), FIRST(a.logTime)
).The ApiSummaryWindow contains the summary events emitted by GetApiUsage. This window jumps in sync with ApiWindow since both use KEEP WITHIN 1 HOUR ON logTime.
The GetApiSummaryUsage discards the sobject details and generates summary events for the API call.