Skip to content

Parcel

This extension for Channels implements a set of adapters to encode and decode data using the Parcel protocol.

Getting started

Install the package from NuGet

dotnet add package Faactory.Channels.Parcel

To enable the decoding and encoding of Parcel packets, use the following to register with the pipelines.

IChannelBuilder channel = ...;

channel.AddInputAdapter<ParcelDecoderAdapter>();
channel.AddOutputAdapter<ParcelEncoderAdapter>();

Data Types

The decoder adapter will decode binary packets and forward a Parcel.Message[] array, which you can then use on your own adapters and/or handlers.

Here's an example. Since the base class handles T <--> T[] type mutation, we can choose to handle the data as an array or as a single record. For this example, we'll use a single record.

public class MyMessageHandler : ChannelHandler<Message>
{
    public override Task ExecuteAsync( IChannelContext context, Message message )
    {
        // ...
    }
}

The encoder adapter will encode Parcel.Message objects into binary packets, which will be written to the channel's underlying socket.

Observables

It is possible to write to the channel and then wait for a specific response. This can be useful if we know how the server will reply to a particular message.

To make this work, we first need to register the message observer, which can be referred to later through dependency injection.

IChannelBuilder channel = ...;

channel.AddMessageObserver();

With the message observer registered, we can now create an observable instance for a particular identifier. Here's an example.

IChannel channel = ...;
IMessageObserver observer = ...;

/*
Sending the message below to the channel will result
in a reply with the identifier `my-reply`. Therefore,
we create an observable for that identifier.
*/
var observable = observer.CreateObservable( "my-reply" );

// send the message to the channel
await channel.WriteAsync( new Message
{
    // ...
} );

// tell the observable to wait for the reply
var replyMessage = await observable.WaitAsync();

if ( replyMessage == null )
{
    // the response is null if the timeout triggers
}

Note

The observable instance is only valid once. It cannot be reused. Once the observable value is set, it is removed from the observer.

This isn't enough though. We also need to tell the observer when to observe the messages received, since the handling is done by our own handlers. Let's alter the handler from an earlier example to reflect that.

public class MyMessageHandler : ChannelHandler<Message>
{
    private readonly IMessageObserver observer;

    public MyMessageHandler( IMessageObserver messageObserver )
    {
        /*
        our observer is injected
        */
        observer = messageObserver;
    }

    public override Task ExecuteAsync( IChannelContext context, Message message )
    {
        // ...

        // feed the message to the observer
        observer.Push( message );

        // ...
    }
}

That's it. Now the observer will be able to notify the waiting task when the message arrives.