MongoDB Writer
Writes to MongoDB collections.
Using the adapter
This adapter may be used in four ways:
With an input stream of a user-defined type, MongoDB Writer writes events as documents to a single collection.
Target document field names are taken from the input stream's event type.
The value of the key field of the input event is used as the document key (
_id
field value). If the input stream's type has no key, the target document's key is generated by concatenating the values of all fields, separated by the Key Separator string. Alternatively, you may specify a subset of fields to be concatenated using the syntax<database name>.<collection name> keycolumns(<field1 name>, <field2 name>, ...)
in the Collections property.With an input stream of type JSONNodeEvent that is the output stream of a source using JSONParser, MongoDB Writer writes events as documents to a single collection.
Target document field names are taken from the input events' JSON field names.
When the JSON event contains an
_id
field, its value is used as the MongoDB Writer document key. Otherwise, MongoDB will generate an ObjectId for the document key.With an input stream of type JSONNodeEvent that is the output stream of a MongoDBReader source, MongoDB Writer writes each MongoDB collection to a separate MongoDB collection. Inserts, updates, and deletes in the source are handled as inserts, updates, and deletes in the target.
MongoDB collections may be replicated in another MongoDB instance by using wildcards in the Collections property. Alternatively, you may manually map source collections to target collections as discussed in the notes for the Collections property.
The source document's primary key and field names are used as the target document's key and field names.
With an input stream of type WAEvent that is the output stream of a SQL CDC reader or DatabaseReader source, MongoDB Writer writes data from each source table to a separate collection. The target collections may be in different databases. In order to process updates and deletes, compression must be disabled in the source adapter (that is, WAEvents for insert and delete operations must contain all values, not just primary keys and, for inserts, the modified values)..
Each row in a source table is written to a document in the target collection mapped to the table. Target document field names are taken from the source event's metadata map and their values from its data array (see WAEvent contents for change data).
Source table data may be replicated to MongoDB collections of the same names by using wildcards in the Collections property. Note that data will be read only from tables that exist when the source starts. Additional tables added later will be ignored until the source is restarted. Alternatively, you may manually map source tables to MongoDB collections as discussed in the notes for the Collections property. When the source is a CDC reader, updates and deletes in source tables are replicated in the corresponding MongoDB target collections.
Each source row's primary key value (which may be a composite) is used as the key (
_id
field value) for the corresponding MongoDB document. If the table has no primary key, the target document's key is generated by concatenating the values of all fields in the row, separated by the Key Separator string. Alternatively, you may select a subset of fields to be concatenated using thekeycolumns
option as discussed in the notes for the Collections property.
MongoDB Writer properties
property | type | default value | notes |
---|---|---|---|
Auth DB | String | admin | Specify the authentication database for the specified username. If not specified, uses the |
Auth Type | String | SCRAM_SHA_1 | Specify the authentication mechanism used by your MongoDB instance. The default setting uses MongoDB's default authentication mechanism, SCRAMSHA1. Other supported choices are KERBEROS_GSSAPI, MONGODB_CR, SCRAM_SHA_256, and X_509. Set to NoAuth if authentication is not enabled. Set to KERBEROS_GSSAPI if you are using Kerberos. |
Batch Policy | String | EventCount:1000, Interval:30 | The batch policy includes eventCount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to With the default setting, data will be written every 30 seconds or sooner if the buffer accumulates 1,000 events. |
Checkpoint Collection | String | Optionally, specify the fully-qualified name of an existing empty MongoDB collection in the target. The user specified in Username must have the readwrite role on this collection. When no checkpoint collection is specified, Striim guarantees at-least-once processing (A1P) with MongoDB Writer. That is, after recovery, there may be some duplicate events, but none will be missing. When writing to replica sets in MongoDB 4.0 or later or to sharded clusters in Mongo 4.2 or later, specifying a checkpoint collection enables exactly-once processing (E1P). That is, after recovery, there will be no duplicate or missing events. | |
Collections | String | The fully-qualified name(s) of the MongoDB collection(s) to write to, for example, mydb.mycollection. Separate multiple collections by commas. You may use the % wildcard, for example, mydb.%. Note that data will be written only to collections that exist when the Striim application starts. Additional collections added later will be ignored until the application is restarted. When multiple source collections are mapped to the target, the combination of shard key and For MongoDB 4.2 or later only: When the source is MongoDB Reader, the source colleciton is unsharded, and the target collection is sharded, the shard key fields in the source and target must match. When the source is MongoDB Reader and the source and target are both unsharded, in MongoDB Reader FullDocumentUpdateLookup must be True. When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source, it can write to multiple collections. In this case, specify the names of both the source tables and target collections ( | |
Connection Retry | String | retryInterval=60, maxRetries=3 | With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds ( |
Connection URL | String | NoteWhen connecting to MongoDB with DNS SRV, specify When connecting to a sharded MongoDB instance with mongos, specify When connecting to a sharded instance of MongoDB without mongos, specify | |
Excluded Collections | String | Any collections to be excluded from the set specified in the Collections property. Specify as for the Collections property. | |
Ignorable Exception Code | String | By default, if the target returns an error, the application will terminate. Specify DUPLICATE_KEY or KEY_NOT_FOUND to ignore such errors and continue. By default, if MongoDB Writer attempts to update a shard key field without providing the previous value of the field, the Striim application will halt with a No Operation Exception error. To instead ignore such errors and continue without updating the shard key field, specify SHARD_KEY_UPDATE. To specify multiple ignorable exception codes, separate them with a comma. Ignored exceptions will be written to the application's exception store (see CREATE EXCEPTIONSTORE). | |
Key Separator | String | : | Inserted between values when generating document keys by concatenating column or field values. If the values might contain a colon, change this to something that will not occur in those values. |
Ordered Writes | Boolean | True | If you do not care that documents may be written out of order (typically the case during initial load), set to False to improve performance. |
Parallel Threads | Integer | ||
Password | com. webaction. security. Password | The password for the specified Username. | |
Retriable Error Codes | String | Specify any error codes for which you want to trigger a connection retry rather than a halt or termination, for example, Each version of MongoDB has its own error code reference, for example, https://github.com/mongodb/mongo/blob/v2.6/docs/errors.md for 2.6 and https://github.com/mongodb/mongo/blob/r5.0.7/src/mongo/base/error_codes.yml for 5.0. | |
Security Config | String | See Using SSL or Kerberos or X.509 authentication with MongoDB. | |
SSL Config | String | To enable SSL for the connection, or if individual Atlas MongoDB instances are specified in the Connection URL, set to | |
Upsert Mode | Boolean | False | Set to True to process inserts and updates as upserts. This is required if the input stream of this writer is the output stream of a Cosmos DB Reader or Mongo Cosmos DB Reader source. |
Username | String | A MongoDB user with the readwrite role on the target collection(s). |
MongoDB sample applications
This application writes data from a CSV file to MongoDB. It has an input stream of a user-defined type.
CREATE SOURCE FileSource USING FileReader ( directory: '/Users/user/Desktop', wildcard: 'data.csv', positionbyeof: false ) PARSE USING DSVParser() OUTPUT TO FileStream; CREATE TYPE CqStream_Type ( uid java.lang.Integer, name java.lang.String , zip java.lang.Long, city java.lang.String); CREATE STREAM CqStream OF CqStream_Type; CREATE CQ Cq1 INSERT INTO CqStream SELECT TO_INT(data[0]) as uid, data[1] as name, TO_LONG(data[2]) as zip, data[3] as city FROM FileStream; CREATE TARGET MongoTarget USING MongoDBWriter ( Collections: 'test.emp keycolumns(uid,name)', ConnectionURL: 'localhost:27017', AuthDB: 'admin', UserName: 'waction', keyseparator: ':', Password: '********') INPUT FROM CqStream;
This application writes data from a JSON file to MongoDB. It has an input stream of type JSONNodeEvent from JSONParser.
CREATE SOURCE JsonSource USING FileReader ( directory: '/Users/user/Desktop', wildcard: 'jsondata.txt', positionbyeof: false ) PARSE USING JSONParser() OUTPUT TO JsonStream; CREATE TARGET MongoTgt USING MongoDBWriter ( AuthType: 'SCRAM_SHA_1', ConnectionURL: 'localhost:27017', AuthDB: 'admin', Collections: 'test.emp1', UserName: 'waction', Password: '********', ) INPUT FROM JsonStream;
This initial load application writes data from one MongoDB collection to another. It has an input stream of type JSONNodeEvent from MongoDB Reader.
CREATE SOURCE Mongoource USING MongoDBReader ( Mode: 'InitialLoad', collections: 'qatest.col1', connectionUrl: 'localhost:27017' ) OUTPUT TO Mongostream ; CREATE TARGET MongoTarget USING MongoDBWriter ( Collections: 'qatest.col1,db2.TEST', ConnectionURL: 'localhost:27017' ) INPUT FROM Mongostream;
This streaming integration application writes data from Oracle to MongoDB. It has an input stream type of WAEvent from Oracle Reader (a SQL CDC source).
CREATE SOURCE Oracle_Source USING OracleReader ( Username: 'miner', Password: 'miner', ConnectionURL: 'jdbc:oracle:thin:@//192.168.1.49:1521/orcl', Tables: 'QATEST.%' ) OUTPUT TO DataStream; CREATE TARGET MongoDBTarget1 USING MongoDBWriter ( Username: 'admin', Password: 'admin', ConnectionURL : 'localhost:27017', Collections: 'QATEST.%,MongoRegrDB01.%' ) INPUT FROM DataStream;