Skip to main content

Azure Cosmos DB using Cosmos DB API for MongoDB

Azure Cosmos DB is a web-based no-SQL database from Microsoft. For more information, see Azure Cosmos DB and Common Azure Cosmos DB use cases, and Striim for Azure Cosmos DB.

Striim 4.2.0 supports Cosmos DB API for Mongo DB versions 3.6, 4.0, and 4.2. Version 3.2 is also supported but in initial load mode only.

Mongo Cosmos DB Reader reads documents from one or more Cosmos DB containers using the Mongo Java driver (bundled with Striim). Its output stream type is JSONNodeEvent, so it requires targets that can read JSONNodeEvents. Alternatively, you must convert the output to a user-defined type (see Converting JSONNodeEvent output to a user-defined type for an example).

This reader sends both inserts and updates as inserts. This means that to support replicating Cosmos DB documents the writer must support upsert mode. In upsert mode, a new document (one whose _id field does not match that of any existing document) is handled as an insert and an update to an existing documents (based on matching _id fields) is handled as an update. For replication, this limits the choice of writers to Cosmos DB Writer and Mongo Cosmos DB Writer. Append-only targets such as files, blobs, and Kafka are also supported so long as they can handle a JSONNodeEvent input stream.

Be sure to provision sufficient Request Units (see Request Units in Azure Cosmos DB) to handle the volume of data you expect to read. If you do not, the reader be unable to keep up with the source data.

You may get better performance and reduce Cosmos DB egress charges by using Striim Cloud (see Differences between Striim Platform and Striim Cloud).Differences between Striim Platform and Striim Cloud

Cosmos DB setup for Mongo Cosmos DB Reader

SSL

By default, SSL is enabled in Cosmos DB API for Mongo DB. Mongo Cosmos DB Reader uses SSL to connect to Cosmos DB. Other encryption methods are not supported.

Server Side Retry

Server Side Retry is enabled by default for Cosmos DB API for Mongo DB 3.6 and later. Disabling it may result in rate-limiting errors. For more information, see Prevent rate-limiting errors for Azure Cosmos DB API for MongoDB operations.

Capturing deletes

Azure Cosmos DB API for MongoDB's change stream does not capture delete operations. To work around this limitation, do not delete documents directly. Instead, use the following process.

Enable soft deletes
  1. Using the MongoDB shell, create a _ts index on the collection with expireAfterSeconds set to -1. For example:

    mydb.mycollection.createIndex({"_ts":1}, {expireAfterSeconds: -1})

    The -1 value means Cosmos DB will not automatically delete any documents, but Mongo Cosmos DB Reader will be able to set the TTL for individual documents in order to delete them. For more information, see Expire data with Azure Cosmos DB's API for MongoDB.

  2. Set Mongo Cosmos DB Reader's Cosmos DB Config property to:

    {"Operations": {"SoftDelete": {"FieldName" : "IsDeleted","FieldValue" : "true"}}}

    This will enable the IsDeleted field for soft-delete operations, as shown by the example in Mongo Cosmos DB Reader example output. When the IsDeleted field value is true, the OperationName value in the metadata of the output event is DELETE even though the operation is actually an UPDATE.

Perform a soft delete

Instead of deleting a document, set IsDeleted to true and ttl to the number of seconds after which Cosmos DB will delete the source document. For a TTL of five seconds, the syntax is:

<database name>.<container name>.updateOne({_id:<document ID>}, {$set : {"IsDeleted":"true", "ttl": 5}})

The source document will be deleted in five seconds and the output will include "OperationName":"DELETE".

For example, to soft-delete the following document from the mydb.employee collection:

{
  "_id": 1001,
  "name": "Kim",
  "lastname": "Taylor",
  "email": "ktaylor@example.com"
}

You would use the command:

mydb.employee.updateOne({_id:1001}, {$set : {"IsDeleted":"true", "ttl": 5}})

Immediately after entering that command, the document would be:

{
  "_id": 1001,
  "name": "Kim",
  "lastname": "Taylor",
  "email": "ktaylor@example.com",
  "IsDeleted":"true",
  "ttl":5
}

Five seconds after entering the command, the source document would be deleted. The output event would be similar to:

JsonNodeEvent {
   data:{
      "_id":"1001",
      "name": "Kim",
      "lastname": "Taylor",
      "email": "ktaylor@example.com",
      "IsDeleted":"true",
      "ttl":5
   } metadata:{
      "CollectionName":"employee",
      "OperationName":"DELETE",
      "DatabaseName":"mydb",
      "DocumentKey":{"id":"1001"},
      "NameSpace":"mydb.employee",
      "TimeStamp":1646819488,
      "Partitioned":false,
      "FullDocumentReceived":true,
      "PartitionKeys":{}
   } userdata:null
   } removedfields:null
};

Mongo Cosmos DB Reader properties

The Mongo Java driver used by this reader is bundled with Striim.

For best performance and lower Azure ingress and egress charges, Mongo CosmosDB Reader should be run in Striim in Azure.

property

type

default value

notes

Collections

String

The fully-qualified name(s) of the collection(s) to read from, for example, mydb.mycollection. Separate multiple collections by commas.

You may use the $ wildcard, for example, mydb.$ The wildcard is allowed only at the end of the string: for example, mydb.prefix$ is valid, but mydb.$suffix is not.

Note that data will be read only from collections that exist when the Striim application starts, additional collections added later will be ignored until the application is restarted.

Connection Retry Policy

String

retryInterval=60, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Connection URL

String

Enter the Host and Port from the Connection String read-only tab of your Azure Cosmos DB API for MongoDB account, separated by a colon, for example, mydb.mongo.cosmos.azure.com:10255.

Cosmos DB Config

String

Optionally, specify a JSON-format string with additional Cosmos DB options. For an example, see "Capturing deletes" in Cosmos DB setup for Mongo Cosmos DB Reader.

Exclude Collections

String

If Collections uses a wildcard, data from any collections specified here will be omitted. Multiple collection names (separated by commas) and wildcards may be used exactly as for Collections.

Fetch Size

Integer

1000

The number of documents the adapter will fetch in a single read operation.

Mode

String

InitialLoad

With the default setting, will load all existing data using mongo-driver-sync and stop. In this mode, Mongo Cosmos DB Reader is not a CDC reader. In this mode, recovery will restart from the beginning.

Set to Incremental to read CDC data continuously using the change streams API (see Change streams in Azure Cosmos DB’s API for MongoDB). In this mode, insert, update, and replace operations are all sent to the target as inserts, since the change stream does not include the operation type. See Mongo Cosmos DB Reader limitations for discussion of recovery in this mode.

MongoDB Config

String

Optionally specify a JSON string to define a subset of documents to be selected in InitialLoad mode and for inserts in Incremental mode. See Selecting documents using MongoDB Config.

When Mode is Incremental, insert operations are sent only for the defined subset of documents, but updates and deletes are sent for all documents. If Cosmos DB Writer, Mongo Cosmos DB Writer, or MongoDB Writer receive an update or delete for a document not in the subset, the application will halt. To avoid this, set Ignorable Exception Code to RESOURCE_NOT_FOUND for Cosmos DB Writer or KEY_NOT_FOUND for Mongo Cosmos DB Writer or MongoDB Writer. Note that in this case you will have to check the exception store to see if there were any ignored exceptions for documents in the subset.

Overload Retry Policy

String

retryInterval=30, maxRetries=10

With the default setting, if reading is interrupted because the number of request units (RUs) per second exceeded the provisioned limit, the adapter will try again in 30 seconds (retryInterval). If this attempt is unsuccessful, every 30 seconds it will try again. If the tenth attempt (maxRetries) is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Password

encrypted password

The Primary Password or Secondary Password from the Connection String read-only tab of your Azure Cosmos DB API for MongoDB account.

Quiesce on IL Completion

Boolean

False

ThreadPool Size

Integer

10

The number of threads Striim will use for reading collections. If this number is lower than the number of collections being read, threads will read in round-robin fashion. If this number equals the number of collections, each thread will read from one collection. If this number exceeds the number of collections, only this number of threads will be active.

Username

String

The Username from the Connection String read-only tab of your Azure Cosmos DB API for MongoDB account.

Mongo Cosmos DB Reader JSONNodeEVent fields

The output type for Mongo Cosmos DB Reader is JSONNodeEvent. The fields are:

data: contains the field names and values of a document, for example:

data:{
  "_id":{"$oid":"620365482b7622580d9e6e43"},
  "state":4,
  "name":"Willard",
  "last_name":"Valek",
  "email":"wvalek3@vk.com",
  "gender":"Male",
  "ip_address":"67.76.188.26",
  "ttl":-1
}

metadata: contains the following elements:

  • CollectionName: the collection from which the document was read

  • DatabaseName: the database of the collection

  • DocumentKey: for an unsharded collection, the _id field and its value; for a sharded collection, also the shard key field and its value

  • FullDocumentReceived: value is True if data includes the entire image of the document, False if it does not

  • Namespace<database name>.<collection name>

  • OperationName: in InitialLoad mode, SELECT; in Incremental mode, INSERT or DELETE (for "soft deletes")

  • Partitioned: value is True if the operation was on a sharded collection

  • PartitionKeys: a JsonNode object containing the shard keys and their values

  • Timestamp: In InitialLoad mode, the current time of the Striim server when the document was read. In Incremental mode, 0 (zero), because the change streams API (see Change streams in Azure Cosmos DB’s API for MongoDB does not provide a timestamp.

For example:

metadata:{
  "CollectionName":"collection1",
  "OperationName":"SELECT",
  "DatabaseName":"testDB",
  "NameSpace":"testDB.collection1",
  "id":{"$oid":"620365482b7622580d9e6e43"},
  "TimeStamp":1644429967516
}

Mongo Cosmos DB Reader example application

The following application will read CDC data from Cosmos DB and write it to MongoDB.

CREATE APPLICATION MongoCosmosToMongo;

CREATE SOURCE MongoCosmosSrc USING MongoCosmosDBReader ( 
  CosmosDBConfig: '{\"Operations\": {\"SoftDelete\": {\"FieldName\" : \"IsDeleted\",\"FieldValue\" : \"true\"}}}', 
  Mode: 'Incremental', 
  Username: 'az-cosmos-mongodb', 
  ConnectionURL: 'az-cosmos-mongodb.mongo.cosmos.azure.com:10255', 
  Collections: 'testDB.collection$', 
  Password: '********' 
) 
OUTPUT TO cout;

CREATE TARGET MongoTarget USING MongoDBWriter ( 
  collections: 'src.emp,targdb.emp', 
  ConnectionURL: ******:27018', 
  Password: '******', 
  Username: 'myuser', 
  AuthDB: 'targdb', 
  upsertMode: 'true' 
) 
INPUT FROM cout;

END APPLICATION MongoCosmosToMongo;

Mongo Cosmos DB Reader example output

Initial load

When Mode is InitialLoad, the Operation Name is reported as SELECT, even though it is actually an insert.

JsonNodeEvent{
   data:{
      "_id":{
         "$oid":"620365482b7622580d9e6e43"
      },
      "state":4,
      "name":"Willard",
      "last_name":"Valek",
      "email":"wvalek3@vk.com",
      "gender":"Male",
      "ip_address":"67.76.188.26",
      "ttl":-1
   } metadata:{
      "CollectionName":"collection1",
      "OperationName":"SELECT",
      "DatabaseName":"testDB",
      "NameSpace":"testDB.collection1",
      "id":{
         "$oid":"620365482b7622580d9e6e43"
      },
      "TimeStamp":1644429967516
   } userdata:null
   } removedfields:null
};

Incremental - insert

JsonNodeEvent {
   data:{
      "id":"updated",
      "type":2,
      "name":"Alex"
   } metadata:{
      "CollectionName":"container1",
      "OperationName":"INSERT",
      "DatabaseName":"testDB",
      "DocumentKey":{
         "id":"updated",
         "type":2
      },
      "NameSpace":"testDB.container1",
      "TimeStamp":1646819481
   } userdata:null
   } removedfields:null
};

Incremental - delete

Note that this includes the IsDeleted field discussed in Cosmos DB setup for Mongo Cosmos DB Reader.

JsonNodeEvent {
   data:{
      "id":"updated",
      "type":2,
      "name":"Alex",
      "IsDeleted":"Yes"
   } metadata:{
      "CollectionName":"container1",
      "OperationName":"DELETE",
      "DatabaseName":"testDB",
      "DocumentKey":{
         "id":"updated",
         "type":2
      },
      "NameSpace":"testDB.container1",
      "TimeStamp":1646819488
   } userdata:null
   } removedfields:null
};

Mongo Cosmos DB Reader limitations

  • The change stream (see Change streams in Azure Cosmos DB’s API for MongoDB) does not capture timestamps for operations.

  • The change stream does not capture deletes. Use the "soft delete" approach to add an IsDeleted field with value True to target documents that have been deleted in the source (see Cosmos DB setup for Mongo Cosmos DB Reader and Mongo Cosmos DB Reader example output).

  • The change stream captures field-level updates as document replace operations, so the entire document will be read.

  • If multiple updates are made to a document in a short period of time, the change stream may consolidate them all into a single document update.

  • The order of operations is guaranteed to be preserved only for events with the same shard key in the change stream. The order of operations may not be preserved for events with different shard keys.

  • Multi-region writes are not supported.

  • Document _id fields must be unique across all shards. Otherwise you may encounter errors or data corruption.

  • Recovery (see Recovering applications) from the point at which the application stopped is not possible until the change stream has two resume tokens for each collection. Prior to that point, after the application restarts, Mongo Cosmos DB Reader will start reading from the latest document, resulting in a gap in the target from the time the application stopped until it was restarted. In other words, at-least once processing (A1P) is not guaranteed until after Mongo Cosmos DB Reader has been running for a few hours or days.Recovering applications

    To tell whether recovery would result in data loss, run the command.SHOW <namespace>.<application name> CHECKPOINT HISTORY. If the output includes any occurrences of ResumeToken[null], when the application is restarted Mongo Cosmos DB Reader will resume reading from the latest document. To avoid this, you may start from scratch with a new initial load. If you need advice or assistance in this situation, Contact Striim support.