Scheduling Future and Recurring Actor Messages
A useful feature of Akka.NET is the ability to schedule messages to be delivered in the future or on a recurring basis. This functionality can be used for all sorts of use cases, such as:
- Creating conditional timeouts;
- Execute recurring tasks; or
- Throttling or delaying work.
Scheduling Actor Messages Using IWithTimers
(Recommended Approach)
As of Akka.NET v1.4.4 we introduced the IWithTimers
interface, which gives Akka.NET actors a way of accessing the ActorSystem
's scheduler without having to remember to manually dispose of scheduled tasks afterwards. Any scheduled or recurring tasks created by the IWithTimers
interface will be automatically cancelled once the actor terminates.
public sealed class Print { }
public sealed class Total { }
public sealed class TimerActor : ReceiveActor, IWithTimers
{
public ITimerScheduler Timers { get; set; }
private int _count = 0;
private readonly ILoggingAdapter _log = Context.GetLogger();
public TimerActor()
{
Receive<int>(i =>
{
_count += i;
});
Receive<Print>(_ => _log.Info("Current count is [{0}]", _count));
Receive<Total>(_ => Sender.Tell(_count));
}
protected override void PreStart()
{
// start two recurring timers
// both timers will be automatically disposed when actor is stopped
Timers.StartPeriodicTimer("print", new Print(), TimeSpan.FromSeconds(0.1), TimeSpan.FromSeconds(5));
Timers.StartPeriodicTimer("add", 1, TimeSpan.FromMilliseconds(0), TimeSpan.FromMilliseconds(20));
}
}
In this approach, all timers are created using a specific key that can also be used to stop a timer again in the future:
// start single timer that fires off 5 seconds in the future
Timers.StartSingleTimer("print", new Print(), TimeSpan.FromSeconds(5));
// start recurring timer
Timers.StartPeriodicTimer("add", 1, TimeSpan.FromMilliseconds(0), TimeSpan.FromMilliseconds(20));
The key that was used to create a timer can also be used to query whether that timer is still running or not:
var isPrintTimerActive = Timers.IsTimerActive("print");
Sender.Tell(isPrintTimerActive);
And that key can be used to stop those timers as well:
// start single timer that fires off 5 seconds in the future
Timers.StartSingleTimer("print", new Print(), TimeSpan.FromSeconds(5));
// start recurring timer
Timers.StartPeriodicTimer("add", 1, TimeSpan.FromMilliseconds(0), TimeSpan.FromMilliseconds(20));
To use the IWithTimer
interface, simply decorate your actor class with it and call the Timer.StartPeriodicTimer
and Timer.StartSingleTimer
methods. All of those timers will automatically be cancelled when the actor terminates.
Testing for Idle Timeouts with ReceiveTimeout
One specific case with actors, and this is particularly useful for areas like Akka.Cluster.Sharding, is the ability to time out "idle" actors after a specified period of inactivity.
This can be accomplished using the ReceiveTimeout
capability.
/// <summary>
/// Used to query if a <see cref="ReceiveTimeout"/> has been observed.
///
/// Can't influence the <see cref="ReceiveTimeout"/> since it implements
/// <see cref="INotInfluenceReceiveTimeout"/>.
/// </summary>
public class CheckTimeout : INotInfluenceReceiveTimeout { }
public class ReceiveTimeoutActor : ReceiveActor
{
private readonly TimeSpan _inactivityTimeout;
public ReceiveTimeoutActor(TimeSpan inactivityTimeout, IActorRef receiver)
{
_inactivityTimeout = inactivityTimeout;
// if we don't
Receive<ReceiveTimeout>(_ =>
{
receiver.Tell("timeout");
});
}
protected override void PreStart()
{
Context.SetReceiveTimeout(_inactivityTimeout);
}
}
ReceiveTimeout
is a sliding window timeout - the timeout gets reset every time an actor receives a message that does not implement theINotInfluenceReceiveTimeout
interface the timer is reset back to its original duration.- If the timeout expires, the actor will be notified by receiving a copy of the
ReceiveTimeout
message - at this stage the actor can do things like shut itself down, flush its state to a database, or whatever else you might need the actor to do once it becomes idle. - The
SetReceiveTimeout(TimeSpan? time = null)
value can be changed at runtime or it can be cancelled altogether by callingContext.SetReceiveTimeout(null)
; and - The
ReceiveTimeout
will automatically be cancelled when the actor terminates.
Scheduling Recurring Tasks with IScheduler
While the IWithTimers
interface is the recommended approach for working with actors, the ActorSystem
itself includes the underlying IScheduler
interface, which exposes timing primitives that can be used inside or outside of individual actors.
public class SchedulerActor : ReceiveActor
{
private int _count = 0;
private ILoggingAdapter _log = Context.GetLogger();
private ICancelable _printTask;
private ICancelable _addTask;
public SchedulerActor()
{
Receive<int>(i =>
{
_count += i;
});
Receive<Print>(_ => _log.Info("Current count is [{0}]", _count));
Receive<Total>(_ => Sender.Tell(_count));
}
protected override void PreStart()
{
// start two recurring timers
// both timers will be automatically disposed when actor is stopped
_printTask = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromSeconds(0.1),
TimeSpan.FromSeconds(5), Self, new Print(), ActorRefs.NoSender);
_addTask = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromMilliseconds(0),
TimeSpan.FromMilliseconds(20), Self, 1, ActorRefs.NoSender);
}
protected override void PostStop()
{
// have to cancel all recurring scheduled tasks
_printTask?.Cancel();
_addTask?.Cancel();
}
}
The ActorSystem.Scheduler
can be used for any number of different types of tasks, but those tasks will not be cancelled automatically. You have to call the IScheduler.Schedule_{method}_RepeatedlyCancelable
method, store the ICancelable
returned by that method, and then call ICancelable.Cancel()
once you're finished with it to dispose the method.
To learn more about working with the IScheduler
, please see Akka.NET Bootcamp Unit 2 Lesson 3 - Using the Scheduler to Send Messages Later.