Skip to main content

Databricks Writer

Databricks Writer writes to Delta Lake tables in Databricks on AWS or Azure. Delta Lake is an open-source tabular storage framework that includes a transaction log to support features such as ACID transactions and optimistic concurrency control typically associated with relational databases.

For more information, see:

The required JDBC driver is bundled with Striim.

Limitations

Writing to Databricks requires a staging area. The native Databricks File System (DBFS) has as a 2 GB cap on storage, which can cause file corruption. To work around that limitation, we strongly recommend using an external stage instead: Azure Data Lake Storage (ADLS) Gen2 for Azure Databricks or Amazon S3 for Databricks on AWS. To use an external stage, your Databricks instance must use Databricks Runtime 10.4 or later.

If you will use MERGE mode, we strongly recommend partitioning your target tables as this will significantly improve performance (see Partitions | Databricks on AWS or Learn / Azure / Azure Databricks / Partitions.

Data is written in batch mode. Streaming mode is not supported in this release because it is not supported by Databricks Connect (see Databricks Connect - Limitations).

Creating a Databricks target using a template

Note

In this release, Auto Schema Creation is not supported when you are using Databricks' Unity Catalog.

When you create a Databricks Writer target using a wizard template (see Creating apps using templates), you must specify three properties: Connection URL, Hostname, and Personal Access Token. The Tables property value will be set based on your selections in the wizard.Creating apps using templates

Databricks does not have schemas. When the source database uses schemas, the tables will be mapped as <source_database>.<source_schema>.<table>,<target_database>.<table>, for example, mydb.myschema.%,mydb.%. Each schema in the source will be mapped to a database in the target. If the databases do not exist in the target, Striim will create them.

Databricks authentication mechanisms

Databricks authentication requires a personal access token. Optionally you may use Azure Active Directory (Azure AD).

Create a personal access token in Databricks on AWS

Create a personal access token in Azure Databricks

  1. Create a personal access token for Striim to use to authenticate with the Databricks cluster as described in Learn / Azure / Azure Databricks / Authentication for Azure Databricks automation / Generate a personal access token.

  2. Grant the user associated with the token read and write access to DBFS (see Important information about DBFS permissions).

  3. If table access control has been enabled, also grant the user MODIFY and READ_METADATA privileges (see Data object privileges - Data governance model).

Authenticating to Databricks with Azure Active Directory (Azure AD)

In summary, configuring Azure AD requires the following steps:

  • Register the Striim app with the Azure AD identity provider (IdP).

  • Note the registered app's Client ID, Client Secret, and Tenant ID

  • Make a request to the /authorize endpoint using the Postman app or the browser.

  • Authenticate to Azure AD.

  • Consent to login at the consent dialog box to obtain the authorization code.

  • Provide the authorization code and Client Secret to the /token endpoint to obtain the access and refresh tokens.

In detail:

  1. Log in to the Azure Portal.

  2. Register a new app.

  3. Note the Application ID (referred to as Client ID in this procedure), the OAuth v2 authorization endpoint, and the OAuth v2 token endpoint.

  4. Generate a new Client secret.

    Note the Client Secret for future use.

  5. Add the AzureDatabricks API permission.

  6. (When the external stage is ADLS Gen 2) Add the Azure Storage API permission.

The following procedure uses curl and the Web browser to fetch the refresh token.

  1. Open the following URL in a Web browser.

    https://login.microsoftonline.com/<tenant-id>/oauth2/v2.0/authorize?
       client_id=<client-id>&
       response_type=code&
       redirect_uri=http%3A%2F%2Flocalhost%3A7734%2Fstriim-callback&
       response_mode=query&
       scope=2ff814a6-3304-4ab8-85cb-cd0e6f879c1d%2F.default%20offline_access

    Replace <tenant-id> with with the tenant ID of the registered app. Replace <client-id> with the client ID of the registered app. Provide valid authentication credentials if Azure Portal requests authentication.

    The web browser redirects to the specified redirect URI. The authorization code is the part of the URI after the code= string.

  2. Note the authorization code for future use.

  3. Execute the following curl command.

    curl -X POST -H 'Content-Type: application/x-www-form-urlencoded' \
       https://login.microsoftonline.com/<tenant-id>/oauth2/v2.0/token \
       -d 'client_id=<client-id>' \-d 'client_secret=<client_secret>' \
       -d 'scope=2ff814a6-3304-4ab8-85cb-cd0e6f879c1d%2F.default%20offline_access' \
       -d 'code=<authorization_code>' \
       -d 'redirect_uri=http%3A%2F%2Flocalhost%3A7734%2Fstriim-callback' \
       -d 'grant_type=authorization_code'

    Replace <tenant-id> with with the tenant ID of the registered app. Replace <client-id> with the client ID of the registered app. Replace <client_secret> with the client secret of the registered app. Replace <authorization_code> with the previously noted authorization code.

    The call returns an object that contains an access_token key and a refresh_token key.

  4. Note the value of the refresh_token key.

The following procedure uses the Postman app to generate an access token.

  1. Open the Postman app.

  2. In the Authorization tab, set the authorization type to OAuth 2.0.

  3. Configure values for the Client ID, Client secret, authorization URL and access token URL.

  4. Set the value of the Scope field to 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default offline_access.

  5. Set the value of the Callback URL field to the redirect URL determined in earlier procedures.

  6. Click Get New Access Token.

  7. Sign into Microsoft Azure and accept the app privilege requests at the consent dialog box.

    The browser sends an access token and a refresh token as a response. Note the value of the refresh token.

When the External Stage type is ADLS Gen 2 and the authentication type is Azure AD, you must grant the service principal account the Storage Blob Data Contributor privilege before generating the access and refresh tokens.

Example 2. TQL Example for Azure AD with ADLS Gen 2 as External Stage type
CREATE OR REPLACE TARGET db USING Global.DeltaLakeWriter ( 
   tenantID: '71bfeed5-1905-43da-a4a4-49d8490731da',
   connectionUrl: 'jdbc:spark://adb-8073469162361072.12.azuredatabricks.net:443/default;
                   transportMode=http;ssl=1;
                   httpPath=sql/protocolv1/o/8073469162361072/0301-101350-kprc8x3a;
                   AuthMech=3;UID=token;PWD=<personal-access-token>',
   stageLocation: '/',
   CDDLAction: 'Process',
   adapterName: 'DeltaLakeWriter',
   authenticationType: 'AzureAD',
   ConnectionRetryPolicy: 'initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m',
   ClientSecret: 'untNjHnQOzsY90BjrKs2napohIP8WebUUcXybRdKVURH0XeklB5+Xw8NZgZUylqn',
   ClientSecret_encrypted: 'true',
   ClientID: 'dcf190e8-a315-42bb-a0b1-86063ff1c340',
   RefreshToken_encrypted: 'true',
   Mode: 'APPENDONLY',
   externalStageType: 'ADLSGen2',
   Tables: 'public.sample_pk,default.testoauth',
   azureAccountName: 'samplestorage',
   RefreshToken: '<refresh-token-value>',
   azureContainerName: 'striim-deltalakewriter-container',
   uploadPolicy: 'eventcount:10000,interval:60s' )
 INPUT FROM sysout;


Example 3. TQL Example using Personal Access Token and ADLS Gen 2 as External Stage type
CREATE TARGET db USING Global.DeltaLakeWriter (
   connectionUrl: 'jdbc:spark://adb-8073469162361072.12.azuredatabricks.net:443/default;
                   transportMode=http;ssl=1;
                   httpPath=sql/protocolv1/o/8073469162361072/0301-101350-kprc8x3a;
                   AuthMech=3;UID=token;PWD=<personal-access-token>',
   azureAccountAccessKey: '2YoK5czZpmPjxSiSe7uFVXrb9jt9P4xrWp+NNKxWzjU=',
   stageLocation: '/',
   CDDLAction: 'Process',
   ConnectionRetryPolicy: 'initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m',
   authenticationType: 'PersonalAccessToken',
   Mode: 'APPENDONLY',
   externalStageType: 'ADLSGen2',
   Tables: 'public.sample_pk,default.testoauth',
   azureAccountName: 'samplestorage',
   azureAccountAccessKey_encrypted: 'true',
   personalAccessToken: 'GGR/zQHfh7wQa3vJhP6dcWtejN1UL+E8YEXc13g9+UZdTQmYN1h3E0d0jabboJsd',
   personalAccessToken_encrypted: 'true',
   uploadPolicy: 'eventcount:10000,interval:60s' )
 INPUT FROM sysout;


Databricks Writer properties

When creating a Databricks Writer target in TQL, you must specify values for the Connection URL, Hostname, Personal Access Token, and Tables properties. If not specified, the other properties will use their default values.

property

type

default value

notes

Authentication Type

enum

PersonalAccessToken

With the default setting PersonalAccessToken, Striim's connection to Databricks is authenticated using the token specified in Personal Access Token.

Set to AzureAD to authenticate using Azure Active Directory. In this case, specify Client ID, Client Secret, Refresh Token, and Tenant ID. See Databricks authentication mechanisms for details.

CDDL Action

enum

Process

See Handling schema evolution.

If TRUNCATE commands may be entered in the source and you do not want to delete events in the target, precede the writer with a CQ with the select statement ELECT * FROM <input stream name> WHERE META(x, OperationName).toString() != 'Truncate'; (replacing <input stream name> with the name of the writer's input stream). Note that there will be no record in the target that the affected events were deleted.

Client ID

string

This property is required when AzureAD authentication is selected as the value of the Authentication Type property.

Client Secret

encrypted password

This property is required when AzureAD authentication is selected as the value of the Authentication Type property.

Connection Retry Policy

String

initialRetryDelay=10s, retryDelayMultiplier=2, maxRetryDelay=1m, maxAttempts=5, totalTimeout=10m

Do not change unless instructed to by Striim support.

Connection URL

String

Provide the JDBC URL from the JDBC/ODBC tab of the Databricks cluster's Advanced options (see Get connection details for a cluster). If the URL starts with jdbc:spark:// change that to jdbc:databricks:// (this is required by the upgraded driver bundled with Striim).

External Stage Type

enum

DBFSROOT

With the default value (not recommended), events are staged to DBFS storage at the path specified in Stage Location. To use an external stage, your Databricks instance should be using Databricks Runtime 11.0 or later. When using serverless compute, you must select PersonalStagingLocation as the stage type.

If running Databricks on AWS and you want to use S3, set to S3 and set the S3 properties as detailed below.

In Striim 4.2.0.1 or later only, if running Databricks on AWS and you want to use a Personal Staging Area, see Using an AWS personal staging location.

In Striim 4.2.0.4 or later only, if running Azure Databricks and you want to use a personal staging location, see Using an Azure personal staging location.

If running Azure Databricks and you want to use Azure Data Lake Storage Gen2, set to ADLSGen2 and set the ADLS properties as detailed below.

Hostname

String

the Server Hostname from the JDBC/ODBC tab of the Databricks cluster's Advanced options (see Get connection details for a cluster)

Ignorable Exception Code

String

Set to TABLE_NOT_FOUND to prevent the application from terminating when Striim tries to write to a table that does not exist in the target. See Handling "table not found" errors for more information.

Ignored exceptions will be written to the application's exception store (see CREATE EXCEPTIONSTORE).

Mode

enum

AppendOnly

With the default value AppendOnly:

  • Updates and deletes from DatabaseReader, IncrementalBatchReader, and SQL CDC sources are handled as inserts in the target.

  • Primary key updates result in two records in the target, one with the previous value and one with the new value. If the Tables setting has a ColumnMap that includes @METADATA(OperationName), the operation name for the first event will be DELETE and for the second INSERT.

Set to Merge to handle updates and deletes as updates and deletes instead. In Merge mode:

  • Since Delta Lake tables do not have primary keys, you may include the keycolumns option in the Tables property to specify a column in the target table that will contain a unique identifier for each row: for example, Tables:'SCOTT.EMP,mydatabase.employee keycolumns(emp_num)'.

  • You may use wildcards for the source table provided key columns are specified for all the target tables. For example, Tables:'DEMO.%,mydatabase.% KeyColumns(...)'.

  • If you do not specify keycolumns , Striim will use the source table's keycolumns as a unique identifier. If the source table has no keycolumns, Striim will concatenate all column values and use that as a unique identifier.

Optimized Merge

Boolean

false

In Flow Designer, this property will be displayed only when Mode is Merge.

Set to True only when Mode is MERGE and the target's input stream is the output of an HP NonStop reader, MySQL Reader, or Oracle Reader source and the source events will include partial records. For example, with Oracle Reader, when supplemental logging has not been enabled for all columns, partial records are sent for updates. When the source events will always include full records, leave this set to false.

Parallel Threads

Integer

Not supported when Mode is Merge.

See Creating multiple writer instances.

Personal Access Token

encrypted password

Used to authenticate with the Databricks cluster (see Generate a personal access token). The user associated with the token must have read and write access to DBFS (see Important information about DBFS permissions). If table access control has been enabled, the user must also have MODIFY and READ_METADATA (see Data object privileges - Data governance model).

Personal Staging User Name

String

When External Stage Type is PersonalStagingLocation, set as desecribed in Using an AWS personal staging location or Using an Azure personal staging location.

Refresh Token

encrypted password

This property is required when AzureAD authentication is selected as the value of the Authentication Type property.

Stage Location

String

/

When the External Stage Type is DBFSROOT, the path to the staging area in DBFS, for example, /StriimStage/.

Tables

String

The name(s) of the table(s) to write to. The table(s) must exist in the database.

Specify target table names in uppercase as <CATALOG>.<DATABASE>.<TABLE>. Not specifying the catalog (<DATABASE>.<TABLE>) may result in errors if a table in another catalog has the same name.

When the target's input stream is a user-defined event, specify a single table.

The only special character allowed in target table names is underscore (_).

When the input stream of the target is the output of a DatabaseReader, IncrementalBatchReader, or SQL CDC source (that is, when replicating data from one database to another), it can write to multiple tables. In this case, specify the names of both the source and target tables. You may use the % wildcard only for tables, not for schemas or databases. If the reader uses three-part names, you must use them here as well. Note that Oracle CDB/PDB source table names must be specified in two parts when the source is Database Reader or Incremental Batch reader (schema.%,schema.%) but in three parts when the source is Oracle Reader or OJet ((database.schema.%,schema.%). Note that SQL Server source table names must be specified in three parts when the source is Database Reader or Incremental Batch Reader (database.schema.%,schema.%) but in two parts when the source is MS SQL Reader or MS Jet (schema.%,schema.%). Examples:

source.emp,target_database.emp
source_schema.%,target_catalog.target_database.%
source_database.source_schema.%,target_database.%
source_database.source_schema.%,target_catalog.target_database.%

MySQL and Oracle names are case-sensitive, SQL Server names are not. Specify names as <schema name>.<table name> for MySQL and Oracle and as <database name>.<schema name>.<table name> for SQL Server.

See Mapping columns for additional options.

Tenant ID

String

This property is required when AzureAD authentication is selected as the value of the Authentication Type property.

Upload Policy

String

eventcount:100000, interval:60s

The upload policy may include eventcount and/or interval (see Setting output names and rollover / upload policies for syntax). Buffered data is written to the storage account every time any of the specified values is exceeded. With the default value, data will be written every 60 seconds or sooner if the buffer contains 100,000 events. When the app is quiesced, any data remaining in the buffer is written to the storage account; when the app is undeployed, any data remaining in the buffer is discarded.

Azure Data Lake Storage (ADLS) Gen2 properties for Databricks Writer

To use an ADLS Gen2 bucket as your staging area, your Databricks instance should be using Databricks Runtime 11.0 or later.

property

type

default value

notes

Azure Account Access Key

encrypted password

When Authentication Type is set to ServiceAccountKey, specify the account access key from Storage accounts > <account name> > Access keys.

When Authentication Type is set to AzureAD, this property is ignored in TQL and not displayed in the Flow Designer.

Azure Account Name

String

the name of the Azure storage account for the blob container

Azure Container Name

String

striim-deltalakewriter-container

the blob container name from Storage accounts > <account name> > Containers

If it does not exist, it will be created.

Amazon S3 properties for Databricks Writer

To use an Amazon S3 bucket as your staging area, your Databricks instance should be using Databricks Runtime 11.0 or later.

property

type

default value

notes

S3 Access Key

String

an AWS access key ID (created on the AWS Security Credentials page) for a user with read and write permissions on the bucket

If the Striim host has default credentials stored in the .aws directory, you may leave this blank.

S3 Bucket Name

String

striim-deltalake-bucket

Specify the S3 bucket to be used for staging. If it does not exist, it will be created.

S3 Region

String

us-west-1

the AWS region of the bucket

S3 Secret Access Key

encrypted password

the secret access key for the access key

If the Striim host has default credentials stored in the .aws directory, you may leave this blank.

Using an AWS personal staging location

Using a personal staging location requires Striim 4.2.0.1 or later and Unity Catalog (see Documentation > Data Governance Guide > What is Unity Catalog?). You may not use a personal staging location when the Authentication Type is AzureAD.

To use a personal staging location as your staging area:

  1. Create an S3 bucket as described in Documentation > Data Governance Guide > What is Unity Catalog? > Get started using Unity Catalog > Configure a storage bucket and IAM role in AWS.

  2. Configure that S3 bucket as described in Documentation > Data Governance Guide > What is Unity Catalog? > Get started using Unity Catalog > Configure Unity Catalog storage account for CORS > Configure CORS settings for S3.

  3. Set External Stage Type to PersonalStagingLocation.

  4. For Personal Staging User Name, specify a user name or application ID for a user or service principal with a personal access token (see Documentation > Security and compliance guide > Authentication and access control > Manage personal access tokens):

Using an Azure personal staging location

Using a personal staging location requires Striim 4.2.0.4 or later and Unity Catalog (see Learn / Azure Databricks documentation / Data Governance / What is Unity Catalog?). You may not use a personal staging location when the Authentication Type is AzureAD.

To use a Personal Staging Location as your staging area:

  1. Create a Unity Catalog metastore (see Learn / Azure Databricks documentation / Data Governance / Create a Unity Catalog metastore).

  2. Configure that metastore as described in Learn / Azure Databricks documentation / Data Governance / Create a Unity Catalog metastore / Enable Azure Databricks management for personal staging locations.

  3. Set External Stage Type to PersonalStagingLocation.

  4. Set Personal Staging User Name specify a user name or application ID for a user or service principal with a personal access token (see Learn > Azure Databricks documentation > Administration Guide > Manage personal access tokens):

Sample TQL application using Databricks Writer

Sample TQL in AppendOnly mode:

CREATE TARGET DatabricksAppendOnly USING DeltaLakeWriter ( 
  personalAccessToken: '*************************', 
  hostname:'adb-xxxx.xx.azuredatabricks.net',
  tables: 'mydb.employee,mydatabase.employee', 
  stageLocation: '/StriimStage/', 
  connectionUrl:'jdbc:xxx.xx;transportMode=http;ssl=1;httpPath=xxx;AuthMech=3;UID=token;'
)
INPUT FROM ns1.sourceStream;

Sample TQL in Merge mode with Optimized Merge set to True:

CREATE TARGET DatabricksAppendOnly USING DeltaLakeWriter ( 
  personalAccessToken: '*************************', 
  hostname:'adb-xxxx.xx.azuredatabricks.net',
  tables: 'mydb.employee,mydatabase.employee', 
  stageLocation: '/StriimStage/', 
  connectionUrl:'jdbc:xxx.xx;transportMode=http;ssl=1;httpPath=xxx;AuthMech=3;UID=token;',
  mode: 'MERGE',
  optimizedMerge: 'true'
)
INPUT FROM ns1.sourceStream;

Databricks Writer data type support and mapping

See also Data type support & mapping for schema conversion & evolution.

TQL type

Delta Lake type

java.lang.Byte

binary

java.lang.Double

double

java.lang.Float

float

java.lang.Integer

int

java.lang.Long

bigint

java.lang.Short

smallint

java.lang.String

string

org.joda.time.DateTime

timestamp

For additional data type mappings, see Data type support & mapping for schema conversion & evolution.