Persistence FSM
PersistentFSM
handles the incoming messages in an FSM like fashion. Its internal state is persisted as a sequence of changes, later referred to as domain events. Relationship between incoming messages, FSM's states and transitions, persistence of domain events is defined by a DSL.
A Simple Example
To demonstrate the features of the PersistentFSM
class, consider an actor which represents a Web store customer. The contract of our "WebStoreCustomerFSMActor
" is that it accepts the following commands:
public interface ICommand { }
public class AddItem : ICommand
{
public AddItem(Item item)
{
Item = item;
}
public Item Item { get; set; }
}
public class Buy : ICommand
{
public static Buy Instance { get; } = new();
private Buy() { }
}
public class Leave : ICommand
{
public static Leave Instance { get; } = new();
private Leave() { }
}
public class GetCurrentCart : ICommand
{
public static GetCurrentCart Instance { get; } = new();
private GetCurrentCart() { }
}
AddItem
sent when the customer adds an item to a shopping cart Buy
- when the customer finishes the purchase Leave
- when the customer leaves the store without purchasing anything GetCurrentCart
allows to query the current state of customer's shopping cart
The customer can be in one of the following states:
public interface IUserState : Akka.Persistence.Fsm.PersistentFSM.IFsmState { }
public class LookingAround : IUserState
{
public static LookingAround Instance { get; } = new();
private LookingAround() { }
public string Identifier => "Looking Around";
}
public class Shopping : IUserState
{
public static Shopping Instance { get; } = new();
private Shopping() { }
public string Identifier => "Shopping";
}
public class Inactive : IUserState
{
public static Inactive Instance { get; } = new();
private Inactive() { }
public string Identifier => "Inactive";
}
public class Paid : IUserState
{
public static Paid Instance { get; } = new();
private Paid() { }
public string Identifier => "Paid";
}
LookingAround
customer is browsing the site, but hasn't added anything to the shopping cart Shopping
customer has recently added items to the shopping cart Inactive
customer has items in the shopping cart, but hasn't added anything recently Paid
customer has purchased the items
Note
PersistentFSM
states must inherit from trait PersistentFSM.IFsmState
and implement the string Identifier
property. This is required in order to simplify the serialization of FSM states. String identifiers should be unique!
Customer's actions are "recorded" as a sequence of "domain events" which are persisted. Those events are replayed on an actor's start in order to restore the latest customer's state:
public interface IDomainEvent { }
public class ItemAdded : IDomainEvent
{
public ItemAdded(Item item)
{
Item = item;
}
public Item Item { get; set; }
}
public class OrderExecuted : IDomainEvent
{
public static OrderExecuted Instance { get; } = new();
private OrderExecuted() { }
}
public class OrderDiscarded : IDomainEvent
{
public static OrderDiscarded Instance { get; } = new();
private OrderDiscarded() { }
}
Customer state data represents the items in a customer's shopping cart:
public class Item
{
public Item(string id, string name, double price)
{
Id = id;
Name = name;
Price = price;
}
public string Id { get; }
public string Name { get; }
public double Price { get; }
}
public interface IShoppingCart
{
IShoppingCart AddItem(Item item);
IShoppingCart Empty();
}
public class EmptyShoppingCart : IShoppingCart
{
public IShoppingCart AddItem(Item item)
{
return new NonEmptyShoppingCart(ImmutableList.Create(item));
}
public IShoppingCart Empty()
{
return this;
}
}
public class NonEmptyShoppingCart : IShoppingCart
{
public NonEmptyShoppingCart(ImmutableList<Item> items)
{
Items = items;
}
public IShoppingCart AddItem(Item item)
{
return new NonEmptyShoppingCart(Items.Add(item));
}
public IShoppingCart Empty()
{
return new EmptyShoppingCart();
}
public ImmutableList<Item> Items { get; }
}
Side-effects:
public interface IReportEvent { }
public class PurchaseWasMade : IReportEvent
{
public PurchaseWasMade(IEnumerable<Item> items)
{
Items = items;
}
public IEnumerable<Item> Items { get; }
}
public class ShoppingCardDiscarded : IReportEvent
{
public static ShoppingCardDiscarded Instance { get; } = new();
private ShoppingCardDiscarded() { }
}
Here is how everything is wired together:
StartWith(LookingAround.Instance, new EmptyShoppingCart());
When(LookingAround.Instance, (evt, _) =>
{
if (evt.FsmEvent is AddItem addItem)
{
return GoTo(Shopping.Instance)
.Applying(new ItemAdded(addItem.Item))
.ForMax(TimeSpan.FromSeconds(1));
}
else if (evt.FsmEvent is GetCurrentCart)
{
return Stay().Replying(evt.StateData);
}
return Stay();
});
When(Shopping.Instance, (evt, _) =>
{
if (evt.FsmEvent is AddItem addItem)
{
return Stay()
.Applying(new ItemAdded(addItem.Item))
.ForMax(TimeSpan.FromSeconds(1));
}
else if (evt.FsmEvent is Buy)
{
return GoTo(Paid.Instance).Applying(OrderExecuted.Instance)
.AndThen(cart =>
{
if (cart is NonEmptyShoppingCart nonShoppingCart)
{
reportActor.Tell(new PurchaseWasMade(nonShoppingCart.Items));
SaveStateSnapshot();
}
else if (cart is EmptyShoppingCart)
{
SaveStateSnapshot();
}
});
}
else if (evt.FsmEvent is Leave)
{
return Stop().Applying(OrderDiscarded.Instance)
.AndThen(_ =>
{
reportActor.Tell(ShoppingCardDiscarded.Instance);
SaveStateSnapshot();
});
}
else if (evt.FsmEvent is GetCurrentCart)
{
return Stay().Replying(evt.StateData);
}
else if (evt.FsmEvent is FSMBase.StateTimeout)
{
return GoTo(Inactive.Instance).ForMax(TimeSpan.FromSeconds(2));
}
return Stay();
});
When(Inactive.Instance, (evt, _) =>
{
if (evt.FsmEvent is AddItem addItem)
{
return GoTo(Shopping.Instance)
.Applying(new ItemAdded(addItem.Item))
.ForMax(TimeSpan.FromSeconds(1));
}
else if (evt.FsmEvent is FSMBase.StateTimeout)
{
return Stop()
.Applying(OrderDiscarded.Instance)
.AndThen(_ => reportActor.Tell(ShoppingCardDiscarded.Instance));
}
return Stay();
});
When(Paid.Instance, (evt, _) =>
{
if (evt.FsmEvent is Leave)
{
return Stop();
}
else if (evt.FsmEvent is GetCurrentCart)
{
return Stay().Replying(evt.StateData);
}
return Stay();
});
Note
State data can only be modified directly on initialization. Later it's modified only as a result of applying domain events. Override the ApplyEvent
method to define how state data is affected by domain events, see the example below
protected override IShoppingCart ApplyEvent(IDomainEvent evt, IShoppingCart cartBeforeEvent)
{
switch (evt)
{
case ItemAdded itemAdded: return cartBeforeEvent.AddItem(itemAdded.Item);
case OrderExecuted _: return cartBeforeEvent;
case OrderDiscarded _: return cartBeforeEvent.Empty();
default: return cartBeforeEvent;
}
}
AndThen
can be used to define actions which will be executed following event’s persistence - convenient for "side effects" like sending a message or logging. Notice that actions defined in andThen block are not executed on recovery:
GoTo(Paid.Instance).Applying(OrderExecuted.Instance).AndThen(cart =>
{
if (cart is NonEmptyShoppingCart nonShoppingCart)
{
reportActor.Tell(new PurchaseWasMade(nonShoppingCart.Items));
}
});
A snapshot of state data can be persisted by calling the SaveStateSnapshot()
method:
Stop().Applying(OrderDiscarded.Instance).AndThen(cart =>
{
reportActor.Tell(ShoppingCardDiscarded.Instance);
SaveStateSnapshot();
});
On recovery state data is initialized according to the latest available snapshot, then the remaining domain events are replayed, triggering the ApplyEvent
method.
Periodical Snapshot by Snapshot-After
You can enable periodical SaveStateSnapshot()
calls in PersistentFSM
if you turn the following flag on in reference.conf
akka.persistence.fsm.snapshot-after = 1000
this means SaveStateSnapshot()
is called after the sequence number reaches multiple of 1000.
Note
SaveStateSnapshot()
might not be called exactly at sequence numbers being multiple of the snapshot-after
configuration value.
This is because PersistentFSM
works in a sort of "batch" mode when processing and persisting events, and SaveStateSnapshot()
is called only at the end of the "batch". For example, if you set akka.persistence.fsm.snapshot-after = 1000
,
it is possible that SaveStateSnapshot()
is called at lastSequenceNr = 1005, 2003, ...
A single batch might persist state transition, also there could be multiple domain events to be persisted
if you pass them to Applying
method in the PersistentFSM
DSL.