Cosmos DB readers programmer's guide
Cosmos DB Reader properties
The Azure Cosmos Java driver used by this reader is bundled with Striim.
property | type | default value | notes |
|---|---|---|---|
Access Key | encrypted password | The Primary Key or Secondary Key from the Keys read-only tab of your Cosmos DB account. | |
Connection Retry Policy | String | retryInterval=60, maxRetries=3 | With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds ( |
Connectivity Mode | enum | Direct | Selects service port range. See Learn / Azure / Azure Cosmos DB / NoSQL / Azure Cosmos DB SQL SDK connectivity modes / Service port ranges for more information. |
Containers | String | The fully-qualified name(s) of the container(s) to read from, for example, mydb.mycollection. Separate multiple collections by commas. Container names are case-sensitive. Do not modify this property when recovery is enabled for the application. You may use the % wildcard, for example, Note that data will be read only from containers that exist when the Striim application starts, additional containers added later will be ignored until the application is restarted. | |
Cosmos DB Config | String | Visible in Flow Designer only when Mode is Incremental. Optionally, specify a JSON-format string with additional Cosmos DB options. For an example, see "Capturing deletes" in Cosmos DB setup for Cosmos DB Reader. | |
Exclude Containers | String | If | |
Fetch Size | Integer | 1000 | The number of documents the adapter will fetch in a single read operation. |
Mode | String | InitialLoad | With the default Set to |
Overload Retry Policy | String | maxRetryTimeInSecs=30, maxRetries=9 | This policy determines how the reader handles RequestRateTooLargeException errors received from Cosmos DB. |
Polling Interval | Integer | 10 | Visible in Flow Designer only when Mode is Incremental. The time in milliseconds the adapter will wait before polling the change feed for new documents. With the default value of |
Quiesce on IL Completion | Boolean | False | Visible in Flow Designer only when Mode is Initial Load. |
Service Endpoint | String | The URI from the Overview page of your Cosmos DB account. | |
Start Timestamp | Visible in Flow Designer only when Mode is Incremental. Optionally, in incremental mode, specify a YYYY YYYY-MM YYYY-MM-DD YYYY-MM-DD"T"hhTZD YYYY-MM-DD"T"hh:mmTZD YYYY-MM-DD"T"hh:mm:ssTZD YYYY-MM-DD"T"hh:mm:ss.sssTZD | ||
ThreadPool Size | Integer | 10 | Visible in Flow Designer only when Mode is Incremental. The number of threads Striim will use for reading containers. If this number is lower than the number of containers being read, threads will be read in round-robin fashion. If this number equals the number of containers, each thread will read from one container. If this number exceeds the number of containers, only this number of threads will be active. |
Mongo Cosmos DB Reader properties
The Mongo Java driver used by this reader is bundled with Striim.
For best performance and lower Azure ingress and egress charges, Mongo CosmosDB Reader should be run in Striim in Azure.
property | type | default value | notes |
|---|---|---|---|
Collections | String | The fully-qualified name(s) of the collection(s) to read from, for example, mydb.mycollection. Separate multiple collections by commas. Do not modify this property when recovery is enabled for the application. You may use the $ wildcard, for example, Note that data will be read only from collections that exist when the Striim application starts, additional collections added later will be ignored until the application is restarted. | |
Connection Retry Policy | String | retryInterval=60, maxRetries=3 | With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds ( |
Connection URL | String | Enter the Host and Port from the Connection String read-only tab of your Azure Cosmos DB API for MongoDB account, separated by a colon, for example, | |
Cosmos DB Config | String | Visible in Flow Designer only when Mode is Incremental. When Mode is Initial Load, this setting is ignored. Optionally, specify a JSON-format string with additional Cosmos DB options. For an example, see "Capturing deletes" in Cosmos DB setup for Mongo Cosmos DB Reader. | |
Exclude Collections | String | If | |
Fetch Size | Integer | 1000 | The number of documents the adapter will fetch in a single read operation. |
Mode | String | InitialLoad | With the default setting, will load all existing data using mongo-driver-sync and stop. In this mode, Mongo Cosmos DB Reader is not a CDC reader. Set to |
MongoDB Config | String | Optionally specify a JSON string to define a subset of documents to be selected in InitialLoad mode and for inserts in Incremental mode. See Selecting documents using MongoDB Config. When Mode is Incremental, insert operations are sent only for the defined subset of documents, but updates and deletes are sent for all documents. If Cosmos DB Writer, Mongo Cosmos DB Writer, or MongoDB Writer receive an update or delete for a document not in the subset, the application will halt. To avoid this, set Ignorable Exception Code to RESOURCE_NOT_FOUND for Cosmos DB Writer or KEY_NOT_FOUND for Mongo Cosmos DB Writer or MongoDB Writer. Note that in this case you will have to check the exception store to see if there were any ignored exceptions for documents in the subset. | |
Overload Retry Policy | String | retryInterval=30, maxRetries=10 | With the default setting, if reading is interrupted because the number of request units (RUs) per second exceeded the provisioned limit, the adapter will try again in 30 seconds ( |
Password | encrypted password | The Primary Password or Secondary Password from the Connection String read-only tab of your Azure Cosmos DB API for MongoDB account. | |
Quiesce on IL Completion | Boolean | False | Visible in Flow Designer only when Mode is Initial Load. |
ThreadPool Size | Integer | 10 | Visible in Flow Designer only when Mode is Incremental. When Mode is Initial Load, this setting is ignored. The number of threads Striim will use for reading collections. If this number is lower than the number of collections being read, threads will read in round-robin fashion. If this number equals the number of collections, each thread will read from one collection. If this number exceeds the number of collections, only this number of threads will be active. |
Username | String | The Username from the Connection String read-only tab of your Azure Cosmos DB API for MongoDB account. |
Cosmos DB Reader JSONNodeEvent fields
The output type for Cosmos DB Reader is JSONNodeEvent. The fields are:
data: contains the field names and values of a document, for example:
data:{
"id":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG",
"brand":"Jerry's",
"type":"plums",
"quantity":"50"
}metadata: contains the following elements:
CollectionName: the collection from which the document was read
DatabaseName: the database of the collection
DocumentKey: the value of the
idfield of the documentFullDocumentReceived: value is True if data includes the entire image of the document, False if it does not
Namespace:
<database name>.<collection name>OperationName: in InitialLoad mode, SELECT; in Incremental mode, INSERT or DELETE (for "soft deletes")
Partitioned: value is True if the operation was on a sharded collection
PartitionKeys: a JsonNode object containing the shard keys and their values
Timestamp: the Unix epoch time at which the operation was performed in the source
_attachments: the addressable path for the document's attachments
_etag: the document's resource etag
_lsn: the document's logical sequence number
_rid: the document's resource ID
_self: the document's addressable URI
For example, in InitialLoad mode:
metadata: {
"Partitioned":true
"_rid":"JdBVALN36DwBAAAAAAAAAA=="
"EntityName":"src.emp"
"CollectionName":"emp"
"OperationName":"SELECT"
"DatabaseName":"src"
"NameSpace":"src.emp"
"TimeStamp":1747306064000
"_attachments":"attachments/"
"DocumentKey":"1"
"FullDocumentReceived":true
"PartitionKeys":{"id":"1"}
"_self":"dbs/JdBVAA==/colls/JdBVALN36Dw=/docs/JdBVALN36DwBAAAAAAAAAA==/"
"_etag":"\"5a00d804-0000-2200-0000-6825c6500000\""
}
In Incremental mode:
metadata: {
"Partitioned":true,
"_rid":"JdBVALN36DwDAAAAAAAAAA==",
"EntityName":"src.emp",
"_lsn":4,
"CollectionName":"emp",
"OperationName":"INSERT",
"DatabaseName":"src",
"NameSpace":"src.emp",
"TimeStamp":1747381403000,
"_attachments":"attachments/",
"DocumentKey":"3",
"FullDocumentReceived":true,
"PartitionKeys":{"id":"3"},
"_self":"dbs/JdBVAA==/colls/JdBVALN36Dw=/docs/JdBVALN36DwDAAAAAAAAAA==/",
"_etag":"\"05002125-0000-2200-0000-6826ec9b0000\""
}Mongo Cosmos DB Reader JSONNodeEVent fields
The output type for Mongo Cosmos DB Reader is JSONNodeEvent. The fields are:
data: contains the field names and values of a document, for example:
data:{
"_id":{"$oid":"620365482b7622580d9e6e43"},
"state":4,
"name":"Willard",
"last_name":"Valek",
"email":"wvalek3@vk.com",
"gender":"Male",
"ip_address":"67.76.188.26",
"ttl":-1
}metadata: contains the following elements:
CollectionName: the collection from which the document was read
DatabaseName: the database of the collection
DocumentKey: for an unsharded collection, the
_idfield and its value; for a sharded collection, also the shard key field and its valueFullDocumentReceived: value is True if data includes the entire image of the document, False if it does not
Namespace:
<database name>.<collection name>OperationName: in InitialLoad mode, SELECT; in Incremental mode, INSERT or DELETE (for "soft deletes")
Partitioned: value is True if the operation was on a sharded collection
PartitionKeys: a JsonNode object containing the shard keys and their values
Timestamp: In InitialLoad mode, the current time of the Striim server when the document was read. In Incremental mode, 0 (zero), because the change streams API (see Change streams in Azure Cosmos DB’s API for MongoDB does not provide a timestamp.
For example:
metadata:{
"CollectionName":"collection1",
"OperationName":"SELECT",
"DatabaseName":"testDB",
"NameSpace":"testDB.collection1",
"id":{"$oid":"620365482b7622580d9e6e43"},
"TimeStamp":1644429967516
}
Cosmos DB Reader example application
The following application will read CDC data from Cosmos DB and write it to MongoDB.
CREATE APPLICATION CosmosToMongo recovery 5 second interval;
CREATE SOURCE CosmosSrc USING CosmosDBReader (
CosmosDBConfig: '{\"Operations\": {\"SoftDelete\": {\"FieldName\" :
\"IsDeleted\",\"FieldValue\" : \"true\"}}}',
Mode: 'Incremental',
AccessKey: '*******',
Containers: 'src.emp',
ServiceEndpoint: 'https://******.documents.azure.com:443/'
}
OUTPUT TO cout;
CREATE TARGET MongoTarget USING MongoDBWriter (
collections: 'src.emp,targdb.emp',
ConnectionURL: ******:27018',
Password: '******',
Username: 'myuser',
AuthDB: 'targdb',
upsertMode: 'true'
)
INPUT FROM cout;
END APPLICATION CosmosToMongo;Mongo Cosmos DB Reader example application
The following application will read CDC data from Cosmos DB and write it to MongoDB.
CREATE APPLICATION MongoCosmosToMongo;
CREATE SOURCE MongoCosmosSrc USING MongoCosmosDBReader (
CosmosDBConfig: '{\"Operations\": {\"SoftDelete\": {\"FieldName\" :
\"IsDeleted\",\"FieldValue\" : \"true\"}}}',
Mode: 'Incremental',
Username: 'az-cosmos-mongodb',
ConnectionURL: 'az-cosmos-mongodb.mongo.cosmos.azure.com:10255',
Collections: 'testDB.collection$',
Password: '********'
)
OUTPUT TO cout;
CREATE TARGET MongoTarget USING MongoDBWriter (
collections: 'src.emp,targdb.emp',
ConnectionURL: ******:27018',
Password: '******',
Username: 'myuser',
AuthDB: 'targdb',
upsertMode: 'true'
)
INPUT FROM cout;
END APPLICATION MongoCosmosToMongo;Cosmos DB Reader example output
Initial load
When Mode is InitialLoad, the Operation Name is reported as SELECT, even though it is actually an insert.
JsonNodeEvent{
data:{
"id":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG",
"brand":"Jerry's",
"type":"plums",
"quantity":"50"
} metadata:{
"CollectionName":"container2",
"OperationName":"SELECT",
"DatabaseName":"testDB",
"DocumentKey":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG",
"NameSpace":"testDB.container2",
"TimeStamp":1639999991
} userdata:null
} removedfields:null
};Incremental - insert
JsonNodeEvent{
data:{
"id":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO",
"brand":"Kraft Heinz",
"type":"kool-aid",
"quantity":"50"
} metadata:{
"CollectionName":"container2",
"OperationName":"INSERT",
"DatabaseName":"testDB",
"DocumentKey":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO",
"NameSpace":"testDB.container2",
"TimeStamp":1643876905
} userdata:null
} removedfields:null
};Incremental - delete
Note that this includes the IsDeleted field discussed in Cosmos DB setup for Cosmos DB Reader.
JsonNodeEvent{
data:{
"id":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO",
"brand":"Kraft Heinz",
"type":"kool-aid",
"quantity":"50",
"IsDeleted":"true"
} metadata:{
"CollectionName":"container2",
"OperationName":"DELETE",
"DatabaseName":"testDB",
"DocumentKey":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO",
"NameSpace":"testDB.container2",
"TimeStamp":1643877020
} userdata:null
} removedfields:null
};Mongo Cosmos DB Reader example output
Initial load
When Mode is InitialLoad, the Operation Name is reported as SELECT, even though it is actually an insert.
JsonNodeEvent{
data:{
"_id":{
"$oid":"620365482b7622580d9e6e43"
},
"state":4,
"name":"Willard",
"last_name":"Valek",
"email":"wvalek3@vk.com",
"gender":"Male",
"ip_address":"67.76.188.26",
"ttl":-1
} metadata:{
"CollectionName":"collection1",
"OperationName":"SELECT",
"DatabaseName":"testDB",
"NameSpace":"testDB.collection1",
"id":{
"$oid":"620365482b7622580d9e6e43"
},
"TimeStamp":1644429967516
} userdata:null
} removedfields:null
};Incremental - insert
JsonNodeEvent {
data:{
"id":"updated",
"type":2,
"name":"Alex"
} metadata:{
"CollectionName":"container1",
"OperationName":"INSERT",
"DatabaseName":"testDB",
"DocumentKey":{
"id":"updated",
"type":2
},
"NameSpace":"testDB.container1",
"TimeStamp":1646819481
} userdata:null
} removedfields:null
};Incremental - delete
Note that this includes the IsDeleted field discussed in Cosmos DB setup for Mongo Cosmos DB Reader.
JsonNodeEvent {
data:{
"id":"updated",
"type":2,
"name":"Alex",
"IsDeleted":"Yes"
} metadata:{
"CollectionName":"container1",
"OperationName":"DELETE",
"DatabaseName":"testDB",
"DocumentKey":{
"id":"updated",
"type":2
},
"NameSpace":"testDB.container1",
"TimeStamp":1646819488
} userdata:null
} removedfields:null
};