Search Results for

    Show / Hide Table of Contents

    Akka.Cluster.Sharding

    Cluster sharding is useful in cases when you want to contact with cluster actors using their logical id's, but don't want to care about their physical location inside the cluster or manage their creation. Moreover it's able to re-balance them, as nodes join/leave the cluster.

    Important

    Interested in upgrading an Akka.NET v1.4 Cluster.Sharding application to v1.5? Please read our Akka.Cluster.Sharding v1.5 migration guide.

    Cluster sharding can operate in 2 modes, configured via akka.cluster.sharding.state-store-mode HOCON configuration:

    1. persistence (default) depends on Akka.Persistence module. In order to use it, you'll need to specify an event journal accessible by all of the participating nodes. An information about the particular shard placement is stored in a persistent cluster singleton actor known as coordinator. In order to guarantee consistent state between different incarnations, coordinator stores its own state using Akka.Persistence event journals. This setting is being deprecated after 1.5 - please move to using state-store-mode=ddata for all new and existing applications.
    2. ddata depends on Akka.DistributedData module. It uses Conflict-free Replicated Data Types (CRDT) to ensure eventually consistent shard placement and global availability via node-to-node replication and automatic conflict resolution. In this mode event journals don't have to be configured.

    Cluster sharding may be active only on nodes in Up status - so the ones fully recognized and acknowledged by every other node in a cluster.

    Quick Start

    Actors managed by cluster sharding are called entities and can be automatically distributed across multiple nodes inside the cluster. One entity instance may live only at one node at the time, and can be communicated with via ShardRegion without need to know, what it's exact node location is.

    Example:

    // define envelope used to message routing
    public sealed class ShardEnvelope
    {
        public readonly int ShardId;
        public readonly int EntityId;
        public readonly object Message;
    
        ...
    }
    
    // define, how shard id, entity id and message itself should be resolved
    public sealed class MessageExtractor : IMessageExtractor
    {
        public string EntityId(object message) => (message as ShardEnvelope)?.EntityId.ToString();
    
        public string ShardId(object message) => (message as ShardEnvelope)?.ShardId.ToString();
    
        public object EntityMessage(object message) => (message as ShardEnvelope)?.Message;
    }
    
    // register actor type as a sharded entity
    var region = await ClusterSharding.Get(system).StartAsync(
        typeName: "my-actor",
        entityPropsFactory: s => Props.Create(() => new MyActor(s)),
        settings: ClusterShardingSettings.Create(system),
        messageExtractor: new MessageExtractor());
    
    // send message to entity through shard region
    region.Tell(new ShardEnvelope(shardId: 1, entityId: 1, message: "hello"))
    

    In this example, we first specify way to resolve our message recipients in context of sharded entities. For this, specialized message type called ShardEnvelope and resolution strategy called MessageExtractor have been specified. That part can be customized, and shared among many different shard regions, but it needs to be uniform among all nodes.

    Second part of an example is registering custom actor type as sharded entity using ClusterSharding.Start or ClusterSharding.StartAsync methods. Result is the IActorRef to shard region used to communicate between current actor system and target entities. Shard region must be specified once per each type on each node, that is expected to participate in sharding entities of that type. Keep in mind, that it's recommended to wait for the current node to first fully join the cluster before initializing a shard regions in order to avoid potential timeouts.

    N.B. Sharded entity actors are automatically created by the Akka.Cluster.Sharding guardian actor hierarchy, hence why they live under the /system portion of the actor hierarchy. This is done intentionally - in the event of an ActorSystem termination the /user side of the actor hierarchy is always terminated first before the /system actors are.

    Therefore, this design gives the sharding system a chance to hand over all of the sharded entity actors running on the terminating node over to the other remaining nodes in the cluster.

    In some cases, the actor may need to know the entityId associated with it. This can be achieved using the entityPropsFactory parameter to ClusterSharding.Start or ClusterSharding.StartAsync. The entity ID will be passed to the factory as a parameter, which can then be used in the creation of the actor.

    In case when you want to send message to entities from specific node, but you don't want that node to participate in sharding itself, you can use ShardRegionProxy for that.

    Example:

    var proxy = ClusterSharding.Get(system).StartProxy(
        typeName: "my-actor",
        role: null,
        messageExtractor: new MessageExtractor());
    

    Shards

    Entities are located and managed automatically. They can also be recreated on the other nodes, as new nodes join the cluster or old ones are leaving it. This process is called re-balancing and for performance reasons it never works over a single entity. Instead all entities are organized and managed in so called shards.

    As you may have seen in the examples above shard resolution algorithm is one of the choices you have to make. Good uniform distribution is not an easy task - too small number shards may result in not even distribution of entities across all nodes, while too many of them may increase message routing latency and re-balancing overhead. As a rule of thumb, you may decide to have a number of shards ten times greater than expected maximum number of cluster nodes.

    By default re-balancing process always happens from nodes with the highest number of shards, to the ones with the smallest one. This can be configured into by specifying custom implementation of the IShardAllocationStrategy interface in ClusterSharding.Start parameters.

    Shard Re-Balancing

    In the shard re-balance process, the coordinator first notifies all ShardRegion actors that a handoff for a shard has started. That means they will start buffering incoming messages for that shard, in the same way as if the shard location is unknown. During the re-balance process, the coordinator will not answer any requests for the location of shards that are being rebalanced, i.e. local buffering will continue until the handoff is completed. The ShardRegion responsible for the rebalanced shard will stop all entities in that shard by sending the specified handoffStopMessage (default PoisonPill) to them. When all entities have been terminated the ShardRegion owning the entities will acknowledge the handoff as completed to the coordinator. Thereafter the coordinator will reply to requests for the location of the shard, thereby allocating a new home for the shard, and then buffered messages in the ShardRegion actors are delivered to the new location; this means that the state of the entities are not transferred or migrated. If the state of the entities are of importance it should be persistent (durable) with Persistence, so that it can be recovered at the new location.

    The logic that decides which shards to re-balance is defined in a pluggable shard allocation strategy. The default implementation LeastShardAllocationStrategy allocates new shards to the ShardRegion (node) with least number of previously allocated shards.

    Intercepting Actor Shutdown During Shard Re-Balancing Handoff Using Custom Stop Message

    The default PoisonPill handoff message is perfectly fine for most shard implementation where shard entity actor does not need to do additional processing before being stopped. The default PoisonPill message, however, are handled automatically by the ActorCell, making it hard for the shard entity actor to intercept handoff stop events. For an actor to intercept such events, we will need to provide a custom type for cluster sharding to use.

    To illustrate this, let us assume that we have an actor that needs to perform an asynchronous operation before it can safely shut itself down. We will use a custom handoff message called Stop that are declared as such:

    public class Stop
    {
        public static Stop Instance = new();
    
        private Stop() { }
    }
    

    We then tell the sharding system that we would like to use a custom handoff stop message by passing an instance of it into the cluster sharding Start() method:

    ClusterSharding.Get(system: Sys).Start(
        typeName: typeName,
        entityProps: Props.Create(() => new SlowStopShardedEntity()),
        settings: Settings.Value,
        messageExtractor: new MessageExtractor(),
        allocationStrategy: ShardAllocationStrategy.LeastShardAllocationStrategy(absoluteLimit: 2, relativeLimit: 1.0),
        handOffStopMessage: SlowStopShardedEntity.Stop.Instance); // This is the custom handoff message instance
    

    We can then intercept this custom message type inside the entity actor message handler and perform our operation before the actor stops:

    protected override bool Receive(object message)
    {
        switch (message)
        {
            case int id:
                Sender.Tell(id);
                return true;
            case Stop _:
                Timers.StartSingleTimer(ActualStop.Instance, ActualStop.Instance, TimeSpan.FromMilliseconds(50));
                return true;
            case ActualStop _:
                Context.Stop(Self);
                return true;
        }
        return false;
    }
    

    Reliable Delivery of Messages to Sharded Entity Actors

    If you are interested in ensuring that all messages are guaranteed to be delivered to your entity actors even across restarts, re-balancing operations, or crashes then please see "Reliable Delivery over Akka.Cluster.Sharding."

    Passivation

    To reduce memory consumption, you may decide to stop entities after some period of inactivity using Context.SetReceiveTimeout(timeout). In order to make cluster sharding aware of stopping entities, DON'T use Context.Stop(Self) on the entities, as this may result in losing messages. Instead send a ShardRegion.Passivate message to current entity Context.Parent (which is shard itself in this case). This will inform shard to stop forwarding messages to target entity, and buffer them instead until it's terminated. Once that happens, if there are still some messages buffered, entity will be reincarnated and messages flushed to it automatically.

    Automatic Passivation

    The entities can be configured to be automatically passivated if they haven't received a message for a while using the akka.cluster.sharding.passivate-idle-entity-after setting, or by explicitly setting ClusterShardingSettings.PassivateIdleEntityAfter to a suitable time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages to the ActorRef of the actor or messages that it sends to itself are not counted as activity. Passivation can be disabled by setting akka.cluster.sharding.passivate-idle-entity-after = off. It is always disabled if Remembering Entities is enabled.

    Remembering Entities

    By default, when a shard is re-balanced to another node, the entities it stored before migration, are NOT started immediately after. Instead they are recreated ad-hoc, when new messages are incoming. This behavior can be modified by akka.cluster.sharding.remember-entities = true configuration. It will instruct shards to keep their state between re-balances - it also comes with extra cost due to necessity of persisting information about started/stopped entities. Additionally a message extractor logic must be aware of ShardRegion.StartEntity message:

    public sealed class ShardEnvelope
    {
        public readonly int EntityId;
        public readonly object Message;
    
        ...
    }
    
    public sealed class MessageExtractor : HashCodeMessageExtractor
    {
        public MessageExtractor() : base(maxNumberOfShards: 100) { }
    
        public string EntityId(object message) 
        {
            switch(message)
            {
                case ShardEnvelope e: return e.EntityId;
                case ShardRegion.StartEntity start: return start.EntityId;
            }
        } 
        public object EntityMessage(object message) => (message as ShardEnvelope)?.Message ?? message;
    }
    

    ![IMPORTANT] Since Akka.NET v1.5.15 Akka.Cluster.Sharding will now automatically handle ShardRegion.StartEntity messages for you and will raise an analyzer warning AK2001 if you attempt to handle them in your message extractors.

    Using ShardRegion.StartEntity implies, that you're able to infer a shard id given an entity id alone. For this reason, in example above we modified a cluster sharding routing logic to make use of HashCodeMessageExtractor - in this variant, shard id doesn't have to be provided explicitly, as it will be computed from the hash of entity id itself. Notice a maxNumberOfShards, which is the maximum available number of shards allowed for this type of an actor - this value must never change during a single lifetime of a cluster.

    Remember Entities Store

    As of Akka.NET v1.5, there is now a dedicated setting for storing data about remembered entities:

    akka.cluster.sharding{
      state-store-mode = ddata
      remember-entities-store = eventsourced or ddata
    }
    

    You don't need to configure this setting if you don't have remember-entities=on.

    Remember Entities Event Sourced Mode

    You can enable event sourced with:

    akka.cluster.sharding.remember-entities-store = eventsourced
    

    This mode uses persistence to store the active shards and active entities for each shard.

    By default, cluster sharding will use the journal and snapshot store plugin defined in akka.persistence.journal.plugin and akka.persistence.snapshot-store.plugin respectively; to change this behavior, you can use these configuration:

    akka.cluster.sharding.journal-plugin-id = <plugin>
    akka.cluster.sharding.snapshot-plugin-id = <plugin>
    
    Important

    It's considered a good practice to have Akka.Cluster.Sharding store its state in a separate journal and snapshot store - that way, in the event that you need to purge all sharding data, this can be easily isolated in its own table.

    You can have Akka.Cluster.Sharding use its own separate journal and snapshot store via the following HOCON, for instance:

    akka.persistence {
        # default plugins
        journal {
            plugin = "akka.persistence.journal.mongodb"
            mongodb {
                # qualified type name of the MongoDb persistence journal actor
                class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
    
                # connection string used for database access
                connection-string = ""
                collection = "EventJournal"
                metadata-collection = "Metadata"
            }
    
            sharding {
                # qualified type name of the MongoDb persistence journal actor
                class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
    
                # connection string used for database access
                connection-string = ""
    
                # separate collections / tables for Akka.Cluster.Sharding
                collection = "EventJournalSharding"
                metadata-collection = "MetadataSharding"
            }
        }
    
        snapshot-store {
            plugin = "akka.persistence.snapshot-store.mongodb"
            mongodb {
                class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
    
                # connection string used for database access
                connection-string = ""
    
                collection = "SnapshotStore"
            }
    
            sharding {
                class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
    
                # connection string used for database access
                connection-string = ""
    
                collection = "SnapshotStoreSharding"
            }
        }
    }
    
    akka.cluster.sharding.journal-plugin-id = akka.persistence.journal.sharding
    akka.cluster.sharding.snapshot-plugin-id = akka.persistence.snapshot-store.sharding
    

    Remember Entities Distributed Data Mode

    It's recommended to use state-store-mode=eventsourced as it's much faster and more scalable than ddata, but in case you can't use Akka.Persistence for some reason you can still use DData.

    You can enable DData mode by setting these configuration:

    akka.cluster.sharding.remember-entities-store = ddata
    

    To support restarting entities after a full cluster restart (non-rolling) the remember entities store is persisted to disk by distributed data. This can be disabled if not needed:

    akka.cluster.sharding.distributed-data.durable.keys = []
    

    Possible reasons for disabling remember entity storage are:

    • No requirement for remembering entities after a full cluster shutdown
    • Running in an environment without access to disk between restarts e.g. Kubernetes without persistent volumes

    For supporting remembered entities in an environment without disk storage but with access to a database, use persistence mode instead.

    Terminating Remembered Entities

    One complication that akka.cluster.sharding.remember-entities = true introduces is that your sharded entity actors can no longer be terminated through the normal Akka.NET channels, i.e. Context.Stop(Self), PoisonPill.Instance, and the like. This is because as part of the remember-entities contract - the sharding system is going to insist on keeping all remembered entities alive until explicitly told to stop.

    To terminate a remembered entity, the sharded entity actor needs to send a Passivate command to its parent actor in order to signal to the sharding system that we no longer need to remember this particular entity.

    protected override bool ReceiveCommand(object message)
    {
        switch (message)
        {
            case Increment _:
                Persist(new CounterChanged(1), UpdateState);
                return true;
            case Decrement _:
                Persist(new CounterChanged(-1), UpdateState);
                return true;
            case Get _:
                Sender.Tell(_count);
                return true;
            case ReceiveTimeout _:
                // send Passivate to parent (shard actor) to stop remembering this entity.
                // shard actor will send us back a `Stop.Instance` message
                // as our "shutdown" signal - at which point we can terminate normally.
                Context.Parent.Tell(new Passivate(Stop.Instance));
                return true;
            case Stop _:
                Context.Stop(Self);
                return true;
        }
        return false;
    }
    

    It is common to simply use Context.Parent.Tell(new Passivate(PoisonPill.Instance)); to passivate and shutdown remembered-entity actors.

    To recreate a remembered entity actor after it has been passivated all you have to do is message the ShardRegion actor with a message containing the entity's EntityId again just like how you instantiated the actor the first time.

    Retrieving Sharding State

    You can inspect current sharding stats by using following messages:

    • On GetShardRegionState shard region will reply with ShardRegionState containing data about shards living in the current actor system and what entities are alive on each one of them.
    • On GetClusterShardingStats shard region will reply with ClusterShardingStats having information about shards living in the whole cluster and how many entities alive in each one of them.

    Querying for the Location of Specific Entities

    It's possible to query a ShardRegion or a ShardRegionProxy using a GetEntityLocation query:

    // creates an entity with entityId="1"
    await _shardRegion.Ask<int>(1, TimeSpan.FromSeconds(3));
    
    // determine where entity with "entityId=1" is located in cluster
    var q1 = await _shardRegion.Ask<EntityLocation>(new GetEntityLocation("1", TimeSpan.FromSeconds(1)));
    
    q1.EntityId.Should().Be("1");
    
    // have a valid ShardId
    q1.ShardId.Should().NotBeEmpty();
    
    // have valid address for node that will / would host entity
    q1.ShardRegion.Should().NotBe(Address.AllSystems); // has real address
    
    // if entity actor is alive, will retrieve a reference to it
    q1.EntityRef.HasValue.Should().BeTrue();
    

    A GetEntityLocation query will always return an EntityLocation response - even if the query could not be executed.

    Important

    One major caveat is that in order for the GetEntityLocation to execute your IMessageExtractor or ShardExtractor delegate will need to support the ShardRegion.StartEntity message - just like you'd have to use in order to support remember-entities=on:

    private string ExtractShardId(object message)
    {
        switch (message)
        {
            case int i:
                return (i % 10 + 1).ToString();
            // must support ShardRegion.StartEntity in order for
            // GetEntityLocation to work properly
            case ShardRegion.StartEntity se:
                return (int.Parse(se.EntityId) % 10 + 1).ToString();
        }
    
        throw new NotSupportedException();
    }
    

    Integrating Cluster Sharding with Persistent Actors

    One of the most common scenarios, where cluster sharding is used, is to combine them with event-sourced persistent actors from Akka.Persistence module.

    Entity actors are instantiated automatically by Akka.Cluster.Sharding - but in order for persistent actors to recover and persist their state correctly they must be given a globally unique PersistentId. This can be most easily accomplished using the entityPropsFactory overload on the Sharding.Start call used to create a new ShardRegion:

    // register actor type as a sharded entity
    var region = ClusterSharding.Get(system).Start(
        typeName: "aggregate",
        entityPropsFactory: s => Props.Create(() => new Aggregate(s)),
        settings: ClusterShardingSettings.Create(system),
        messageExtractor: new MessageExtractor());
    
    

    Given these values we can build consistent, unique PersistenceIds on the fly using the entityId (the expectation is that entityId are globally unique) as in the following example:

    public class Aggregate : PersistentActor
    {
        public override string PersistenceId { get; }
    
        // Passed in via entityPropsFactory via the ShardRegion
        public Aggregate(string persistentId)
        {
            PersistenceId = persistentId;
        }
    
        // rest of class
    }
    

    Cleaning Up Akka.Persistence Shard State

    In the normal operation of an Akka.NET cluster, the sharding system automatically terminates and re-balances Akka.Cluster.Sharding regions gracefully whenever an ActorSystem terminates.

    However, in the event that an ActorSystem is aborted as a result of a process / hardware failure it's possible that when using akka.cluster.sharding.state-store-mode=persistence leftover sharding data can still be present inside the Akka.Persistence journal and snapshot store - which will prevent the Akka.Cluster.Sharding system from recovering and starting up correctly the next time it's launched.

    This is a rare, but not impossible occurrence. In the event that this happens you'll need to purge the old Akka.Cluster.Sharding data before restarting the sharding system. You can purge this data automatically by using the Akka.Cluster.Sharding.RepairTool produced by Petabridge.

    Tutorial

    In this tutorial, we will be making a very simple non-persisted shopping cart implementation using cluster sharding. All the code used in this tutorial can be found in the GitHub repository

    For a distributed data backed persistent example, please see this example project instead.

    Setting Up the Roles

    In a sharded cluster, it is common for the shards to be assigned their own specialized role inside the cluster to distribute their actors in. In this tutorial we will have a single frontend node that will feed three backend nodes with purchasing data. Usually, these nodes will be separated into different specialized projects but in this example, we will roll them into a single project and control their roles using an environment variable.

    var role = Environment.GetEnvironmentVariable("IS_FRONTEND") == "true" 
        ? FrontEndRole : BackEndRole;
    
    var config = ConfigurationFactory.ParseString(@$"
            # We need to tell Akka to provide us cluster enabled actors
            akka.actor.provider = cluster
    
            # This tells Akka which role this node belongs to
            akka.cluster.roles=[{role}]
    
            # This tells Akka to wait for at least 4 nodes joining the cluster 
            # before signaling that it is up and ready
            akka.cluster.min-nr-of-members = 4")
        .BootstrapFromDocker();
    
    var system = ActorSystem.Create("shopping-cart", config);
    

    Starting Up Cluster Sharding

    Cluster sharding can be added by using the Akka.Cluster.Sharding NuGet package, it already contains references to the other required packages. Note that the ClusterSharding.Get() call is very important as it contains all the initialization code needed by cluster sharding to start. Note that all nodes that participates or interacts with the sharded cluster will need to initialize ClusterSharding.

    // Starts and initialize cluster sharding
    var sharding = Akka.Cluster.Sharding.ClusterSharding.Get(system);
    

    Starting the Sharding Coordinator Actors

    There are two types of sharding coordinator actors:

    • Regular coordinator: coordinates messages and instantiates sharded actors in their correct shard.
    • Proxy coordinator: only coordinates messages to the proper sharded actors. This coordinator actor is used on nodes that needs to talk to the shard region but does not host any of the sharded actors.

    Note that you only need one of these coordinator actors to be able to communicate with the actors inside the shard region, you don't need a proxy if you already created a regular coordinator. We will use the proxy coordinator for the front end and the normal coordinator on the backend nodes.

    switch (role)
    {
        case FrontEndRole:
            // Depending on the role, we will start a shard or a shard proxy
            var shardRegionProxy = await sharding.StartProxyAsync(
                typeName: "customer",
                role: BackEndRole,
                messageExtractor: new MessageExtractor(10));
    
            // Register a callback for the "cluster is up" event
            var cluster = Cluster.Get(system);
            cluster.RegisterOnMemberUp(() =>
            {
                ProduceMessages(system, shardRegionProxy);
            });
            break;
        
        case BackEndRole:
            // <LaunchShardRegion>
            // Depending on the role, we will start a shard or a shard proxy
            await sharding.StartAsync(
                typeName: "customer",
                entityPropsFactory: e => 
                    // ShardingConsumerController guarantees processing of messages,
                    // even across process restarts / shutdowns or shard rebalancing
                    ShardingConsumerController.Create<Customer.ICustomerCommand>( 
                        c => Props.Create(() => new Customer(e,c)), 
                        ShardingConsumerController.Settings.Create(system)),
                // .WithRole is important because we're dedicating a specific node role for
                // the actors to be instantiated in; in this case, we're instantiating only
                // in the "backend" roled nodes.
                settings: ClusterShardingSettings.Create(system).WithRole(BackEndRole), 
                messageExtractor: new MessageExtractor(10));
            // </LaunchShardRegion>
            break;
    }
    

    Sending Messages To the Sharded Actors

    Finally we can start sending messages from the front end node to the sharded actors in the back end through the proxy coordinator actor.

    private static void ProduceMessages(ActorSystem system, IActorRef shardRegionProxy)
    {
        var producerId = "ProducerId1" + MurmurHash.StringHash(Cluster.Get(system).SelfAddress.ToString());
        
        var shardingProducerController = system.ActorOf(ShardingProducerController.Create<Customer.ICustomerCommand>(
            producerId,
            shardRegionProxy,
            Option<Props>.None, 
            ShardingProducerController.Settings.Create(system)), "shardingProducerController-1");
    
        var producer = system.ActorOf(Props.Create(() => new Producer()), "msg-producer");
        shardingProducerController.Tell(new ShardingProducerController.Start<Customer.ICustomerCommand>(producer));
    }
    

    Note that the message need to contain the entity and shard id information so that cluster sharding will know where to send the message to the correct shard and actor. You can do this by directly embedding the ids inside all of your shard messages, or you can wrap them inside an envelope. Cluster sharding will extract the information it needs by using a message extractor. We will discuss this later in the tutorial.

    Sharded Actor

    Sharded actors are usually persisted using Akka.Persistence so that it can be restored after it is terminated, but a regular ReceiveActor would also work. In this example, we will be using a regular ReceiveActor for brevity. For a distributed data backed cluster sharding example, please see this example in the GitHub repository.

    
        public class Customer : ReceiveActor
        {
            // <MessageProtocol>
            /// <summary>
            /// Marker interface used for grouping all Customer-entity messages
            /// </summary>
            public interface ICustomerCommand
            {
            }
    
            public sealed class PurchaseItem : ICustomerCommand
            {
                public readonly string ItemName;
    
                public PurchaseItem(string itemName)
                {
                    ItemName = itemName;
                }
            }
            // </MessageProtocol>
    
            private readonly List<string> _purchasedItems = new();
            private readonly IActorRef _consumerController; // use to guarantee reliable delivery of messages
    
            public Customer(string persistenceId, IActorRef consumerController)
            {
                _consumerController = consumerController;
                // <Delivery>
                Receive<ConsumerController.Delivery<ICustomerCommand>>(purchase =>
                {
                    if (purchase.Message is PurchaseItem p)
                    {
                        _purchasedItems.Add(p.ItemName);
                        var name = Uri.UnescapeDataString(Self.Path.Name);
                        Console.WriteLine(
                            @$"'{name}' purchased '{p.ItemName}'.
    All items: [{string.Join(", ", _purchasedItems)}]
    --------------------------");
                    }
                    else
                    {
                        // unsupported message type
                        Unhandled(purchase.Message);
                    }
                    
                    purchase.ConfirmTo.Tell(ConsumerController.Confirmed.Instance);
                });
                // </Delivery>
            }
    
            // <ShardingConsumerRegistration>
            protected override void PreStart()
            {
                // signal that we're ready to consume messages
                _consumerController.Tell(new ConsumerController.Start<ICustomerCommand>(Self));
            }
            // </ShardingConsumerRegistration>
        }
    
    

    Message Envelope and Message Extractor

    The shard coordinator would need to know which shard and entity it needs to send the messages to, we do that by embedding the entity and shard id information in the message itself, or inside the envelope we send the message in. The shard coordinator will then use the message extractor to extract the shard and entity id from the envelope.

    To be recognized as a message extractor, the class needs to implement the IMessageExtractor interface. In this example, we will use the built-in HashCodeMessageExtractor; this extractor will derive the shard id by applying murmur hash algorithm on the entity id so we don't need to create our own.

    
    public sealed class MessageExtractor : HashCodeMessageExtractor
    {
        public MessageExtractor(int maxNumberOfShards) : base(maxNumberOfShards)
        {
        }
    
        public override string? EntityId(object message)
            => message switch
            {
                _ => null
            };
    
        public override object EntityMessage(object message)
            => message switch
            {
                _ => message
            };
    }
    

    Migrating to Different Sharding State Storage Modes

    After you've gone live with Akka.Cluster.Sharding, one day you might decide it'd be better to migrate from state-store-mode=persistence to state-store-mode=ddata as the latter is more performant and resilient, plus the former (persistence) will be deprecated eventually.

    Migrating between storage modes requires a full restart of your Akka.Cluster as it's a significant, far-reaching change. You can see a demonstration of how to perform this upgrade in our "Akka NET v1.5 New Features and Upgrade Guide" video beginning at 12:53.

    In this article
    • githubEdit this page
    Back to top
    Contribute
    • Project Chat
    • Discussion Forum
    • Source Code
    Support
    • Akka.NET Support Plans
    • Akka.NET Observability Tools
    • Akka.NET Training & Consulting
    Maintained By
    • Petabridge - The Akka.NET Company
    • Learn Akka.NET