Reliable Message Delivery with Akka.Delivery
By default Akka.NET uses "at most once" message delivery between actors - this is sufficient for most purposes within Akka.NET, but there are instances where users may require strong delivery guarantees.
This is where Akka.Delivery can be helpful - it provides a robust set of tools for ensuring message delivery over the network, across actor restarts, and even across process restarts.
Akka.Delivery will ultimately support two different modes of reliable delivery:
- Point-to-point delivery - this works similarly to Akka.Persistence's
AtLeastOnceDeliveryActor
; messages are pushed by the producer to the consumer in real-time. - Pull-based delivery - this is not yet implemented in Akka.Delivery.
Point to Point Delivery
Point-to-point delivery is a great option for users who desire reliable, ordered delivery of messages over the network or across process restarts. Additionally, point to point delivery mode also supports "message chunking" - the ability to break up large messages into smaller, sequenced chunks that can be delivered over Akka.Remote connections without head-of-line blocking.
Tip
If you're interested in using Akka.Delivery in combination with Akka.Cluster.Sharding, please see "Reliable Delivery over Akka.Cluster.Sharding."
Built-in Actors and Messages
An Akka.Delivery relationship consists of 4 actors typically:
Producer
- this is a user-defined actor that is responsible for the production of messages. It receivesProducerController.RequestNext<T>
messages from theProducerController
when capacity is available to deliver additional messages.ProducerController
- this actor is built into Akka.Delivery and does most of the work. TheProducerController
sequences all incoming messages from theProducer
, delivers them to theConsumerController
, waits for acknowledgements that messages have been processed, and subsequently requests more messages from theProducer
.ConsumerController
- this actor is also built into Akka.Delivery and typically resides on the opposite site of the network from theProducerController
. TheConsumerController
is responsible for buffering unprocessed messages, delivering messages for processing viaConsumerController.Delivery<T>
to theConsumer
, receiving confirmation that theConsumer
has successfully processed the most recent message, and then subsequently requesting additional messages from theProducerController
.Consumer
- this is a user-defined actor that is ultimately responsible for consuming messages of typeT
and sendingConsumerController.Confirmation
messages back to theConsumerController
once it has successfully processed eachConsumerController.Delivery<T>
.
Typed Messaging Protocol
Akka.Delivery uses a .NET generic-typed protocol and the ProducerController
and ConsumerController
are also both strongly typed. This means that end-users need to organize their messages into "protocol groups" in order to be effective, like so:
public interface ICustomerProtocol
{
}
public record PurchaseItem(string ItemName) : ICustomerProtocol;
The common interface that all of the messages in this protocol implement is typically the type you'll want to use for your generic argument T
in the Akka.Delivery or Akka.Cluster.Sharding.Delivery method calls and types, as shown below:
IActorRef producer = Sys.ActorOf(Props.Create(() => new ProducerActor()), "producer");
IActorRef producerController = Sys.ActorOf(ProducerController.Create<ICustomerProtocol>(Sys,
producerId: "producerController",
durableProducerQueue: Option<Props>.None));
producerController.Tell(new ProducerController.Start<ICustomerProtocol>(producer));
Registration Flow
In order for the ProducerController
and the ConsumerController
to begin exchanging messages with each other, the respective actors must register with each other:
- The
Producer
must send aProducerController.Start<T>
message to theProducerController
in order to begin receivingProducerController.RequestNext<T>
messages; - The
Consumer
must send aConsumerController.Start<T>
message to theConsumerController
in order to receiveConsumerController.Delivery<T>
messages; and - The
ConsumerController
needs to register with theProducerController
- this can be accomplished either by sending a reference to theProducerController
to theConsumerController
via aConsumerController.RegisterToProducerController<T>
message or by sending theProducerController
aProducerController.RegisterConsumer<T>
message. Either one will work.
The ProducerController
registration flow is pretty simple:
IActorRef producer = Sys.ActorOf(Props.Create(() => new ProducerActor()), "producer");
IActorRef producerController = Sys.ActorOf(ProducerController.Create<ICustomerProtocol>(Sys,
producerId: "producerController",
durableProducerQueue: Option<Props>.None));
producerController.Tell(new ProducerController.Start<ICustomerProtocol>(producer));
As is the ConsumerController
registration flow:
TestProbe endProbe = CreateTestProbe();
// stop after 3 messages
IActorRef consumer = Sys.ActorOf(
Props.Create(() => new CustomerActor("customer1", endProbe, 3)), "consumer1");
IActorRef consumerController =
Sys.ActorOf(ConsumerController.Create<ICustomerProtocol>(Sys, Option<IActorRef>.None),
"consumerController");
consumerController.Tell(new ConsumerController.Start<ICustomerProtocol>(consumer));
consumerController.Tell(
new ConsumerController.RegisterToProducerController<ICustomerProtocol>(producerController));
Message Production
Once the registration flow is completed, all that's needed is for the Producer
to begin delivering messages to the ProducerController
after it receives an initial ProducerController.RequestNext<T>
message.
- The
ProducerController
sends aProducerController.RequestNext<T>
message to theProducer
when delivery capacity is available; - The
Producer
messages theIActorRef
stored atProducerController.RequestNext<T>.SendNextTo
with a message of typeT
; - The
ProducerController
will sequence (and optionally chunk) the incoming messages and deliver them to theConsumerController
on the other side of the network; - The
ConsumerController
will transmit the received messages in the order in which they were received to theConsumer
via aConsumerController.Delivery<T>
message.
The Producer
actor is ultimately responsible for managing back-pressure inside the Akka.Delivery system - a message of type T
cannot be sent to the ProducerController
until the Producer
receives a message of type ProducerController.RequestNext<T>
:
protected override void OnReceive(object message)
{
switch (message)
{
case ProducerController.RequestNext<ICustomerProtocol> requestNext:
{
// deliver next item once timer Tick is received
Become(Active(requestNext.SendNextTo));
break;
}
case Tick:
{
// ignore;
break;
}
default:
Unhandled(message);
break;
}
}
Important
If a Producer
tries to send a message of type T
to the ProducerController
before receiving a ProducerController.RequestNext<T>
message, the ProducerController
will log an error and crash. The appropriate way to handle this backpressure is typically through a combination of actor behavior switching and stashing.
The ProducerController
maintains a buffer of unacknowledged messages - in the event that the ConsumerController
restarts the ProducerController
will re-deliver all unacknowledged messages to it and restart the delivery process.
Note
ProducerController.RequestNext<T>
messages aren't sent 1:1 with messages successfully consumed by the Consumer
- the Akka.Delivery system allows for some buffering inside the ProducerController
and ConsumerController
, but the system constrains it to 10s of messages in order to conserve memory across a large number of concurrent Akka.Delivery instances. In practice this means that the flow control of Akka.Delivery will always allow the Producer
to run slightly ahead of a slower Consumer
.
Message Consumption
It's the job of the Consumer
, a user-defined actor, to mark the messages it's received as processed so the reliable delivery system can move forward and deliver the next message in the sequence:
- The
Consumer
receives theConsumerController.Delivery<T>
message from theConsumerController
; - The
Consumer
replies to theIActorRef
stored insideConsumerController.Delivery<T>.DeliverTo
with aConsumerController.Confirmed
message - this marks the message as "processed;" and - The
ConsumerController
marks the message as delivered, removes it from the buffer, and requests additional messages from theProducerController
.
case ConsumerController.Delivery<ICustomerProtocol> { Message: PurchaseItem purchase } delivery:
_purchasedItems.Add(purchase.ItemName);
_log.Info($"'{_customerId}' purchased '{purchase.ItemName}'");
delivery.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
// terminate after receiving enough messages
if (_purchasedItems.Count == _endMessages)
{
_endProbe.Tell(_purchasedItems);
Context.Stop(Self);
}
break;
The ConsumerController
also maintains a buffer of unacknowledged messages received from the ProducerController
- those messages will be delivered to the Consumer
each time a ConsumerController.Confirmed
is received by the ConsumerController
.
There's no time limit on how long the Consumer
has to process each message - any additional messages sent or resent by the ProducerController
to the ConsumerController
will be buffered until the Consumer
acknowledges the current message.
Important
In the event that the Consumer
actor terminates, the ConsumerController
actor delivering messages to it will also self-terminate. The ProducerController
, however, will continue running while it waits for a new ConsumerController
instance to register.
Guarantees, Constraints, and Caveats
It's the goal of Akka.Delivery to ensure that all messages are successfully processed by the Consumer
in the order that the Producer
originally sent.
- The
ConsumerController.Settings.ResendIntervalMin
andConsumerController.Settings.ResendIntervalMax
will determine how often theConsumerController
re-requests messages from theProducerController
. - The
ProducerController
buffers all messages (and can optionally persist them) until they are marked as consumed; and - The
ProducerController
and theConsumerController
can both restart independently from each other - if theProducerController
has a durable (persistent) queue configured there will be no message loss in this event. - The only scenario in which duplicates will be delivered to the
Consumer
:ProducerController
with durable queue restarts before it can process confirmations from theConsumerController
- in that scenario (an edge case of an edge case) it's possible that previously confirmed messages will be re-delivered. However, they will be the first messages immediately received by the consumer upon theProducerController
restarting. - The
Producer
can restart independently from theProducerController
.
Note
The Producer
and ProducerController
must reside inside the same ActorSystem
(the ProducerController
) asserts this. Likewise, the Consumer
and the ConsumerController
must reside inside the same ActorSystem
(the ConsumerController
asserts this.)
Chunking Large Messages
One very useful feature of point-to-point message delivery is message chunking: the ability to break up large messages into an ordered sequence of fixed-size byte arrays. Akka.Delivery guarantees that chunks will be transmitted and received in-order over the wire - and when using the DurableQueue
with the ProducerController
chunked messages will be re-chunked and re-transmitted upon restart.
Chunking of messages is disabled by default.
Tip
This feature is very useful for eliminating head-of-line-blocking in Akka.Remote.
To enable message chunking you can set the following HOCON value in your ActorSystem
configuration:
akka.reliable-delvery.producer-controller.chunk-large-messages = 100b #100b chunks, off by default
Or by setting the ProducerController.Settiongs.ChunkLargeMessagesBytes
property to a value greater than 0.
IActorRef producer = Sys.ActorOf(Props.Create(() => new ProducerActor()), "producer");
ProducerController.Settings settings = ProducerController.Settings.Create(Sys) with
{
ChunkLargeMessagesBytes = 10 // chunk messages into 10b segments
};
IActorRef producerController = Sys.ActorOf(ProducerController.Create<ICustomerProtocol>(Sys,
producerId: "producerController",
durableProducerQueue: Option<Props>.None,
settings: settings));
producerController.Tell(new ProducerController.Start<ICustomerProtocol>(producer));
When chunking is enabled, the ProducerController
will use the Akka.NET serialization system to convert each message sent by the Producer
into a byte[]
which will be chunked into [1,N] ProducerController.Settiongs.ChunkLargeMessagesBytes
-sized arrays.
Each of these chunks will be transmitted individually, in-order and subsequently buffered by the ConsumerController
.
Once the last chunk has been received by the ConsumerController
the original message will be deserialized using Akka.NET serialization again. Provided that serialization was successful, the message will be delivered just like a normal ConsumerController.Delivery<T>
to the Consumer
.
Once the Consumer
has received the ConsumerController.Delivery<T>
and sent a ConsumerController.Confirmed
to the ConsumerController
, all chunks will be marked as received and freed from memory on both ends of the network.
Tip
Your akka.reliable-delvery.producer-controller.chunk-large-messages
size should always be smaller than your akka.remote.dot-netty.tcp.maximum-frame-size
.
Durable Reliable Delivery
By default the ProducerController
will run using without any persistent storage - however, if you reference the Akka.Persistence library in your Akka.NET application then you can make use of the EventSourcedProducerQueue
to ensure that your ProducerController
saves any un-acknowledged messages to your Akka.Persistence Journal and SnapshotStore.
TestProbe producerProbe = CreateTestProbe();
Props eventSourcedProducerQueue =
EventSourcedProducerQueue.Create<ICustomerProtocol>(persistentId: "durableQueue-1", Sys);
IActorRef producerController = Sys.ActorOf(ProducerController.Create<ICustomerProtocol>(Sys,
producerId: "producerController",
durableProducerQueue: eventSourcedProducerQueue));
producerController.Tell(new ProducerController.Start<ICustomerProtocol>(producerProbe.Ref));
Tip
The EventSourcedProducerQueue
can be customized via the EventSourcedProducerQueue.Settings
class - for instance, you can customize it to use a separate Akka.Persistence Journal and SnapshotStore.
Each time a message is sent to the ProducerController
it will persist a copy of the message to the Akka.Persistence journal.
Confirmation of Outbound Messages Persisted
If the Producer
needs to confirm that all of its outbound messages have been successfully persisted, this can be accomplished via the ProducerController.RequestNext<T>.AskNextTo
method:
ProducerController.RequestNext<ICustomerProtocol> request1 = (await producerProbe.ExpectMsgAsync<ProducerController.RequestNext<ICustomerProtocol>>());
// confirm that message was stored in durable queue (so we know it will be redelivered if needed)
long seqNo1 = await request1.AskNextTo(new PurchaseItem("Burger"));
The AskNextTo
method will return a Task<long>
that will be completed once the message has been confirmed as stored inside the EventSourcedProducerQueue
- the long
in this case is the sequence number that has been assigned to this message via the ProducerController
's outbound queue.
In addition to outbound deliveries, confirmation messages from the ConsumerController
will also be persisted - and these will cause the EventSourcedProducerQueue
to gradually compress its footprint in the Akka.Persistence journal by taking snapshots.
Tip
By default the EventSourcedProducerQueue
will take a new snapshot every 1000 events but this can be configured via the EventSourcedProducerQueue.Settings
class.