This project provides a Redis Stream-based SmallRye Reactive Messaging connector that enables the use of the MicroProfile Reactive Messaging API over Redis Stream. The implementation allows seamless integration of asynchronous data streams into WildFly and Quarkus-based applications.
For more details on MicroProfile Reactive Messaging, check:
For information on SmallRye Reactive Messaging, visit:
1. Modules
The project consists of the following modules:
-
quarkus-reactive-messaging-redisstream-extension-sample
: A sample application demonstrating the usage of the Redis Streams extension. -
quarkus-reactive-messaging-redisstream-extension-parent
: A custom Quarkus extension that integrates Redis Streams as a reactive messaging connector, utilizing the core Quarkus Redis extension for managing Redis connections. -
reactive-messaging-redisstream-connector
: Provides the core connector logic to integrate Redis Streams API with Reactive Messaging Specification. It does not directly build on any specific Redis library, but rather provides an interface that conforms to the Redis Streams commands.
2. Connector overview
The connector core module aims to integrate the Redis Streams API with the Reactive Messaging Specification without depending on a specific Redis SDK.
2.1. Consuming Messages
The inbound connector reads messages from a Redis Stream as a consumer group using the XREADGROUP
command.
2.1.1. Consumer Group Setup
-
If the specified consumer group does not exist, it is automatically created.
-
Each connector instance uses a unique consumer ID (UUID) to track message processing.
2.1.2. Incoming Message
Incoming stream entries are converted to org.eclipse.microprofile.reactive.messaging.Message
instances with a String payload and IncomingRedisStreamMetadata
metadata.
-
The payload is extracted from the stream entries'
message
field by default.-
The field can be configured via the
payload-field
configuration property (Optional).
-
-
IncomingRedisStreamMetadata
consists of:-
The stream key from which the message was read.
-
Stream entry ID (as generated on Redis).
-
All additional fields (excluding the payload-field) in a map.
-
2.1.3. Message Processing
-
Stream entries are acknowledged on Redis using (
XACK
) if theorg.eclipse.microprofile.reactive.messaging.Message
has been acked as per the Microprofile Reactive Messaging specification. You can use the@Acknowledgment
annotation to control the acknowledgment behavior. -
Expired messages (based on the
ttl
field) are automatically acknowledged and skipped. -
XREADGROUP
commands are retried with exponential backoff (1s to 30s) on processing failures.
Example Configuration
mp.messaging.incoming.my-channel.connector=reactive-messaging-redis-streams (1)
mp.messaging.incoming.my-channel.stream-key=mystream (2)
mp.messaging.incoming.my-channel.group=mygroup (3)
mp.messaging.incoming.my-channel.payload-field=message (4)
mp.messaging.incoming.my-channel.xread-count=10 (5)
mp.messaging.incoming.my-channel.xread-block-ms=10000 (6)
mp.messaging.incoming.in-reactive.xread-noack=false (7)
1 | Activate the redis stream connector for the incoming channel called my-channel . |
2 | The stream key on redis to read messages from. |
3 | The consumer group name. |
4 | The field name to extract the payload from the stream entry. |
5 | The number of messages to read in a single XREADGROUP call.
The COUNT parameter of the XREADGROUP command. |
6 | The milliseconds to block during XREADGROUP calls.
The BLOCK parameter of the XREADGROUP command. |
7 | Include NOACK option in the XREADGROUP calls. |
2.2. Producing Messages
The outbound connector publishes messages to a Redis Stream using XADD
.
2.2.1. Outgoing Message
Outgoing org.eclipse.microprofile.reactive.messaging.Message
instances with a String payload are converted to stream entries.
-
The payload is sent as the stream entries'
message
field by default.-
The field can be configured via the
payload-field
configuration property (Optional).
-
-
Additional fields can be added via
RedisStreamMetadata
.
@Outgoing("out-channel")
public Message<String> produce() {
return Message.of("Hello World!")
.addMetadata(new RedisStreamMetadata()
.withAdditionalField("additionalKey", "additional value")
);
// The produced stream entry will be like:
// 1) 1) "out-stream"
// 2) 1) 1) "1739262584638-0"
// 2) 1) "message"
// 2) "Hello World!"
// 3) "additionalKey"
// 4) "Test-additional value"
}
2.2.2. Stream Trimming
-
Use
xadd-maxlen
to trim by entry count (MAXLEN
parameter of theXADD
command).-
Exact trimming can be enabled by setting
xadd-exact-maxlen
totrue
(defaults to false since almost exact is more efficient).
-
-
Use
xadd-ttl-ms
to compute an expiration timestamp-
Adds a
ttl
field with a value of (current epoch time + TTL), in order to consumers be able to skip expired messages. -
It also sets a minimum ID for stream trimming (
MINID
parameter of theXADD
command).
-
-
If both are set,
xadd-maxlen
takes precedence.
Example Configuration
mp.messaging.outgoing.out-channel.connector=reactive-messaging-redis-streams (1)
mp.messaging.outgoing.out-channel.stream-key=mystream (2)
mp.messaging.incoming.out-channel.payload-field=message (3)
mp.messaging.outgoing.out-channel.xadd-maxlen=1000 (4)
mp.messaging.outgoing.out-channel.xadd-exact-maxlen=true (5)
#mp.messaging.outgoing.out-channel.xadd-ttl-ms=1000 (6)
1 | Activate the redis stream connector for the incoming channel called my-channel . |
2 | The stream key on redis to read messages from. |
3 | The field name to use as the payload in the stream entry. |
4 | The maximum number of entries to keep in the stream. |
5 | Enable exact trimming by entry count. |
6 | Possible trimming based on milliseconds to set the TTL for the stream entry. |
2.3. Graceful Shutdown
On shutdown:
-
New message consumption stops immediately.
-
In-flight messages are given up to
graceful-timeout-ms
(default: 60000ms) to complete. -
Redis connections are closed after timeout or all messages are processed.
Configure the timeout via:
mp.messaging.connector.reactive-messaging-redis-streams.graceful-timeout-ms=30000
3. Configuration
The connector is identified by reactive-messaging-redis-streams
.
Below are the configuration attributes available for both inbound and outbound channels.
Attribute | Description | Default | Mandatory | Direction |
---|---|---|---|---|
|
The Redis connection key to use.
Can be implementation specific in case of the quarkus extension
it is the key used to define the redis connection according to the quarkus redis client.
For example if you use |
|
No |
INCOMING_AND_OUTGOING |
|
The Redis key holding the stream items. |
- |
Yes |
INCOMING_AND_OUTGOING |
|
The stream entry field name containing the message payload. |
|
No |
INCOMING_AND_OUTGOING |
|
The consumer group of the Redis stream to read from. |
- |
Yes |
INCOMING |
|
The maximum number of entries to receive per |
|
No |
INCOMING |
|
The milliseconds to block during |
|
No |
INCOMING |
|
Include the |
|
No |
INCOMING |
|
The maximum number of entries to keep in the stream (trims old entries). |
- |
No |
OUTGOING |
|
Use exact trimming for |
|
No |
OUTGOING |
|
Milliseconds to keep an entry in the stream (uses |
- |
No |
OUTGOING |
|
Milliseconds to wait for the consumed messages to finish processing before closing the consumer group. |
|
No |
CONNECTOR |
4. Examples
4.1. Consuming Messages
Example usage of the Redis Streams connector for consuming messages from a Redis Stream.
4.1.1. Pre-requisites:
-
Redis server running on localhost:6379 or dev-container from Redis extension.
4.1.2. Set up quarkus
Add the following dependency to your project:
<dependency>
<groupId>hu.icellmobilsoft.quarkus.extension</groupId>
<artifactId>quarkus-redisstream-extension</artifactId>
</dependency>
4.1.3. Configure the connector
mp:
messaging:
incoming:
in-channel: (1)
connector: reactive-messaging-redis-streams (2)
stream-key: mystream (3)
group: mygroup (4)
connection-key: my-redis-connection (5)
payload-field: message #optional defaults to 'message'
xread-count: 10 #optional defaults to '1'
xread-block-ms: 10000 #optional defaults to '5000'
xread-noack: false #optional defaults to 'true'
connector:
reactive-messaging-redis-streams:
graceful-timeout-ms: 10000 #optional defaults to '60000' (6)
quarkus:
redis:
my-redis-connection: (5)
hosts: redis://localhost:6379
1 | The incoming MP channel name. |
2 | Specify the connector to use. |
3 | The Redis stream key to read messages from. |
4 | The consumer group name. |
5 | The Redis connection reference to use. |
6 | Connector specific config to set the graceful timeout. |
4.1.4. Implement the consumer
Blocking implementation
@Incoming("in-channel") (1)
//@Blocking(ordered = false, value = "incoming-pool") (2)
//@Retry(maxRetries = 2) (3)
public void process(String message) {
// Process the message
}
1 | The incoming MP channel name. |
2 | Optional: Use the @Blocking annotation to specify parallel processing with a custom thread pool.
Pool size can be set via the smallrye.messaging.worker.incoming-pool.max-concurrency configuration key. |
3 | Optional: Use the @Retry annotation from MP Fault Tolerance to control method retry behavior. |
Reactive implementation
@Incoming("in-channel") (1)
public Uni<Void> consumeReactive(Message<String> message) {
return Uni.createFrom()
.item(message)
.invoke(this::processMessage) (2)
.replaceWithVoid();
}
private void processMessage(Message<String> message) {
// process the message
message.getMetadata()
.get(IncomingRedisStreamMetadata.class)
.ifPresent(this::processMetadata); (3)
message.ack(); (4)
}
private void processMetadata(IncomingRedisStreamMetadata metadata) { (3)
// process metadata
}
1 | The incoming MP channel name. |
2 | Process the message reactively. |
3 | Example on how to process the metadata. |
4 | Acknowledge the message manually. |
4.2. Producing Messages
Example usage of the Redis Streams connector for producing messages to a Redis Stream.
4.2.1. Pre-requisites:
-
Redis server running on localhost:6379 or dev-container from Redis extension.
4.2.2. Set up quarkus
Add the following dependency to your project:
<dependency>
<groupId>hu.icellmobilsoft.quarkus.extension</groupId>
<artifactId>quarkus-redisstream-extension</artifactId>
</dependency>
4.2.3. Configure the connector
mp:
messaging:
outgoing:
out-channel: (1)
connector: reactive-messaging-redis-streams (2)
stream-key: mystream (3)
connection-key: my-redis-connection (4)
payload-field: message #optional defaults to 'message'
# xadd-maxlen: 10 #optional
# xadd-exact-maxlen: true #optional defaults to 'false'
xadd-ttl-ms: 10000 #optional
quarkus:
redis:
my-redis-connection: (4)
hosts: redis://localhost:6379
1 | The outgoing MP channel name. |
2 | Specify the connector to use. |
3 | The Redis stream key to read messages from. |
4 | The Redis connection reference to use. |
4.2.4. Implement the Producer
Simple producer
@Inject
@Channel("out-channel") (1)
Emitter<String> emitter;
public void produceWithMetadata() {
emitter.send("Hello"); (2)
}
1 | The incoming MP channel name. |
2 | The message payload |
The resulting message will be like:
1) 1) "mystream" 2) 1) 1) "1739262584638-0" 2) 1) "message" 2) "Hello" 3) "ttl" 4) "1739262594638"
With custom metadata
@Inject
@Channel("out-channel") (1)
Emitter<String> emitter;
public void produceWithMetadata() {
emitter.send(
ContextAwareMessage.of("Hello") (2)
.addMetadata(new RedisStreamMetadata()
.withAdditionalField("otherKey", "Other value") (3)
));
}
1 | The incoming MP channel name. |
2 | The message payload |
3 | Additional fields |
The resulting message will be like:
1) 1) "mystream" 2) 1) 1) "1739262584638-0" 2) 1) "message" 2) "Hello" 3) "otherKey" 4) "Other value" 5) "ttl" 6) "1739262594638"
5. Migration Description
Next section describes the changes between releases.
5.1. 1.0.0 → 1.1.0
5.1.1. quarkus-reactive-messaging-redisstream-additional-fields module
-
Introduced the new
quarkus-reactive-messaging-redisstream-additional-fields
module for MDC support (i.e.extSessionId
) in the redis stream additional fields with overridable methods.
Migration
Changes are backwards compatible doesn’t need any migration.
5.1.2. Changes
-
OSS Sonatype → Maven Central repository migration
Migration
Changes are backwards compatible doesn’t need any migration.