Debezium MongoDB Connector
Since Camel 3.0
Only consumer is supported
The Debezium MongoDB component is wrapper around Debezium using Debezium Engine, which enables Change Data Capture from MongoDB database using Debezium without the need for Kafka or Kafka Connect.
|
The Debezium MongoDB connector uses MongoDB’s oplog to capture the changes. The connector works only with the MongoDB replica sets or with sharded clusters, where each shard is a separate replica set. Therefore, you will need to have your MongoDB instance running either in replica set mode or sharded clusters mode. |
|
Note on handling failures: per Debezium Embedded Engine documentation, the engines are actively recording source offsets and periodically flush these offsets to a persistent storage. Therefore, when the application is restarted or crashed, the engine will resume from the last recorded offset. This means that, at normal operation, your downstream routes will receive each event exactly once. However, in case of an application crash (not having a graceful shutdown), the application will resume from the last recorded offset, which may result in receiving duplicate events immediately after the restart. Therefore, your downstream routes should be tolerant enough of such a case and deduplicate events if needed. |
Maven users will need to add the following
dependency to their pom.xml
for this component.
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-debezium-mongodb</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
Configuring Options
Camel components are configured on two separate levels:
-
component level
-
endpoint level
Configuring Component Options
At the component level, you set general and shared configurations that are, then, inherited by the endpoints. It is the highest configuration level.
For example, a component may have security settings, credentials for authentication, urls for network connection and so forth.
Some components only have a few options, and others may have many. Because components typically have pre-configured defaults that are commonly used, then you may often only need to configure a few options on a component; or none at all.
You can configure components using:
-
the Component DSL.
-
in a configuration file (
application.properties,*.yamlfiles, etc). -
directly in the Java code.
Configuring Endpoint Options
You usually spend more time setting up endpoints because they have many options. These options help you customize what you want the endpoint to do. The options are also categorized into whether the endpoint is used as a consumer (from), as a producer (to), or both.
Configuring endpoints is most often done directly in the endpoint URI as path and query parameters. You can also use the Endpoint DSL and DataFormat DSL as a type safe way of configuring endpoints and data formats in Java.
A good practice when configuring options is to use Property Placeholders.
Property placeholders provide a few benefits:
-
They help prevent using hardcoded urls, port numbers, sensitive information, and other settings.
-
They allow externalizing the configuration from the code.
-
They help the code to become more flexible and reusable.
The following two sections list all the options, firstly for the component followed by the endpoint.
Component Options
The Debezium MongoDB Connector component supports 74 options, which are listed below.
Endpoint Options
The Debezium MongoDB Connector endpoint is configured using URI syntax:
debezium-mongodb:name
With the following path and query parameters:
Path Parameters (1 parameters)
| Name | Description | Default | Type |
|---|---|---|---|
|
Required Unique name for the connector. Attempting to register again with the same name will fail. |
String |
Query Parameters (74 parameters)
Message Headers
The Debezium MongoDB Connector component supports 7 message header(s), which is/are listed below:
| Name | Description | Default | Type |
|---|---|---|---|
CamelDebeziumSourceMetadata (consumer) Constant: |
The metadata about the source event, for example table name, database name, log position, etc, please refer to the Debezium documentation for more info. |
Map |
|
CamelDebeziumIdentifier (consumer) Constant: |
The identifier of the connector, normally is this format {server-name}.{database-name}.{table-name}. |
String |
|
|
Constant: |
The key of the event, normally is the table Primary Key. |
Struct |
|
CamelDebeziumOperation (consumer) Constant: |
If presents, the type of event operation. Values for the connector are c for create (or insert), u for update, d for delete or r for read (in the case of a initial sync) or in case of a snapshot event. |
String |
|
CamelDebeziumTimestamp (consumer) Constant: |
If presents, the time (using the system clock in the JVM) at which the connector processed the event. |
Long |
|
CamelDebeziumBefore (consumer) Constant: |
If presents, contains the state of the row before the event occurred. |
Struct |
|
CamelDebeziumDdlSQL (consumer) Constant: |
If presents, the ddl sql text of the event. |
String |
Note: Debezium Mongodb uses
MongoDB’s oplog to populate the CDC events, the
update events in MongoDB’s oplog don’t have the
before or after states of the changed document,
so there’s no way for the Debezium connector to
provide this information, therefore header key
CamelDebeziumBefore is not
available in this component.
Message body
The message body if is not null (in
case of tombstones), it contains the state of
the row after the event occurred as
String JSON format, and you can
unmarshal using Camel JSON Data Format.
Examples
Consuming events
Here is a basic route that you can use to listen to Debezium events from MongoDB connector:
from("debezium-mongodb:dbz-test-1?offsetStorageFileName=/usr/offset-file-1.dat&mongodbHosts=rs0/localhost:27017&mongodbUser=debezium&mongodbPassword=dbz&mongodbName=dbserver1&databaseHistoryFileFilename=/usr/history-file-1.dat")
.log("Event received from Debezium : ${body}")
.log(" with this identifier ${headers.CamelDebeziumIdentifier}")
.log(" with these source metadata ${headers.CamelDebeziumSourceMetadata}")
.log(" the event occurred upon this operation '${headers.CamelDebeziumSourceOperation}'")
.log(" on this database '${headers.CamelDebeziumSourceMetadata[db]}' and this table '${headers.CamelDebeziumSourceMetadata[table]}'")
.log(" with the key ${headers.CamelDebeziumKey}")
.choice()
.when(header(DebeziumConstants.HEADER_OPERATION).in("c", "u", "r"))
.unmarshal().json()
.log("Event received from Debezium : ${body}")
.end()
.end();
By default, the component will emit the
events in the body String JSON format in
case of u, c or
r operations.
This can be easily converted to JSON using
Camel JSON Data Format e.g.: .unmarshal().json()
like the above example.
In case of operation d, the
body will be null.
|
This component is a thin wrapper around Debezium Engine as mentioned. Therefore, before using this component in production, you need to understand how Debezium works and how configurations can reflect the expected behavior. This is especially true in regard to handling failures. |