Azure Cosmos DB using Cosmos DB API for MongoDB
Azure Cosmos DB is a web-based no-SQL database from Microsoft. For more information, see Azure Cosmos DB and Common Azure Cosmos DB use cases, and Striim for Azure Cosmos DB.
Striim 4.2.0 supports Cosmos DB API for Mongo DB versions 3.6, 4.0, and 4.2. Version 3.2 is also supported but in initial load mode only.
Mongo Cosmos DB Reader reads documents from one or more Cosmos DB containers using the Mongo Java driver (bundled with Striim). Its output stream type is JSONNodeEvent, so it requires targets that can read JSONNodeEvents. Alternatively, you must convert the output to a user-defined type (see Converting JSONNodeEvent output to a user-defined type for an example).
This reader sends both inserts and updates as inserts. This means that to support replicating Cosmos DB documents the writer must support upsert mode. In upsert mode, a new document (one whose _id
field does not match that of any existing document) is handled as an insert and an update to an existing documents (based on matching _id
fields) is handled as an update. For replication, this limits the choice of writers to Cosmos DB Writer and Mongo Cosmos DB Writer. Append-only targets such as files, blobs, and Kafka are also supported so long as they can handle a JSONNodeEvent input stream.
Be sure to provision sufficient Request Units (see Request Units in Azure Cosmos DB) to handle the volume of data you expect to read. If you do not, the reader be unable to keep up with the source data.
You may get better performance and reduce Cosmos DB egress charges by using Striim Cloud (see Differences between Striim Platform and Striim Cloud).
Cosmos DB setup for Mongo Cosmos DB Reader
SSL
By default, SSL is enabled in Cosmos DB API for Mongo DB. Mongo Cosmos DB Reader uses SSL to connect to Cosmos DB. Other encryption methods are not supported.
Server Side Retry
Server Side Retry is enabled by default for Cosmos DB API for Mongo DB 3.6 and later. Disabling it may result in rate-limiting errors. For more information, see Prevent rate-limiting errors for Azure Cosmos DB API for MongoDB operations.
Capturing deletes
Azure Cosmos DB API for MongoDB's change stream does not capture delete operations. To work around this limitation, do not delete documents directly. Instead, use the following process.
Enable soft deletes
Using the MongoDB shell, create a
_ts
index on the collection withexpireAfterSeconds
set to-1
. For example:mydb.mycollection.createIndex({"_ts":1}, {expireAfterSeconds: -1})
The
-1
value means Cosmos DB will not automatically delete any documents, but Mongo Cosmos DB Reader will be able to set the TTL for individual documents in order to delete them. For more information, see Expire data with Azure Cosmos DB's API for MongoDB.Set Mongo Cosmos DB Reader's Cosmos DB Config property to:
{"Operations": {"SoftDelete": {"FieldName" : "IsDeleted","FieldValue" : "true"}}}
This will enable the
IsDeleted
field for soft-delete operations, as shown by the example in Mongo Cosmos DB Reader example output. When theIsDeleted
field value istrue
, the OperationName value in the metadata of the output event is DELETE even though the operation is actually an UPDATE.
Perform a soft delete
Instead of deleting a document, set IsDeleted
to true
and ttl
to the number of seconds after which Cosmos DB will delete the source document. For a TTL of five seconds, the syntax is:
<database name>.<container name>.updateOne({_id:<document ID>}, {$set : {"IsDeleted":"true", "ttl": 5}})
The source document will be deleted in five seconds and the output will include "OperationName":"DELETE"
.
For example, to soft-delete the following document from the mydb.employee collection:
{ "_id": 1001, "name": "Kim", "lastname": "Taylor", "email": "ktaylor@example.com" }
You would use the command:
mydb.employee.updateOne({_id:1001}, {$set : {"IsDeleted":"true", "ttl": 5}})
Immediately after entering that command, the document would be:
{ "_id": 1001, "name": "Kim", "lastname": "Taylor", "email": "ktaylor@example.com", "IsDeleted":"true", "ttl":5 }
Five seconds after entering the command, the source document would be deleted. The output event would be similar to:
JsonNodeEvent { data:{ "_id":"1001", "name": "Kim", "lastname": "Taylor", "email": "ktaylor@example.com", "IsDeleted":"true", "ttl":5 } metadata:{ "CollectionName":"employee", "OperationName":"DELETE", "DatabaseName":"mydb", "DocumentKey":{"id":"1001"}, "NameSpace":"mydb.employee", "TimeStamp":1646819488, "Partitioned":false, "FullDocumentReceived":true, "PartitionKeys":{} } userdata:null } removedfields:null };
Mongo Cosmos DB Reader properties
The Mongo Java driver used by this reader is bundled with Striim.
For best performance and lower Azure ingress and egress charges, Mongo CosmosDB Reader should be run in Striim in Azure.
property | type | default value | notes |
---|---|---|---|
Collections | String | The fully-qualified name(s) of the collection(s) to read from, for example, mydb.mycollection. Separate multiple collections by commas. You may use the $ wildcard, for example, Note that data will be read only from collections that exist when the Striim application starts, additional collections added later will be ignored until the application is restarted. | |
Connection Retry Policy | String | retryInterval=60, maxRetries=3 | With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds ( |
Connection URL | String | Enter the Host and Port from the Connection String read-only tab of your Azure Cosmos DB API for MongoDB account, separated by a colon, for example, | |
Cosmos DB Config | String | Optionally, specify a JSON-format string with additional Cosmos DB options. For an example, see "Capturing deletes" in Cosmos DB setup for Mongo Cosmos DB Reader. | |
Exclude Collections | String | If | |
Fetch Size | Integer | 1000 | The number of documents the adapter will fetch in a single read operation. |
Mode | String | InitialLoad | With the default setting, will load all existing data using mongo-driver-sync and stop. In this mode, Mongo Cosmos DB Reader is not a CDC reader. In this mode, recovery will restart from the beginning. Set to |
MongoDB Config | String | Optionally specify a JSON string to define a subset of documents to be selected in InitialLoad mode and for inserts in Incremental mode. See Selecting documents using MongoDB Config. When Mode is Incremental, insert operations are sent only for the defined subset of documents, but updates and deletes are sent for all documents. If Cosmos DB Writer, Mongo Cosmos DB Writer, or MongoDB Writer receive an update or delete for a document not in the subset, the application will halt. To avoid this, set Ignorable Exception Code to RESOURCE_NOT_FOUND for Cosmos DB Writer or KEY_NOT_FOUND for Mongo Cosmos DB Writer or MongoDB Writer. Note that in this case you will have to check the exception store to see if there were any ignored exceptions for documents in the subset. | |
Overload Retry Policy | String | retryInterval=30, maxRetries=10 | With the default setting, if reading is interrupted because the number of request units (RUs) per second exceeded the provisioned limit, the adapter will try again in 30 seconds ( |
Password | encrypted password | The Primary Password or Secondary Password from the Connection String read-only tab of your Azure Cosmos DB API for MongoDB account. | |
Quiesce on IL Completion | Boolean | False | |
ThreadPool Size | Integer | 10 | The number of threads Striim will use for reading collections. If this number is lower than the number of collections being read, threads will read in round-robin fashion. If this number equals the number of collections, each thread will read from one collection. If this number exceeds the number of collections, only this number of threads will be active. |
Username | String | The Username from the Connection String read-only tab of your Azure Cosmos DB API for MongoDB account. |
Mongo Cosmos DB Reader JSONNodeEVent fields
The output type for Mongo Cosmos DB Reader is JSONNodeEvent. The fields are:
data: contains the field names and values of a document, for example:
data:{ "_id":{"$oid":"620365482b7622580d9e6e43"}, "state":4, "name":"Willard", "last_name":"Valek", "email":"wvalek3@vk.com", "gender":"Male", "ip_address":"67.76.188.26", "ttl":-1 }
metadata: contains the following elements:
CollectionName: the collection from which the document was read
DatabaseName: the database of the collection
DocumentKey: for an unsharded collection, the
_id
field and its value; for a sharded collection, also the shard key field and its valueFullDocumentReceived: value is True if data includes the entire image of the document, False if it does not
Namespace:
<database name>.<collection name>
OperationName: in InitialLoad mode, SELECT; in Incremental mode, INSERT or DELETE (for "soft deletes")
Partitioned: value is True if the operation was on a sharded collection
PartitionKeys: a JsonNode object containing the shard keys and their values
Timestamp: In InitialLoad mode, the current time of the Striim server when the document was read. In Incremental mode, 0 (zero), because the change streams API (see Change streams in Azure Cosmos DB’s API for MongoDB does not provide a timestamp.
For example:
metadata:{ "CollectionName":"collection1", "OperationName":"SELECT", "DatabaseName":"testDB", "NameSpace":"testDB.collection1", "id":{"$oid":"620365482b7622580d9e6e43"}, "TimeStamp":1644429967516 }
Mongo Cosmos DB Reader example application
The following application will read CDC data from Cosmos DB and write it to MongoDB.
CREATE APPLICATION MongoCosmosToMongo;
CREATE SOURCE MongoCosmosSrc USING MongoCosmosDBReader (
CosmosDBConfig: '{\"Operations\": {\"SoftDelete\": {\"FieldName\" : \"IsDeleted\",\"FieldValue\" : \"true\"}}}',
Mode: 'Incremental',
Username: 'az-cosmos-mongodb',
ConnectionURL: 'az-cosmos-mongodb.mongo.cosmos.azure.com:10255',
Collections: 'testDB.collection$',
Password: '********'
)
OUTPUT TO cout;
CREATE TARGET MongoTarget USING MongoDBWriter (
collections: 'src.emp,targdb.emp',
ConnectionURL: ******:27018',
Password: '******',
Username: 'myuser',
AuthDB: 'targdb',
upsertMode: 'true'
)
INPUT FROM cout;
END APPLICATION MongoCosmosToMongo;
Mongo Cosmos DB Reader example output
Initial load
When Mode is InitialLoad, the Operation Name is reported as SELECT, even though it is actually an insert.
JsonNodeEvent{ data:{ "_id":{ "$oid":"620365482b7622580d9e6e43" }, "state":4, "name":"Willard", "last_name":"Valek", "email":"wvalek3@vk.com", "gender":"Male", "ip_address":"67.76.188.26", "ttl":-1 } metadata:{ "CollectionName":"collection1", "OperationName":"SELECT", "DatabaseName":"testDB", "NameSpace":"testDB.collection1", "id":{ "$oid":"620365482b7622580d9e6e43" }, "TimeStamp":1644429967516 } userdata:null } removedfields:null };
Incremental - insert
JsonNodeEvent { data:{ "id":"updated", "type":2, "name":"Alex" } metadata:{ "CollectionName":"container1", "OperationName":"INSERT", "DatabaseName":"testDB", "DocumentKey":{ "id":"updated", "type":2 }, "NameSpace":"testDB.container1", "TimeStamp":1646819481 } userdata:null } removedfields:null };
Incremental - delete
Note that this includes the IsDeleted field discussed in Cosmos DB setup for Mongo Cosmos DB Reader.
JsonNodeEvent { data:{ "id":"updated", "type":2, "name":"Alex", "IsDeleted":"Yes" } metadata:{ "CollectionName":"container1", "OperationName":"DELETE", "DatabaseName":"testDB", "DocumentKey":{ "id":"updated", "type":2 }, "NameSpace":"testDB.container1", "TimeStamp":1646819488 } userdata:null } removedfields:null };
Mongo Cosmos DB Reader limitations
The change stream (see Change streams in Azure Cosmos DB’s API for MongoDB) does not capture timestamps for operations.
The change stream does not capture deletes. Use the "soft delete" approach to add an IsDeleted field with value True to target documents that have been deleted in the source (see Cosmos DB setup for Mongo Cosmos DB Reader and Mongo Cosmos DB Reader example output).
The change stream captures field-level updates as document replace operations, so the entire document will be read.
If multiple updates are made to a document in a short period of time, the change stream may consolidate them all into a single document update.
The order of operations is guaranteed to be preserved only for events with the same shard key in the change stream. The order of operations may not be preserved for events with different shard keys.
Multi-region writes are not supported.
Document
_id
fields must be unique across all shards. Otherwise you may encounter errors or data corruption.Recovery (see Recovering applications) from the point at which the application stopped is not possible until the change stream has two resume tokens for each collection. Prior to that point, after the application restarts, Mongo Cosmos DB Reader will start reading from the latest document, resulting in a gap in the target from the time the application stopped until it was restarted. In other words, at-least once processing (A1P) is not guaranteed until after Mongo Cosmos DB Reader has been running for a few hours or days.
To tell whether recovery would result in data loss, run the command.
SHOW <namespace>.<application name> CHECKPOINT HISTORY
. If the output includes any occurrences ofResumeToken[null]
, when the application is restarted Mongo Cosmos DB Reader will resume reading from the latest document. To avoid this, you may start from scratch with a new initial load. If you need advice or assistance in this situation, Contact Striim support.