Skip to content

Correlation

Correlation provides a structured way to implement request/response patterns over a channel.

It allows a component to send a message and asynchronously wait for a specific response that matches a predicate.

Correlation is optional and must be explicitly enabled.

Enabling Correlation

First, install the correlation package:

dotnet add package Faactory.Channels.Correlation

Then enable correlation support for a channel configuration:

services.AddChannels( channel =>
{
    channel.AddCorrelation();

    // other configuration...
} );

Correlation is scoped per channel instance. Each channel maintains its own isolated response registry.

Feeding Incoming Messages

Incoming messages must be pushed into the correlation registry. This is typically done inside a handler.

public class MyMessageHandler( IChannelResponseRegistry registry ) : ChannelHandler<Message>
{
    public override Task ExecuteAsync( IChannelContext context, Message message, CancellationToken cancellationToken )
    {
        registry.Push( message );

        return Task.CompletedTask;
    }
}

Important

Correlation should not be awaited inside middleware. Middleware remains reactive and should only feed incoming messages into the registry.

Waiting for a Response

Correlation is intended to be used from components that initiate a request and expect a response.

Typical usage scenarios include:

  • Channel services
  • Application-level services
  • Client-side request/response flows

From a Channel Service

Inside a ChannelService, the registry can be injected directly:

public class MyService( IChannelResponseRegistry registry ) : ChannelService
{
    protected override async Task ExecuteAsync( CancellationToken cancellationToken )
    {
        var awaiter = registry.Create<Message>(
            m => m.Id == expectedId
        );

        await Channel.WriteAsync( request );

        var response = await awaiter.WaitAsync( cancellationToken );

        // handle response...
    }
}

From Outside the Channel Scope

If you only have an IChannel instance, retrieve the registry from the channel:

var registry = channel.GetChannelResponseRegistry();

var awaiter = registry.Create<Message>(
    m => m.Id == expectedId
);

await channel.WriteAsync( request );

var response = await awaiter.WaitAsync( cancellationToken );

The awaiter completes when:

  • A pushed message matches the predicate
  • The cancellation token is canceled

If canceled, WaitAsync throws OperationCanceledException.

Behavior and Guarantees

One-shot Awaiters

Each awaiter completes only once.
After completion (success or cancellation), it is automatically removed.

Fanout Semantics

If multiple awaiters match the same message, all matching awaiters complete.

Channel Isolation

Correlation is scoped per channel instance:

  • No cross-channel interference
  • Automatic cleanup when the channel closes
  • Safe for concurrent usage

When to Use Correlation

Use correlation when:

  • A request expects a specific reply
  • Messages contain identifiers or correlation keys
  • You need structured request/response behavior over streaming transports

Avoid correlation when:

  • You are processing fire-and-forget messages
  • Ordering alone is sufficient
  • Responses are broadcast rather than targeted

Correlation complements the reactive pipeline model without altering it.