Correlation
Correlation provides a structured way to implement request/response patterns over a channel.
It allows a component to send a message and asynchronously wait for a specific response that matches a predicate.
Correlation is optional and must be explicitly enabled.
Enabling Correlation
First, install the correlation package:
Then enable correlation support for a channel configuration:
Correlation is scoped per channel instance. Each channel maintains its own isolated response registry.
Feeding Incoming Messages
Incoming messages must be pushed into the correlation registry. This is typically done inside a handler.
public class MyMessageHandler( IChannelResponseRegistry registry ) : ChannelHandler<Message>
{
public override Task ExecuteAsync( IChannelContext context, Message message, CancellationToken cancellationToken )
{
registry.Push( message );
return Task.CompletedTask;
}
}
Important
Correlation should not be awaited inside middleware. Middleware remains reactive and should only feed incoming messages into the registry.
Waiting for a Response
Correlation is intended to be used from components that initiate a request and expect a response.
Typical usage scenarios include:
- Channel services
- Application-level services
- Client-side request/response flows
From a Channel Service
Inside a ChannelService, the registry can be injected directly:
public class MyService( IChannelResponseRegistry registry ) : ChannelService
{
protected override async Task ExecuteAsync( CancellationToken cancellationToken )
{
var awaiter = registry.Create<Message>(
m => m.Id == expectedId
);
await Channel.WriteAsync( request );
var response = await awaiter.WaitAsync( cancellationToken );
// handle response...
}
}
From Outside the Channel Scope
If you only have an IChannel instance, retrieve the registry from the channel:
var registry = channel.GetChannelResponseRegistry();
var awaiter = registry.Create<Message>(
m => m.Id == expectedId
);
await channel.WriteAsync( request );
var response = await awaiter.WaitAsync( cancellationToken );
The awaiter completes when:
- A pushed message matches the predicate
- The cancellation token is canceled
If canceled, WaitAsync throws OperationCanceledException.
Behavior and Guarantees
One-shot Awaiters
Each awaiter completes only once.
After completion (success or cancellation), it is automatically removed.
Fanout Semantics
If multiple awaiters match the same message, all matching awaiters complete.
Channel Isolation
Correlation is scoped per channel instance:
- No cross-channel interference
- Automatic cleanup when the channel closes
- Safe for concurrent usage
When to Use Correlation
Use correlation when:
- A request expects a specific reply
- Messages contain identifiers or correlation keys
- You need structured request/response behavior over streaming transports
Avoid correlation when:
- You are processing fire-and-forget messages
- Ordering alone is sufficient
- Responses are broadcast rather than targeted
Correlation complements the reactive pipeline model without altering it.