Parcel
This extension for Channels implements a set of adapters to encode and decode data using the Parcel protocol.
Getting started
Install the package from NuGet
To enable the decoding and encoding of Parcel packets, use the following to register with the pipelines.
IChannelBuilder channel = ...;
channel.AddInputAdapter<ParcelDecoderAdapter>();
channel.AddOutputAdapter<ParcelEncoderAdapter>();
Data Types
The decoder adapter will decode binary packets and forward a Parcel.Message[]
array, which you can then use on your own adapters and/or handlers.
Here's an example. Since the base class handles T <--> T[]
type mutation, we can choose to handle the data as an array or as a single record. For this example, we'll use a single record.
public class MyMessageHandler : ChannelHandler<Message>
{
public override Task ExecuteAsync( IChannelContext context, Message message )
{
// ...
}
}
The encoder adapter will encode Parcel.Message
objects into binary packets, which will be written to the channel's underlying socket.
Observables
It is possible to write to the channel and then wait for a specific response. This can be useful if we know how the server will reply to a particular message.
To make this work, we first need to register the message observer, which can be referred to later through dependency injection.
With the message observer registered, we can now create an observable instance for a particular identifier. Here's an example.
IChannel channel = ...;
IMessageObserver observer = ...;
/*
Sending the message below to the channel will result
in a reply with the identifier `my-reply`. Therefore,
we create an observable for that identifier.
*/
var observable = observer.CreateObservable( "my-reply" );
// send the message to the channel
await channel.WriteAsync( new Message
{
// ...
} );
// tell the observable to wait for the reply
var replyMessage = await observable.WaitAsync();
if ( replyMessage == null )
{
// the response is null if the timeout triggers
}
Note
The observable instance is only valid once. It cannot be reused. Once the observable value is set, it is removed from the observer.
This isn't enough though. We also need to tell the observer when to observe the messages received, since the handling is done by our own handlers. Let's alter the handler from an earlier example to reflect that.
public class MyMessageHandler : ChannelHandler<Message>
{
private readonly IMessageObserver observer;
public MyMessageHandler( IMessageObserver messageObserver )
{
/*
our observer is injected
*/
observer = messageObserver;
}
public override Task ExecuteAsync( IChannelContext context, Message message )
{
// ...
// feed the message to the observer
observer.Push( message );
// ...
}
}
That's it. Now the observer will be able to notify the waiting task when the message arrives.