Akka.Cluster.Sharding
Cluster sharding is useful in cases when you want to contact with cluster actors using their logical id's, but don't want to care about their physical location inside the cluster or manage their creation. Moreover it's able to re-balance them, as nodes join/leave the cluster.
Important
Interested in upgrading an Akka.NET v1.4 Cluster.Sharding application to v1.5? Please read our Akka.Cluster.Sharding v1.5 migration guide.
Cluster sharding can operate in 2 modes, configured via akka.cluster.sharding.state-store-mode
HOCON configuration:
persistence
(default) depends on Akka.Persistence module. In order to use it, you'll need to specify an event journal accessible by all of the participating nodes. An information about the particular shard placement is stored in a persistent cluster singleton actor known as coordinator. In order to guarantee consistent state between different incarnations, coordinator stores its own state using Akka.Persistence event journals. This setting is being deprecated after 1.5 - please move to usingstate-store-mode=ddata
for all new and existing applications.ddata
depends on Akka.DistributedData module. It uses Conflict-free Replicated Data Types (CRDT) to ensure eventually consistent shard placement and global availability via node-to-node replication and automatic conflict resolution. In this mode event journals don't have to be configured.
Cluster sharding may be active only on nodes in Up
status - so the ones fully recognized and acknowledged by every other node in a cluster.
Quick Start
Actors managed by cluster sharding are called entities and can be automatically distributed across multiple nodes inside the cluster. One entity instance may live only at one node at the time, and can be communicated with via ShardRegion
without need to know, what it's exact node location is.
Example:
// define envelope used to message routing
public sealed class ShardEnvelope
{
public readonly int ShardId;
public readonly int EntityId;
public readonly object Message;
...
}
// define, how shard id, entity id and message itself should be resolved
public sealed class MessageExtractor : IMessageExtractor
{
public string EntityId(object message) => (message as ShardEnvelope)?.EntityId.ToString();
public string ShardId(object message) => (message as ShardEnvelope)?.ShardId.ToString();
public object EntityMessage(object message) => (message as ShardEnvelope)?.Message;
}
// register actor type as a sharded entity
var region = await ClusterSharding.Get(system).StartAsync(
typeName: "my-actor",
entityPropsFactory: s => Props.Create(() => new MyActor(s)),
settings: ClusterShardingSettings.Create(system),
messageExtractor: new MessageExtractor());
// send message to entity through shard region
region.Tell(new ShardEnvelope(shardId: 1, entityId: 1, message: "hello"))
In this example, we first specify way to resolve our message recipients in context of sharded entities. For this, specialized message type called ShardEnvelope
and resolution strategy called MessageExtractor
have been specified. That part can be customized, and shared among many different shard regions, but it needs to be uniform among all nodes.
Second part of an example is registering custom actor type as sharded entity using ClusterSharding.Start
or ClusterSharding.StartAsync
methods. Result is the IActorRef
to shard region used to communicate between current actor system and target entities. Shard region must be specified once per each type on each node, that is expected to participate in sharding entities of that type. Keep in mind, that it's recommended to wait for the current node to first fully join the cluster before initializing a shard regions in order to avoid potential timeouts.
N.B. Sharded entity actors are automatically created by the Akka.Cluster.Sharding guardian actor hierarchy, hence why they live under the
/system
portion of the actor hierarchy. This is done intentionally - in the event of anActorSystem
termination the/user
side of the actor hierarchy is always terminated first before the/system
actors are.Therefore, this design gives the sharding system a chance to hand over all of the sharded entity actors running on the terminating node over to the other remaining nodes in the cluster.
In some cases, the actor may need to know the entityId
associated with it. This can be achieved using the entityPropsFactory
parameter to ClusterSharding.Start
or ClusterSharding.StartAsync
. The entity ID will be passed to the factory as a parameter, which can then be used in the creation of the actor.
In case when you want to send message to entities from specific node, but you don't want that node to participate in sharding itself, you can use ShardRegionProxy
for that.
Example:
var proxy = ClusterSharding.Get(system).StartProxy(
typeName: "my-actor",
role: null,
messageExtractor: new MessageExtractor());
Shards
Entities are located and managed automatically. They can also be recreated on the other nodes, as new nodes join the cluster or old ones are leaving it. This process is called re-balancing and for performance reasons it never works over a single entity. Instead all entities are organized and managed in so called shards.
As you may have seen in the examples above shard resolution algorithm is one of the choices you have to make. Good uniform distribution is not an easy task - too small number shards may result in not even distribution of entities across all nodes, while too many of them may increase message routing latency and re-balancing overhead. As a rule of thumb, you may decide to have a number of shards ten times greater than expected maximum number of cluster nodes.
By default re-balancing process always happens from nodes with the highest number of shards, to the ones with the smallest one. This can be configured into by specifying custom implementation of the IShardAllocationStrategy
interface in ClusterSharding.Start
parameters.
Shard Re-Balancing
In the shard re-balance process, the coordinator first notifies all ShardRegion
actors that a handoff for a shard has started.
That means they will start buffering incoming messages for that shard, in the same way as if the shard location is unknown.
During the re-balance process, the coordinator will not answer any requests for the location of shards that are being rebalanced, i.e. local buffering will continue until the handoff is completed.
The ShardRegion
responsible for the rebalanced shard will stop all entities in that shard by sending the specified handoffStopMessage
(default PoisonPill
) to them.
When all entities have been terminated the ShardRegion
owning the entities will acknowledge the handoff as completed to the coordinator.
Thereafter the coordinator will reply to requests for the location of the shard, thereby allocating a new home for the shard, and then buffered messages in the ShardRegion
actors are delivered to the new location; this means that the state of the entities are not transferred or migrated.
If the state of the entities are of importance it should be persistent (durable) with Persistence, so that it can be recovered at the new location.
The logic that decides which shards to re-balance is defined in a pluggable shard allocation strategy. The default implementation LeastShardAllocationStrategy
allocates new shards to the ShardRegion
(node) with least number of previously allocated shards.
Intercepting Actor Shutdown During Shard Re-Balancing Handoff Using Custom Stop Message
The default PoisonPill
handoff message is perfectly fine for most shard implementation where shard entity actor does not need to do additional processing before being stopped.
The default PoisonPill
message, however, are handled automatically by the ActorCell
, making it hard for the shard entity actor to intercept handoff stop events. For an actor to intercept such events, we will need to provide a custom type for cluster sharding to use.
To illustrate this, let us assume that we have an actor that needs to perform an asynchronous operation before it can safely shut itself down. We will use a custom handoff message called Stop
that are declared as such:
public class Stop
{
public static Stop Instance = new();
private Stop() { }
}
We then tell the sharding system that we would like to use a custom handoff stop message by passing an instance of it into the cluster sharding Start()
method:
ClusterSharding.Get(system: Sys).Start(
typeName: typeName,
entityProps: Props.Create(() => new SlowStopShardedEntity()),
settings: Settings.Value,
messageExtractor: new MessageExtractor(),
allocationStrategy: ShardAllocationStrategy.LeastShardAllocationStrategy(absoluteLimit: 2, relativeLimit: 1.0),
handOffStopMessage: SlowStopShardedEntity.Stop.Instance); // This is the custom handoff message instance
We can then intercept this custom message type inside the entity actor message handler and perform our operation before the actor stops:
protected override bool Receive(object message)
{
switch (message)
{
case int id:
Sender.Tell(id);
return true;
case Stop _:
Timers.StartSingleTimer(ActualStop.Instance, ActualStop.Instance, TimeSpan.FromMilliseconds(50));
return true;
case ActualStop _:
Context.Stop(Self);
return true;
}
return false;
}
Reliable Delivery of Messages to Sharded Entity Actors
If you are interested in ensuring that all messages are guaranteed to be delivered to your entity actors even across restarts, re-balancing operations, or crashes then please see "Reliable Delivery over Akka.Cluster.Sharding."
Passivation
To reduce memory consumption, you may decide to stop entities after some period of inactivity using Context.SetReceiveTimeout(timeout)
. In order to make cluster sharding aware of stopping entities, DON'T use Context.Stop(Self)
on the entities, as this may result in losing messages. Instead send a ShardRegion.Passivate
message to current entity Context.Parent
(which is shard itself in this case). This will inform shard to stop forwarding messages to target entity, and buffer them instead until it's terminated. Once that happens, if there are still some messages buffered, entity will be reincarnated and messages flushed to it automatically.
Automatic Passivation
The entities can be configured to be automatically passivated if they haven't received a message for a while using the akka.cluster.sharding.passivate-idle-entity-after
setting, or by explicitly setting ClusterShardingSettings.PassivateIdleEntityAfter
to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the ActorRef
of the actor or messages that it sends to itself are not counted as activity. Passivation can be disabled by setting akka.cluster.sharding.passivate-idle-entity-after = off
. It is always disabled if Remembering Entities is enabled.
Remembering Entities
By default, when a shard is re-balanced to another node, the entities it stored before migration, are NOT started immediately after. Instead they are recreated ad-hoc, when new messages are incoming. This behavior can be modified by akka.cluster.sharding.remember-entities = true
configuration. It will instruct shards to keep their state between re-balances - it also comes with extra cost due to necessity of persisting information about started/stopped entities. Additionally a message extractor logic must be aware of ShardRegion.StartEntity
message:
public sealed class ShardEnvelope
{
public readonly int EntityId;
public readonly object Message;
...
}
public sealed class MessageExtractor : HashCodeMessageExtractor
{
public MessageExtractor() : base(maxNumberOfShards: 100) { }
public string EntityId(object message)
{
switch(message)
{
case ShardEnvelope e: return e.EntityId;
case ShardRegion.StartEntity start: return start.EntityId;
}
}
public object EntityMessage(object message) => (message as ShardEnvelope)?.Message ?? message;
}
![IMPORTANT] Since Akka.NET v1.5.15 Akka.Cluster.Sharding will now automatically handle
ShardRegion.StartEntity
messages for you and will raise an analyzer warningAK2001
if you attempt to handle them in your message extractors.
Using ShardRegion.StartEntity
implies, that you're able to infer a shard id given an entity id alone. For this reason, in example above we modified a cluster sharding routing logic to make use of HashCodeMessageExtractor
- in this variant, shard id doesn't have to be provided explicitly, as it will be computed from the hash of entity id itself. Notice a maxNumberOfShards
, which is the maximum available number of shards allowed for this type of an actor - this value must never change during a single lifetime of a cluster.
Remember Entities Store
As of Akka.NET v1.5, there is now a dedicated setting for storing data about remembered entities:
akka.cluster.sharding{
state-store-mode = ddata
remember-entities-store = eventsourced or ddata
}
You don't need to configure this setting if you don't have remember-entities=on
.
Remember Entities Event Sourced Mode
You can enable event sourced with:
akka.cluster.sharding.remember-entities-store = eventsourced
This mode uses persistence to store the active shards and active entities for each shard.
By default, cluster sharding will use the journal and snapshot store plugin defined in akka.persistence.journal.plugin
and
akka.persistence.snapshot-store.plugin
respectively; to change this behavior, you can use these configuration:
akka.cluster.sharding.journal-plugin-id = <plugin>
akka.cluster.sharding.snapshot-plugin-id = <plugin>
Important
It's considered a good practice to have Akka.Cluster.Sharding store its state in a separate journal and snapshot store - that way, in the event that you need to purge all sharding data, this can be easily isolated in its own table.
You can have Akka.Cluster.Sharding use its own separate journal and snapshot store via the following HOCON, for instance:
akka.persistence {
# default plugins
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
# qualified type name of the MongoDb persistence journal actor
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
# connection string used for database access
connection-string = ""
collection = "EventJournal"
metadata-collection = "Metadata"
}
sharding {
# qualified type name of the MongoDb persistence journal actor
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
# connection string used for database access
connection-string = ""
# separate collections / tables for Akka.Cluster.Sharding
collection = "EventJournalSharding"
metadata-collection = "MetadataSharding"
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
# connection string used for database access
connection-string = ""
collection = "SnapshotStore"
}
sharding {
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
# connection string used for database access
connection-string = ""
collection = "SnapshotStoreSharding"
}
}
}
akka.cluster.sharding.journal-plugin-id = akka.persistence.journal.sharding
akka.cluster.sharding.snapshot-plugin-id = akka.persistence.snapshot-store.sharding
Remember Entities Distributed Data Mode
It's recommended to use state-store-mode=eventsourced
as it's much faster and more scalable than ddata
, but in case you can't use Akka.Persistence for some reason you can still use DData.
You can enable DData mode by setting these configuration:
akka.cluster.sharding.remember-entities-store = ddata
To support restarting entities after a full cluster restart (non-rolling) the remember entities store is persisted to disk by distributed data. This can be disabled if not needed:
akka.cluster.sharding.distributed-data.durable.keys = []
Possible reasons for disabling remember entity storage are:
- No requirement for remembering entities after a full cluster shutdown
- Running in an environment without access to disk between restarts e.g. Kubernetes without persistent volumes
For supporting remembered entities in an environment without disk storage but with access to a database, use persistence mode instead.
Terminating Remembered Entities
One complication that akka.cluster.sharding.remember-entities = true
introduces is that your sharded entity actors can no longer be terminated through the normal Akka.NET channels, i.e. Context.Stop(Self)
, PoisonPill.Instance
, and the like. This is because as part of the remember-entities
contract - the sharding system is going to insist on keeping all remembered entities alive until explicitly told to stop.
To terminate a remembered entity, the sharded entity actor needs to send a Passivate
command to its parent actor in order to signal to the sharding system that we no longer need to remember this particular entity.
protected override bool ReceiveCommand(object message)
{
switch (message)
{
case Increment _:
Persist(new CounterChanged(1), UpdateState);
return true;
case Decrement _:
Persist(new CounterChanged(-1), UpdateState);
return true;
case Get _:
Sender.Tell(_count);
return true;
case ReceiveTimeout _:
// send Passivate to parent (shard actor) to stop remembering this entity.
// shard actor will send us back a `Stop.Instance` message
// as our "shutdown" signal - at which point we can terminate normally.
Context.Parent.Tell(new Passivate(Stop.Instance));
return true;
case Stop _:
Context.Stop(Self);
return true;
}
return false;
}
It is common to simply use Context.Parent.Tell(new Passivate(PoisonPill.Instance));
to passivate and shutdown remembered-entity actors.
To recreate a remembered entity actor after it has been passivated all you have to do is message the ShardRegion
actor with a message containing the entity's EntityId
again just like how you instantiated the actor the first time.
Retrieving Sharding State
You can inspect current sharding stats by using following messages:
- On
GetShardRegionState
shard region will reply withShardRegionState
containing data about shards living in the current actor system and what entities are alive on each one of them. - On
GetClusterShardingStats
shard region will reply withClusterShardingStats
having information about shards living in the whole cluster and how many entities alive in each one of them.
Querying for the Location of Specific Entities
It's possible to query a ShardRegion
or a ShardRegionProxy
using a GetEntityLocation
query:
// creates an entity with entityId="1"
await _shardRegion.Ask<int>(1, TimeSpan.FromSeconds(3));
// determine where entity with "entityId=1" is located in cluster
var q1 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));
q1.EntityId.Should().Be("1");
// have a valid ShardId
q1.ShardId.Should().NotBeEmpty();
// have valid address for node that will / would host entity
q1.ShardRegion.Should().NotBe(Address.AllSystems); // has real address
// if entity actor is alive, will retrieve a reference to it
q1.EntityRef.HasValue.Should().BeTrue();
A GetEntityLocation
query will always return an EntityLocation
response - even if the query could not be executed.
Important
One major caveat is that in order for the GetEntityLocation
to execute your IMessageExtractor
or ShardExtractor
delegate will need to support the ShardRegion.StartEntity
message - just like you'd have to use in order to support remember-entities=on
:
private string ExtractShardId(object message)
{
switch (message)
{
case int i:
return (i % 10 + 1).ToString();
// must support ShardRegion.StartEntity in order for
// GetEntityLocation to work properly
case ShardRegion.StartEntity se:
return (int.Parse(se.EntityId) % 10 + 1).ToString();
}
throw new NotSupportedException();
}
Integrating Cluster Sharding with Persistent Actors
One of the most common scenarios, where cluster sharding is used, is to combine them with event-sourced persistent actors from Akka.Persistence module.
Entity actors are instantiated automatically by Akka.Cluster.Sharding - but in order for persistent actors to recover and persist their state correctly they must be given a globally unique PersistentId
. This can be most easily accomplished using the entityPropsFactory
overload on the Sharding.Start
call used to create a new ShardRegion
:
// register actor type as a sharded entity
var region = ClusterSharding.Get(system).Start(
typeName: "aggregate",
entityPropsFactory: s => Props.Create(() => new Aggregate(s)),
settings: ClusterShardingSettings.Create(system),
messageExtractor: new MessageExtractor());
Given these values we can build consistent, unique PersistenceId
s on the fly using the entityId
(the expectation is that entityId
are globally unique) as in the following example:
public class Aggregate : PersistentActor
{
public override string PersistenceId { get; }
// Passed in via entityPropsFactory via the ShardRegion
public Aggregate(string persistentId)
{
PersistenceId = persistentId;
}
// rest of class
}
Cleaning Up Akka.Persistence Shard State
In the normal operation of an Akka.NET cluster, the sharding system automatically terminates and re-balances Akka.Cluster.Sharding regions gracefully whenever an ActorSystem
terminates.
However, in the event that an ActorSystem
is aborted as a result of a process / hardware failure it's possible that when using akka.cluster.sharding.state-store-mode=persistence
leftover sharding data can still be present inside the Akka.Persistence journal and snapshot store - which will prevent the Akka.Cluster.Sharding system from recovering and starting up correctly the next time it's launched.
This is a rare, but not impossible occurrence. In the event that this happens you'll need to purge the old Akka.Cluster.Sharding data before restarting the sharding system. You can purge this data automatically by using the Akka.Cluster.Sharding.RepairTool produced by Petabridge.
Tutorial
In this tutorial, we will be making a very simple non-persisted shopping cart implementation using cluster sharding. All the code used in this tutorial can be found in the GitHub repository
For a distributed data backed persistent example, please see this example project instead.
Setting Up the Roles
In a sharded cluster, it is common for the shards to be assigned their own specialized role inside the cluster to distribute their actors in. In this tutorial we will have a single frontend node that will feed three backend nodes with purchasing data. Usually, these nodes will be separated into different specialized projects but in this example, we will roll them into a single project and control their roles using an environment variable.
var role = Environment.GetEnvironmentVariable("IS_FRONTEND") == "true"
? FrontEndRole : BackEndRole;
var config = ConfigurationFactory.ParseString(@$"
# We need to tell Akka to provide us cluster enabled actors
akka.actor.provider = cluster
# This tells Akka which role this node belongs to
akka.cluster.roles=[{role}]
# This tells Akka to wait for at least 4 nodes joining the cluster
# before signaling that it is up and ready
akka.cluster.min-nr-of-members = 4")
.BootstrapFromDocker();
var system = ActorSystem.Create("shopping-cart", config);
Starting Up Cluster Sharding
Cluster sharding can be added by using the Akka.Cluster.Sharding
NuGet package, it already contains
references to the other required packages. Note that the ClusterSharding.Get()
call is very important
as it contains all the initialization code needed by cluster sharding to start. Note that all nodes that
participates or interacts with the sharded cluster will need to initialize ClusterSharding.
// Starts and initialize cluster sharding
var sharding = Akka.Cluster.Sharding.ClusterSharding.Get(system);
Starting the Sharding Coordinator Actors
There are two types of sharding coordinator actors:
- Regular coordinator: coordinates messages and instantiates sharded actors in their correct shard.
- Proxy coordinator: only coordinates messages to the proper sharded actors. This coordinator actor is used on nodes that needs to talk to the shard region but does not host any of the sharded actors.
Note that you only need one of these coordinator actors to be able to communicate with the actors inside the shard region, you don't need a proxy if you already created a regular coordinator. We will use the proxy coordinator for the front end and the normal coordinator on the backend nodes.
switch (role)
{
case FrontEndRole:
// Depending on the role, we will start a shard or a shard proxy
var shardRegionProxy = await sharding.StartProxyAsync(
typeName: "customer",
role: BackEndRole,
messageExtractor: new MessageExtractor(10));
// Register a callback for the "cluster is up" event
var cluster = Cluster.Get(system);
cluster.RegisterOnMemberUp(() =>
{
ProduceMessages(system, shardRegionProxy);
});
break;
case BackEndRole:
// <LaunchShardRegion>
// Depending on the role, we will start a shard or a shard proxy
await sharding.StartAsync(
typeName: "customer",
entityPropsFactory: e =>
// ShardingConsumerController guarantees processing of messages,
// even across process restarts / shutdowns or shard rebalancing
ShardingConsumerController.Create<Customer.ICustomerCommand>(
c => Props.Create(() => new Customer(e,c)),
ShardingConsumerController.Settings.Create(system)),
// .WithRole is important because we're dedicating a specific node role for
// the actors to be instantiated in; in this case, we're instantiating only
// in the "backend" roled nodes.
settings: ClusterShardingSettings.Create(system).WithRole(BackEndRole),
messageExtractor: new MessageExtractor(10));
// </LaunchShardRegion>
break;
}
Sending Messages To the Sharded Actors
Finally we can start sending messages from the front end node to the sharded actors in the back end through the proxy coordinator actor.
private static void ProduceMessages(ActorSystem system, IActorRef shardRegionProxy)
{
var producerId = "ProducerId1" + MurmurHash.StringHash(Cluster.Get(system).SelfAddress.ToString());
var shardingProducerController = system.ActorOf(ShardingProducerController.Create<Customer.ICustomerCommand>(
producerId,
shardRegionProxy,
Option<Props>.None,
ShardingProducerController.Settings.Create(system)), "shardingProducerController-1");
var producer = system.ActorOf(Props.Create(() => new Producer()), "msg-producer");
shardingProducerController.Tell(new ShardingProducerController.Start<Customer.ICustomerCommand>(producer));
}
Note that the message need to contain the entity and shard id information so that cluster sharding will know where to send the message to the correct shard and actor. You can do this by directly embedding the ids inside all of your shard messages, or you can wrap them inside an envelope. Cluster sharding will extract the information it needs by using a message extractor. We will discuss this later in the tutorial.
Sharded Actor
Sharded actors are usually persisted using Akka.Persistence
so that it can be restored after it is
terminated, but a regular ReceiveActor
would also work. In this example, we will be using a regular
ReceiveActor
for brevity. For a distributed data backed cluster sharding example, please see
this example
in the GitHub repository.
public class Customer : ReceiveActor
{
// <MessageProtocol>
/// <summary>
/// Marker interface used for grouping all Customer-entity messages
/// </summary>
public interface ICustomerCommand
{
}
public sealed class PurchaseItem : ICustomerCommand
{
public readonly string ItemName;
public PurchaseItem(string itemName)
{
ItemName = itemName;
}
}
// </MessageProtocol>
private readonly List<string> _purchasedItems = new();
private readonly IActorRef _consumerController; // use to guarantee reliable delivery of messages
public Customer(string persistenceId, IActorRef consumerController)
{
_consumerController = consumerController;
// <Delivery>
Receive<ConsumerController.Delivery<ICustomerCommand>>(purchase =>
{
if (purchase.Message is PurchaseItem p)
{
_purchasedItems.Add(p.ItemName);
var name = Uri.UnescapeDataString(Self.Path.Name);
Console.WriteLine(
@$"'{name}' purchased '{p.ItemName}'.
All items: [{string.Join(", ", _purchasedItems)}]
--------------------------");
}
else
{
// unsupported message type
Unhandled(purchase.Message);
}
purchase.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
});
// </Delivery>
}
// <ShardingConsumerRegistration>
protected override void PreStart()
{
// signal that we're ready to consume messages
_consumerController.Tell(new ConsumerController.Start<ICustomerCommand>(Self));
}
// </ShardingConsumerRegistration>
}
Message Envelope and Message Extractor
The shard coordinator would need to know which shard and entity it needs to send the messages to, we do that by embedding the entity and shard id information in the message itself, or inside the envelope we send the message in. The shard coordinator will then use the message extractor to extract the shard and entity id from the envelope.
To be recognized as a message extractor, the class needs to implement the IMessageExtractor
interface.
In this example, we will use the built-in HashCodeMessageExtractor
; this extractor will derive the
shard id by applying murmur hash algorithm on the entity id so we don't need to create our own.
public sealed class MessageExtractor : HashCodeMessageExtractor
{
public MessageExtractor(int maxNumberOfShards) : base(maxNumberOfShards)
{
}
public override string? EntityId(object message)
=> message switch
{
_ => null
};
public override object EntityMessage(object message)
=> message switch
{
_ => message
};
}
Migrating to Different Sharding State Storage Modes
After you've gone live with Akka.Cluster.Sharding, one day you might decide it'd be better to migrate from state-store-mode=persistence
to state-store-mode=ddata
as the latter is more performant and resilient, plus the former (persistence) will be deprecated eventually.
Migrating between storage modes requires a full restart of your Akka.Cluster as it's a significant, far-reaching change. You can see a demonstration of how to perform this upgrade in our "Akka NET v1.5 New Features and Upgrade Guide" video beginning at 12:53.