Skip to main content

Azure Cosmos DB using Core (SQL) API

Azure Cosmos DB is a web-based no-SQL database from Microsoft. For more information, see Azure Cosmos DB and Common Azure Cosmos DB use cases, and Striim for Azure Cosmos DB.

Cosmos DB Reader reads documents from one or more Cosmos DB containers using Cosmos DB's native Core (SQL) API. Its output stream type is JSONNodeEvent, so it requires targets that can read JSONNodeEvents. Alternatively, you must convert the output to a user-defined type (see Converting JSONNodeEvent output to a user-defined type for an example).

This reader sends both inserts and updates as inserts. This means that to support replicating Cosmos DB documents the writer must support upsert mode. In upsert mode, a new document (one whose id field does not match that of any existing document) is handled as an insert and an update to an existing documents (based on matching id fields) is handled as an update. For replication, this limits the choice of writers to Cosmos DB Writer and Mongo Cosmos DB Writer. Append-only targets such as files, blobs, and Kafka are also supported so long as they can handle a JSONNodeEvent input stream.

Be sure to provision sufficient Request Units (see Request Units in Azure Cosmos DB) to handle the volume of data you expect to read. If you do not, the reader be unable to keep up with the source data.

Cosmos DB setup for Cosmos DB Reader

Request Units

Provision sufficient Request Units to handle the volume of data you expect to read. For more information, see Request Units in Azure Cosmos DB.

Capturing deletes

Azure Cosmos DB's change feed does not capture deletes. To work around this limitation:

  1. Set time-to-live (TTL) to -1 on the container(s) to be read as described in Learn / Azure / Azure Cosmos DB / No SQL / Configure time to live in Azure Cosmos DB. The -1 value means Cosmos DB will not automatically delete any documents.

  2. Set Cosmos DB Reader's Cosmos DB Config property to:

    {"Operations": {"SoftDelete": {"FieldName" : "IsDeleted","FieldValue" : "true"}}}

    This will enable the IsDeleted field for soft-delete operations, as shown by the example in Cosmos DB Reader example output. To delete a document, perform an UPDATE operation that sets isDeleted to true and sets a positive value for TTL. When the IsDeleted field value is true, the OperationName value in the metadata of the output event is DELETE even though the operation is actually an UPDATE.

Cosmos DB Reader properties

The Azure Cosmos Java driver used by this reader is bundled with Striim.

property

type

default value

notes

Access Key

encrypted password

The Primary Key or Secondary Key from the Keys read-only tab of your Cosmos DB account.

Connection Retry Policy

String

retryInterval=60, maxRetries=3

With the default setting, if a connection attempt is unsuccessful, the adapter will try again in 30 seconds (retryInterval. If the second attempt is unsuccessful, in 30 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Containers

String

The fully-qualified name(s) of the container(s) to read from, for example, mydb.mycollection. Separate multiple collections by commas. Container names are case-sensitive.

You may use the % wildcard, for example, mydb.% The wildcard is allowed only at the end of the string: for example, mydb.prefix% is valid, but mydb.%suffix is not.

Note that data will be read only from containers that exist when the Striim application starts, additional containers added later will be ignored until the application is restarted.

Cosmos DB Config

String

Optionally, specify a JSON-format string with additional Cosmos DB options. For an example, see "Capturing deletes" in Cosmos DB setup for Cosmos DB Reader.

Exclude Containers

String

If Containers uses a wildcard, data from any containers specified here will be omitted. Multiple container names (separated by semicolons) and wildcards may be used exactly as for Containers.

Fetch Size

Integer

1000

The number of documents the adapter will fetch in a single read operation.

Mode

String

InitialLoad

With the default InitialLoad setting, will load all existing data using Microsoft Azure Cosmos SDK for Azure Cosmos DB SQL API. After initial load is complete, you must stop the application manually. In this mode, Cosmos DB Reader is not a CDC reader. In this mode, recovery will restart from the beginning.

Set to Incremental to read CDC data continuously using the change feed API (see Change feed in Azure Cosmos DB). In this mode, insert, update, and replace operations are all sent to the target as inserts, since the change feed does not include the operation type. In this mode, recovery will restart from the timestamp of the last operation for each container.

Overload Retry Policy

String

maxRetryTimeInSecs=30, maxRetries=9

This policy determines how the reader handles RequestRateTooLargeException errors received from Cosmos DB. maxRetryTimeInSecs sets the total maximum time allowed for all retries, after which the application will halt. For more information, see RetryOptions.setMaxRetryAttemptsOnThrottledRequests(int maxRetryAttemptsOnThrottledRequests) Method and ThrottlingRetryOptions.setMaxRetryWaitTime(Duration maxRetryWaitTime) Method.

Polling Interval

Integer

10

The time in milliseconds the adapter will wait before polling the change feed for new documents. With the default value of 10, when there are new documents, the adapter will fetch the number set by the Fetch Size property, then immediately fetch again, repeating until all new documents have been fetched. Then it will wait for 10 milliseconds before polling again.

Quiesce on IL Completion

Boolean

False

Service Endpoint

String

The URI from the Overview page of your Cosmos DB account.

Start Timestamp

Optionally, in incremental mode, specify a _ts field value from which to start reading. Supported formats (see dateTimeParser for more information):

YYYY
YYYY-MM
YYYY-MM-DD
YYYY-MM-DD"T"hhTZD
YYYY-MM-DD"T"hh:mmTZD
YYYY-MM-DD"T"hh:mm:ssTZD
YYYY-MM-DD"T"hh:mm:ss.sssTZD

ThreadPool Size

Integer

10

The number of threads Striim will use for reading containers. If this number is lower than the number of containers being read, threads will be read in round-robin fashion. If this number equals the number of containers, each thread will read from one container. If this number exceeds the number of containers, only this number of threads will be active.

Cosmos DB Reader JSONNodeEvent fields

The output type for Cosmos DB Reader is JSONNodeEvent. The fields are:

data: contains the field names and values of a document, for example:

data:{
  "id":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG",
  "brand":"Jerry's",
  "type":"plums",
  "quantity":"50"
}

metadata: contains the following elements:

  • CollectionName: the collection from which the document was read

  • DatabaseName: the database of the collection

  • DocumentKey: the value of the id field of the document

  • FullDocumentReceived: value is True if data includes the entire image of the document, False if it does not

  • Namespace<database name>.<collection name>

  • OperationName: in InitialLoad mode, SELECT; in Incremental mode, INSERT or DELETE (for "soft deletes")

  • Partitioned: value is True if the operation was on a sharded collection

  • PartitionKeys: a JsonNode object containing the shard keys and their values

  • Timestamp: the Unix epoch time at which the operation was performed in the source

For example:

metadata:{
  "CollectionName":"container2",
  "OperationName":"SELECT",
  "DatabaseName":"testDB",
  "DocumentKey":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG",
  "NameSpace":"testDB.container2",
  "ResumeToken":"",
  "TimeStamp":1639999991
}

Cosmos DB Reader example application

The following application will read CDC data from Cosmos DB and write it to MongoDB.

CREATE APPLICATION CosmosToMongo recovery 5 second interval;

CREATE SOURCE CosmosSrc USING CosmosDBReader ( 
CosmosDBConfig: '{\"Operations\": {\"SoftDelete\": {\"FieldName\" : \"IsDeleted\",\"FieldValue\" : \"true\"}}}',
  Mode: 'Incremental', 
  AccessKey: '*******', 
  Containers: 'src.emp', 
  ServiceEndpoint: 'https://******.documents.azure.com:443/'
}
OUTPUT TO cout;

CREATE TARGET MongoTarget USING MongoDBWriter ( 
  collections: 'src.emp,targdb.emp', 
  ConnectionURL: ******:27018', 
  Password: '******', 
  Username: 'myuser', 
  AuthDB: 'targdb', 
  upsertMode: 'true'
) 
INPUT FROM cout;

END APPLICATION CosmosToMongo;

Cosmos DB Reader example output

Initial load

When Mode is InitialLoad, the Operation Name is reported as SELECT, even though it is actually an insert.

JsonNodeEvent{
   data:{
      "id":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG",
      "brand":"Jerry's",
      "type":"plums",
      "quantity":"50"
   } metadata:{
      "CollectionName":"container2",
      "OperationName":"SELECT",
      "DatabaseName":"testDB",
      "DocumentKey":"1d40842b-f28d-4b29-b5bf-7168712c9807eanOlpyItG",
      "NameSpace":"testDB.container2",
      "TimeStamp":1639999991
   } userdata:null
   } removedfields:null
};

Incremental - insert

JsonNodeEvent{
   data:{
      "id":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO",
      "brand":"Kraft Heinz",
      "type":"kool-aid",
      "quantity":"50"
   } metadata:{
      "CollectionName":"container2",
      "OperationName":"INSERT",
      "DatabaseName":"testDB",
      "DocumentKey":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO",
      "NameSpace":"testDB.container2",
      "TimeStamp":1643876905
   } userdata:null
   } removedfields:null
};

Incremental - delete

Note that this includes the IsDeleted field discussed in Cosmos DB setup for Cosmos DB Reader.

JsonNodeEvent{
   data:{
      "id":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO",
      "brand":"Kraft Heinz",
      "type":"kool-aid",
      "quantity":"50",
      "IsDeleted":"true"
   } metadata:{
      "CollectionName":"container2",
      "OperationName":"DELETE",
      "DatabaseName":"testDB",
      "DocumentKey":"c6de96ef-d7f0-44a9-a5ab-e3d0298652afNvEDtXTPqO",
      "NameSpace":"testDB.container2",
      "TimeStamp":1643877020
   } userdata:null
   } removedfields:null
};

Cosmos DB Reader limitations

  • The change feed captures field-level updates as document replace operations, so the entire document will be read.

  • The change feed does not capture deletes. Use the "soft delete" approach to add an IsDeleted field with value True to target documents that have been deleted in the source (see Cosmos DB setup for Cosmos DB Reader and Cosmos DB Reader example output).

  • If there are multiple replace operations on a document during the polling interval (see Cosmos DB Reader properties), only the last will be read.

  • When a document's id field is changed, the change feed treats it as an insert rather than a replace, so the previous version of the document with the old id field will not be overwritten, and it will remain in the target.

  • Document id fields must be unique across all partitions. Otherwise you may encounter errors or data corruption.

  • Multi-region writes are not supported.

  • The order of operations is guaranteed to be preserved only for events with the same partition key. The order of operations may not be preserved for events with different partition keys.

  • Cosmos DB's change feed timestamp (_ts) resolution is in seconds. Consequently, to avoid events being missing from the target, recovery will start one second earlier than the time of the last recovery checkpoint, so there may be some duplicate events.

  • The change feed does not capture delete operations. Consequently, recovery will not capture those operations, and the deleted documents will remain in the target.

  • Cosmos DB's change feed does not capture changes to deleted documents. Consequently, if the Striim application is offline when a document is changed, and document is deleted before recovery starts, the.changes will not be written to the target during recovery.