Skip to main content

Cosmos DB readers programmer's guide

Cosmos DB Reader properties

The Azure Cosmos Java driver used by this reader is bundled with Striim.

property

type

default value

notes

Access Key

encrypted password

The Primary Key or Secondary Key from the Keys read-only tab of your Cosmos DB account.

Connection Retry Policy

String

retryInterval=60, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Connectivity Mode

enum

Direct

Selects service port range. See Learn / Azure / Azure Cosmos DB / NoSQL / Azure Cosmos DB SQL SDK connectivity modes / Service port ranges for more information.

Containers

String

The fully-qualified name(s) of the container(s) to read from, for example, mydb.mycollection. Separate multiple collections by commas. Container names are case-sensitive. Do not modify this property when recovery is enabled for the application.

You may use the % wildcard, for example, mydb.% The wildcard is allowed only at the end of the string: for example, mydb.prefix% is valid, but mydb.%suffix is not.

Note that data will be read only from containers that exist when the Striim application starts, additional containers added later will be ignored until the application is restarted.

Cosmos DB Config

String

Visible in Flow Designer only when Mode is Incremental.

Optionally, specify a JSON-format string with additional Cosmos DB options. For an example, see "Capturing deletes" in Cosmos DB setup for Cosmos DB Reader.

Exclude Containers

String

If Containers uses a wildcard, data from any containers specified here will be omitted. Multiple container names (separated by semicolons) and wildcards may be used exactly as for Containers.

Fetch Size

Integer

1000

The number of documents the adapter will fetch in a single read operation.

Mode

String

InitialLoad

With the default InitialLoad setting, will load all existing data using Microsoft Azure Cosmos SDK for Azure Cosmos DB SQL API. After initial load is complete, you must stop the application manually. In this mode, Cosmos DB Reader is not a CDC reader.

Set to Incremental to read CDC data continuously using the change feed API (see Change feed in Azure Cosmos DB). In this mode, insert, update, and replace operations are all sent to the target as inserts, since the change feed does not include the operation type. In this mode, recovery will restart from the timestamp of the last operation for each container.

Overload Retry Policy

String

maxRetryTimeInSecs=30, maxRetries=9

This policy determines how the reader handles RequestRateTooLargeException errors received from Cosmos DB. maxRetryTimeInSecs sets the total maximum time allowed for all retries, after which the application will halt. For more information, see RetryOptions.setMaxRetryAttemptsOnThrottledRequests(int maxRetryAttemptsOnThrottledRequests) Method and ThrottlingRetryOptions.setMaxRetryWaitTime(Duration maxRetryWaitTime) Method.

Polling Interval

Integer

10

Visible in Flow Designer only when Mode is Incremental.

The time in milliseconds the adapter will wait before polling the change feed for new documents. With the default value of 10, when there are new documents, the adapter will fetch the number set by the Fetch Size property, then immediately fetch again, repeating until all new documents have been fetched. Then it will wait for 10 milliseconds before polling again.

Quiesce on IL Completion

Boolean

False

Visible in Flow Designer only when Mode is Initial Load.

Service Endpoint

String

The URI from the Overview page of your Cosmos DB account.

Start Timestamp

Visible in Flow Designer only when Mode is Incremental.

Optionally, in incremental mode, specify a _ts field value from which to start reading. Supported formats (see dateTimeParser for more information):

YYYY
YYYY-MM
YYYY-MM-DD
YYYY-MM-DD"T"hhTZD
YYYY-MM-DD"T"hh:mmTZD
YYYY-MM-DD"T"hh:mm:ssTZD
YYYY-MM-DD"T"hh:mm:ss.sssTZD

ThreadPool Size

Integer

10

Visible in Flow Designer only when Mode is Incremental.

The number of threads Striim will use for reading containers. If this number is lower than the number of containers being read, threads will be read in round-robin fashion. If this number equals the number of containers, each thread will read from one container. If this number exceeds the number of containers, only this number of threads will be active.

Mongo Cosmos DB Reader properties

The Mongo Java driver used by this reader is bundled with Striim.

For best performance and lower Azure ingress and egress charges, Mongo CosmosDB Reader should be run in Striim in Azure.

property

type

default value

notes

Collections

String

The fully-qualified name(s) of the collection(s) to read from, for example, mydb.mycollection. Separate multiple collections by commas. Do not modify this property when recovery is enabled for the application.

You may use the $ wildcard, for example, mydb.$ The wildcard is allowed only at the end of the string: for example, mydb.prefix$ is valid, but mydb.$suffix is not.

Note that data will be read only from collections that exist when the Striim application starts, additional collections added later will be ignored until the application is restarted.

Connection Retry Policy

String

retryInterval=60, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Connection URL

String

Enter the Host and Port from the Connection String read-only tab of your Azure Cosmos DB API for MongoDB account, separated by a colon, for example, mydb.mongo.cosmos.azure.com:10255.

Cosmos DB Config

String

Visible in Flow Designer only when Mode is Incremental. When Mode is Initial Load, this setting is ignored.

Optionally, specify a JSON-format string with additional Cosmos DB options. For an example, see "Capturing deletes" in Cosmos DB setup for Mongo Cosmos DB Reader.

Exclude Collections

String

If Collections uses a wildcard, data from any collections specified here will be omitted. Multiple collection names (separated by commas) and wildcards may be used exactly as for Collections.

Fetch Size

Integer

1000

The number of documents the adapter will fetch in a single read operation.

Mode

String

InitialLoad

With the default setting, will load all existing data using mongo-driver-sync and stop. In this mode, Mongo Cosmos DB Reader is not a CDC reader.

Set to Incremental to read CDC data continuously using the change streams API (see Change streams in Azure Cosmos DB’s API for MongoDB). In this mode, insert, update, and replace operations are all sent to the target as inserts, since the change stream does not include the operation type. See Mongo Cosmos DB Reader limitations for discussion of recovery in this mode.

MongoDB Config

String

Optionally specify a JSON string to define a subset of documents to be selected in InitialLoad mode and for inserts in Incremental mode. See Selecting documents using MongoDB Config.

When Mode is Incremental, insert operations are sent only for the defined subset of documents, but updates and deletes are sent for all documents. If Cosmos DB Writer, Mongo Cosmos DB Writer, or MongoDB Writer receive an update or delete for a document not in the subset, the application will halt. To avoid this, set Ignorable Exception Code to RESOURCE_NOT_FOUND for Cosmos DB Writer or KEY_NOT_FOUND for Mongo Cosmos DB Writer or MongoDB Writer. Note that in this case you will have to check the exception store to see if there were any ignored exceptions for documents in the subset.

Overload Retry Policy

String

retryInterval=30, maxRetries=10

With the default setting, if reading is interrupted because the number of request units (RUs) per second exceeded the provisioned limit, the adapter will try again in 30 seconds (retryInterval). If this attempt is unsuccessful, every 30 seconds it will try again. If the tenth attempt (maxRetries) is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Password

encrypted password

The Primary Password or Secondary Password from the Connection String read-only tab of your Azure Cosmos DB API for MongoDB account.

Quiesce on IL Completion

Boolean

False

Visible in Flow Designer only when Mode is Initial Load.

ThreadPool Size

Integer

10

Visible in Flow Designer only when Mode is Incremental. When Mode is Initial Load, this setting is ignored.

The number of threads Striim will use for reading collections. If this number is lower than the number of collections being read, threads will read in round-robin fashion. If this number equals the number of collections, each thread will read from one collection. If this number exceeds the number of collections, only this number of threads will be active.

Username

String

The Username from the Connection String read-only tab of your Azure Cosmos DB API for MongoDB account.

Cosmos DB Reader JSONNodeEvent fields

The output type for Cosmos DB Reader is JSONNodeEvent. The fields are:

data: contains the field names and values of a document, for example:

data:{
  "id":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG",
  "brand":"Jerry's",
  "type":"plums",
  "quantity":"50"
}

metadata: contains the following elements:

  • CollectionName: the collection from which the document was read

  • DatabaseName: the database of the collection

  • DocumentKey: the value of the id field of the document

  • FullDocumentReceived: value is True if data includes the entire image of the document, False if it does not

  • Namespace<database name>.<collection name>

  • OperationName: in InitialLoad mode, SELECT; in Incremental mode, INSERT or DELETE (for "soft deletes")

  • Partitioned: value is True if the operation was on a sharded collection

  • PartitionKeys: a JsonNode object containing the shard keys and their values

  • Timestamp: the Unix epoch time at which the operation was performed in the source

  • _attachments: the addressable path for the document's attachments

  • _etag: the document's resource etag

  • _lsn: the document's logical sequence number

  • _rid: the document's resource ID

  • _self: the document's addressable URI

For example, in InitialLoad mode:

metadata: {
  "Partitioned":true
  "_rid":"JdBVALN36DwBAAAAAAAAAA=="
  "EntityName":"src.emp"
  "CollectionName":"emp"
  "OperationName":"SELECT"
  "DatabaseName":"src"
  "NameSpace":"src.emp"
  "TimeStamp":1747306064000
  "_attachments":"attachments/"
  "DocumentKey":"1"
  "FullDocumentReceived":true
  "PartitionKeys":{"id":"1"}
  "_self":"dbs/JdBVAA==/colls/JdBVALN36Dw=/docs/JdBVALN36DwBAAAAAAAAAA==/"
  "_etag":"\"5a00d804-0000-2200-0000-6825c6500000\""
}

In Incremental mode:

metadata: {
  "Partitioned":true,
  "_rid":"JdBVALN36DwDAAAAAAAAAA==",
  "EntityName":"src.emp",
  "_lsn":4,
  "CollectionName":"emp",
  "OperationName":"INSERT",
  "DatabaseName":"src",
  "NameSpace":"src.emp",
  "TimeStamp":1747381403000,
  "_attachments":"attachments/",
  "DocumentKey":"3",
  "FullDocumentReceived":true,
  "PartitionKeys":{"id":"3"},
  "_self":"dbs/JdBVAA==/colls/JdBVALN36Dw=/docs/JdBVALN36DwDAAAAAAAAAA==/",
  "_etag":"\"05002125-0000-2200-0000-6826ec9b0000\""
}

Mongo Cosmos DB Reader JSONNodeEVent fields

The output type for Mongo Cosmos DB Reader is JSONNodeEvent. The fields are:

data: contains the field names and values of a document, for example:

data:{
  "_id":{"$oid":"620365482b7622580d9e6e43"},
  "state":4,
  "name":"Willard",
  "last_name":"Valek",
  "email":"wvalek3@vk.com",
  "gender":"Male",
  "ip_address":"67.76.188.26",
  "ttl":-1
}

metadata: contains the following elements:

  • CollectionName: the collection from which the document was read

  • DatabaseName: the database of the collection

  • DocumentKey: for an unsharded collection, the _id field and its value; for a sharded collection, also the shard key field and its value

  • FullDocumentReceived: value is True if data includes the entire image of the document, False if it does not

  • Namespace<database name>.<collection name>

  • OperationName: in InitialLoad mode, SELECT; in Incremental mode, INSERT or DELETE (for "soft deletes")

  • Partitioned: value is True if the operation was on a sharded collection

  • PartitionKeys: a JsonNode object containing the shard keys and their values

  • Timestamp: In InitialLoad mode, the current time of the Striim server when the document was read. In Incremental mode, 0 (zero), because the change streams API (see Change streams in Azure Cosmos DB’s API for MongoDB does not provide a timestamp.

For example:

metadata:{
  "CollectionName":"collection1",
  "OperationName":"SELECT",
  "DatabaseName":"testDB",
  "NameSpace":"testDB.collection1",
  "id":{"$oid":"620365482b7622580d9e6e43"},
  "TimeStamp":1644429967516
}

Cosmos DB Reader example application

The following application will read CDC data from Cosmos DB and write it to MongoDB.

CREATE APPLICATION CosmosToMongo recovery 5 second interval;

CREATE SOURCE CosmosSrc USING CosmosDBReader ( 
CosmosDBConfig: '{\"Operations\": {\"SoftDelete\": {\"FieldName\" : 
  \"IsDeleted\",\"FieldValue\" : \"true\"}}}',
  Mode: 'Incremental', 
  AccessKey: '*******', 
  Containers: 'src.emp', 
  ServiceEndpoint: 'https://******.documents.azure.com:443/'
}
OUTPUT TO cout;

CREATE TARGET MongoTarget USING MongoDBWriter ( 
  collections: 'src.emp,targdb.emp', 
  ConnectionURL: ******:27018', 
  Password: '******', 
  Username: 'myuser', 
  AuthDB: 'targdb', 
  upsertMode: 'true'
) 
INPUT FROM cout;

END APPLICATION CosmosToMongo;

Mongo Cosmos DB Reader example application

The following application will read CDC data from Cosmos DB and write it to MongoDB.

CREATE APPLICATION MongoCosmosToMongo;

CREATE SOURCE MongoCosmosSrc USING MongoCosmosDBReader ( 
  CosmosDBConfig: '{\"Operations\": {\"SoftDelete\": {\"FieldName\" :
    \"IsDeleted\",\"FieldValue\" : \"true\"}}}', 
  Mode: 'Incremental', 
  Username: 'az-cosmos-mongodb', 
  ConnectionURL: 'az-cosmos-mongodb.mongo.cosmos.azure.com:10255', 
  Collections: 'testDB.collection$', 
  Password: '********' 
) 
OUTPUT TO cout;

CREATE TARGET MongoTarget USING MongoDBWriter ( 
  collections: 'src.emp,targdb.emp', 
  ConnectionURL: ******:27018', 
  Password: '******', 
  Username: 'myuser', 
  AuthDB: 'targdb', 
  upsertMode: 'true' 
) 
INPUT FROM cout;

END APPLICATION MongoCosmosToMongo;

Cosmos DB Reader example output

Initial load

When Mode is InitialLoad, the Operation Name is reported as SELECT, even though it is actually an insert.

JsonNodeEvent{
   data:{
      "id":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG",
      "brand":"Jerry's",
      "type":"plums",
      "quantity":"50"
   } metadata:{
      "CollectionName":"container2",
      "OperationName":"SELECT",
      "DatabaseName":"testDB",
      "DocumentKey":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG",
      "NameSpace":"testDB.container2",
      "TimeStamp":1639999991
   } userdata:null
   } removedfields:null
};

Incremental - insert

JsonNodeEvent{
   data:{
      "id":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO",
      "brand":"Kraft Heinz",
      "type":"kool-aid",
      "quantity":"50"
   } metadata:{
      "CollectionName":"container2",
      "OperationName":"INSERT",
      "DatabaseName":"testDB",
      "DocumentKey":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO",
      "NameSpace":"testDB.container2",
      "TimeStamp":1643876905
   } userdata:null
   } removedfields:null
};

Incremental - delete

Note that this includes the IsDeleted field discussed in Cosmos DB setup for Cosmos DB Reader.

JsonNodeEvent{
   data:{
      "id":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO",
      "brand":"Kraft Heinz",
      "type":"kool-aid",
      "quantity":"50",
      "IsDeleted":"true"
   } metadata:{
      "CollectionName":"container2",
      "OperationName":"DELETE",
      "DatabaseName":"testDB",
      "DocumentKey":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO",
      "NameSpace":"testDB.container2",
      "TimeStamp":1643877020
   } userdata:null
   } removedfields:null
};

Mongo Cosmos DB Reader example output

Initial load

When Mode is InitialLoad, the Operation Name is reported as SELECT, even though it is actually an insert.

JsonNodeEvent{
   data:{
      "_id":{
         "$oid":"620365482b7622580d9e6e43"
      },
      "state":4,
      "name":"Willard",
      "last_name":"Valek",
      "email":"wvalek3@vk.com",
      "gender":"Male",
      "ip_address":"67.76.188.26",
      "ttl":-1
   } metadata:{
      "CollectionName":"collection1",
      "OperationName":"SELECT",
      "DatabaseName":"testDB",
      "NameSpace":"testDB.collection1",
      "id":{
         "$oid":"620365482b7622580d9e6e43"
      },
      "TimeStamp":1644429967516
   } userdata:null
   } removedfields:null
};

Incremental - insert

JsonNodeEvent {
   data:{
      "id":"updated",
      "type":2,
      "name":"Alex"
   } metadata:{
      "CollectionName":"container1",
      "OperationName":"INSERT",
      "DatabaseName":"testDB",
      "DocumentKey":{
         "id":"updated",
         "type":2
      },
      "NameSpace":"testDB.container1",
      "TimeStamp":1646819481
   } userdata:null
   } removedfields:null
};

Incremental - delete

Note that this includes the IsDeleted field discussed in Cosmos DB setup for Mongo Cosmos DB Reader.

JsonNodeEvent {
   data:{
      "id":"updated",
      "type":2,
      "name":"Alex",
      "IsDeleted":"Yes"
   } metadata:{
      "CollectionName":"container1",
      "OperationName":"DELETE",
      "DatabaseName":"testDB",
      "DocumentKey":{
         "id":"updated",
         "type":2
      },
      "NameSpace":"testDB.container1",
      "TimeStamp":1646819488
   } userdata:null
   } removedfields:null
};