Skip to main content

MongoDB

Striim supports MongoDB versions 3.6 through 6.3 and MongoDB and MongoDB Atlas on AWS, Azure, and Google Cloud Platform.

With MongoDB 4.2 or later, MongoDB reads from MongoDB change streams rather than its oplog. See MongoDB Manual > Change Streams. When reading from change streams, MongoDB Reader also:

  • can read from multiple shards. (With MongoDB 4.1 or earlier, a separate MongoDB Reader will be needed for each shard.)

  • supports transactions, providing transaction metadata, and creating individual events for each operation in a transaction. See MongoDB Manual > Transactions for more information.

MongoDB Reader applications created in releases of Striim prior to 4.2 will continue to read from the oplog after upgrading. To switch to change streams, see "Changes that may require modification of your TQL code, workflow, or environment" in the Release notes.Release notes

Striim provides a template for creating applications that read from MongoDB and write to Cosmos DB. See Creating an application using a template for details.

MongoDB setup

MongoDBReader reads data from the Replica Set Oplog, so to use it you must be running a replica set (see Deploy a Replica Set or Convert a Standalone to a Replica Set, or your MongoDB cloud provider's documentation) on each shard to be read. For more information, see Replication and Replica Set Data Synchronization.

For all versions of MongoDB, the user specified in MongoDBReader's Username property must have read access to the config database (see MongoDB Manual > Config Database

For MongoDB Atlas, for both InitialLoad and Incremental mode, the user specified in MongoDBReader's Username property must have the readAnyDatabase role (see Built-in Roles).

For other versions of MongoDB, In InitialLoad mode, the user specified in MongoDBReader's Username property must have read access to all databases containing the specified collections..

In Incremental mode, for MongoDB 4.2 and later, the user specified in MongoDBReader's Username property must have changeStream and find privileges on all the collections of the cluster. You may want to create a role with these two privileges (see MongoDB Manual > User-Defined Roles).

In Incremental mode, for MongoDB 4.1 and earlier, the user specified in MongoDBReader's Username property must have read access to the local database and the oplog.rs collection. The oplog is a capped collection, which means that the oldest data is automatically removed to keep it within the specified size. To support recovery, the oplog must be large enough to retain all data that may need to be recovered. See Oplog Size and Change Oplog Size for more information.

To support recovery (see Recovering applications), for all versions of MongoDB, the replica set's oplog must be large enough to retain all the events generated while Striim is offline.

Using SSL or Kerberos or X.509 authentication with MongoDB

If you have an on-premise MongoDB deployment with your own certificate authority, set the Security Config properties as discussed below.

To set up SSL in MongoDB, see MongoDB Manual > Configure mongod and mongos for TLS/SSL.

To set up Kerberos in MongoDB, see MongoDB Manual > Kerberos Authentication.

To set up X.509 in MongoDB, see MongoDB Manual > Use x.509 Certificates to Authenticate Clients.

To secure your authentication parameters, store the entire Security Config string in a vault (see use Using vaults). For example, assuming your Kerberos realm is MYREALM.COM, its Key Distribution Center (KDC) is kerberos.realm.com, the path to the SSL trust store is /cacerts, the path to the SSL keystore file is /client.pkcs12, and the password for both stores is MyPassword, the Striim console commands to store the Security Config string with the key SSLKerberos in a vault named MongoDBVault would be:

CREATE VAULT MongoDBVault;
WRITE INTO MongoDBVault (
  vaultKey: "SSLKerberos",
  vaultValue: "RealmName:MYREALM.COM;
    KDC:kerberos.myrealm.com;
    KeyStore:/keystore.pkcs12;
    TrustStore:/cacerts;
    trustStorePassword:MyPassword;
    KeyStorePassword:MyPassword"
);

Enter READ ALL FROM MongoDBVault; to verify the contents.

In TQL or the Flow Designer, you would then specify the Security Config as [[MongoDBVault.SSLKerberos]].

The following are examples for each authentication option.

Kerberos authentication

Without SSL:

CREATE VAULT MongoDBVault;
WRITE INTO MongoDBVault (
  vaultKey: "Kerberos",
  vaultValue : "RealmName:MYREALM.COM;
  KDC:kdc.myrealm.com"
);

With SSL:

CREATE VAULT MongoDBVault;
WRITE INTO MongoDBVault (
  vaultKey: "KerberosSSL",
  vaultValue : "RealmName:MYREALM.COM;
  KDC:kdc.myrealm.com;
  KeyStore:UploadedFiles/keystore.ks;
  KeyStorePassword:MyPassword;
  TrustStore:Platform/UploadedFiles/truststore.ks;
  TrustStorePassword:MyPassword"
);

If required, also specify:

  • JAAS_CONF: the path to and name of the uploaded JAAS configuration file, for example, UploadedFiles/jaas.conf

  • JAAS_ENTRY: the name of an entry in jaas.conf, for example, striim_user

  • KeyStoreType:PKCS12: specify this if the KeyStore is PKCS12 rather than JKS

  • any of the elements listed below for SSL

SSL

If you are using MongoDB on premise with SSL, use Security Config to configure key store and trust stores.

CREATE VAULT MongoDBVault;
WRITE INTO MongoDBVault (
  vaultKey: "SSL",
  vaultValue : "KeyStore:UploadedFiles/keystore.ks;
  TrustStore:UploadedFiles/truststore.ks;
  TrustStorePassword:MyPassword;
  KeyStorePassword:MyPassword"
);

If required, also specify:

  • SecureSocketProtocol: specify if a specific protocol is required

  • TrustStoreType:PKCS12: specify this if the TrustStore is PKCS12 rather than JKS

X.509 authentication
CREATE VAULT MongoDBVault;
WRITE INTO MongoDBVault (
  vaultKey: "X509",
  vaultValue : "KeyStore:UploadedFiles/keystore.ks;
  KeyStorePassword:MyPassword"
);

If required, also specify:

  • KeyStoreType:PKCS12: specify this if the KeyStore is PKCS12 rather than JKS

Connecting to MongoDB Atlas clusters using private endpoints or network peering

Striim provides support for connecting to MongoDB Atlas clusters using private endpoints or network peering.

  • Private endpoints: MongoDB Atlas supports private endpoints on dedicated clusters. For example, you may configure a private endpoint connection to a Mongo cluster in Atlas from Striim Platform installed in a Google Cloud Platform VM using a private aware endpoint.

  • Network peering: MongoDB Atlas supports network peering connections. Network peering establishes a private connection between your Atlas VPC and your cloud provider's VPC. The connection isolates traffic from public networks for added security.

There are no Striim-specific configuration steps required; the configuration steps you need to perform are in MongoDB Atlas.

See the following MongoDB Atlas doc topics:

MongoDB Reader properties

The MongoDB driver is bundled with Striim, so no installation is necessary.

The adapter properties are:

property

type

default value

notes

authDB

String

admin

Specify the authentication database for the specified username. If not specified, uses the admin database.

authType

enum

Default

Specify the authentication mechanism used by your MongoDB instance (see MongoDB Manual > Authentication). The Default setting uses MongoDB's default authentication mechanism, SCRAM. Other supported choices are GSSAPI, MONGODBCR, MONGODBX509, PLAIN, SCRAMSHA1, and SCRAMSHA256. Set to NoAuth if authentication is not enabled. 

Set to GSSAPI if you are using Kerberos.

Collections

String

The fully-qualified name(s) of the MongoDB collection(s) to read from, for example, mydb.mycollection. Separate multiple collections by commas.

You may use the $ wildcard, for example, mydb.$ The wildcard is allowed only at the end of the string: for example, mydb.prefix$ is valid, but mydb.$suffix is not.

Note that data will be read only from collections that exist when the Striim application starts, additional collections added later will be ignored until the application is restarted.

Connection Retry Policy

String

retryInterval=30, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Connection URL

String

With MongoDB 4.2 or higher or with earlier versions when Mode is InitialLoad

  • and you are connecting to MongoDB with DNS SRV, specify mongodb+srv://<host name>/<authentication database>, for example, mongodb+srv://abcdev3.gcp.mongodb.net/mydb. If you do not specify a database, the connection will use admin.

  • and you are connecting to a sharded MongoDB instance with mongos, specify <IP address or host name>:<port> of the mongos instance.

  • and you are connecting to a sharded instance of MongoDB without mongos, specify <IP address or host name>:<port> for all instances of the replica set, separated by commas. For example: 192.168.1.1:27107, 192.168.1.2:27107, 192.168.1.3:27017..

With MongoDB 4.1 or earlier when Mode is Incremental

  • and you are connecting to an unsharded instance of MongoDB, specify <IP address or host name>:<port> for all instances of the replica set, separated by commas. For example: 192.168.1.1:27107, 192.168.1.2:27107, 192.168.1.3:27017..

  • and you are connecting to a sharded instance of MongoDB, create a separate source for each shard. For each reader, specify all the instances of the replica set, separated by commas. For example: 192.168.1.1:27107, 192.168.1.2:27107, 192.168.1.3:27017..

Exclude Collections

String

If Collections uses a wildcard, data from any collections specified here will be omitted. Multiple collection names (separated by commas) and wildcards may be used exactly as for Collections.

Full Document Update Lookup

Boolean

False

When Mode is InitialLoad, this setting is ignored and will not appear in the Flow Designer.

With the default setting of False, for UPDATE events the JSONNodeEvent data field will contain only the _id and modified values.

Set to True to include the entire document. Note that the document will be the current version, and depending on other write operations that may have occurred between the update and the lookup, the returned document may differ significantly from the document at the time of the update. Enabling this option setting may affect performance, since MongoDB Reader will have to call the database to fetch more data.

Mode

String

InitialLoad

With the default setting, will load all existing data using db.collection.find()and stop. In this mode, MongoDBReader is not a CDC reader.

Set to Incremental to read CDC data continuously. In this mode, when reading from MongoDB 4.2 and later, MongoDB Reader will read change streams (see MongoDB Manual > Change Streams). When reading from earlier versions of MongoDB, it will read the oplog (see MongoDB Manual > Replica Set Oplog).

MongoDB Config

String

Optionally specify a JSON string to define a subset of documents to be selected in InitialLoad mode and for inserts in Incremental mode. See Selecting documents using MongoDB Config.

When Mode is Incremental, insert operations are sent only for the defined subset of documents, but updates and deletes are sent for all documents. If Cosmos DB Writer, Mongo Cosmos DB Writer, or MongoDB Writer receive an update or delete for a document not in the subset, the application will halt. To avoid this, set Ignorable Exception Code to RESOURCE_NOT_FOUND for Cosmos DB Writer or KEY_NOT_FOUND for Mongo Cosmos DB Writer or MongoDB Writer. Note that in this case you will have to check the exception store to see if there were any ignored exceptions for documents in the subset.

Password

encrypted password

The password for the specified Username.

Quiesce on IL Completion

Boolean

False

Read Preference

String

primaryPreferred

See Read Preference Modes. Supported values are primary, primaryPreferred, secondary, secondaryPreferred, and nearest.

Security Config

String

See Using SSL or Kerberos or X.509 authentication with MongoDB.

SSL Enabled

Boolean

False

If MongoDB requires SSL or individual MongoDB Atlas nodes are specified in the Connection URL, set to True (see Configure mongod and mongos for TLS/SSL). If you have an on-premise MongoDB deployment with your own certificate authority, see Using SSL or Kerberos or X.509 authentication with MongoDB.

Start Timestamp

String

Leave blank to read only new data. Specify a UTC DateTime value (for example, 2018-07-18T04:56:10) to read all data from that time forward or to wait to start reading until a time in the future. If the MongoDB and Striim servers are in different time zones, adjust the value to match the Striim time zone. If the oplog no longer contains data back to the specified time, reading will start from the beginning of the oplog.

If milliseconds are specified (for example, 2017-07-18T04:56:10.999), they will be interpreted as the incrementing ordinal for the MongoDB timestamp (see Timestamps).

Username

String

A MongoDB user with access as described in MongoDB setup.

Selecting documents using MongoDB Config

You can use the MongoDB Config property to define a subset of documents to be selected.

  • The subset is defined by a query in JSON format.

  • Documents may be selected from multiple collections.

  • Specify the wildcard $ at the end of a collection name to match multiple collections using a single QueryClause.

  • All collections referenced in the query must be specified in the Collections property.

  • Multiple queries may be specified.

Operators

The logical operators AND and OR are supported for nested expressions

The following comparison operators are supported:

  • =: equals

  • !=: does not equal

  • <: is less than

  • <=: is less than or equal to

  • >: is greater than

  • >=: is greater than or equal to

The data types supported for comparison using FieldValue are Boolean, String and Numeric.

JSON fields

  • The filter criteria for all the collections should be provided as an object inside the QueryClause field.

  • QueryClause can contain multiple JSON objects with the fully qualified name (or pattern) of the collection and its filter criteria as key and value respectively.

  • If a collection matches more than one pattern, the filter criteria provided with the first pattern will be considered.

  • The Filter object contains the filter criteria of the query clause. Simple expressions and nested expressions are supported for Filter.

  • The leaf fields of the MongoDBConfig JSON object are FilterField and FilterValue. The field names in a document and their filter values can be provided here. These are combined by a comparison operator field called Operator.

  • A simple expression involves directly providing the Operator, FieldName and FieldValue to the Filter field as an object.

  • Multiple nested expressions are created by combining individual Filter JSON objects using logical operators.

  • FieldName is the JSON path of the field. Dot notation can be used to provide the FieldName of a nested field.

MongoDB Config syntax and examples

Basic syntax:

{
  "QueryClause": {
    "<database name>.<collection name>": {
      "Filter": {
        "<field name>": "<field value>"
      }
    }
  }
}

For example, to select documents with the city value Bangalore from the collection MyCollection in the database MyDB:

{
  "QueryClause": {
    "MyDB.MyCollection": {
      "Filter": {
        "city": "Bangalore"
      }
    }
  }
}

An example using the logical operator OR to select documents matching multiple cities:

{
  "QueryClause": {
    "MyDB.MyCollection": {
      "Filter": {
        "OR": [
          {
            "operator": {
              "city": "Bangalore"
            }
          },
          {
            "operator": {
              "city": "Bangalore"
            }
          }
        ]
      }
    }
  }
}

Complex MongoDB Config example

Specifying the JSON query below in MogoDB Config will select the documents that match the following criteria:

Collection

Criteria

mongodb.employee

Documents that have a field named City with Chennai as the value.

Collections whose name start with mongodb.dep

Documents that match both the following conditions

  • do not have a field Name with value Accounts

  • have a field named State with value Tamil Nadu

mongodb.payroll

Documents that have either of the following conditions

  • do not have a field Source with value F

  • match both the following conditions

    • have a field named Age with a value greater than 30

    • have a field named City with a value Bangalore as the value

{
  "QueryClause": {
    "mongodb.employee": {
      "Filter": {
        "City": "Chennai"
      }
    },
    "mongodb.dep$": {
      "Filter": {
        "and": [
          {
            "!=": {
              "Name": "Accounts"
            }
          },
          {
            "State": "Tamil Nadu"
          }
        ]
      }
    },
    "mongodb.payroll": {
      "Filter": {
        "or": [
          {
            "!=": {
              "Source": "F"
            }
          },
          {
            "and": [
              {
                ">": {
                  "Age": 30
                }
              },
              {
                "City": "Bangalore"
              }
            ]
          }
        ]
      }
    }
  }
}

MongoDBReader JSONNodeEvent fields

The output type for MongoDBReader is JSONNodeEvent. The fields are:

data: contains the field names and values of a document, for example:

data: {"_id":2441,"company":"Striim","city":"Palo Alto"}

Updates include only the modified values. Deletes include only the document ID.

removedfields: contains the names of any fields deleted by the $unset function. If no fields were deleted, the value of removedfields is null. For example:

removedfields: {"myField":true}

Or if no fields have been removed:

removedfields: null

metadata: contains the following elements:

  • CollectionName: the collection from which the document was read

  • DatabaseName: the database of the collection

  • DocumentKey: the document ID (same as the _id value in data)

  • FullDocumentReceived: value is True if data includes the entire image of the document, False if it does not

  • Lsid (for MongoDB 4.2 or later only, for operations that are part of a multi-document transaction): the logical session identifier of the transaction session

  • Namespace<database name>.<collection name>

  • OperationName: in InitialLoad mode, SELECT; in Incremental mode, INSERT, UPDATE, or DELETE (with MongoDB 4.1 or earlier, operations within a transaction are not included; see Oplog does not record operations within a transaction)

  • Partitioned: value is True if the operation was on a sharded collection

  • PartitionKeys: a JsonNode object containing the shard keys and their values

  • Timestamp: in InitialLoad mode, the current time of the Striim server when the document was read; in Incremental mode, the MongoDB timestamp when the operation was performed

  • TxnNumber (for MongoDB 4.2 or later only, for operations that are part of a multi-document transaction): the transaction number

For example:

metadata: {"CollectionName":"employee","OperationName":"SELECT","DatabaseName":"test",
  "DocumentKey":1.0,"NameSpace":"test.employee","TimeStamp":1537433999609}

MongoDBReader example application and output

The following Striim application will write change data for the specified collection to SysOut. To run this yourself, replace striim and ****** with the user name and password for the MongoDB user account discussed in MongoDB setup, specify the correct connection URL for your instance, and replace mydb with the name of your database.

CREATE APPLICATION MongoDBTest;

CREATE SOURCE MongoDBIn USING MongoDBReader (
  Username:'striim',
  Password:'******',
  ConnectionURL:'192.168.1.10:27107',
  Collections:'mydb.employee'
) 
OUTPUT TO MongoDBStream;

CREATE TARGET MongoDBCOut
USING SysOut(name:MongoDB)
INPUT FROM MongoDBStream;

END APPLICATION MongoDBTest;

With the above application running, the following MongoDB shell commands:

use mydb;
db.employee.insertOne({_id:1,"firstname":"Larry","lastname":"Talbot","age":10,"comment":"new record"});
db.employee.updateOne({_id:1},{$set:{ "age":40, "comment":"partial update"}});
db.employee.deleteOne({_id:1});

would produce output similar to the following:

data: {"_id":1,"firstname":"Larry","lastname":"Talbot","age":10,"comment":"new record"}
metadata: {"CollectionName":"employee","OperationName":"INSERT","DatabaseName":"mydb","DocumentKey":1,
"NameSpace":"mydb.employee","TimeStamp":1537250474, "Partitioned":false,"FullDocumentReceived":true,
"PartitionKeys":{}}
...
data: {"_id":1.0,"age":40,"comment":"partial update"}
metadata: {"CollectionName":"employee","OperationName":"UPDATE","DatabaseName":"mydb","DocumentKey":1,"
NameSpace":"mydb.employee","TimeStamp":1537250474, "Partitioned":false,"FullDocumentReceived":false,
"PartitionKeys":{}}
...
data: {"_id":1}
metadata: {"CollectionName":"employee","OperationName":"DELETE","DatabaseName":"mydb","DocumentKey":1,
"NameSpace":"mydb.employee","TimeStamp":1537250477, "Partitioned":false,"FullDocumentReceived":false,
"PartitionKeys":{}}

Note that output for the "partial" update and delete operations includes only the fields specified in the shell commands. See Replicating MongoDB data to Azure CosmosDB for discussion of the issues this can cause when writing to targets and how to work around those issues.

Replicating MongoDB data to Azure CosmosDB

To replicate one or many MongoDB collections to Cosmos DB, specify multiple collections in the Collections properties of MongoDBReader and CosmosDBWriter. You may use wildcards ($ for MongoDB, % for Cosmos DB) to replicate all collections in a database, as in the example below, or specify multiple collections manually, as described in the notes for Cosmos DB Writer's Collections property.

You must create the target collections in Cosmos DB manually. The partition key names must match one of the fields in the MongoDB documents.

Data will be read only from collections that exist when the source starts. Additional collections added later will be ignored until the source is restarted. When the target collection is in a fixed container (see Partition and scale in Azure Cosmos DB), inserts, updates, and deletes are handled automatically. When the target collection is in an unlimited container, updates require special handling and deletes must be done manually, as discussed below.

If you wish to run the examples, adjust the MongoDB Reader properties and Cosmos DB Writer properties to reflect your own environment.

When the target collection is in a fixed container

Note

Writing to a target collection in a fixed container will not be possible until Microsoft fixes the bug discussed in this Azure forum discussion.

  1. In Cosmos DB, create database mydb containing the collection employee with partition key /name  (note that the collection and partition names are case-sensitive).

  2. In MongoDB, create the collection employee and populate it as follows:

    use mydb;
    db.employee.insertMany([
    {_id:1,"name":"employee1","company":"Striim","city":"Madras"},
    {_id:2,"name":"employee2","company":"Striim","city":"Seattle"},
    {_id:3,"name":"employee3","company":"Striim","city":"California"}
    ]);
  3. In Striim, run the following application to perform the initial load of the existing data:

    CREATE APPLICATION Mongo2CosmosInitialLoad; 
     
    CREATE MongoDBIn USING MongoDBReader (
      Username:'striim',
      Password:'******',
      ConnectionURL:'<MongoDB connection string>',
      Collections:'mydb.$'
     )
    OUTPUT TO MongoDBStream;
     
    CREATE TARGET WriteToCosmos USING CosmosDBWriter (
      ServiceEndpoint: '<Cosmos DB connection string>',
      AccessKey: '<Cosmos DB account read-write key>',
      Collections: 'mydb.$,mydb.%'
    )
    INPUT FROM MongoDBStream;
     
    END APPLICATION Mongo2CosmosInitialLoad;

    After the application is finished, the Cosmos DB employee collection should contain the following:

    {
        "_id": 1,
        "name": "employee1",
        "company": "striim",
        "city": "madras",
        "id": "1.0",
        "_rid": "HnpSALVXpu4BAAAAAAAACA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAACA==/",
        "_etag": "\"0800b33d-0000-0000-0000-5bb5aafa0000\"",
        "_attachments": "attachments/",
        "_ts": 1538632442
    }
    {
        "_id": 2,
        "name": "employee2",
        "company": "striim",
        "city": "seattle",
        "id": "2.0",
        "_rid": "HnpSALVXpu4BAAAAAAAADA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAADA==/",
        "_etag": "\"2b00f87b-0000-0000-0000-5bb5aafb0000\"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    {
        "_id": 3,
        "name": "employee3",
        "company": "striim",
        "city": "california",
        "id": "3.0",
        "_rid": "HnpSALVXpu4BAAAAAAAAAA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAA==/",
        "_etag": "\"2700ad2a-0000-0000-0000-5bb5aafb0000\"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    
  4. In Striim, run the following application to continuously replicate new data from MongoDB to Cosmos DB:

    CREATE APPLICATION Mongo2CosmosIncrementalFixedContainer; 
     
    CREATE MongoDBIn USING MongoDBReader (
      Username:'striim',
      Password:'******',
      ConnectionURL:'<MongoDB connection string>',
      authType: 'NoAuth',
      Mode:'Incremental',
      FullDocumentUpdateLookup:'true', 
      startTimestamp: '<timestamp>'
      Collections:'mydb.$'
     )
    OUTPUT TO MongoDBStream;
    
    CREATE TARGET WriteToCosmos USING CosmosDBWriter (
      ServiceEndpoint:'<Cosmos DB connection string>',
      AccessKey:'<Cosmos DB account read-write key>',
      Collections:'mydb.$,mydb.%'
    )
    INPUT FROM MongoDBStream ;
    
    CREATE CQ SelectDeleteOperations
    INSERT INTO DeleteOpsStream
    SELECT META(MongoDBStream,"DatabaseName"),
      META(MongoDBStream,"CollectionName"),
      META(MongoDBStream,"DocumentKey")
    FROM MongoDBStream
    WHERE META(MongoDBStream,"OperationName").toString() = "DELETE";
    
    CREATE TARGET WriteIgnoredDeleteOps USING FileWriter (
      filename:'DeleteOperations.json'
    )
    FORMAT USING JSONFormatter()
    INPUT FROM DeleteOpsStream;
     
    END APPLICATION Mongo2CosmosIncrementalFixedContainer;
  5. In MongoDB, modify the employees collection as follows to add employee4:

    use mydb;
    db.employee.save({_id:4,"name":"employee4","company":"Striim","city":"Palo Alto"});
    db.employee.save({_id:1,"name":"employee1","company":"Striim","city":"Seattle"});
    db.employee.update({_id:2},{$set : {"city":"Palo Alto"}});
    db.employee.remove({_id:3});

    Within 30 seconds, those changes should be replicated to the corresponding Cosmos DB collection with results similar to the following:

    {
        "_id": 1,
        "name": "employee1",
        "company": "striim",
        "city": “Seattle”,
        "id": "1.0",
        "_rid": "HnpSALVXpu4BAAAAAAAACA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAACA==/",
        "_etag": ""0800b33d-0000-0000-0000-5bb5aafa0000"",
        "_attachments": "attachments/",
        "_ts": 1538632442
    }
    {
        "_id": 2,
        "name": "employee2",
        "company": "striim",
        "city": “Palo Alto”,
        "id": "2.0",
        "_rid": "HnpSALVXpu4BAAAAAAAADA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAADA==/",
        "_etag": ""2b00f87b-0000-0000-0000-5bb5aafb0000"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    {
        "_id": 3,
        "name": "employee3",
        "company": "striim",
        "city": "california",
        "id": "3.0",
        "_rid": "HnpSALVXpu4BAAAAAAAAAA==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAA==/",
        "_etag": ""2700ad2a-0000-0000-0000-5bb5aafb0000"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }
    {
        "_id": 4,
        "name": "employee4”,
        "company": "striim",
        "city": “Palo Alto”,
        "id": “4.0”,
        "_rid": "HnpSALVXpu4BAAAAAAAAAE==",
        "_self": "dbs/HnpSAA==/colls/HnpSALVXpu4=/docs/HnpSALVXpu4BAAAAAAAAAE==/",
        "_etag": ""2700ad2a-0000-0000-0000-5bb5aafb0000"",
        "_attachments": "attachments/",
        "_ts": 1538632443
    }

When the target collection is in an unlimited container

When a Cosmos DB collection is in an unlimited container, it must have a partition key, which must be specified when you create the collection.

  • When MongoDB save operations create new documents, all fields are included in MongoDBReader's output, so CosmosDBWriter can write to the correct partition.

  • When MongoDB save operations update existing documents, all fields are included in MongoDBReader's output, so CosmosDBWriter can use the partition key and document ID to update the correct target document.

  • MongoDB update operations do not include all fields, so the partition key may be missing from MongoDBReader's output. In those cases, the PartialRecordPolicy open processor retrieves the missing fields from MongoDB and adds them before passing the data to CosmosDBWriter.

  • MongoDB remove operations include only the document ID, so the partition key is missing from MongoDBReader's output. Since CosmosDBWriter would be unable to determine the correct partition, the application writes the database name, collection name, and document key to a DeleteOps collection in CosmosDB.

incrementalMongo2Cosmos.png
CREATE APPLICATION Mongo2CosmosIncrementalUnlimitedContainer; 
 
CREATE SOURCE MongoDBIn USING MongoDBReader (
  Username:'striim',
  Password:'******',
  ConnectionURL:'<MongoDB connection string>',
  authType: 'NoAuth',
  Mode:'Incremental',
  startTimestamp: '<timestamp>',
  Collections:'mydb.$'
 )
OUTPUT TO MongoDBStream;

CREATE STREAM FilteredMongoDBStream OF Global.JsonNodeEvent;

CREATE CQ ExcludeDeleteOperations
INSERT INTO FilteredMongoDBStream
SELECT META(MongoDBStream,"DatabaseName"),
  META(MongoDBStream,"CollectionName"),
  META(MongoDBStream,"DocumentKey")
FROM MongoDBStream
WHERE META(MongoDBStream,"OperationName").toString() != "DELETE";

CREATE STREAM FullDocstream OF Global.JsonNodeEvent;

CREATE OPEN PROCESSOR CompletePartialDocs USING MongoPartialRecordPolicy ( 
  ConnectionURL:'<MongoDB connection string>', 
  authType:'NoAuth',
  OnMissingDocument: 'Process'
)
INSERT INTO FullDocstream
FROM FilteredMongoDBStream;

CREATE TARGET WriteToCosmos USING CosmosDBWriter (
  ServiceEndpoint:'<Cosmos DB connection string>',
  AccessKey:'<Cosmos DB account read-write key>',
  Collections:'mydb.$,mydb.%',
  IgnorableExceptionCode:'PARTITION_KEY_NOT_FOUND'
)
INPUT FROM FullDocstream;

CREATE CQ SelectDeleteOperations
INSERT INTO DeleteOpsStream
SELECT TO_STRING(META(MongoDBStream,"DatabaseName")) AS DatabaseName,
  TO_STRING(META(MongoDBStream,"CollectionName")) AS CollectionName,
  TO_STRING(META(MongoDBStream,"DocumentKey")) AS DocumentKey
FROM MongoDBStream
WHERE META(MongoDBStream,"OperationName").toString() = "DELETE";

CREATE TARGET WriteDeleteOpsToCosmos USING CosmosDBWriter (
  ServiceEndpoint:'<Cosmos DB connection string>',
  AccessKey:'<Cosmos DB account read-write key>',
  Collections:'mydb.DeleteOps'
)
INPUT FROM DeleteOpsStream;
 
END APPLICATION Mongo2CosmosIncrementalUnlimitedContainer;