Striim 3.9.7 documentation

Join data from a cache using a CQ

Caches provide data from non-real-time sources. In this case, the cache is a subset of the USAddresses.txt file used by the PosApp sample application.

Save the following as …/Striim/Samples/simplezipdata.txt:

zip	city	state	latVal	longVal
16950	Westfield	PA	41.9193	-77.523
41363	Quicksand	KY	37.5331	-83.3652

In simple.tql, replace these lines:

CREATE TARGET SimpleFilteredOutput
USING SysOut(name:simpleFiltered)
INPUT FROM FilteredDataStream;

with the following:

CREATE TYPE ZipCacheType(
  zip String KEY,
  city String,
  state String,
  latVal double,
  longVal double
);

CREATE CACHE ZipCache using FileReader (
  directory: 'Samples',
  wildcard: 'simplezipdata.txt')
PARSE USING DSVParser (
  header: Yes,
  columndelimiter: '\t',
  trimquote:false
) QUERY (keytomap:'zip') OF ZipCacheType;

CREATE TYPE JoinedDataType(
  merchantId String KEY,
  zip String,
  city String,
  state String,
  latVal double,
  longVal double
);
CREATE STREAM JoinedDataStream OF JoinedDataType;

CREATE CQ JoinDataCQ
INSERT INTO JoinedDataStream
SELECT  f.merchantId,
        f.zip,
        z.city,
        z.state,
        z.latVal,
        z.longVal
FROM FilteredDataStream f, ZipCache z
WHERE f.zip = z.zip;

CREATE TARGET simpleJoinedData
USING SysOut(name:simpleJoined)
INPUT FROM JoinedDataStream;

Reload and restart simple.tql and you should see these results in striim-node.log:

simpleJoined: JoinedDataType_1_0{
  merchantId: "D6RJPwyuLXoLqQRQcOcouJ26KGxJSf6hgbu"
  zip: "41363"
  city: "Quicksand"
  state: "KY"
  latVal: 37.5331
  longVal: -83.3652
};
simpleJoined: JoinedDataType_1_0{
  merchantId: "OFp6pKTMg26n1iiFY00M9uSqh9ZfMxMBRf1"
  zip: "16950"
  city: "Westfield"
  state: "PA"
  latVal: 41.9193
  longVal: -77.523
};