Distributed Locks with Akka.Coordination
General Definition
Akka.Coordination
provides a generalized "distributed lock" implementation called a Lease
that uses a unique resource identifier inside a backing store (such as Azure Blob Storage or Kubernetes Custom Resource Definitions) to only allow one current "holder" of the lease to perform an action at any given time.
Akka.NET uses leases internally inside Split Brain Resolver, Cluster.Sharding, and Cluster Singletons for this purpose - and in this document you can learn how to call and create leases in your own Akka.NET applications if needed.
Officially Supported Lease Implementations
There are currently two officially supported lease implementations:
All lease implementations in Akka.NET supports automatic expiry or renewal mechanisms. Expiry ensures that leases do not remain active indefinitely, which can prevent resource deadlock or starvation scenarios.
Key Characteristics and Components
- Lease Name: A unique identifier for the lease, which specifies the resource to be protected.
- Owner Name: A unique identifier for the entity (usually an actor or node) that is attempting to acquire the lease.
- Lease Timeout: May also be called "Time To Live" or TTL. A duration parameter that specifies how long the lease should last. Leases may be renewed or revoked depending on the implementation.
Public API
The Akka.Coordination.Lease
API provides the following methods:
Task<bool> Acquire()
Task<bool> Acquire(Action<Exception> leaseLostCallback)
These asynchronous methods attempts to acquire the lease for the resource. It returns a
Task<bool>
, indicating if the acquisition was successful or not. Parameters may include callback delegate method that will be invoked when a granted lease have been revoked for some reason.Task<bool> Release()
This asynchronous method releases the lease, relinquishing the access rights to the resource. It returns a
Task<bool>
, where true indicates successful release. This method is important for ensuring that resources are freed up for other actors or nodes once a task is completed.bool CheckLease()
This method checks whether the lease is still valid, typically returning a Boolean.
CheckLease()
is useful for verifying if a lease has expired or been revoked, ensuring that processes do not operate under an invalid lease.
Example
The full code for this example can be seen inside the Akka.NET repo
Internal Messages
The actor using Lease
will need a few boilerplate internal messages:
private sealed record LeaseAcquireResult(bool Acquired, Exception? Reason);
private sealed record LeaseLost(Exception Reason);
private sealed class LeaseRetryTick
{
public static readonly LeaseRetryTick Instance = new();
private LeaseRetryTick() { }
}
Obtaining Reference To Lease Implementation
To obtain a reference to the Lease
implementation, you will need 4 things:
- Lease Name: A unique identifier for the lease, which specifies the resource to be protected.
- Owner Name: A unique identifier for the entity (usually an actor or node) that is attempting to acquire the lease.
- Configuration Path: A full HOCON configuration path containing the definition of the lease implementation.
- Retry Interval: A time duration needed for failed lease acquisition retry.
A Lease
reference is then obtained by calling LeaseProvider.Get(Context.System).GetLease()
public LeaseActor(LeaseUsageSettings leaseSettings, string resourceId, string actorUniqueId)
{
_resourceId = resourceId;
_uniqueId = actorUniqueId;
_lease = LeaseProvider.Get(Context.System).GetLease(
leaseName: _resourceId,
configPath: leaseSettings.LeaseImplementation,
ownerName: _uniqueId);
_leaseRetryInterval = leaseSettings.LeaseRetryInterval;
_log = Context.GetLogger();
}
Actor States
The actor leverages actor states to separate the lease acquisition and actual working state of the actor.
AcquiringLease
State In this state, the actor will only handle the required internal messages related to lease acquisition. Any other messages not related to lease acquisition will be stashed until the lease is acquired/granted. The actor will automatically retry lease acquisition by callingAcquireLease()
on a regular basis if it failed to acquire a lease.Active
State In this state, the actor is active and is allowed to process all received messages normally. The only lease related message being processed is theLeaseLost
internal message that signals lease revocation.
In the event of a lease revocation, the actor will forcefully shuts down to prevent resource contention. This may be modified to suit user needs.
private void AcquiringLease()
{
Receive<LeaseAcquireResult>(lar =>
{
if (lar.Acquired)
{
_log.Debug("{0}: Lease acquired", _resourceId);
Stash.UnstashAll();
Become(Active);
}
else
{
_log.Error(lar.Reason, "{0}: Failed to get lease for unique Id [{1}]. Retry in {2}",
_resourceId, _uniqueId, _leaseRetryInterval);
Timers.StartSingleTimer(LeaseRetryTimer, LeaseRetryTick.Instance, _leaseRetryInterval);
}
});
Receive<LeaseRetryTick>(_ => AcquireLease());
Receive<LeaseLost>(HandleLeaseLost);
ReceiveAny(msg =>
{
_log.Debug("{0}: Got msg of type [{1}] from [{2}] while waiting for lease, stashing",
_resourceId, msg.GetType().Name, Sender);
Stash.Stash();
});
}
private void Active()
{
Receive<LeaseLost>(HandleLeaseLost);
// TODO: Insert your actor message handlers here
ReceiveAny(msg => Sender.Tell(msg, Self));
}
private void HandleLeaseLost(LeaseLost msg)
{
_log.Error(msg.Reason, "{0}: unique id [{1}] lease lost", _resourceId, _uniqueId);
Context.Stop(Self);
}
private void AcquireLease()
{
_log.Info("{0}: Acquiring lease {1}", _resourceId, _lease.Settings);
var self = Self;
Acquire().PipeTo(self);
Become(AcquiringLease);
return;
async Task<LeaseAcquireResult> Acquire()
{
try
{
var result = await _lease.Acquire(reason => { self.Tell(new LeaseLost(reason)); });
return new LeaseAcquireResult(result, null);
}
catch (Exception ex)
{
return new LeaseAcquireResult(false, ex);
}
}
}
Lease Lifecycle
Lease needs to be granted before an actor can perform any of its message handling and the actor needs to stop, forcefully or gracefully, if the lease is revoked. Attention must be taken so that, in the event of revoked lease, there would be no resource contention, or at least with minimal impact.
In the example code, lease would be acquired inside the PreStart()
method override by calling AcquireLease()
and it will be released inside the PostStop()
method override.
protected override void PreStart()
{
base.PreStart();
// Acquire a lease when actor starts
AcquireLease();
}
protected override void PostStop()
{
base.PostStop();
// Release the lease when actor stops
_lease.Release().GetAwaiter().GetResult();
}