Debezium SQL Server Connector
Since Camel 3.0
Only consumer is supported
The Debezium SQL Server component is wrapper around Debezium using Debezium Engine, which enables Change Data Capture from SQL Server database using Debezium without the need for Kafka or Kafka Connect.
Note on handling failures: Per Debezium Embedded Engine documentation, the engines is actively recording source offsets and periodically flushes these offsets to a persistent storage, so when the application is restarted or crashed, the engine will resume from the last recorded offset. Thus, at normal operation, your downstream routes will receive each event exactly once, however in case of an application crash (not having a graceful shutdown), the application will resume from the last recorded offset, which may result in receiving duplicate events immediately after the restart. Therefore, your downstream routes should be tolerant enough of such case and deduplicate events if needed.
Maven users will need to add the following
dependency to their pom.xml
for this component.
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-debezium-sqlserver</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
Configuring Options
Camel components are configured on two separate levels:
-
component level
-
endpoint level
Configuring Component Options
The component level is the highest level which holds general and common configurations that are inherited by the endpoints. For example a component may have security settings, credentials for authentication, urls for network connection and so forth.
Some components only have a few options, and others may have many. Because components typically have pre configured defaults that are commonly used, then you may often only need to configure a few options on a component; or none at all.
Configuring components can be done with the Component DSL, in a configuration file (application.properties|yaml), or directly with Java code.
Configuring Endpoint Options
Where you find yourself configuring the most is on endpoints, as endpoints often have many options, which allows you to configure what you need the endpoint to do. The options are also categorized into whether the endpoint is used as consumer (from) or as a producer (to), or used for both.
Configuring endpoints is most often done directly in the endpoint URI as path and query parameters. You can also use the Endpoint DSL and DataFormat DSL as a type safe way of configuring endpoints and data formats in Java.
A good practice when configuring options is to use Property Placeholders, which allows to not hardcode urls, port numbers, sensitive information, and other settings. In other words placeholders allows to externalize the configuration from your code, and gives more flexibility and reuse.
The following two sections lists all the options, firstly for the component followed by the endpoint.
Component Options
The Debezium SQL Server Connector component supports 75 options, which are listed below.
Endpoint Options
The Debezium SQL Server Connector endpoint is configured using URI syntax:
debezium-sqlserver:name
with the following path and query parameters:
Path Parameters (1 parameters)
| Name | Description | Default | Type |
|---|---|---|---|
|
Required Unique name for the connector. Attempting to register again with the same name will fail. |
String |
Query Parameters (75 parameters)
Message Headers
The Debezium SQL Server Connector component supports 7 message header(s), which is/are listed below:
| Name | Description | Default | Type |
|---|---|---|---|
CamelDebeziumSourceMetadata (consumer) Constant: |
The metadata about the source event, for example table name, database name, log position, etc, please refer to the Debezium documentation for more info. |
Map |
|
CamelDebeziumIdentifier (consumer) Constant: |
The identifier of the connector, normally is this format {server-name}.{database-name}.{table-name}. |
String |
|
|
Constant: |
The key of the event, normally is the table Primary Key. |
Struct |
|
CamelDebeziumOperation (consumer) Constant: |
If presents, the type of event operation. Values for the connector are c for create (or insert), u for update, d for delete or r for read (in the case of a initial sync) or in case of a snapshot event. |
String |
|
CamelDebeziumTimestamp (consumer) Constant: |
If presents, the time (using the system clock in the JVM) at which the connector processed the event. |
Long |
|
CamelDebeziumBefore (consumer) Constant: |
If presents, contains the state of the row before the event occurred. |
Struct |
|
CamelDebeziumDdlSQL (consumer) Constant: |
If presents, the ddl sql text of the event. |
String |
Message body
The message body if is not null (in
case of tombstones), it contains the state of
the row after the event occurred as
Struct format or Map
format if you use the included Type Converter
from Struct to Map
(please look below for more explanation).
Samples
Consuming events
Here is a very simple route that you can use in order to listen to Debezium events from SQL Server connector.
from("debezium-sqlserver:dbz-test-1?offsetStorageFileName=/usr/offset-file-1.dat&databaseHostname=localhost&databaseUser=debezium&databasePassword=dbz&databaseServerName=my-app-connector&databaseHistoryFileFilename=/usr/history-file-1.dat")
.log("Event received from Debezium : ${body}")
.log(" with this identifier ${headers.CamelDebeziumIdentifier}")
.log(" with these source metadata ${headers.CamelDebeziumSourceMetadata}")
.log(" the event occurred upon this operation '${headers.CamelDebeziumSourceOperation}'")
.log(" on this database '${headers.CamelDebeziumSourceMetadata[db]}' and this table '${headers.CamelDebeziumSourceMetadata[table]}'")
.log(" with the key ${headers.CamelDebeziumKey}")
.log(" the previous value is ${headers.CamelDebeziumBefore}")
By default, the component will emit the
events in the body and CamelDebeziumBefore
header as Struct
data type, the reasoning behind this, is to
perceive the schema information in case is
needed.
However, the component as well contains a Type Converter
that converts
from default output type of Struct
to Map in order to leverage
Camel’s rich Data Format
types which many of them work out of box
with Map data type.
To use it, you can either add Map.class
type when you access the message e.g: exchange.getIn().getBody(Map.class),
or you can convert the body always to Map
from the route builder by adding .convertBodyTo(Map.class)
to your Camel Route DSL after
from statement.
We mentioned above about the schema, which
can be used in case you need to perform
advance data transformation and the schema
is needed for that. If you choose not to
convert your body to Map,
you can obtain the schema information as Schema
type from Struct like this:
from("debezium-sqlserver:[name]?[options]])
.process(exchange -> {
final Struct bodyValue = exchange.getIn().getBody(Struct.class);
final Schema schemaValue = bodyValue.schema();
log.info("Body value is : {}", bodyValue);
log.info("With Schema : {}", schemaValue);
log.info("And fields of : {}", schemaValue.fields());
log.info("Field name has `{}` type", schemaValue.field("name").schema());
});
Important Note: This component is a thin wrapper around Debezium Engine as mentioned, therefore before using this component in production, you need to understand how Debezium works and how configurations can reflect the expected behavior, especially in regards to handling failures.