Reliable Delivery over Akka.Cluster.Sharding
Tip
Please see "Reliable Message Delivery with Akka.Delivery" before reading this documentation. Akka.Cluster.Sharding.Delivery builds upon all of the concepts and tools implemented in the base Akka.Delivery APIs.
If you're using Akka.Cluster.Sharding to distribute state via one or more ShardRegions across your Akka.Cluster, Akka.Cluster.Sharding.Delivery can help you guarantee delivery of messages from the rest of your ActorSystems to each of your entity actors.
Point to Point Delivery
Akka.Cluster.Sharding.Delivery only uses point-to-point delivery mode from Akka.Delivery and message chunking is not supported in this mode.
Typed Messaging Protocol
Akka.Cluster.ShardingDelivery uses a .NET generic-typed protocol and the ShardingProducerController and ShardingConsumerController are also both strongly typed. This means that end-users need to organize their messages into "protocol groups" in order to be effective, like so:
public interface ICustomerProtocol
{
}
public record PurchaseItem(string ItemName) : ICustomerProtocol;
The common interface that all of the messages in this protocol implement is typically the type you'll want to use for your generic argument T in the Akka.Delivery or Akka.Cluster.Sharding.Delivery method calls and types, as shown below:
IActorRef producer = Sys.ActorOf(Props.Create(() => new ProducerActor()), "producer");
IActorRef producerController = Sys.ActorOf(ProducerController.Create<ICustomerProtocol>(Sys,
producerId: "producerController",
durableProducerQueue: Option<Props>.None));
producerController.Tell(new ProducerController.Start<ICustomerProtocol>(producer));
Built-in Actors and Messages

The Akka.Cluster.Sharding.Delivery relationship consists of the following actors:
Producer- this is a user-defined actor that is responsible for the production of messages. It receivesShardingProducerController.RequestNext<T>messages from theShardingProducerControllerwhen capacity is available to deliver additional messages.ShardingProducerController- this actor is built into Akka.Cluster.ShardingDelivery and does most of the work. You typically only need a singleShardingProducerControllerper-ActorSystem/ per-ShardRegion(or you can use Sharded Daemon Processes to host a fixed number of producers per-cluster.) TheShardingProducerControlleris responsible for spawning aProducerControllerper-entity and delivering those messages to theShardRegionIActorRef.ShardingConsumerController- this actor is built into Akka.Cluster.Sharding.Delivery and typically resides on the opposite site of the network from theShardingProducerController. This actor wraps around your normal Akka.Cluster.Sharding entity actors and is created directly by theShardRegioneach time an entity is messaged. TheShardingConsumerControllerwill spawn your entity actor directly and will additionally spawn aConsumerControllerfor each uniqueProducerIddetected in the message stream. Each of theConsumerControllers spawned by theShardingConsumerControllerwill deliver messages via their usualConsumerController.Delivery<T>to theConsumer.Consumer- this is your entity actor hosted via Akka.Cluster.Sharding. TheConsumerprocesses messages of typeTand must sendConsumerController.Confirmationmessages back to theConsumerControlleronce it has successfully processed eachConsumerController.Delivery<T>. TheConsumeractor is spawned by theShardingConsumerController.
Integration with ShardRegions
In order to make use of Akka.Cluster.Sharding.Delivery, we have to change the way we spawn our ShardRegion's entity actors:
// Depending on the role, we will start a shard or a shard proxy
await sharding.StartAsync(
typeName: "customer",
entityPropsFactory: e =>
// ShardingConsumerController guarantees processing of messages,
// even across process restarts / shutdowns or shard rebalancing
ShardingConsumerController.Create<Customer.ICustomerCommand>(
c => Props.Create(() => new Customer(e,c)),
ShardingConsumerController.Settings.Create(system)),
// .WithRole is important because we're dedicating a specific node role for
// the actors to be instantiated in; in this case, we're instantiating only
// in the "backend" roled nodes.
settings: ClusterShardingSettings.Create(system).WithRole(BackEndRole),
messageExtractor: new MessageExtractor(10));
- The
ShardingConsumerControllerneeds to be the actor initially created by theShardRegioneach time an entity is spawned; - The
ShardingConsumerController.Createmethod takes an argument of typeFunc<IActorRef, Props>- this allows you to pass in theIActorRefof theShardingConsumerControlleritself down to your entity actor, thePropsof which should be returned by this function. - The
Consumeractor must send aConsumerController.Start<T>message, typically duringPreStart, to theShardingConsumerControllerin order to trigger message delivery.
protected override void PreStart()
{
// signal that we're ready to consume messages
_consumerController.Tell(new ConsumerController.Start<ICustomerCommand>(Self));
}
Do this for each entity type / ShardRegion you wish to guarantee delivery for via the ShardingProducerController.
Next, we have to create our Producer and ShardingProducerController instances:
private static void ProduceMessages(ActorSystem system, IActorRef shardRegionProxy)
{
var producerId = "ProducerId1" + MurmurHash.StringHash(Cluster.Get(system).SelfAddress.ToString());
var shardingProducerController = system.ActorOf(ShardingProducerController.Create<Customer.ICustomerCommand>(
producerId,
shardRegionProxy,
Option<Props>.None,
ShardingProducerController.Settings.Create(system)), "shardingProducerController-1");
var producer = system.ActorOf(Props.Create(() => new Producer()), "msg-producer");
shardingProducerController.Tell(new ShardingProducerController.Start<Customer.ICustomerCommand>(producer));
}
- We have to launch our
ShardingProducerControllerand ourProduceractors - eachShardingProducerControllermust have a globally uniqueProducerIdvalue (similar to aPersistentId). ShardingProducerControllers can be optionally made persistent via the sameEventSourcedProducerQueuethat can be used by an individualProducerController.- The
ShardingProducerControllermust have a reference to theIActorRefof theShardRegionto which it will be delivering messages. - The
ShardingProducerControllermust receive aShardingProducerController.Start<T>message that contains theProducer'sIActorRefin order to begin message production.
Tip
Unlike Akka.Delivery, there is no need to have the ProducerController and ConsumerController explicitly register with the other - this will be handled automatically by the Akka.Cluster.Sharding messaging system.
Message Production
Once the Producer has been successfully registered with its ProducerController, it will begin to receive ShardingProducerController.RequestNext<T> messages - each time it receives one of these messages the Producer can send a burst of messages to the ShardingProducerController.
private void Active()
{
Receive<Produce>(_ =>
{
var customer = PickRandom(Customers);
var item = PickRandom(Items);
var msg = new Customer.PurchaseItem(item);
SendNext.Tell(new ShardingEnvelope(customer, msg));
Become(Idle); // wait for demand
});
Receive<ShardingProducerController.RequestNext<Customer.ICustomerCommand>>(next =>
{
// no work to do yet, but update SendNext
SendNext = next.SendNextTo;
});
}
Important
It is crucial that the Prodcuer send its messages of type T wrapped inside a ShardingEnvelope - otherwise the ShardingProducerController won't know which messages should be routed to which unique entityId. Additionally - your HashCodeMessageExtractor that you use with your ShardRegion must also be able to handle the ShardingEnvelope to ensure that this message is processed correctly on the receiving side. There is a proposal in-place to automate some of this work in a future release of Akka.NET: #6717.
One important distinction between ShardingProducerController.RequestNext<T> and ProducerController.RequestNext<T> - because the ShardingProducerController has to deliver to multiple Consumers, it retains a much larger outbound delivery buffer. You can check the status of which entities are currently buffered or which ones have active demand by inspecting the ShardingProducerController.RequestNext<T>.BufferedForEntitiesWithoutDemand or ShardingProducerController.RequestNext<T>.EntitiesWithDemand properties respectively.

Once the ShardingProducerController begins receiving messages of type T (wrapped in a ShardingEnvelope) from the Producer, it will spawn ProducerControllers for each unique entity and begin routing those messages to the ShardRegion.
Message Consumption
The ProducerControllers will all send ConsumerController.SequencedMessage<T> over the wire, wrapped inside ShardingEnvelopes - the ShardRegion must be programmed to handle these types:

public sealed class MessageExtractor : HashCodeMessageExtractor
{
public MessageExtractor(int maxNumberOfShards) : base(maxNumberOfShards)
{
}
public override string? EntityId(object message)
=> message switch
{
_ => null
};
public override object EntityMessage(object message)
=> message switch
{
_ => message
};
}
As the ConsumerController.SequencedMessage<T>s are delivered, the ShardRegion will spawn the ShardingConsumerController for each entity, which will in turn spawn the entity actor itself (the Consumer) as well as one ConsumerController per unique ProducerId. These actors are all cheap and maintain a finite amount of buffer space.
- The
Consumerreceives theConsumerController.Delivery<T>message from theShardingConsumerController; - The
Consumerreplies to theIActorRefstored insideConsumerController.Delivery<T>.DeliverTowith aConsumerController.Confirmedmessage - this marks the message as "processed;" - The
ConsumerControllermarks the message as delivered, removes it from the buffer, and requests additional messages from theProducerController; and - This in turn causes the
ShardingProducerControllerto update its aggregate state and send additionalShardingProducerController.RequestNext<T>demand to theProducer.
Guarantees, Constraints, and Caveats
See "Reliable Message Delivery with Akka.Delivery - Guarantees, Constraints, and Caveats" - these are the same in Akka.Cluster.Sharding.Delivery.
With one notable exception: Akka.Cluster.Sharding.Delivery does not support message chunking and there are no plans to add it at this time.
Durable Reliable Delivery over Akka.Cluster.Sharding
By default the ShardingProducerController will run using without any persistent storage - however, if you reference the Akka.Persistence library in your Akka.NET application then you can make use of the EventSourcedProducerQueue to ensure that your ShardingProducerController saves and un-acknowledged messages to your Akka.Persistence Journal and SnapshotStore.
var durableQueueProps = EventSourcedProducerQueue.Create<Job>(ProducerId, Sys);
var shardingProducerController =
Sys.ActorOf(
ShardingProducerController.Create<Job>(ProducerId, sharding, durableQueueProps,
ShardingProducerController.Settings.Create(Sys)), $"shardingProducerController-{_idCount}");
var producerProbe = CreateTestProbe();
shardingProducerController.Tell(new ShardingProducerController.Start<Job>(producerProbe.Ref));
Tip
The EventSourcedProducerQueue can be customized via the EventSourcedProducerQueue.Settings class - for instance, you can customize it to use a separate Akka.Persistence Journal and SnapshotStore.
Each time a message is sent to the ShardingProducerController it will persist a copy of the message to the Akka.Persistence journal.

Tip
All messages for all entities are stored in the same EventSourcedProducerQueue instance.
Confirmation of Outbound Messages Persisted
If the Producer needs to confirm that all of its outbound messages have been successfully persisted, this can be accomplished via the ShardingProducerController.RequestNext<T>.AskNextTo method:
ProducerController.RequestNext<ICustomerProtocol> request1 = (await producerProbe.ExpectMsgAsync<ProducerController.RequestNext<ICustomerProtocol>>(TimeSpan.FromSeconds(10)));
// confirm that message was stored in durable queue (so we know it will be redelivered if needed)
long seqNo1 = await request1.AskNextTo(new PurchaseItem("Burger"));
The AskNextTo method will return a Task<long> that will be completed once the message has been confirmed as stored inside the EventSourcedProducerQueue - the long in this case is the sequence number that has been assigned to this message via the ShardingProducerController's outbound queue.
In addition to outbound deliveries, confirmation messages from the ShardingConsumerController will also be persisted - and these will cause the EventSourcedProducerQueue to gradually compress its footprint in the Akka.Persistence journal by taking snapshots.
Tip
By default the EventSourcedProducerQueue will take a new snapshot every 1000 events but this can be configured via the EventSourcedProducerQueue.Settings class.
Edit this page