Skip to main content

Sources with WAEvent output

The basic programming pattern for a source using an adapter with the WAEvent output type is:

source > stream > CQ > stream

You do not need to define the streams explicitly: instead, define them implicitly as part of the source and CQ, and they will be created automatically. For example, from PosApp:

CREATE SOURCE CsvDataSource USING FileReader (
  directory:'Samples/PosApp/appData',
  wildcard:'posdata.csv',
  blocksize: 10240,
  positionByEOF:false
)
PARSE USING DSVParser (
  header:Yes,
  trimquote:false
)
OUTPUT TO CsvStream;

CREATE CQ CsvToPosData
INSERT INTO PosDataStream
SELECT TO_STRING(data[1]) as merchantId,
  TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime,
  DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue,
  TO_DOUBLE(data[7]) as amount,
  TO_STRING(data[9]) as zip
FROM CsvStream;

In addition to the explicitly defined CsvDataSource and CsvToPosData, this will create the "implicit" stream CsvStream (of type WAEvent), PosDataStream_Type, and PosDataStream:

You can inspect these using the DESCRIBE command, for example:

W (admin) > describe type Samples.PosDataStream_Type;
Processing - describe type Samples.PosDataStream_Type
TYPE Samples.PosDataStream_Type CREATED 2016-01-22 12:35:34
ATTRIBUTES (
  merchantId java.lang.String
  dateTime org.joda.time.DateTime
  hourValue java.lang.Integer
  amount java.lang.Double
  zip java.lang.String
)

For more information, see Parsing the data field of WAEvent.

The code for XML sources is slightly different. For example, from MultiLogApp:

CREATE SOURCE Log4JSource USING FileReader (
  directory:'Samples/MultiLogApp/appData',
  wildcard:'log4jLog.xml',
  positionByEOF:false
) 
PARSE USING XMLParser(
  rootnode:'/log4j:event',
  columnlist:'log4j:event/@timestamp,
    log4j:event/@level,
    log4j:event/log4j:message,
    log4j:event/log4j:throwable,
    log4j:event/log4j:locationInfo/@class,
    log4j:event/log4j:locationInfo/@method,
    log4j:event/log4j:locationInfo/@file,
    log4j:event/log4j:locationInfo/@line'
)
OUTPUT TO RawXMLStream;

Here, fields 0-7 in the WAEvent data array are defined by columnlist. These are then parsed by the CQ ParseLog4J:

CREATE TYPE Log4JEntry (
  logTime DateTime,
  level String,
  message String,
  api String,
  sessionId String,
  userId String,
  sobject String,
  xception String,
  className String,
  method String,
  fileName String,
  lineNum String
);
CREATE STREAM Log4JStream OF Log4JEntry;

CREATE CQ ParseLog4J
INSERT INTO Log4JStream
SELECT TO_DATE(TO_LONG(data[0])),
  data[1],
  data[2], 
  MATCH(data[2], '\\\\[api=([a-zA-Z0-9]*)\\\\]'),
  MATCH(data[2], '\\\\[session=([a-zA-Z0-9\\-]*)\\\\]'),
  MATCH(data[2], '\\\\[user=([a-zA-Z0-9\\-]*)\\\\]'),
  MATCH(data[2], '\\\\[sobject=([a-zA-Z0-9]*)\\\\]'),
  data[3],
  data[4],
  data[5],
  data[6],
  data[7]
FROM RawXMLStream;

See MultiLogApp for a more detailed discussion including explanation of the MATCH function.

If you preferred, instead of separately defining Log4JStream as above, you could define it within the ParseLog4J CQ, as follows:

CREATE CQ ParseLog4J
INSERT INTO Log4JStream
SELECT TO_DATE(TO_LONG(data[0])) as logTime,
  TO_STRING(data[1]) as level,
  TO_STRING(data[2]) as message,
  TO_STRING(MATCH(data[2], '\\\\[api=([a-zA-Z0-9]*)\\\\]')) as api,
  TO_STRING(MATCH(data[2], '\\\\[session=([a-zA-Z0-9\\-]*)\\\\]')) as sessionId,
  TO_STRING(MATCH(data[2], '\\\\[user=([a-zA-Z0-9\\-]*)\\\\]')) as userId,
  TO_STRING(MATCH(data[2], '\\\\[sobject=([a-zA-Z0-9]*)\\\\]')) as sobject,
  TO_STRING(data[3]) as xception,
  TO_STRING(data[4]) as className,
  TO_STRING(data[5]) as method,
  TO_STRING(data[6]) as fileName,
  TO_STRING(data[7]) as lineNum
FROM RawXMLStream;

With this approach, Log4JStream's automatically generated type would be Log4JStream_Type, so you would have to replace the four other references to Log4JEntry in the application with Log4JStream_Type.