Sources with WAEvent output
The basic programming pattern for a source using an adapter with the WAEvent output type is:
source > stream > CQ > stream
You do not need to define the streams explicitly: instead, define them implicitly as part of the source and CQ, and they will be created automatically. For example, from PosApp:
CREATE SOURCE CsvDataSource USING FileReader ( directory:'Samples/PosApp/appData', wildcard:'posdata.csv', blocksize: 10240, positionByEOF:false ) PARSE USING DSVParser ( header:Yes, trimquote:false ) OUTPUT TO CsvStream; CREATE CQ CsvToPosData INSERT INTO PosDataStream SELECT TO_STRING(data[1]) as merchantId, TO_DATEF(data[4],'yyyyMMddHHmmss') as dateTime, DHOURS(TO_DATEF(data[4],'yyyyMMddHHmmss')) as hourValue, TO_DOUBLE(data[7]) as amount, TO_STRING(data[9]) as zip FROM CsvStream;
In addition to the explicitly defined CsvDataSource
and CsvToPosData
, this will create the "implicit" stream CsvStream
(of type WAEvent
), PosDataStream_Type
, and PosDataStream
:
You can inspect these using the DESCRIBE
command, for example:
W (admin) > describe type Samples.PosDataStream_Type; Processing - describe type Samples.PosDataStream_Type TYPE Samples.PosDataStream_Type CREATED 2016-01-22 12:35:34 ATTRIBUTES ( merchantId java.lang.String dateTime org.joda.time.DateTime hourValue java.lang.Integer amount java.lang.Double zip java.lang.String )
For more information, see Parsing the data field of WAEvent.
The code for XML sources is slightly different. For example, from MultiLogApp:
CREATE SOURCE Log4JSource USING FileReader ( directory:'Samples/MultiLogApp/appData', wildcard:'log4jLog.xml', positionByEOF:false ) PARSE USING XMLParser( rootnode:'/log4j:event', columnlist:'log4j:event/@timestamp, log4j:event/@level, log4j:event/log4j:message, log4j:event/log4j:throwable, log4j:event/log4j:locationInfo/@class, log4j:event/log4j:locationInfo/@method, log4j:event/log4j:locationInfo/@file, log4j:event/log4j:locationInfo/@line' ) OUTPUT TO RawXMLStream;
Here, fields 0-7 in the WAEvent data
array are defined by columnlist
. These are then parsed by the CQ ParseLog4J
:
CREATE TYPE Log4JEntry ( logTime DateTime, level String, message String, api String, sessionId String, userId String, sobject String, xception String, className String, method String, fileName String, lineNum String ); CREATE STREAM Log4JStream OF Log4JEntry; CREATE CQ ParseLog4J INSERT INTO Log4JStream SELECT TO_DATE(TO_LONG(data[0])), data[1], data[2], MATCH(data[2], '\\\\[api=([a-zA-Z0-9]*)\\\\]'), MATCH(data[2], '\\\\[session=([a-zA-Z0-9\\-]*)\\\\]'), MATCH(data[2], '\\\\[user=([a-zA-Z0-9\\-]*)\\\\]'), MATCH(data[2], '\\\\[sobject=([a-zA-Z0-9]*)\\\\]'), data[3], data[4], data[5], data[6], data[7] FROM RawXMLStream;
See MultiLogApp for a more detailed discussion including explanation of the MATCH
function.
If you preferred, instead of separately defining Log4JStream
as above, you could define it within the ParseLog4J
CQ, as follows:
CREATE CQ ParseLog4J INSERT INTO Log4JStream SELECT TO_DATE(TO_LONG(data[0])) as logTime, TO_STRING(data[1]) as level, TO_STRING(data[2]) as message, TO_STRING(MATCH(data[2], '\\\\[api=([a-zA-Z0-9]*)\\\\]')) as api, TO_STRING(MATCH(data[2], '\\\\[session=([a-zA-Z0-9\\-]*)\\\\]')) as sessionId, TO_STRING(MATCH(data[2], '\\\\[user=([a-zA-Z0-9\\-]*)\\\\]')) as userId, TO_STRING(MATCH(data[2], '\\\\[sobject=([a-zA-Z0-9]*)\\\\]')) as sobject, TO_STRING(data[3]) as xception, TO_STRING(data[4]) as className, TO_STRING(data[5]) as method, TO_STRING(data[6]) as fileName, TO_STRING(data[7]) as lineNum FROM RawXMLStream;
With this approach, Log4JStream
's automatically generated type would be Log4JStream_Type
, so you would have to replace the four other references to Log4JEntry
in the application with Log4JStream_Type
.