Zeebe
Since Camel 3.21
Both producer and consumer are supported
The Zeebe: components provides the ability to interact with business processes in Zeebe.
In order to use the Zeebe component, Maven users will need to add the
following dependency to their pom.xml
:
Prerequisites You must have access to a local zeebe instance. More information is available at Camunda Zeebe. |
Configuring Options
Camel components are configured on two separate levels:
-
component level
-
endpoint level
Configuring Component Options
The component level is the highest level which holds general and common configurations that are inherited by the endpoints. For example a component may have security settings, credentials for authentication, urls for network connection and so forth.
Some components only have a few options, and others may have many. Because components typically have pre configured defaults that are commonly used, then you may often only need to configure a few options on a component; or none at all.
Configuring components can be done with the Component DSL, in a configuration file (application.properties|yaml), or directly with Java code.
Configuring Endpoint Options
Where you find yourself configuring the most is on endpoints, as endpoints often have many options, which allows you to configure what you need the endpoint to do. The options are also categorized into whether the endpoint is used as consumer (from) or as a producer (to), or used for both.
Configuring endpoints is most often done directly in the endpoint URI as path and query parameters. You can also use the Endpoint DSL and DataFormat DSL as a type safe way of configuring endpoints and data formats in Java.
A good practice when configuring options is to use Property Placeholders, which allows to not hardcode urls, port numbers, sensitive information, and other settings. In other words placeholders allows to externalize the configuration from your code, and gives more flexibility and reuse.
The following two sections lists all the options, firstly for the component followed by the endpoint.
Component Options
The Zeebe component supports 8 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
The authorization server’s URL, from which the access token will be requested. |
String |
||
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
false |
boolean |
|
Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. |
false |
boolean |
|
Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. |
true |
boolean |
|
Client id to be used when requesting access token from OAuth authorization server. |
String |
||
Client secret to be used when requesting access token from OAuth authorization server. |
String |
||
The gateway server hostname to connect to the Zeebe cluster. |
localhost |
String |
|
The gateway server port to connect to the Zeebe cluster. |
26500 |
int |
Endpoint Options
The Zeebe endpoint is configured using URI syntax:
zeebe:operationName
with the following path and query parameters:
Path Parameters (1 parameters)
Name | Description | Default | Type |
---|---|---|---|
Required The operation to use. Enum values:
|
OperationName |
Query Parameters (7 parameters)
Name | Description | Default | Type |
---|---|---|---|
Format the result in the body as JSON. |
false |
boolean |
|
JobKey for the job worker. |
String |
||
Timeout for job worker. |
10 |
int |
|
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
false |
boolean |
|
To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
ExceptionHandler |
||
Sets the exchange pattern when the consumer creates an exchange. Enum values:
|
ExchangePattern |
||
Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. |
false |
boolean |
Producer Endpoints:
Endpoint | Description |
---|---|
startProcess |
Creates and starts an instance of the specified process. |
cancelProcess |
Cancels a running process instance. |
publishMessage |
Publishes a message. |
completeJob |
Completes a job for a service task. |
failJob |
Fails a job. |
updateJobRetries |
Updates the number of retries for a job. |
throwError |
Throw an error to indicate that a business error has occurred. |
deployResource |
Deploy a process resource. Currently only supports process definitions. |
The endpoints accept either Java request objects as shown in the examples below or JSON. In JSON camel case for property names is replaced with all lower case seperated by underscores, e.g. processId becomes process_id.
Examples
-
startProcess
from("direct:start")
.process(exchange -> {
ProcessRequest request = new ProcessRequest();
request.setProcessId("processId");
request.setVariables(new HashMap<String,Object> ());
exchange.getIn().setBody(request);
})
.to("zeebe://startProcess")
.process(exchange -> {
ProcessResponse body = exchange.getIn().getBody(ProcessResponse.class);
if (body != null) {
bool success = body.getSuccess();
long processInstanceKey = body.getProcessInstanceKey();
}
});
JSON Request Example
{
"process_id" : "Process_0e3ldfm",
"variables" : { "v1": "a", "v2": 10 }
}
JSON Response Example
{
"success": true,
"process_id": "Process_0e3ldfm",
"process_instance_key": 2251799813688297,
"process_version": 4,
"process_key": 2251799813685906
}
-
cancelProcess
from("direct:start")
.process(exchange -> {
ProcessRequest request = new ProcessRequest();
request.setProcessInstanceKey(123);
exchange.getIn().setBody(request);
})
.to("zeebe://cancelProcess")
.process(exchange -> {
ProcessResponse body = exchange.getIn().getBody(ProcessResponse.class);
if (body != null) {
bool success = body.getSuccess();
}
});
-
publishMessage
from("direct:start")
.process(exchange -> {
MessageRequest request = new MessageRequest();
request.setCorrelationKey("messageKey");
request.setTimeToLive(100);
request.setVariables(new HashMap<String,Object>());
request.setName("MessageName");
exchange.getIn().setBody(request);
})
.to("zeebe://publishMessage")
.process(exchange -> {
MessageResponse body = exchange.getIn().getBody(MessageResponse.class);
if (body != null) {
bool success = body.getSuccess();
String messageKey = body.getMessageKey();
}
});
JSON Request Example
{
"correlation_key" : "messageKey",
"time-to-live" : 100,
"variables" : { "v1": "a", "v2": 10 },
"name" : "MessageName"
}
JSON Response Example
{
"success": true,
"correlation_key": "messageKey",
"message_key": 2251799813688336
}
-
completeJob
from("direct:start")
.process(exchange -> {
JobRequest request = new JobRequest();
request.setJobKey("jobKey");
request.setVariables(new HashMap<String,Object>());
exchange.getIn().setBody(request);
})
.to("zeebe://completeJob")
.process(exchange -> {
JobResponse body = exchange.getIn().getBody(JobResponse.class);
if (body != null) {
bool success = body.getSuccess();
}
});
-
failJob
from("direct:start")
.process(exchange -> {
JobRequest request = new JobRequest();
request.setJobKey("jobKey");
request.setRetries(3);
request.setErrorMessage("Error");
exchange.getIn().setBody(request);
})
.to("zeebe://failJob")
.process(exchange -> {
JobResponse body = exchange.getIn().getBody(JobResponse.class);
if (body != null) {
bool success = body.getSuccess();
}
});
-
updateJobRetries
from("direct:start")
.process(exchange -> {
JobRequest request = new JobRequest();
request.setJobKey("jobKey");
request.setRetries(3);
exchange.getIn().setBody(request);
})
.to("zeebe://updateJobRetries")
.process(exchange -> {
JobResponse body = exchange.getIn().getBody(JobResponse.class);
if (body != null) {
bool success = body.getSuccess();
}
});
-
throwError
from("direct:start")
.process(exchange -> {
JobRequest request = new JobRequest();
request.setJobKey("jobKey");
request.setErrorMessage("Error Message");
request.setErrorCode("Error Code")
exchange.getIn().setBody(request);
})
.to("zeebe://throwError")
.process(exchange -> {
JobResponse body = exchange.getIn().getBody(JobResponse.class);
if (body != null) {
bool success = body.getSuccess();
}
});
-
deployResource
from("direct:start")
.process(exchange -> {
DeploymentRequest request = new DeploymentRequest();
request.setName("process.bpmn");
request.setContent(content.getBytes());
exchange.getIn().setBody(request);
})
.to("zeebe://deployResource")
.process(exchange -> {
ProcessDeploymentResponse body = exchange.getIn().getBody(ProcessDeploymentResponse.class);
if (body != null) {
bool success = body.getSuccess();
String bpmnProcessId = body.getBpmnProcessId();
int version = body.getVersion();
long processDefinitionKey = body.getProcessDefinitionKey();
String resourceName = body.getResourceName();
}
});
Consumer Endpoints:
Endpoint | Description |
---|---|
worker |
Registers a job worker for a job type and provides messages for available jobs. |
Example
from("zeebe://worker?jobKey=job1&timeout=20")
.process(exchange -> {
JobWorkerMessage body = exchange.getIn().getBody(JobWorkerMessage.class);
if (body != null) {
long key = body.getKey();
String type = body.getType();
Map<String,String> customHeaders = body.getCustomHeaders();
long processInstanceKey = body.getProcessInstanceKey();
String bpmnProcessId = body.getBpmnProcessId();
int processDefinitionVersion = body.getProcessDefinitionVersion();
long processDefinitionKey = body.getProcessDefinitionKey();
String elementId = body.getElementId();
long elementInstanceKey = body.getElementInstanceKey();
String worker = body.getWorker();
int retries = body.getRetries();
long deadline = body.getDeadline();
Map<String,Object> variables = body.getVariables();
}
})
camel-zeebe creates a route exchange per job type with a job in the body.
Message Headers
The Zeebe component supports 8 message header(s), which is/are listed below:
Name | Description | Default | Type |
---|---|---|---|
CamelZeebeResourceName (producer) Constant: |
The name of the resource. |
String |
|
CamelZeebeIsSuccess (producer) Constant: |
True if the operation was successful. |
boolean |
|
CamelZeebeErrorMessage (producer) Constant: |
In case of an error, the error message. |
String |
|
CamelZeebeErrorCode (producer) Constant: |
In case of an error, the error code if available. |
String |
|
CamelZeebeBPMNProcessId (producer) Constant: |
The process ID of a deployed process. |
String |
|
Constant: |
The version of a deployed process. |
int |
|
CamelZeebeProcessDefinitionKey (producer) Constant: |
The process definition key of a deployed process. |
long |
|
Constant: |
The key of a job. The worker consumer adds the job key to the headers and the operations completeJob and failJob accept the job key in the header if no JobRequest is provided in the body. |
long |
Dependencies
Maven users will need to add the following dependency to their pom.xml.
pom.xml
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-zeebe</artifactId>
<version>${camel-version}</version>
</dependency>
where ${camel-version
} must be replaced by the actual version of Camel.