Reliable Delivery over Akka.Cluster.Sharding
Tip
Please see "Reliable Message Delivery with Akka.Delivery" before reading this documentation. Akka.Cluster.Sharding.Delivery builds upon all of the concepts and tools implemented in the base Akka.Delivery APIs.
If you're using Akka.Cluster.Sharding to distribute state via one or more ShardRegion
s across your Akka.Cluster, Akka.Cluster.Sharding.Delivery can help you guarantee delivery of messages from the rest of your ActorSystem
s to each of your entity actors.
Point to Point Delivery
Akka.Cluster.Sharding.Delivery only uses point-to-point delivery mode from Akka.Delivery and message chunking is not supported in this mode.
Typed Messaging Protocol
Akka.Cluster.ShardingDelivery uses a .NET generic-typed protocol and the ShardingProducerController
and ShardingConsumerController
are also both strongly typed. This means that end-users need to organize their messages into "protocol groups" in order to be effective, like so:
public interface ICustomerProtocol
{
}
public record PurchaseItem(string ItemName) : ICustomerProtocol;
The common interface that all of the messages in this protocol implement is typically the type you'll want to use for your generic argument T
in the Akka.Delivery or Akka.Cluster.Sharding.Delivery method calls and types, as shown below:
IActorRef producer = Sys.ActorOf(Props.Create(() => new ProducerActor()), "producer");
IActorRef producerController = Sys.ActorOf(ProducerController.Create<ICustomerProtocol>(Sys,
producerId: "producerController",
durableProducerQueue: Option<Props>.None));
producerController.Tell(new ProducerController.Start<ICustomerProtocol>(producer));
Built-in Actors and Messages
The Akka.Cluster.Sharding.Delivery relationship consists of the following actors:
Producer
- this is a user-defined actor that is responsible for the production of messages. It receivesShardingProducerController.RequestNext<T>
messages from theShardingProducerController
when capacity is available to deliver additional messages.ShardingProducerController
- this actor is built into Akka.Cluster.ShardingDelivery and does most of the work. You typically only need a singleShardingProducerController
per-ActorSystem
/ per-ShardRegion
(or you can use Sharded Daemon Processes to host a fixed number of producers per-cluster.) TheShardingProducerController
is responsible for spawning aProducerController
per-entity and delivering those messages to theShardRegion
IActorRef
.ShardingConsumerController
- this actor is built into Akka.Cluster.Sharding.Delivery and typically resides on the opposite site of the network from theShardingProducerController
. This actor wraps around your normal Akka.Cluster.Sharding entity actors and is created directly by theShardRegion
each time an entity is messaged. TheShardingConsumerController
will spawn your entity actor directly and will additionally spawn aConsumerController
for each uniqueProducerId
detected in the message stream. Each of theConsumerController
s spawned by theShardingConsumerController
will deliver messages via their usualConsumerController.Delivery<T>
to theConsumer
.Consumer
- this is your entity actor hosted via Akka.Cluster.Sharding. TheConsumer
processes messages of typeT
and must sendConsumerController.Confirmation
messages back to theConsumerController
once it has successfully processed eachConsumerController.Delivery<T>
. TheConsumer
actor is spawned by theShardingConsumerController
.
Integration with ShardRegions
In order to make use of Akka.Cluster.Sharding.Delivery, we have to change the way we spawn our ShardRegion
's entity actors:
// Depending on the role, we will start a shard or a shard proxy
await sharding.StartAsync(
typeName: "customer",
entityPropsFactory: e =>
// ShardingConsumerController guarantees processing of messages,
// even across process restarts / shutdowns or shard rebalancing
ShardingConsumerController.Create<Customer.ICustomerCommand>(
c => Props.Create(() => new Customer(e,c)),
ShardingConsumerController.Settings.Create(system)),
// .WithRole is important because we're dedicating a specific node role for
// the actors to be instantiated in; in this case, we're instantiating only
// in the "backend" roled nodes.
settings: ClusterShardingSettings.Create(system).WithRole(BackEndRole),
messageExtractor: new MessageExtractor(10));
- The
ShardingConsumerController
needs to be the actor initially created by theShardRegion
each time an entity is spawned; - The
ShardingConsumerController.Create
method takes an argument of typeFunc<IActorRef, Props>
- this allows you to pass in theIActorRef
of theShardingConsumerController
itself down to your entity actor, theProps
of which should be returned by this function. - The
Consumer
actor must send aConsumerController.Start<T>
message, typically duringPreStart
, to theShardingConsumerController
in order to trigger message delivery.
protected override void PreStart()
{
// signal that we're ready to consume messages
_consumerController.Tell(new ConsumerController.Start<ICustomerCommand>(Self));
}
Do this for each entity type / ShardRegion
you wish to guarantee delivery for via the ShardingProducerController
.
Next, we have to create our Producer
and ShardingProducerController
instances:
private static void ProduceMessages(ActorSystem system, IActorRef shardRegionProxy)
{
var producerId = "ProducerId1" + MurmurHash.StringHash(Cluster.Get(system).SelfAddress.ToString());
var shardingProducerController = system.ActorOf(ShardingProducerController.Create<Customer.ICustomerCommand>(
producerId,
shardRegionProxy,
Option<Props>.None,
ShardingProducerController.Settings.Create(system)), "shardingProducerController-1");
var producer = system.ActorOf(Props.Create(() => new Producer()), "msg-producer");
shardingProducerController.Tell(new ShardingProducerController.Start<Customer.ICustomerCommand>(producer));
}
- We have to launch our
ShardingProducerController
and ourProducer
actors - eachShardingProducerController
must have a globally uniqueProducerId
value (similar to aPersistentId
). ShardingProducerController
s can be optionally made persistent via the sameEventSourcedProducerQueue
that can be used by an individualProducerController
.- The
ShardingProducerController
must have a reference to theIActorRef
of theShardRegion
to which it will be delivering messages. - The
ShardingProducerController
must receive aShardingProducerController.Start<T>
message that contains theProducer
'sIActorRef
in order to begin message production.
Tip
Unlike Akka.Delivery, there is no need to have the ProducerController
and ConsumerController
explicitly register with the other - this will be handled automatically by the Akka.Cluster.Sharding messaging system.
Message Production
Once the Producer
has been successfully registered with its ProducerController
, it will begin to receive ShardingProducerController.RequestNext<T>
messages - each time it receives one of these messages the Producer
can send a burst of messages to the ShardingProducerController
.
private void Active()
{
Receive<Produce>(_ =>
{
var customer = PickRandom(Customers);
var item = PickRandom(Items);
var msg = new Customer.PurchaseItem(item);
SendNext.Tell(new ShardingEnvelope(customer, msg));
Become(Idle); // wait for demand
});
Receive<ShardingProducerController.RequestNext<Customer.ICustomerCommand>>(next =>
{
// no work to do yet, but update SendNext
SendNext = next.SendNextTo;
});
}
Important
It is crucial that the Prodcuer
send its messages of type T
wrapped inside a ShardingEnvelope
- otherwise the ShardingProducerController
won't know which messages should be routed to which unique entityId
. Additionally - your HashCodeMessageExtractor
that you use with your ShardRegion
must also be able to handle the ShardingEnvelope
to ensure that this message is processed correctly on the receiving side. There is a proposal in-place to automate some of this work in a future release of Akka.NET: #6717.
One important distinction between ShardingProducerController.RequestNext<T>
and ProducerController.RequestNext<T>
- because the ShardingProducerController
has to deliver to multiple Consumer
s, it retains a much larger outbound delivery buffer. You can check the status of which entities are currently buffered or which ones have active demand by inspecting the ShardingProducerController.RequestNext<T>.BufferedForEntitiesWithoutDemand
or ShardingProducerController.RequestNext<T>.EntitiesWithDemand
properties respectively.
Once the ShardingProducerController
begins receiving messages of type T
(wrapped in a ShardingEnvelope
) from the Producer
, it will spawn ProducerController
s for each unique entity and begin routing those messages to the ShardRegion
.
Message Consumption
The ProducerController
s will all send ConsumerController.SequencedMessage<T>
over the wire, wrapped inside ShardingEnvelope
s - the ShardRegion
must be programmed to handle these types:
public sealed class MessageExtractor : HashCodeMessageExtractor
{
public MessageExtractor(int maxNumberOfShards) : base(maxNumberOfShards)
{
}
public override string? EntityId(object message)
=> message switch
{
_ => null
};
public override object EntityMessage(object message)
=> message switch
{
_ => message
};
}
As the ConsumerController.SequencedMessage<T>
s are delivered, the ShardRegion
will spawn the ShardingConsumerController
for each entity, which will in turn spawn the entity actor itself (the Consumer
) as well as one ConsumerController
per unique ProducerId
. These actors are all cheap and maintain a finite amount of buffer space.
- The
Consumer
receives theConsumerController.Delivery<T>
message from theShardingConsumerController
; - The
Consumer
replies to theIActorRef
stored insideConsumerController.Delivery<T>.DeliverTo
with aConsumerController.Confirmed
message - this marks the message as "processed;" - The
ConsumerController
marks the message as delivered, removes it from the buffer, and requests additional messages from theProducerController
; and - This in turn causes the
ShardingProducerController
to update its aggregate state and send additionalShardingProducerController.RequestNext<T>
demand to theProducer
.
Guarantees, Constraints, and Caveats
See "Reliable Message Delivery with Akka.Delivery - Guarantees, Constraints, and Caveats" - these are the same in Akka.Cluster.Sharding.Delivery.
With one notable exception: Akka.Cluster.Sharding.Delivery does not support message chunking and there are no plans to add it at this time.
Durable Reliable Delivery over Akka.Cluster.Sharding
By default the ShardingProducerController
will run using without any persistent storage - however, if you reference the Akka.Persistence library in your Akka.NET application then you can make use of the EventSourcedProducerQueue
to ensure that your ShardingProducerController
saves and un-acknowledged messages to your Akka.Persistence Journal and SnapshotStore.
var durableQueueProps = EventSourcedProducerQueue.Create<Job>(ProducerId, Sys);
var shardingProducerController =
Sys.ActorOf(
ShardingProducerController.Create<Job>(ProducerId, sharding, durableQueueProps,
ShardingProducerController.Settings.Create(Sys)), $"shardingProducerController-{_idCount}");
var producerProbe = CreateTestProbe();
shardingProducerController.Tell(new ShardingProducerController.Start<Job>(producerProbe.Ref));
Tip
The EventSourcedProducerQueue
can be customized via the EventSourcedProducerQueue.Settings
class - for instance, you can customize it to use a separate Akka.Persistence Journal and SnapshotStore.
Each time a message is sent to the ShardingProducerController
it will persist a copy of the message to the Akka.Persistence journal.
Tip
All messages for all entities are stored in the same EventSourcedProducerQueue
instance.
Confirmation of Outbound Messages Persisted
If the Producer
needs to confirm that all of its outbound messages have been successfully persisted, this can be accomplished via the ShardingProducerController.RequestNext<T>.AskNextTo
method:
ProducerController.RequestNext<ICustomerProtocol> request1 = (await producerProbe.ExpectMsgAsync<ProducerController.RequestNext<ICustomerProtocol>>());
// confirm that message was stored in durable queue (so we know it will be redelivered if needed)
long seqNo1 = await request1.AskNextTo(new PurchaseItem("Burger"));
The AskNextTo
method will return a Task<long>
that will be completed once the message has been confirmed as stored inside the EventSourcedProducerQueue
- the long
in this case is the sequence number that has been assigned to this message via the ShardingProducerController
's outbound queue.
In addition to outbound deliveries, confirmation messages from the ShardingConsumerController
will also be persisted - and these will cause the EventSourcedProducerQueue
to gradually compress its footprint in the Akka.Persistence journal by taking snapshots.
Tip
By default the EventSourcedProducerQueue
will take a new snapshot every 1000 events but this can be configured via the EventSourcedProducerQueue.Settings
class.