Azure Storage Queue Service
Since Camel 3.3
Both producer and consumer are supported
The Azure Storage Queue component supports storing and retrieving the messages to/from Azure Storage Queue service using Azure APIs v12. However in case of versions above v12, we will see if this component can adopt these changes depending on how much breaking changes can result.
Prerequisites
You must have a valid Windows Azure Storage account. More information is available at Azure Documentation Portal.
Maven users will need to add the following dependency to their pom.xml
for this component:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-azure-storage-queue</artifactId>
<version>x.x.x</version>
<!-- use the same version as your Camel core version -->
</dependency>
URI Format
azure-storage-queue://accountName[/queueName][?options]
In case of consumer, accountName and queueName are required. In case of producer, it depends on the operation that being requested, for example if operation is on a service level, e.b: listQueues, only accountName is required, but in case of operation being requested on the queue level, e.g: createQueue, sendMessage.. etc, both accountName and queueName are required.
The queue will be created if it does not already exist. You can append query options to the URI in the following format, ?options=value&option2=value&…
Configuring Options
Camel components are configured on two separate levels:
-
component level
-
endpoint level
Configuring Component Options
The component level is the highest level which holds general and common configurations that are inherited by the endpoints. For example a component may have security settings, credentials for authentication, urls for network connection and so forth.
Some components only have a few options, and others may have many. Because components typically have pre configured defaults that are commonly used, then you may often only need to configure a few options on a component; or none at all.
Configuring components can be done with the Component DSL, in a configuration file (application.properties|yaml), or directly with Java code.
Configuring Endpoint Options
Where you find yourself configuring the most is on endpoints, as endpoints often have many options, which allows you to configure what you need the endpoint to do. The options are also categorized into whether the endpoint is used as consumer (from) or as a producer (to), or used for both.
Configuring endpoints is most often done directly in the endpoint URI as path and query parameters. You can also use the Endpoint DSL and DataFormat DSL as a type safe way of configuring endpoints and data formats in Java.
A good practice when configuring options is to use Property Placeholders, which allows to not hardcode urls, port numbers, sensitive information, and other settings. In other words placeholders allows to externalize the configuration from your code, and gives more flexibility and reuse.
The following two sections lists all the options, firstly for the component followed by the endpoint.
Component Options
The Azure Storage Queue Service component supports 18 options, which are listed below.
Name | Description | Default | Type |
---|---|---|---|
The component configurations. |
QueueConfiguration |
||
Determines the credential strategy to adopt. Enum values:
|
SHARED_ACCOUNT_KEY |
CredentialType |
|
Autowired Service client to a storage account to interact with the queue service. This client does not hold any state about a particular storage account but is instead a convenient way of sending off appropriate requests to the resource on the service. This client contains all the operations for interacting with a queue account in Azure Storage. Operations allowed by the client are creating, listing, and deleting queues, retrieving and updating properties of the account, and retrieving statistics of the account. |
QueueServiceClient |
||
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
false |
boolean |
|
When is set to true, the queue will be automatically created when sending messages to the queue. |
false |
boolean |
|
Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. |
false |
boolean |
|
Queue service operation hint to the producer. Enum values:
|
QueueOperationDefinition |
||
Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. |
true |
boolean |
|
Used for enabling or disabling all consumer based health checks from this component. |
true |
boolean |
|
Used for enabling or disabling all producer based health checks from this component. Notice: Camel has by default disabled all producer based health-checks. You can turn on producer checks globally by setting camel.health.producersEnabled=true. |
true |
boolean |
|
Maximum number of messages to get, if there are less messages exist in the queue than requested all the messages will be returned. If left empty only 1 message will be retrieved, the allowed range is 1 to 32 messages. |
1 |
Integer |
|
The ID of the message to be deleted or updated. |
String |
||
Unique identifier that must match for the message to be deleted or updated. |
String |
||
An optional timeout applied to the operation. If a response is not returned before the timeout concludes a RuntimeException will be thrown. |
Duration |
||
How long the message will stay alive in the queue. If unset the value will default to 7 days, if -1 is passed the message will not expire. The time to live must be -1 or any positive number. The format should be in this form: PnDTnHnMn.nS., e.g: PT20.345S — parses as 20.345 seconds, P2D — parses as 2 days However, in case you are using EndpointDsl/ComponentDsl, you can do something like Duration.ofSeconds() since these Java APIs are typesafe. |
Duration |
||
The timeout period for how long the message is invisible in the queue. The timeout must be between 1 seconds and 7 days. The format should be in this form: PnDTnHnMn.nS., e.g: PT20.345S — parses as 20.345 seconds, P2D — parses as 2 days However, in case you are using EndpointDsl/ComponentDsl, you can do something like Duration.ofSeconds() since these Java APIs are typesafe. |
Duration |
||
Access key for the associated azure account name to be used for authentication with azure queue services. |
String |
||
StorageSharedKeyCredential can be injected to create the azure client, this holds the important authentication information. |
StorageSharedKeyCredential |
Endpoint Options
The Azure Storage Queue Service endpoint is configured using URI syntax:
azure-storage-queue:accountName/queueName
with the following path and query parameters:
Path Parameters (2 parameters)
Name | Description | Default | Type |
---|---|---|---|
Azure account name to be used for authentication with azure queue services. |
String |
||
The queue resource name. |
String |
Query Parameters (32 parameters)
Name | Description | Default | Type |
---|---|---|---|
Determines the credential strategy to adopt. Enum values:
|
SHARED_ACCOUNT_KEY |
CredentialType |
|
Autowired Service client to a storage account to interact with the queue service. This client does not hold any state about a particular storage account but is instead a convenient way of sending off appropriate requests to the resource on the service. This client contains all the operations for interacting with a queue account in Azure Storage. Operations allowed by the client are creating, listing, and deleting queues, retrieving and updating properties of the account, and retrieving statistics of the account. |
QueueServiceClient |
||
If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. |
false |
boolean |
|
Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. Important: This is only possible if the 3rd party component allows Camel to be alerted if an exception was thrown. Some components handle this internally only, and therefore bridgeErrorHandler is not possible. In other situations we may improve the Camel component to hook into the 3rd party component and make this possible for future releases. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
false |
boolean |
|
To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |
ExceptionHandler |
||
Sets the exchange pattern when the consumer creates an exchange. Enum values:
|
ExchangePattern |
||
A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. |
PollingConsumerPollStrategy |
||
When is set to true, the queue will be automatically created when sending messages to the queue. |
false |
boolean |
|
Queue service operation hint to the producer. Enum values:
|
QueueOperationDefinition |
||
Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. |
false |
boolean |
|
Maximum number of messages to get, if there are less messages exist in the queue than requested all the messages will be returned. If left empty only 1 message will be retrieved, the allowed range is 1 to 32 messages. |
1 |
Integer |
|
The ID of the message to be deleted or updated. |
String |
||
Unique identifier that must match for the message to be deleted or updated. |
String |
||
An optional timeout applied to the operation. If a response is not returned before the timeout concludes a RuntimeException will be thrown. |
Duration |
||
How long the message will stay alive in the queue. If unset the value will default to 7 days, if -1 is passed the message will not expire. The time to live must be -1 or any positive number. The format should be in this form: PnDTnHnMn.nS., e.g: PT20.345S — parses as 20.345 seconds, P2D — parses as 2 days However, in case you are using EndpointDsl/ComponentDsl, you can do something like Duration.ofSeconds() since these Java APIs are typesafe. |
Duration |
||
The timeout period for how long the message is invisible in the queue. The timeout must be between 1 seconds and 7 days. The format should be in this form: PnDTnHnMn.nS., e.g: PT20.345S — parses as 20.345 seconds, P2D — parses as 2 days However, in case you are using EndpointDsl/ComponentDsl, you can do something like Duration.ofSeconds() since these Java APIs are typesafe. |
Duration |
||
The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. |
int |
||
The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. |
int |
||
To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. |
int |
||
Milliseconds before the next poll. |
500 |
long |
|
If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages. |
false |
boolean |
|
Milliseconds before the first poll starts. |
1000 |
long |
|
Specifies a maximum limit of number of fires. So if you set it to 1, the scheduler will only fire once. If you set it to 5, it will only fire five times. A value of zero or negative means fire forever. |
0 |
long |
|
The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that. Enum values:
|
TRACE |
LoggingLevel |
|
Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool. |
ScheduledExecutorService |
||
To use a cron scheduler from either camel-spring or camel-quartz component. Use value spring or quartz for built in scheduler. |
none |
Object |
|
To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler. |
Map |
||
Whether the scheduler should be auto started. |
true |
boolean |
|
Time unit for initialDelay and delay options. Enum values:
|
MILLISECONDS |
TimeUnit |
|
Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details. |
true |
boolean |
|
Access key for the associated azure account name to be used for authentication with azure queue services. |
String |
||
StorageSharedKeyCredential can be injected to create the azure client, this holds the important authentication information. |
StorageSharedKeyCredential |
Required information options:
Required information options:
To use this component, you have multiple options in order to provide the required Azure authentication information:
-
By providing your own QueueServiceClient instance which can be injected into
serviceClient
. -
Via Azure Identity, when specifying
credentialType=AZURE_IDENTITY
and providing required environment variables. This enables service principal (e.g. app registration) authentication with secret/certificate as well as username password. -
Via shared storage account key, when specifying
credentialType=SHARED_ACCOUNT_KEY
and providingaccountName
andaccessKey
for your Azure account, this is the simplest way to get started. The accessKey can be generated through your Azure portal. Note that this is the default authentication strategy. -
Via shared storage account key, when specifying
credentialType=SHARED_KEY_CREDENTIAL
and providing a StorageSharedKeyCredential instance which can be injected intocredentials
option.
Usage
For example in order to get a message content from the queue messageQueue
in the storageAccount
storage account and, use the following snippet:
from("azure-storage-queue://storageAccount/messageQueue?accessKey=yourAccessKey").
to("file://queuedirectory");
Message Headers
The Azure Storage Queue Service component supports 16 message header(s), which is/are listed below:
Name | Description | Default | Type |
---|---|---|---|
CamelAzureStorageQueueRawHttpHeaders (common) Constant: |
Returns non-parsed httpHeaders that can be used by the user. |
HttpHeaders |
|
CamelAzureStorageQueueMetadata (producer) Constant: |
(createQueue) Metadata to associate with the queue. |
Map |
|
CamelAzureStorageQueueMessageId (common) Constant: |
The ID of the message. |
String |
|
CamelAzureStorageQueueInsertionTime (common) Constant: |
The time the Message was inserted into the Queue. |
OffsetDateTime |
|
CamelAzureStorageQueueExpirationTime (common) Constant: |
The time that the Message will expire and be automatically deleted. |
OffsetDateTime |
|
CamelAzureStorageQueuePopReceipt (producer) Constant: |
(deleteMessage, updateMessage) Unique identifier that must match for the message to be deleted or updated. If deletion fails using this pop receipt then the message has been dequeued by another client. |
String |
|
CamelAzureStorageQueueTimeNextVisible (common) Constant: |
The time that the message will again become visible in the Queue. |
OffsetDateTime |
|
CamelAzureStorageQueueDequeueCount (common) Constant: |
The number of times the message has been dequeued. |
long |
|
CamelAzureStorageQueueOperation (producer) Constant: |
(All) Specify the producer operation to execute, please see the doc on this page related to producer operation. Enum values:
|
QueueOperationDefinition |
|
CamelAzureStorageQueueName (producer) Constant: |
(All) Override the queue name. |
String |
|
CamelAzureStorageQueueSegmentOptions (producer) Constant: |
(listQueues) Options for listing queues. |
QueuesSegmentOptions |
|
CamelAzureStorageQueueTimeout (producer) Constant: |
(All) An optional timeout value beyond which a RuntimeException will be raised. |
Duration |
|
CamelAzureStorageQueueMaxMessages (producer) Constant: |
(receiveMessages, peekMessages) Maximum number of messages to get, if there are less messages exist in the queue than requested all the messages will be returned. If left empty only 1 message will be retrieved, the allowed range is 1 to 32 messages. |
Integer |
|
CamelAzureStorageQueueVisibilityTimeout (producer) Constant: |
(sendMessage, receiveMessages, updateMessage) The timeout period for how long the message is invisible in the queue. If unset the value will default to 0 and the message will be instantly visible. The timeout must be between 0 seconds and 7 days. |
Duration |
|
CamelAzureStorageQueueTimeToLive (producer) Constant: |
(sendMessage) How long the message will stay alive in the queue. If unset the value will default to 7 days, if -1 is passed the message will not expire. The time to live must be -1 or any positive number. |
Duration |
|
CamelAzureStorageQueueCreateQueue (producer) Constant: |
(sendMessage) When is set to true, the queue will be automatically created when sending messages to the queue. |
boolean |
Advanced Azure Storage Queue configuration
If your Camel Application is running behind a firewall or if you need to
have more control over the QueueServiceClient
instance configuration, you can
create your own instance:
StorageSharedKeyCredential credential = new StorageSharedKeyCredential("yourAccountName", "yourAccessKey");
String uri = String.format("https://%s.queue.core.windows.net", "yourAccountName");
QueueServiceClient client = new QueueServiceClientBuilder()
.endpoint(uri)
.credential(credential)
.buildClient();
// This is camel context
context.getRegistry().bind("client", client);
Then refer to this instance in your Camel azure-storage-queue
component configuration:
from("azure-storage-queue://cameldev/queue1?serviceClient=#client")
.to("file://outputFolder?fileName=output.txt&fileExist=Append");
Automatic detection of QueueServiceClient client in registry
The component is capable of detecting the presence of an QueueServiceClient bean into the registry. If it’s the only instance of that type it will be used as client and you won’t have to define it as uri parameter, like the example above. This may be really useful for smarter configuration of the endpoint.
Azure Storage Queue Producer operations
Camel Azure Storage Queue component provides wide range of operations on the producer side:
Operations on the service level
For these operations, accountName
is required.
Operation | Description |
---|---|
|
Lists the queues in the storage account that pass the filter starting at the specified marker. |
Operations on the queue level
For these operations, accountName
and queueName
are required.
Operation | Description |
---|---|
|
Creates a new queue. |
|
Permanently deletes the queue. |
|
Deletes all messages in the queue.. |
|
Default Producer Operation Sends a message with a given time-to-live and a timeout period where the message is invisible in the queue. The message text is evaluated from the exchange message body.
By default, if the queue doesn`t exist, it will create an empty queue first. If you want to disable this, set the config |
|
Deletes the specified message in the queue. |
|
Retrieves up to the maximum number of messages from the queue and hides them from other operations for the timeout period. However it will not dequeue the message from the queue due to reliability reasons. |
|
Peek messages from the front of the queue up to the maximum number of messages. |
|
Updates the specific message in the queue with a new message and resets the visibility timeout. The message text is evaluated from the exchange message body. |
Refer to the example section in this page to learn how to use these operations into your camel application.
Consumer Examples
To consume a queue into a file component with maximum 5 messages in one batch, this can be done like this:
from("azure-storage-queue://cameldev/queue1?serviceClient=#client&maxMessages=5")
.to("file://outputFolder?fileName=output.txt&fileExist=Append");
Producer Operations Examples
-
listQueues
:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g, to only returns list of queues with 'awesome' prefix:
exchange.getIn().setHeader(QueueConstants.QUEUES_SEGMENT_OPTIONS, new QueuesSegmentOptions().setPrefix("awesome"));
})
.to("azure-storage-queue://cameldev?serviceClient=#client&operation=listQueues")
.log("${body}")
.to("mock:result");
-
createQueue
:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g:
exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
})
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=createQueue");
-
deleteQueue
:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g:
exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
})
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteQueue");
-
clearQueue
:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g:
exchange.getIn().setHeader(QueueConstants.QUEUE_NAME, "overrideName");
})
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=clearQueue");
-
sendMessage
:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g:
exchange.getIn().setBody("message to send");
// we set a visibility of 1min
exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1));
})
.to("azure-storage-queue://cameldev/test?serviceClient=#client");
-
deleteMessage
:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g:
// Mandatory header:
exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1");
// Mandatory header:
exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1");
})
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=deleteMessage");
-
receiveMessages
:
from("direct:start")
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=receiveMessages")
.process(exchange -> {
final List<QueueMessageItem> messageItems = exchange.getMessage().getBody(List.class);
messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText()));
})
.to("mock:result");
-
peekMessages
:
from("direct:start")
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=peekMessages")
.process(exchange -> {
final List<PeekedMessageItem> messageItems = exchange.getMessage().getBody(List.class);
messageItems.forEach(messageItem -> System.out.println(messageItem.getMessageText()));
})
.to("mock:result");
-
updateMessage
:
from("direct:start")
.process(exchange -> {
// set the header you want the producer to evaluate, refer to the previous
// section to learn about the headers that can be set
// e.g:
exchange.getIn().setBody("new message text");
// Mandatory header:
exchange.getIn().setHeader(QueueConstants.MESSAGE_ID, "1");
// Mandatory header:
exchange.getIn().setHeader(QueueConstants.POP_RECEIPT, "PAAAAHEEERXXX-1");
// Mandatory header:
exchange.getIn().setHeader(QueueConstants.VISIBILITY_TIMEOUT, Duration.ofMinutes(1));
})
.to("azure-storage-queue://cameldev/test?serviceClient=#client&operation=updateMessage");
Development Notes (Important)
When developing on this component, you will need to obtain your Azure accessKey in order to run the integration tests. In addition to the mocked unit tests you will need to run the integration tests with every change you make or even client upgrade as the Azure client can break things even on minor versions upgrade. To run the integration tests, on this component directory, run the following maven command:
mvn verify -DaccountName=myacc -DaccessKey=mykey
Whereby accountName
is your Azure account name and accessKey
is the access key being generated from Azure portal.