At-Least-Once Delivery (Obsolete)
Warning
AtLeastOnceDelivery
actors in Akka.Persistence are being deprecated in favor of Akka.Delivery and Akka.Cluster.Sharding.Delivery. Please look at those articles for further details.
To send messages with at-least-once delivery semantics to destinations you can mix-in AtLeastOnceDelivery
class to your PersistentActor
on the sending side. It takes care of re-sending messages when they have not been confirmed within a configurable timeout.
The state of the sending actor, including which messages have been sent that have not been confirmed by the recipient must be persistent so that it can survive a crash of the sending actor or CLR. The AtLeastOnceDelivery
class does not persist anything by itself. It is your responsibility to persist the intent that a message is sent and that a confirmation has been received.
Members:
Deliver
method is used to send a message to another actor in at-least-once delivery semantics. A message sent this way must be confirmed by the other endpoint with theConfirmDelivery
method. Otherwise it will be resent again and again until the redelivery limit is reached.GetDeliverySnapshot
andSetDeliverySnapshot
methods are used as part of a delivery snap-shotting strategy. They return/reset state of the current guaranteed delivery actor's unconfirmed messages. In order to save custom deliverer state inside a snapshot, a returned delivery snapshot should be included in that snapshot and reset in ReceiveRecovery method, whenSnapshotOffer
arrives.RedeliveryBurstLimit
is a virtual property which determines the maximum number of unconfirmed messages to be send in each redelivery attempt. It may be useful in preventing message overflow scenarios. It may be overridden or configured inside HOCON configuration under akka.persistence.at-least-once-delivery.redelivery-burst-limit path (10 000 by default).UnconfirmedDeliveryAttemptsToWarn
is a virtual property which determines how many unconfirmed deliveries may be sent before guaranteed delivery actor will send anUnconfirmedWarning
message to itself. The count is reset after the actor's restart. It may be overridden or configured inside HOCON configuration under akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts path (5 by default).MaxUnconfirmedMessages
is a virtual property which determines the maximum number of unconfirmed deliveries to hold in memory. After this threshold is exceeded, anyDeliver
method will raiseMaxUnconfirmedMessagesExceededException
. It may be overridden or configured inside HOCON configuration under akka.persistence.at-least-once-delivery.max-unconfirmed-messages path (100 000 by default).UnconfirmedCount
property shows the number of unconfirmed messages.
Relationship Between Deliver and ConfirmDelivery
To send messages to the destination path, use the Deliver
method after you have persisted the intent to send the message.
The destination actor must send back a confirmation message. When the sending actor receives this confirmation message you should persist the fact that the message was delivered successfully and then call the ConfirmDelivery
method.
If the persistent actor is not currently recovering, the deliver method will send the message to the destination actor. When recovering, messages will be buffered until they have been confirmed using ConfirmDelivery
. Once recovery has completed, if there are outstanding messages that have not been confirmed (during the message replay), the persistent actor will resend these before sending any other messages.
Deliver requires a deliveryMessageMapper
function to pass the provided deliveryId
into the message so that the correlation between Deliver
and ConfirmDelivery
is possible. The deliveryId
must do the round trip. Upon receipt of the message, the destination actor will send the same deliveryId
wrapped in a confirmation message back to the sender. The sender will then use it to call the ConfirmDelivery
method to complete the delivery routine.
public class Msg
{
public Msg(long deliveryId, string message)
{
DeliveryId = deliveryId;
Message = message;
}
public long DeliveryId { get; }
public string Message { get; }
}
public class Confirm
{
public Confirm(long deliveryId)
{
DeliveryId = deliveryId;
}
public long DeliveryId { get; }
}
public interface IEvent
{
}
public class MsgSent : IEvent
{
public MsgSent(string message)
{
Message = message;
}
public string Message { get; }
}
public class MsgConfirmed : IEvent
{
public MsgConfirmed(long deliveryId)
{
DeliveryId = deliveryId;
}
public long DeliveryId { get; }
}
public class ExampleAtLeastOnceDeliveryReceiveActor : AtLeastOnceDeliveryReceiveActor
{
private readonly IActorRef _destionationActor = Context.ActorOf<ExampleDestinationAtLeastOnceDeliveryReceiveActor>();
public ExampleAtLeastOnceDeliveryReceiveActor()
{
Recover<MsgSent>(msgSent => Handler(msgSent));
Recover<MsgConfirmed>(msgConfirmed => Handler(msgConfirmed));
Command<string>(str =>
{
Persist(new MsgSent(str), Handler);
});
Command<Confirm>(confirm =>
{
Persist(new MsgConfirmed(confirm.DeliveryId), Handler);
});
}
private void Handler(MsgSent msgSent)
{
Deliver(_destionationActor.Path, l => new Msg(l, msgSent.Message));
}
private void Handler(MsgConfirmed msgConfirmed)
{
ConfirmDelivery(msgConfirmed.DeliveryId);
}
public override string PersistenceId { get; } = "persistence-id";
}
public class ExampleDestinationAtLeastOnceDeliveryReceiveActor : ReceiveActor
{
public ExampleDestinationAtLeastOnceDeliveryReceiveActor()
{
Receive<Msg>(msg =>
{
Sender.Tell(new Confirm(msg.DeliveryId), Self);
});
}
}
The deliveryId
generated by the persistence module is a strictly monotonically increasing sequence number without gaps. The same sequence is used for all destinations of the actor, i.e. when sending to multiple destinations the destinations will see gaps in the sequence. It is not possible to use custom deliveryId
. However, you can send a custom correlation identifier in the message to the destination. You must then retain a mapping between the internal deliveryId
(passed into the deliveryMessageMapper
function) and your custom correlation id (passed into the message). You can do this by storing such mapping in a Map(CorrelationId -> DeliveryId) from which you can retrieve the deliveryId
to be passed into the ConfirmDelivery
method once the receiver of your message has replied with your custom correlation id.
The AtLeastOnceDeliveryReceiveActor
class has a state consisting of unconfirmed messages and a sequence number. It does not store this state itself. You must persist events corresponding to the Deliver
and ConfirmDelivery
invocations from your PersistentActor so that the state can be restored by calling the same methods during the recovery phase of the PersistentActor. Sometimes these events can be derived from other business level events, and sometimes you must create separate events. During recovery, calls to deliver will not send out messages, those will be sent later if no matching ConfirmDelivery
will have been performed.
Support for snapshots is provided by GetDeliverySnapshot
and SetDeliverySnapshot
. The AtLeastOnceDeliverySnapshot
contains the full delivery state, including unconfirmed messages. If you need a custom snapshot for other parts of the actor state you must also include the AtLeastOnceDeliverySnapshot
. It is serialized using protobuf with the ordinary Akka serialization mechanism. It is easiest to include the bytes of the AtLeastOnceDeliverySnapshot
as a blob in your custom snapshot.
The interval between redelivery attempts is defined by the RedeliverInterval
method. The default value can be configured with the akka.persistence.at-least-once-delivery.redeliver-interval configuration key. The method can be overridden by implementation classes to return non-default values.
The maximum number of messages that will be sent at each redelivery burst is defined by the RedeliveryBurstLimit
method (burst frequency is half of the redelivery interval). If there's a lot of unconfirmed messages (e.g. if the destination is not available for a long time), this helps to prevent an overwhelming amount of messages to be sent at once. The default value can be configured with the akka.persistence.at-least-once-delivery.redelivery-burst-limit configuration key. The method can be overridden by implementation classes to return non-default values.
After a number of delivery attempts a UnconfirmedWarning
message will be sent to self. The re-sending will still continue, but you can choose to call ConfirmDelivery
to cancel the re-sending. The number of delivery attempts before emitting the warning is defined by the WarnAfterNumberOfUnconfirmedAttempts
property. The default value can be configured with the akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts configuration key. The method can be overridden by implementation classes to return non-default values.
The AtLeastOnceDeliveryReceiveActor
class holds messages in memory until their successful delivery has been confirmed. The maximum number of unconfirmed messages that the actor is allowed to hold in memory is defined by the MaxUnconfirmedMessages
method. If this limit is exceed the deliver method will not accept more messages and it will throw MaxUnconfirmedMessagesExceededException
. The default value can be configured with the akka.persistence.at-least-once-delivery.max-unconfirmed-messages configuration key. The method can be overridden by implementation classes to return non-default values.