Skip to main content

Cosmos DB Writer

Writes to Azure Cosmos DB collections using the Cosmos DB SQL API. (To write to Cosmos DB using the Cassandra API, see Cassandra Cosmos DB Writer.) It may be used in four ways:

  • With an input stream of a user-defined type, CosmosDBWriter writes events as documents to a single collection.

    In this case, the key field of the input stream is used as the document ID. If there is no key field, the document ID is generated by concatenating the values of all fields. Alternatively, you may specify a subset of fields to be concatenated using the syntax <database name>.<collection name> keycolumns(<field1 name>, <field2 name>, ...) in the Collections property. Target document field names are taken from the input stream's event type.

  • With an input stream of type JSONNodeEvent that is the output stream of a source using JSONParser, CosmosDBWriter writes events as documents to a single collection.

    When the JSON event contains an id field, its value is used as the Cosmos DB document ID. When the JSON event does not contain an id field, a unique ID (for example, 5abcD-56efgh0-ijkl43) is generated for each document. Since Cosmos DB is case-sensitive, when the JSON event includes a field named IdiD, or ID, it is imported as a separate field.

  • With an input stream of type JSONNodeEvent that is the output stream of a MongoDBReader source, CosmosDBWriter writes each MongoDB collection to a separate Cosmos DB collection.

    MongoDB collections may be replicated in Cosmos DB by using wildcards in the Collections property (see  Replicating MongoDB data to Azure CosmosDB for details). Alternatively, you may manually map MongoDB collections to Cosmos DB collections as discussed in the notes for the Collections property.

    The MongoDB document ID (included in the JSONNodeEvent metadata map) is used as the Cosmos DB document ID.

    When a source event contains a field named id, in the target it will be renamed ID to avoid conflict with the Cosmos DB target document ID.

  • With an input stream of type WAEvent that is the output stream of a SQL CDC reader or DatabaseReader source, CosmosDBWriter writes data from each source table to a separate collection. The target collections may be in different databases.

    Source table data may be replicated to Cosmos DB collections of the same names by using wildcards in the Collections property. Note that data will be read only from tables that exist when the source starts. Additional tables added later will be ignored until the source is restarted. Alternatively, you may manually map source tables to Cosmos DB collections as discussed in the notes for the Collections property. When the source is a CDC reader, updates and deletes in source tables can be replicated in the corresponding Cosmos DB target collections. See Replicating Oracle data to Azure Cosmos DB.

    Each source row's primary key value (which may be a composite) is used as the document ID for the corresponding Cosmos DB document. If the table has no primary key, the document ID is generated by concatenating the values of all fields in the row. Alternatively, you may select a subset of fields to be concatenated using the keycolumns option as discussed in the notes for the Collections property.

    Each row in a source table is written to a document in the target collection mapped to the table. Target document field names are taken from the source event's metadata map and their values from its data array (see WAEvent contents for change data).

    When a source event contains a field named id, in the target it will be renamed ID to avoid conflict with the Cosmos DB target document ID.

Caution

Cosmos DB limits the number of characters allowed in document IDs (see Per-item limits in Microsoft's documentation). When using wildcards or keycolumns, be sure that the generated document IDs will not exceed that limit.

Striim provides templates for creating applications that read from various sources and write to Cosmos DB. See Creating an application using a template for details.

Cosmos DB Writer properties

property

type

default value

notes

Access Key

encrypted password

read-write key from the Azure Cosmos DB account's Keys tab

Batch Policy

String

EventCount:10000, Interval:30

The batch policy includes eventCount and interval (see Setting output names and rollover / upload policies for syntax). Events are buffered locally on the Striim server and sent as a batch to the target every time either of the specified values is exceeded. When the app is stopped, any remaining data in the buffer is discarded. To disable batching, set to EventCount:1,Interval:0.

With the default setting, events will be sent every 30 seconds or sooner if the cache contains 10,000 events.

Collections

String

The collection(s) to write to. The collection(s) must exist when the application is started. Partition keys must match one of the fields of the input and matching is case-sensitive. Unpartitioned collections are not supported (see Migrate non-partitioned containers to partitioned containers).

When the input stream is of a user-defined type or of type JSONNodeEvent from JSONParser, specify a single collection with the syntax <database name>.<collection name>.

When the input stream is of type JSONNodeEvent from MongoDBReader, or of type WAEvent from DatabaseReader or a CDC reader, specify one or more source-target pairs using the syntax <database>.<collection / table>,<database>.<collection name>. Note that Cosmos DB collection names are case-sensitive, so must match the case of source collection / table names. You may use wildcards ($ for MongoDB, % for other sources and Cosmos DB) in place of collection and table names.

If you are not using wildcards, you may override the default document ID by specifying one or more source column names to concatenate as the document ID in the Collections property using the syntax <source database>.<source collection / table>, <target database>.<target collection> keycolumns(<column name,...);... For example, Collections: sourcedb.sourcetable1, targetdb.targetcollection1 keycolumns(UUID); sourcedb.sourcetable2, targetdb.targetcollection2 keycolumns(company,branch).  (Note that @userdata is not supported in keycolumns values.) If an update in the source changes any of the concatenated values, the ID of the corresponding Cosmos DB document will be updated.

Collections Config

String

By default, throughput is set to 5000 RUs/sec. (see Request units in Azure Cosmos DB). You may override this by specifying values manually using the syntax <database>.<collection (throughput=<RUs/sec.>). You may use wildcards and specify multiple values separated by semicolons. A setting for a specific table overrides a wildcard setting, for example, db1.col%(throughput=4000); db1.col5(throughput=2000).

Connection Pool Size

Integer

1000

maximum number of database connections allowed (should not exceed connection pool size in Cosmos DB)

Connection Retry Policy

String

RetryInterval=60, MaxRetries=3

The connection retry policy includes retryInterval and maxRetries. With the default setting, if a connection attempt is unsuccessful, the adapter  will try again in 60 seconds (retryInterval. If the second attempt is unsuccessful, in 60 seconds it will try a third time (maxRetries). If that is unsuccessful, the adapter will fail and log an exception. Negative values are not supported.

Excluded Collections

String

Optionally, specify one or more collections to exclude from the set defined by the Collections property. Wildcards are not supported.

Ignorable Exception Code

String

By default, if Cosmos DB returns an error, CosmosDBWriter terminates the application. Use this property to specify errors to ignore, separated by commas. Supported values are:

  • PARTITION_KEY_NOT_FOUND (partition key is wrong or missing)

  • RESOURCE_ALREADY_EXISTS (the target collection has a document with the same id and partition key)

  • RESOURCE_NOT_FOUND (id is wrong or missing)

Ignored exceptions will be written to the application's exception store (see CREATE EXCEPTIONSTORE).

Key Separator

String

:

Inserted between values when generating document IDs by concatenating column or field values. If the values might contain a colon, change this to something that will not occur in those values.

Parallel Threads

Integer

See Creating multiple writer instances.

Service Endpoint

String

read-write connection string from the Azure Cosmos DB account's Keys tab

Upsert Mode

Boolean

False

Set to True to process inserts and updates as upserts.

Note: Cosmos DB does not support updating Id or partition key fields via the upsert API. If one of these fields is updated in a source document and the document is not present in the target, Cosmos DB will throw a "Resource not found" exception.