Skip to main content

Using the Forwarding Agent

The Forwarding Agent is a stripped-down version of a Striim server that can be used to run sources and CQs locally on a remote host. Windows, caches, and other components are not supported.

To use the Agent, first follow the instructions in Striim Forwarding Agent installation and configuration that are appropriate for your environment. Then, in your application, create a flow for the source that will run on the agent, and deploy it to the agent's deployment group.

Here is a simple example that reads from a file on the remote host and writes to a file on the Striim server:

CREATE APPLICATION agentTest;

CREATE FLOW AgentFlow WITH ENCRYPTION;
CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'posdata.csv',
  blocksize: 10240,
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;
END FLOW AgentFlow;
  
CREATE FLOW ServerFlow;
CREATE TARGET t USING FileWriter(  filename:'AgentOut')
FORMAT USING DSVFormatter ()
INPUT FROM CsvStream
END FLOW ServerFlow;

END APPLICATION agentTest;

DEPLOY APPLICATION agentTest ON default
WITH AgentFlow ON ALL IN agent;

Be sure the Agent is running when you load the application. If there are multiple agents in the deployment group the source will automatically combine their data. The WITH ENCRYPTION option will encrypt the stream connecting the agent to the server (see CREATE APPLICATION ... END APPLICATION).CREATE APPLICATION ... END APPLICATION

Note

CQs running on an Agent may not select from Kafka streams or include AS <field name>, GROUP BY, HAVING, ITERATOR, or MATCH_PATTERN.

The following variation on the beginning of the PosApp sample application filters out unneeded columns and partitions the stream:

CREATE FLOW AgentFlow; 
 CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'posdata.csv',
  blocksize: 10240,
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
) OUTPUT TO CsvStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream partition by merchantId
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM CsvStream;

END FLOW AgentFlow; ...

PARTITION BY merchantId specifies that events with the same merchantId values will be processed on the same server. This is required for ServerFlow to be deployed to multiple servers. An unpartitioned source is automatically deployed to a single server.

See Filtering events using OUTPUT TO and WHERE for additional examples of filter syntax that are compatible with the Forwarding Agent.

To support recovery when a source running on the Forwarding Agent is in one application and the server-side components that consume its output are in one or more other applications, persist the source's output to Kafka (see Persisting a stream to Kafka). For example:Persisting a stream to Kafka

CREATE APPLICATION agentApp;
CREATE FLOW AgentFlow WITH ENCRYPTION;
CREATE STREAM PersistedStream OF Global.WAEvent PERSIST; 
CREATE SOURCE OracleCDCIn USING OracleReader (
  Username:'striim',
  Password:'passwd',
  ConnectionURL:'203.0.113.49:1521:orcl',
  Tables:'myschema.%'
) 
OUTPUT TO PersistedStream;
END FLOW AgentFlow;
END APPLICATION agentApp;
DEPLOY APPLICATION agentApp ON default
WITH AgentFlow ON ALL IN agent;
  
  
CREATE APPLICATION ServerApp;
CREATE TARGET KafkaTarget USING KafkaWriter VERSION '0.8.0' (
  Mode: 'Sync',
  Topic: 'OracleData',
  brokerAddress: '198.51.100.55:9092'
)
FORMAT USING AvroFormatter ( schemaFileName: 'OracleData.avro' )
INPUT FROM PersistedStream;
END APPLICATION ServerApp;
DEPLOY APPLICATION serverApp ON default;