Striim 3.9.7 documentation

Detecting device status changes

Code similar to the following can be used to detect when a device goes offline:

CREATE  TYPE MyUser  ( 
	name java.lang.String , 
        myid java.lang.String  
 ) ;

CREATE  CACHE LoadIDsCache USING FileReader ( 
  wildcard: 'user.csv',
  directory: '.'
) 
PARSE USING Global.DSVParser () 
QUERY (  keytomap: 'myid' ) OF  MyUser;

CREATE  CQ PopulateMasterStream 
INSERT INTO MasterStream
SELECT * FROM admin.LoadIDsCache c, heartbeat(interval 5 second) h;

CREATE  WINDOW MasterWinow OVER MasterStream KEEP 1 ROWS  PARTITION BY myid;

CREATE  SOURCE LiveEventStream USING TCPReader  ( 
  IPAddress: 'localhost',
  portno: 1234
 ) 
 PARSE USING DSVParser  ()
  headerlineno: 0
 ) 
OUTPUT TO liveEvStream ;

CREATE OR REPLACE CQ CreateLiveEventsCQ 
  INSERT INTO liveObjectStream
  SELECT TO_STRING(data[0]) as myId FROM liveEvStream l;

CREATE OR REPLACE JUMPING WINDOW LiveObjWindow
  OVER liveObjectStream KEEP 1 ROWS WITHIN 10 second PARTITION BY myId;

/* select * from admin.NonExistObjStream;
[
   myID = 1
   cnt = 1
   status = existent
]
[
   myID = null 
   cnt = 4
   status = non-existent
]
*/

select lw.myId as myID, count(*) as cnt, 
CASE WHEN lw.myId IS NULL 
  THEN "non-existent" 
  ELSE "existent" END as status
from   MasterWinow mw 
left join LiveObjWindow lw on mw.myid = lw.myId
group by lw.myId;

The cache file has the format:

device_name_a,1
device_name_b,2
...

Code similar to the following can be used detect both when a device goes offline and comes back online:

CREATE OR REPLACE CQ UserStateCQ
INSERT INTO CurrentUserStateStream
SELECT
	lw.myId as myID, 
	count(*) as cnt, 
	DNOW() as StateTime,
	CASE WHEN lw.myId IS NULL THEN "offline" ELSE "online" 
	END as status
	
FROM   MasterWinow mw 
LEFT JOIN LiveObjWindow lw on mw.myid = lw.myId
GROUP BY lw.myId;

CREATE SLIDING WINDOW UserState OVER CurrentUserStateStream KEEP 2 ROW PARTITION BY myId;

CREATE OR REPLACE CQ WatchUserStateCQ
INSERT INTO ChangedUserStateStream
SELECT
	lw.myId as myID, 
	TO_LONG(LAST(w.StateTime) - FIRST(w.StateTime)) as StateChangeInterval,
	CASE 
		WHEN LAST(w.status) == 'online' AND FIRST(w.status) == 'online'  THEN "up"
		WHEN LAST(w.status) == 'online' AND FIRST(w.status) == 'offline'  THEN "online"
		WHEN LAST(w.status) == 'offline' AND FIRST(w.status) == 'online'  THEN "down"
		WHEN LAST(w.status) == 'offline' AND FIRST(w.status) == 'offline'  THEN "offline"
	END as ChangeLabel
FROM   UserState w 
GROUP BY w.myID
HAVING ChangeLabel = 'down' OR ChangeLabel ='online';

The two-row window acts as a two-node state machine. The addition of a CQ can generate events that meet business requirements:

  • alerts on state changes

  • accumulating statistics on device uptime, downtime, frequency of change, and so on