Part 3: Device Groups and Manager
In this chapter, we will integrate our device actors into a component that manages devices. When a new device comes online, there is no actor representing it. We need to be able to ask the device manager component to create a new device actor for us if necessary, in the required group (or return a reference to an already existing one).
Since we keep our tutorial system to the bare minimum, we have no actual component that interfaces with the external world via some networking protocol. For our exercise, we will just create the API necessary to integrate with such a component in the future. In a final system, the steps for connecting a device would look like this:
- The device connects through some protocol to our system.
- The component managing network connections accept the connection.
- The ID of the device and the ID of the group that it belongs to is acquired.
- The device manager component is asked to create a group and device actor for the given IDs (or return an existing one).
- The device actor (just been created or located) responds with an acknowledgment, at the same time exposing its
IActorRef
directly (by being the sender of the acknowledgment). - The networking component now uses the
IActorRef
of the device directly, avoiding going through the component.
We are only concerned with steps 4 and 5 now. We will model the device manager component as an actor tree with three levels:
- The top level is the supervisor actor representing the component. It is also the entry point to look up or create group and device actors.
- Device group actors are supervisors of the devices belonging to the group. Apart from supervising the device actors they also provide extra services, like querying the temperature readings from all the devices available.
- Device actors manage all the interactions with the actual devices, storing temperature readings for example.
When designing actor systems one of the main challenges is to decide on the granularity of the actors. For example, it
would be perfectly possible to have only a single actor maintaining all the groups and devices in Dictionary
s for
example. It would be also reasonable to keep the groups as separate actors, but keep device state simply inside
the group actor.
We chose this three-layered architecture for the following reasons:
- Having groups as individual actors:
- Allows us to isolate failures happening in a group. If a programmer error would happen in the single actor that keeps all state, it would be all wiped out once that actor is restarted affecting groups that are otherwise non-faulty.
- Simplifies the problem of querying all the devices belonging to a group (since it only contains state related to the given group).
- Increases the parallelism of the system by allowing to query multiple groups concurrently. Since groups have dedicated actors, all of them can run concurrently.
- Having devices as individual actors:
- Allows us to isolate failures happening in a device actor from the rest of the devices.
- Increases the parallelism of collecting temperature readings as actual network connections from different devices can talk to the individual device actors directly, reducing contention points.
In practice, a system can be organized in multiple ways, all depending on the characteristics of the interactions between actors.
The following guidelines help to arrive at the right granularity:
- Prefer larger granularity to smaller. Introducing more fine-grained actors than needed causes more problems than it solves.
- Prefer finer granularity if it enables higher concurrency in the system.
- Prefer finer granularity if actors need to handle complex conversations with other actors and hence have many states. We will see a very good example for this in the next chapter.
- Prefer finer granularity if there is too much state to keep around in one place compared to dividing into smaller actors.
- Prefer finer granularity if the current actor has multiple unrelated responsibilities that can fail and be restored individually.
The Registration Protocol
As the first step, we need to design the protocol for registering a device and create an actor that will be responsible
for it. This protocol will be provided by the DeviceManager
component itself because that is the only actor that
is known up front: device groups and device actors are created on-demand. The steps of registering a device are the following:
- DeviceManager receives the request to track a device for a given group and device.
- If the manager already has an actor for the device group, it forwards the request to it. Otherwise, it first creates a new one and then forwards the request.
- The DeviceGroup receives the request to register an actor for the given device.
- If the group already has an actor for the device, it forwards the request to it. Otherwise, it first creates a new one and then forwards the request.
- The device actor receives the request and acknowledges it to the original sender. Since the device actor is the sender of
the acknowledgment, the receiver, i.e. the device, will be able to learn its
IActorRef
and send direct messages to its device actor in the future.
Now that the steps are defined, we only need to define the messages that we will use to communicate requests and their acknowledgment:
public sealed class RequestTrackDevice
{
public RequestTrackDevice(string groupId, string deviceId)
{
GroupId = groupId;
DeviceId = deviceId;
}
public string GroupId { get; }
public string DeviceId { get; }
}
public sealed class DeviceRegistered
{
public static DeviceRegistered Instance { get; } = new();
private DeviceRegistered() { }
}
As you see, in this case, we have not included a request ID field in the messages. Since registration is usually happening once, at the component that connects the system to some network protocol, we will usually have no use for the ID. Nevertheless, it is a good exercise to add this ID.
Add Registration Support to Device Actor
We start implementing the protocol from the bottom first. In practice, both a top-down and bottom-up approach can work, but in our case, we benefit from the bottom-up approach as it allows us to immediately write tests for the new features without mocking out parts.
At the bottom of our hierarchy are the Device
actors. Their job in this registration process is rather simple: just reply to the
registration request with an acknowledgment to the sender. We will assume that the sender of the registration
message is preserved in the upper layers. We will show you in the next section how this can be achieved.
We also add a safeguard against requests that come with a mismatched group or device ID. This is how the resulting the code looks like:
public sealed class RecordTemperature
{
public RecordTemperature(long requestId, double value)
{
RequestId = requestId;
Value = value;
}
public long RequestId { get; }
public double Value { get; }
}
public sealed class TemperatureRecorded
{
public TemperatureRecorded(long requestId)
{
RequestId = requestId;
}
public long RequestId { get; }
}
public sealed class ReadTemperature
{
public ReadTemperature(long requestId)
{
RequestId = requestId;
}
public long RequestId { get; }
}
public sealed class RespondTemperature
{
public RespondTemperature(long requestId, double? value)
{
RequestId = requestId;
Value = value;
}
public long RequestId { get; }
public double? Value { get; }
}
public class Device : UntypedActor
{
private double? _lastTemperatureReading = null;
public Device(string groupId, string deviceId)
{
GroupId = groupId;
DeviceId = deviceId;
}
protected override void PreStart() => Log.Info($"Device actor {GroupId}-{DeviceId} started");
protected override void PostStop() => Log.Info($"Device actor {GroupId}-{DeviceId} stopped");
protected ILoggingAdapter Log { get; } = Context.GetLogger();
protected string GroupId { get; }
protected string DeviceId { get; }
protected override void OnReceive(object message)
{
switch (message)
{
case RequestTrackDevice req when req.GroupId.Equals(GroupId) && req.DeviceId.Equals(DeviceId):
Sender.Tell(DeviceRegistered.Instance);
break;
case RequestTrackDevice req:
Log.Warning($"Ignoring TrackDevice request for {req.GroupId}-{req.DeviceId}.This actor is responsible for {GroupId}-{DeviceId}.");
break;
case RecordTemperature rec:
Log.Info($"Recorded temperature reading {rec.Value} with {rec.RequestId}");
_lastTemperatureReading = rec.Value;
Sender.Tell(new TemperatureRecorded(rec.RequestId));
break;
case ReadTemperature read:
Sender.Tell(new RespondTemperature(read.RequestId, _lastTemperatureReading));
break;
}
}
public static Props Props(string groupId, string deviceId) => Akka.Actor.Props.Create(() => new Device(groupId, deviceId));
}
We should not leave features untested, so we immediately write two new test cases, one exercising successful registration, the other testing the case when IDs don't match:
Note
We used the ExpectNoMsg()
helper method from TestProbe
. This assertion waits until the defined time-limit
and fails if it receives any messages during this period. If no messages are received during the waiting period the
assertion passes. It is usually a good idea to keep these timeouts low (but not too low) because they add significant
test execution time otherwise.
[Fact]
public void Device_actor_must_reply_to_registration_requests()
{
var probe = CreateTestProbe();
var deviceActor = Sys.ActorOf(Device.Props("group", "device"));
deviceActor.Tell(new RequestTrackDevice("group", "device"), probe.Ref);
probe.ExpectMsg<DeviceRegistered>();
probe.LastSender.Should().Be(deviceActor);
}
[Fact]
public void Device_actor_must_ignore_wrong_registration_requests()
{
var probe = CreateTestProbe();
var deviceActor = Sys.ActorOf(Device.Props("group", "device"));
deviceActor.Tell(new RequestTrackDevice("wrongGroup", "device"), probe.Ref);
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
deviceActor.Tell(new RequestTrackDevice("group", "Wrongdevice"), probe.Ref);
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}
Device Group
We are done with the registration support at the device level, now we have to implement it at the group level. A group
has more work to do when it comes to registrations. It must either forward the request to an existing child, or it
should create one. To be able to look up child actors by their device IDs we will use a Dictionary<string, IActorRef>
.
We also want to keep the original sender of the request so that our device actor can reply directly. This is possible
by using Forward
instead of the Tell
operator. The only difference between the two is that Forward
keeps the original
sender while Tell
always sets the sender to be the current actor. Just like with our device actor, we ensure that we don't
respond to wrong group IDs:
public class DeviceGroup : UntypedActor
{
private Dictionary<string, IActorRef> deviceIdToActor = new();
public DeviceGroup(string groupId)
{
GroupId = groupId;
}
protected override void PreStart() => Log.Info($"Device group {GroupId} started");
protected override void PostStop() => Log.Info($"Device group {GroupId} stopped");
protected ILoggingAdapter Log { get; } = Context.GetLogger();
protected string GroupId { get; }
protected override void OnReceive(object message)
{
switch (message)
{
case RequestTrackDevice trackMsg when trackMsg.GroupId.Equals(GroupId):
if (deviceIdToActor.TryGetValue(trackMsg.DeviceId, out var actorRef))
{
actorRef.Forward(trackMsg);
}
else
{
Log.Info($"Creating device actor for {trackMsg.DeviceId}");
var deviceActor = Context.ActorOf(Device.Props(trackMsg.GroupId, trackMsg.DeviceId), $"device-{trackMsg.DeviceId}");
deviceIdToActor.Add(trackMsg.DeviceId, deviceActor);
deviceActor.Forward(trackMsg);
}
break;
case RequestTrackDevice trackMsg:
Log.Warning($"Ignoring TrackDevice request for {trackMsg.GroupId}. This actor is responsible for {GroupId}.");
break;
}
}
public static Props Props(string groupId) => Akka.Actor.Props.Create(() => new DeviceGroup(groupId));
}
Just as we did with the device, we test this new functionality. We also test that the actors returned for the two different IDs are actually different, and we also attempt to record a temperature reading for each of the devices to see if the actors are responding.
[Fact]
public void DeviceGroup_actor_must_be_able_to_register_a_device_actor()
{
var probe = CreateTestProbe();
var groupActor = Sys.ActorOf(DeviceGroup.Props("group"));
groupActor.Tell(new RequestTrackDevice("group", "device1"), probe.Ref);
probe.ExpectMsg<DeviceRegistered>();
var deviceActor1 = probe.LastSender;
groupActor.Tell(new RequestTrackDevice("group", "device2"), probe.Ref);
probe.ExpectMsg<DeviceRegistered>();
var deviceActor2 = probe.LastSender;
deviceActor1.Should().NotBe(deviceActor2);
// Check that the device actors are working
deviceActor1.Tell(new RecordTemperature(requestId: 0, value: 1.0), probe.Ref);
probe.ExpectMsg<TemperatureRecorded>(s => s.RequestId == 0);
deviceActor2.Tell(new RecordTemperature(requestId: 1, value: 2.0), probe.Ref);
probe.ExpectMsg<TemperatureRecorded>(s => s.RequestId == 1);
}
[Fact]
public void DeviceGroup_actor_must_ignore_requests_for_wrong_groupId()
{
var probe = CreateTestProbe();
var groupActor = Sys.ActorOf(DeviceGroup.Props("group"));
groupActor.Tell(new RequestTrackDevice("wrongGroup", "device1"), probe.Ref);
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}
It might be, that a device actor already exists for the registration request. In this case, we would like to use the existing actor instead of a new one. We have not tested this yet, so we need to fix this:
[Fact]
public void DeviceGroup_actor_must_return_same_actor_for_same_deviceId()
{
var probe = CreateTestProbe();
var groupActor = Sys.ActorOf(DeviceGroup.Props("group"));
groupActor.Tell(new RequestTrackDevice("group", "device1"), probe.Ref);
probe.ExpectMsg<DeviceRegistered>();
var deviceActor1 = probe.LastSender;
groupActor.Tell(new RequestTrackDevice("group", "device1"), probe.Ref);
probe.ExpectMsg<DeviceRegistered>();
var deviceActor2 = probe.LastSender;
deviceActor1.Should().Be(deviceActor2);
}
So far, we have implemented everything for registering device actors in the group. Devices come and go, however, so
we will need a way to remove those from the Dictionary<string, IActorRef>
. We will assume that when a device is removed, its corresponding device actor
is simply stopped. We need some way for the parent to be notified when one of the device actors are stopped. Unfortunately,
supervision will not help because it is used for error scenarios, not graceful stopping.
There is a feature in Akka.NET that is exactly what we need here. It is possible for an actor to watch another actor
and be notified if the other actor is stopped. This feature is called Death Watch and it is an important tool for
any Akka.NET application. Unlike supervision, watching is not limited to parent-child relationships, any actor can watch
any other actor given its IActorRef
. After a watched actor stops, the watcher receives a Terminated(ref)
message
which also contains the reference to the watched actor. The watcher can either handle this message explicitly or, if
it does not handle it directly it will fail with a DeathPactException
. This latter is useful if the actor can no
longer perform its duties after its collaborator actor has been stopped. In our case, the group should still function
after one device have been stopped, so we need to handle this message. The steps we need to follow are the following:
- Whenever we create a new device actor, we must also watch it.
- When we are notified that a device actor has been stopped we also need to remove it from the
Dictionary<string, IActorRef>
which maps devices to device actors.
Unfortunately, the Terminated
message contains only the IActorRef
of the child actor but we do not know
its ID, which we need to remove it from the map of existing device to device actor mappings. To be able to do this removal, we
need to introduce another placeholder, Dictionary<IActorRef, string>
, that allow us to find out the device ID corresponding to a given IActorRef
. Putting
this together the result is:
public class DeviceGroup : UntypedActor
{
private Dictionary<string, IActorRef> deviceIdToActor = new();
private Dictionary<IActorRef, string> actorToDeviceId = new();
public DeviceGroup(string groupId)
{
GroupId = groupId;
}
protected override void PreStart() => Log.Info($"Device group {GroupId} started");
protected override void PostStop() => Log.Info($"Device group {GroupId} stopped");
protected ILoggingAdapter Log { get; } = Context.GetLogger();
protected string GroupId { get; }
protected override void OnReceive(object message)
{
switch (message)
{
case RequestTrackDevice trackMsg when trackMsg.GroupId.Equals(GroupId):
if (deviceIdToActor.TryGetValue(trackMsg.DeviceId, out var actorRef))
{
actorRef.Forward(trackMsg);
}
else
{
Log.Info($"Creating device actor for {trackMsg.DeviceId}");
var deviceActor = Context.ActorOf(Device.Props(trackMsg.GroupId, trackMsg.DeviceId), $"device-{trackMsg.DeviceId}");
Context.Watch(deviceActor);
actorToDeviceId.Add(deviceActor, trackMsg.DeviceId);
deviceIdToActor.Add(trackMsg.DeviceId, deviceActor);
deviceActor.Forward(trackMsg);
}
break;
case RequestTrackDevice trackMsg:
Log.Warning($"Ignoring TrackDevice request for {trackMsg.GroupId}. This actor is responsible for {GroupId}.");
break;
case Terminated t:
var deviceId = actorToDeviceId[t.ActorRef];
Log.Info($"Device actor for {deviceId} has been terminated");
actorToDeviceId.Remove(t.ActorRef);
deviceIdToActor.Remove(deviceId);
break;
}
}
public static Props Props(string groupId) => Akka.Actor.Props.Create(() => new DeviceGroup(groupId));
}
So far we have no means to get what devices the group device actor keeps track of and, therefore, we cannot test our
new functionality yet. To make it testable, we add a new query capability RequestDeviceList
that simply lists the currently active
device IDs:
public sealed class RequestDeviceList
{
public RequestDeviceList(long requestId)
{
RequestId = requestId;
}
public long RequestId { get; }
}
public sealed class ReplyDeviceList
{
public ReplyDeviceList(long requestId, ISet<string> ids)
{
RequestId = requestId;
Ids = ids;
}
public long RequestId { get; }
public ISet<string> Ids { get; }
}
public class DeviceGroup : UntypedActor
{
private Dictionary<string, IActorRef> deviceIdToActor = new();
private Dictionary<IActorRef, string> actorToDeviceId = new();
public DeviceGroup(string groupId)
{
GroupId = groupId;
}
protected override void PreStart() => Log.Info($"Device group {GroupId} started");
protected override void PostStop() => Log.Info($"Device group {GroupId} stopped");
protected ILoggingAdapter Log { get; } = Context.GetLogger();
protected string GroupId { get; }
protected override void OnReceive(object message)
{
switch (message)
{
case RequestTrackDevice trackMsg when trackMsg.GroupId.Equals(GroupId):
if (deviceIdToActor.TryGetValue(trackMsg.DeviceId, out var actorRef))
{
actorRef.Forward(trackMsg);
}
else
{
Log.Info($"Creating device actor for {trackMsg.DeviceId}");
var deviceActor = Context.ActorOf(Device.Props(trackMsg.GroupId, trackMsg.DeviceId), $"device-{trackMsg.DeviceId}");
Context.Watch(deviceActor);
actorToDeviceId.Add(deviceActor, trackMsg.DeviceId);
deviceIdToActor.Add(trackMsg.DeviceId, deviceActor);
deviceActor.Forward(trackMsg);
}
break;
case RequestTrackDevice trackMsg:
Log.Warning($"Ignoring TrackDevice request for {trackMsg.GroupId}. This actor is responsible for {GroupId}.");
break;
case RequestDeviceList deviceList:
Sender.Tell(new ReplyDeviceList(deviceList.RequestId, new HashSet<string>(deviceIdToActor.Keys)));
break;
case Terminated t:
var deviceId = actorToDeviceId[t.ActorRef];
Log.Info($"Device actor for {deviceId} has been terminated");
actorToDeviceId.Remove(t.ActorRef);
deviceIdToActor.Remove(deviceId);
break;
}
}
public static Props Props(string groupId) => Akka.Actor.Props.Create(() => new DeviceGroup(groupId));
}
We almost have everything to test the removal of devices. What is missing is:
- Stopping a device actor from our test case, from the outside: any actor can be stopped by simply sending a special
built-in message,
PoisonPill
, which instructs the actor to stop. - Be notified once the device actor is stopped: we can use the Death Watch facility for this purpose, too. Thankfully
the
TestProbe
has two messages that we can easily use,Watch()
to watch a specific actor, andExpectTerminated
to assert that the watched actor has been terminated.
We add two more test cases now. In the first, we just test that we get back the list of proper IDs once we have added a few devices. The second test case makes sure that the device ID is properly removed after the device actor has been stopped:
[Fact]
public void DeviceGroup_actor_must_be_able_to_list_active_devices()
{
var probe = CreateTestProbe();
var groupActor = Sys.ActorOf(DeviceGroup.Props("group"));
groupActor.Tell(new RequestTrackDevice("group", "device1"), probe.Ref);
probe.ExpectMsg<DeviceRegistered>();
groupActor.Tell(new RequestTrackDevice("group", "device2"), probe.Ref);
probe.ExpectMsg<DeviceRegistered>();
groupActor.Tell(new RequestDeviceList(requestId: 0), probe.Ref);
probe.ExpectMsg<ReplyDeviceList>(s => s.RequestId == 0
&& s.Ids.Contains("device1")
&& s.Ids.Contains("device2"));
}
[Fact]
public void DeviceGroup_actor_must_be_able_to_list_active_devices_after_one_shuts_down()
{
var probe = CreateTestProbe();
var groupActor = Sys.ActorOf(DeviceGroup.Props("group"));
groupActor.Tell(new RequestTrackDevice("group", "device1"), probe.Ref);
probe.ExpectMsg<DeviceRegistered>();
var toShutDown = probe.LastSender;
groupActor.Tell(new RequestTrackDevice("group", "device2"), probe.Ref);
probe.ExpectMsg<DeviceRegistered>();
groupActor.Tell(new RequestDeviceList(requestId: 0), probe.Ref);
probe.ExpectMsg<ReplyDeviceList>(s => s.RequestId == 0
&& s.Ids.Contains("device1")
&& s.Ids.Contains("device2"));
probe.Watch(toShutDown);
toShutDown.Tell(PoisonPill.Instance);
probe.ExpectTerminated(toShutDown);
// using awaitAssert to retry because it might take longer for the groupActor
// to see the Terminated, that order is undefined
probe.AwaitAssert(() =>
{
groupActor.Tell(new RequestDeviceList(requestId: 1), probe.Ref);
probe.ExpectMsg<ReplyDeviceList>(s => s.RequestId == 1 && s.Ids.Contains("device2") && !s.Ids.Contains("device1"));
});
}
Device Manager
The only part that remains now is the entry point for our device manager component. This actor is very similar to the device group actor, with the only difference that it creates device group actors instead of device actors:
public static partial class MainDeviceGroup
{
public sealed class RequestTrackDevice
{
public RequestTrackDevice(string groupId, string deviceId)
{
GroupId = groupId;
DeviceId = deviceId;
}
public string GroupId { get; }
public string DeviceId { get; }
}
public sealed class DeviceRegistered
{
public static DeviceRegistered Instance { get; } = new();
private DeviceRegistered() { }
}
public class DeviceManager : UntypedActor
{
private Dictionary<string, IActorRef> groupIdToActor = new();
private Dictionary<IActorRef, string> actorToGroupId = new();
protected override void PreStart() => Log.Info("DeviceManager started");
protected override void PostStop() => Log.Info("DeviceManager stopped");
protected ILoggingAdapter Log { get; } = Context.GetLogger();
protected override void OnReceive(object message)
{
switch (message)
{
case RequestTrackDevice trackMsg:
if (groupIdToActor.TryGetValue(trackMsg.GroupId, out var actorRef))
{
actorRef.Forward(trackMsg);
}
else
{
Log.Info($"Creating device group actor for {trackMsg.GroupId}");
var groupActor = Context.ActorOf(DeviceGroup.Props(trackMsg.GroupId), $"group-{trackMsg.GroupId}");
Context.Watch(groupActor);
groupActor.Forward(trackMsg);
groupIdToActor.Add(trackMsg.GroupId, groupActor);
actorToGroupId.Add(groupActor, trackMsg.GroupId);
}
break;
case Terminated t:
var groupId = actorToGroupId[t.ActorRef];
Log.Info($"Device group actor for {groupId} has been terminated");
actorToGroupId.Remove(t.ActorRef);
groupIdToActor.Remove(groupId);
break;
}
}
public static Props Props() => Akka.Actor.Props.Create<DeviceManager>();
}
}
We leave tests of the device manager as an exercise as it is very similar to the tests we have written for the group actor.
What Is Next?
We have now a hierarchical component for registering and tracking devices and recording measurements. We have seen some conversation patterns like:
- Request-respond (for temperature recordings).
- Delegate-respond (for registration of devices).
- Create-watch-terminate (for creating the group and device actor as children).
In the next chapter, we will introduce group query capabilities, which will establish a new conversation pattern of scatter-gather. In particular, we will implement the functionality that allows users to query the status of all the devices belonging to a group.