Part 4: Querying a Group of Devices
The conversational patterns we have seen so far were simple in the sense that they required no or little state to be kept in the actor that is only relevant to the conversation. Our device actors either simply returned a reading, which required no state change, recorded a temperature, which was required an update of a single field, or in the most complex case, managing groups and devices, we had to add or remove simple entries from a map.
In this chapter, we will see a more complex example. Our goal is to add a new service to the group device actor, one which allows querying the temperature from all running devices. Let us start by investigating how we want our query API to behave.
The very first issue we face is that the set of devices is dynamic, and each device is represented by an actor that can stop at any time. At the beginning of the query, we need to ask all of the device actors for the current temperature that we know about. However, during the lifecycle of the query:
- A device actor may stop and not respond back with a temperature reading.
- A new device actor might start up, but we missed asking it for the current temperature.
There are many approaches that can be taken to address these issues, but the important point is to settle on what is the desired behavior. We will pick the following two guarantees:
- When a query arrives at the group, the group actor takes a snapshot of the existing device actors and will only ask those for the temperature. Actors that are started after the arrival of the query are simply ignored.
- When an actor stops during the query without answering (i.e. before all the actors we asked for the temperature responded) we simply report back that fact to the sender of the query message.
Apart from device actors coming and going dynamically, some actors might take a long time to answer, for example, because they are stuck in an accidental infinite loop, or because they failed due to a bug and dropped our request. Ideally, we would like to give a deadline to our query:
- The query is considered completed if either all actors have responded (or confirmed being stopped), or we reach the deadline.
Given these decisions, and the fact that a device might not have a temperature to record, we can define four states that each device can be in, according to the query:
- It has a temperature available:
Temperature
. - It has responded, but has no temperature available yet:
TemperatureNotAvailable
. - It has stopped before answering:
DeviceNotAvailable
. - It did not respond before the deadline:
DeviceTimedOut
.
Summarizing these in message types we can add the following to DeviceGroup
:
public sealed class RequestAllTemperatures
{
public RequestAllTemperatures(long requestId)
{
RequestId = requestId;
}
public long RequestId { get; }
}
public sealed class RespondAllTemperatures
{
public RespondAllTemperatures(long requestId, Dictionary<string, ITemperatureReading> temperatures)
{
RequestId = requestId;
Temperatures = temperatures;
}
public long RequestId { get; }
public Dictionary<string, ITemperatureReading> Temperatures { get; }
}
public interface ITemperatureReading
{
}
public sealed class Temperature : ITemperatureReading
{
public Temperature(double value)
{
Value = value;
}
public double Value { get; }
}
public sealed class TemperatureNotAvailable : ITemperatureReading
{
public static TemperatureNotAvailable Instance { get; } = new();
private TemperatureNotAvailable() { }
}
public sealed class DeviceNotAvailable : ITemperatureReading
{
public static DeviceNotAvailable Instance { get; } = new();
private DeviceNotAvailable() { }
}
public sealed class DeviceTimedOut : ITemperatureReading
{
public static DeviceTimedOut Instance { get; } = new();
private DeviceTimedOut() { }
}
Implementing the Query
One of the approaches for implementing the query could be to add more code to the group device actor. While this is
possible, in practice this can be very cumbersome and error prone. When we start a query, we need to take a snapshot
of the devices present at the start of the query and start a timer so that we can enforce the deadline. Unfortunately,
during the time we execute a query another query might just arrive. For this other query, of course, we need to keep
track of the exact same information but isolated from the previous query. This complicates the code and also poses
some problems. For example, we would need a data structure that maps the IActorRef
s of the devices to the queries
that use that device, so that they can be notified when such a device terminates, i.e. a Terminated
message is
received.
There is a much simpler approach that is superior in every way, and it is the one we will implement. We will create an actor that represents a single query and which performs the tasks needed to complete the query on behalf of the group actor. So far we have created actors that belonged to classical domain objects, but now, we will create an actor that represents a process or task rather than an entity. This move keeps our group device actor simple and gives us better ways to test the query capability in isolation.
First, we need to design the lifecycle of our query actor. This consists of identifying its initial state, then the first action to be taken by the actor, then, the cleanup if necessary. There are a few things the query should need to be able to work:
- The snapshot of active device actors to query, and their IDs.
- The requestID of the request that started the query (so we can include it in the reply).
- The
IActorRef
of the actor who sent the group actor the query. We will send the reply to this actor directly. - A timeout parameter, how long the query should wait for replies. Keeping this as a parameter will simplify testing.
Since we need to have a timeout for how long we are willing to wait for responses, it is time to introduce a new feature that we have
not used yet: timers. Akka has a built-in scheduler facility for this exact purpose. Using it is simple, the
ScheduleTellOnceCancelable(time, actorRef, message, sender)
method will schedule the message message
into the future by the
specified time
and send it to the actor actorRef
. To implement our query timeout we need to create a message
that represents the query timeout. We create a simple message CollectionTimeout
without any parameters for
this purpose. The return value from ScheduleTellOnceCancelable
is a ICancellable
which can be used to cancel the timer
if the query finishes successfully in time. Getting the scheduler is possible from the ActorSystem
, which, in turn,
is accessible from the actor's context: Context.System.Scheduler
.
At the start of the query, we need to ask each of the device actors for the current temperature. To be able to quickly
detect devices that stopped before they got the ReadTemperature
message we will also watch each of the actors. This
way, we get Terminated
messages for those that stop during the lifetime of the query, so we don't need to wait
until the timeout to mark these as not available.
Putting together all these, the outline of our actor looks like this:
public sealed class CollectionTimeout
{
public static CollectionTimeout Instance { get; } = new();
private CollectionTimeout() { }
}
public class DeviceGroupQuery : UntypedActor
{
private ICancelable queryTimeoutTimer;
public DeviceGroupQuery(Dictionary<IActorRef, string> actorToDeviceId, long requestId, IActorRef requester, TimeSpan timeout)
{
ActorToDeviceId = actorToDeviceId;
RequestId = requestId;
Requester = requester;
Timeout = timeout;
queryTimeoutTimer = Context.System.Scheduler.ScheduleTellOnceCancelable(timeout, Self, CollectionTimeout.Instance, Self);
}
protected override void PreStart()
{
foreach (var deviceActor in ActorToDeviceId.Keys)
{
Context.Watch(deviceActor);
deviceActor.Tell(new ReadTemperature(0));
}
}
protected override void PostStop()
{
queryTimeoutTimer.Cancel();
}
protected ILoggingAdapter Log { get; } = Context.GetLogger();
public Dictionary<IActorRef, string> ActorToDeviceId { get; }
public long RequestId { get; }
public IActorRef Requester { get; }
public TimeSpan Timeout { get; }
protected override void OnReceive(object message)
{
}
public static Props Props(Dictionary<IActorRef, string> actorToDeviceId, long requestId, IActorRef requester, TimeSpan timeout) =>
Akka.Actor.Props.Create(() => new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout));
}
The query actor, apart from the pending timer, has one stateful aspect about it: the actors that did not answer so far or,
from the other way around, the set of actors that have replied or stopped. One way to track this state is
to create a mutable field in the actor. There is another approach. It is also possible to change how
the actor responds to messages. By default, the OnReceive
block defines the behavior of the actor, but it is possible
to change it, several times, during the life of the actor. This is possible by calling Context.Become(newBehavior)
where newBehavior
is anything with type UntypedReceive
. A UntypedReceive
is just a function (or an object, if you like) that
can be returned from another function. We will leverage this feature to track the state of our actor.
As the first step, instead of defining OnReceive
directly, we delegate to another function to create the UntypedReceive
, which
we will call WaitingForReplies
. This will keep track of two changing values, a Dictionary
of already received replies
and a HashSet
of actors that we still wait on. We have three events that we should act on. We can receive a
RespondTemperature
message from one of the devices. Second, we can receive a Terminated
message for a device actor
that has been stopped in the meantime. Finally, we can reach the deadline and receive a CollectionTimeout
. In the
first two cases, we need to keep track of the replies, which we now simply delegate to a method ReceivedResponse
which
we will discuss later. In the case of timeout, we need to simply take all the actors that have not yet replied yet
(the members of the set stillWaiting
) and put a DeviceTimedOut
as the status in the final reply. Then we
reply to the submitter of the query with the collected results and stop the query actor:
public DeviceGroupQuery(Dictionary<IActorRef, string> actorToDeviceId, long requestId, IActorRef requester, TimeSpan timeout)
{
ActorToDeviceId = actorToDeviceId;
RequestId = requestId;
Requester = requester;
Timeout = timeout;
queryTimeoutTimer = Context.System.Scheduler.ScheduleTellOnceCancelable(timeout, Self, CollectionTimeout.Instance, Self);
Become(WaitingForReplies(new Dictionary<string, ITemperatureReading>(), new HashSet<IActorRef>(ActorToDeviceId.Keys)));
}
protected override void PreStart()
{
foreach (var deviceActor in ActorToDeviceId.Keys)
{
Context.Watch(deviceActor);
deviceActor.Tell(new ReadTemperature(0));
}
}
protected override void PostStop()
{
queryTimeoutTimer.Cancel();
}
protected ILoggingAdapter Log { get; } = Context.GetLogger();
public Dictionary<IActorRef, string> ActorToDeviceId { get; }
public long RequestId { get; }
public IActorRef Requester { get; }
public TimeSpan Timeout { get; }
public UntypedReceive WaitingForReplies(
Dictionary<string, ITemperatureReading> repliesSoFar,
HashSet<IActorRef> stillWaiting)
{
return message =>
{
switch (message)
{
case RespondTemperature response when response.RequestId == 0:
var deviceActor = Sender;
ITemperatureReading reading = null;
if (response.Value.HasValue)
{
reading = new Temperature(response.Value.Value);
}
else
{
reading = TemperatureNotAvailable.Instance;
}
ReceivedResponse(deviceActor, reading, stillWaiting, repliesSoFar);
break;
case Terminated t:
ReceivedResponse(t.ActorRef, DeviceNotAvailable.Instance, stillWaiting, repliesSoFar);
break;
case CollectionTimeout _:
var replies = new Dictionary<string, ITemperatureReading>(repliesSoFar);
foreach (var actor in stillWaiting)
{
var deviceId = ActorToDeviceId[actor];
replies.Add(deviceId, DeviceTimedOut.Instance);
}
Requester.Tell(new RespondAllTemperatures(RequestId, replies));
Context.Stop(Self);
break;
}
};
}
What is not yet clear is how we will "mutate" the answersSoFar
and stillWaiting
data structures. One important
thing to note is that the function WaitingForReplies
does not handle the messages directly. It returns an UntypedReceive
function that will handle the messages. This means that if we call WaitingForReplies
again, with different parameters,
then it returns a brand new UntypedReceive
that will use those new parameters. We have seen how we
can install the initial UntypedReceive
by simply returning it from OnReceive
. In order to install a new one, to record a
new reply, for example, we need some mechanism. This mechanism is the method Context.Become(newReceive)
which will
change the actor's message handling function to the provided newReceive
function. You can imagine that before
starting, your actor automatically calls Context.Become(receive)
, i.e. installing the UntypedReceive
function that
is returned from OnReceive
. This is another important observation: it is not OnReceive
that handles the messages,
it just returns a UntypedReceive
function that will actually handle the messages.
We now have to figure out what to do in ReceivedResponse
. First, we need to record the new result in the map
repliesSoFar
and remove the actor from stillWaiting
. The next step is to check if there are any remaining actors
we are waiting for. If there is none, we send the result of the query to the original requester and stop
the query actor. Otherwise, we need to update the repliesSoFar
and stillWaiting
structures and wait for more
messages.
In the code before, we treated Terminated
as the implicit response DeviceNotAvailable
, so ReceivedResponse
does
not need to do anything special. However, there is one small task we still need to do. It is possible that we receive a proper
response from a device actor, but then it stops during the lifetime of the query. We don't want this second event
to overwrite the already received reply. In other words, we don't want to receive Terminated
after we recorded the
response. This is simple to achieve by calling Context.Unwatch(ref)
. This method also ensures that we don't
receive Terminated
events that are already in the mailbox of the actor. It is also safe to call this multiple times,
only the first call will have any effect, the rest is simply ignored.
With all this knowledge, we can create the ReceivedResponse
method:
public void ReceivedResponse(
IActorRef deviceActor,
ITemperatureReading reading,
HashSet<IActorRef> stillWaiting,
Dictionary<string, ITemperatureReading> repliesSoFar)
{
Context.Unwatch(deviceActor);
var deviceId = ActorToDeviceId[deviceActor];
stillWaiting.Remove(deviceActor);
repliesSoFar.Add(deviceId, reading);
if (stillWaiting.Count == 0)
{
Requester.Tell(new RespondAllTemperatures(RequestId, repliesSoFar));
Context.Stop(Self);
}
else
{
Context.Become(WaitingForReplies(repliesSoFar, stillWaiting));
}
}
It is quite natural to ask at this point, what have we gained by using the Context.Become()
trick instead of
just making the repliesSoFar
and stillWaiting
structures mutable fields of the actor? In this
simple example, not that much. The value of this style of state keeping becomes more evident when you suddenly have
more kinds of states. Since each state
might have temporary data that is relevant itself, keeping these as fields would pollute the global state
of the actor, i.e. it is unclear what fields are used in what state. Using parameterized OnReceive
"factory"
methods we can keep data private that is only relevant to the state. It is still a good exercise to
rewrite the query using mutable fields instead of Context.Become()
. However, it is recommended to get comfortable
with the solution we have used here as it helps structuring more complex actor code in a cleaner and more maintainable way.
Our query actor is now done:
public sealed class CollectionTimeout
{
public static CollectionTimeout Instance { get; } = new();
private CollectionTimeout() { }
}
public class DeviceGroupQuery : UntypedActor
{
private ICancelable queryTimeoutTimer;
public DeviceGroupQuery(Dictionary<IActorRef, string> actorToDeviceId, long requestId, IActorRef requester, TimeSpan timeout)
{
ActorToDeviceId = actorToDeviceId;
RequestId = requestId;
Requester = requester;
Timeout = timeout;
queryTimeoutTimer = Context.System.Scheduler.ScheduleTellOnceCancelable(timeout, Self, CollectionTimeout.Instance, Self);
Become(WaitingForReplies(new Dictionary<string, ITemperatureReading>(), new HashSet<IActorRef>(ActorToDeviceId.Keys)));
}
protected override void PreStart()
{
foreach (var deviceActor in ActorToDeviceId.Keys)
{
Context.Watch(deviceActor);
deviceActor.Tell(new ReadTemperature(0));
}
}
protected override void PostStop()
{
queryTimeoutTimer.Cancel();
}
protected ILoggingAdapter Log { get; } = Context.GetLogger();
public Dictionary<IActorRef, string> ActorToDeviceId { get; }
public long RequestId { get; }
public IActorRef Requester { get; }
public TimeSpan Timeout { get; }
public UntypedReceive WaitingForReplies(
Dictionary<string, ITemperatureReading> repliesSoFar,
HashSet<IActorRef> stillWaiting)
{
return message =>
{
switch (message)
{
case RespondTemperature response when response.RequestId == 0:
var deviceActor = Sender;
ITemperatureReading reading = null;
if (response.Value.HasValue)
{
reading = new Temperature(response.Value.Value);
}
else
{
reading = TemperatureNotAvailable.Instance;
}
ReceivedResponse(deviceActor, reading, stillWaiting, repliesSoFar);
break;
case Terminated t:
ReceivedResponse(t.ActorRef, DeviceNotAvailable.Instance, stillWaiting, repliesSoFar);
break;
case CollectionTimeout _:
var replies = new Dictionary<string, ITemperatureReading>(repliesSoFar);
foreach (var actor in stillWaiting)
{
var deviceId = ActorToDeviceId[actor];
replies.Add(deviceId, DeviceTimedOut.Instance);
}
Requester.Tell(new RespondAllTemperatures(RequestId, replies));
Context.Stop(Self);
break;
}
};
}
public void ReceivedResponse(
IActorRef deviceActor,
ITemperatureReading reading,
HashSet<IActorRef> stillWaiting,
Dictionary<string, ITemperatureReading> repliesSoFar)
{
Context.Unwatch(deviceActor);
var deviceId = ActorToDeviceId[deviceActor];
stillWaiting.Remove(deviceActor);
repliesSoFar.Add(deviceId, reading);
if (stillWaiting.Count == 0)
{
Requester.Tell(new RespondAllTemperatures(RequestId, repliesSoFar));
Context.Stop(Self);
}
else
{
Context.Become(WaitingForReplies(repliesSoFar, stillWaiting));
}
}
protected override void OnReceive(object message)
{
}
public static Props Props(Dictionary<IActorRef, string> actorToDeviceId, long requestId, IActorRef requester, TimeSpan timeout) =>
Akka.Actor.Props.Create(() => new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout));
}
Testing
Now let's verify the correctness of the query actor implementation. There are various scenarios we need to test individually to make
sure everything works as expected. To be able to do this, we need to simulate the device actors somehow to exercise
various normal or failure scenarios. Thankfully we took the list of collaborators (actually a Dictionary
) as a parameter
to the query actor, so we can easily pass in TestProbe
references. In our first test, we try out the case when
there are two devices and both report a temperature:
[Fact]
public void DeviceGroupQuery_must_return_temperature_value_for_working_devices()
{
var requester = CreateTestProbe();
var device1 = CreateTestProbe();
var device2 = CreateTestProbe();
var queryActor = Sys.ActorOf(DeviceGroupQuery.Props(
actorToDeviceId: new Dictionary<IActorRef, string> { [device1.Ref] = "device1", [device2.Ref] = "device2" },
requestId: 1,
requester: requester.Ref,
timeout: TimeSpan.FromSeconds(3)
));
device1.ExpectMsg<ReadTemperature>(read => read.RequestId == 0);
device2.ExpectMsg<ReadTemperature>(read => read.RequestId == 0);
queryActor.Tell(new RespondTemperature(requestId: 0, value: 1.0), device1.Ref);
queryActor.Tell(new RespondTemperature(requestId: 0, value: 2.0), device2.Ref);
requester.ExpectMsg<RespondAllTemperatures>(msg =>
msg.Temperatures["device1"].AsInstanceOf<Temperature>().Value == 1.0 &&
msg.Temperatures["device2"].AsInstanceOf<Temperature>().Value == 2.0 &&
msg.RequestId == 1);
}
That was the happy case, but we know that sometimes devices cannot provide a temperature measurement. This scenario is just slightly different from the previous:
[Fact]
public void DeviceGroupQuery_must_return_TemperatureNotAvailable_for_devices_with_no_readings()
{
var requester = CreateTestProbe();
var device1 = CreateTestProbe();
var device2 = CreateTestProbe();
var queryActor = Sys.ActorOf(DeviceGroupQuery.Props(
actorToDeviceId: new Dictionary<IActorRef, string> { [device1.Ref] = "device1", [device2.Ref] = "device2" },
requestId: 1,
requester: requester.Ref,
timeout: TimeSpan.FromSeconds(3)
));
device1.ExpectMsg<ReadTemperature>(read => read.RequestId == 0);
device2.ExpectMsg<ReadTemperature>(read => read.RequestId == 0);
queryActor.Tell(new RespondTemperature(requestId: 0, value: null), device1.Ref);
queryActor.Tell(new RespondTemperature(requestId: 0, value: 2.0), device2.Ref);
requester.ExpectMsg<RespondAllTemperatures>(msg =>
msg.Temperatures["device1"] is TemperatureNotAvailable &&
msg.Temperatures["device2"].AsInstanceOf<Temperature>().Value == 2.0 &&
msg.RequestId == 1);
}
We also know, that sometimes device actors stop before answering:
[Fact]
public void DeviceGroupQuery_must_return_return_DeviceNotAvailable_if_device_stops_before_answering()
{
var requester = CreateTestProbe();
var device1 = CreateTestProbe();
var device2 = CreateTestProbe();
var queryActor = Sys.ActorOf(DeviceGroupQuery.Props(
actorToDeviceId: new Dictionary<IActorRef, string> { [device1.Ref] = "device1", [device2.Ref] = "device2" },
requestId: 1,
requester: requester.Ref,
timeout: TimeSpan.FromSeconds(3)
));
device1.ExpectMsg<ReadTemperature>(read => read.RequestId == 0);
device2.ExpectMsg<ReadTemperature>(read => read.RequestId == 0);
queryActor.Tell(new RespondTemperature(requestId: 0, value: 1.0), device1.Ref);
device2.Tell(PoisonPill.Instance);
requester.ExpectMsg<RespondAllTemperatures>(msg =>
msg.Temperatures["device1"].AsInstanceOf<Temperature>().Value == 1.0 &&
msg.Temperatures["device2"] is DeviceNotAvailable &&
msg.RequestId == 1);
}
If you remember, there is another case related to device actors stopping. It is possible that we get a normal reply
from a device actor, but then receive a Terminated
for the same actor later. In this case, we would like to keep
the first reply and not mark the device as DeviceNotAvailable
. We should test this, too:
[Fact]
public void DeviceGroupQuery_must_return_temperature_reading_even_if_device_stops_after_answering()
{
var requester = CreateTestProbe();
var device1 = CreateTestProbe();
var device2 = CreateTestProbe();
var queryActor = Sys.ActorOf(DeviceGroupQuery.Props(
actorToDeviceId: new Dictionary<IActorRef, string> { [device1.Ref] = "device1", [device2.Ref] = "device2" },
requestId: 1,
requester: requester.Ref,
timeout: TimeSpan.FromSeconds(3)
));
device1.ExpectMsg<ReadTemperature>(read => read.RequestId == 0);
device2.ExpectMsg<ReadTemperature>(read => read.RequestId == 0);
queryActor.Tell(new RespondTemperature(requestId: 0, value: 1.0), device1.Ref);
queryActor.Tell(new RespondTemperature(requestId: 0, value: 2.0), device2.Ref);
device2.Tell(PoisonPill.Instance);
requester.ExpectMsg<RespondAllTemperatures>(msg =>
msg.Temperatures["device1"].AsInstanceOf<Temperature>().Value == 1.0 &&
msg.Temperatures["device2"].AsInstanceOf<Temperature>().Value == 2.0 &&
msg.RequestId == 1);
}
The final case is when not all devices respond in time. To keep our test relatively fast, we will construct the
DeviceGroupQuery
actor with a smaller timeout:
[Fact]
public void DeviceGroupQuery_must_return_DeviceTimedOut_if_device_does_not_answer_in_time()
{
var requester = CreateTestProbe();
var device1 = CreateTestProbe();
var device2 = CreateTestProbe();
var queryActor = Sys.ActorOf(DeviceGroupQuery.Props(
actorToDeviceId: new Dictionary<IActorRef, string> { [device1.Ref] = "device1", [device2.Ref] = "device2" },
requestId: 1,
requester: requester.Ref,
timeout: TimeSpan.FromSeconds(1)
));
device1.ExpectMsg<ReadTemperature>(read => read.RequestId == 0);
device2.ExpectMsg<ReadTemperature>(read => read.RequestId == 0);
queryActor.Tell(new RespondTemperature(requestId: 0, value: 1.0), device1.Ref);
requester.ExpectMsg<RespondAllTemperatures>(msg =>
msg.Temperatures["device1"].AsInstanceOf<Temperature>().Value == 1.0 &&
msg.Temperatures["device2"] is DeviceTimedOut &&
msg.RequestId == 1);
}
Our query works as expected now, it is time to include this new functionality in the DeviceGroup
actor now.
Adding the Query Capability to the Group
Including the query feature in the group actor is fairly simple now. We did all the heavy lifting in the query actor itself, the group actor only needs to create it with the right initial parameters and nothing else.
public class DeviceGroup : UntypedActor
{
private Dictionary<string, IActorRef> deviceIdToActor = new();
private Dictionary<IActorRef, string> actorToDeviceId = new();
private long nextCollectionId = 0L;
public DeviceGroup(string groupId)
{
GroupId = groupId;
}
protected override void PreStart() => Log.Info($"Device group {GroupId} started");
protected override void PostStop() => Log.Info($"Device group {GroupId} stopped");
protected ILoggingAdapter Log { get; } = Context.GetLogger();
protected string GroupId { get; }
protected override void OnReceive(object message)
{
switch (message)
{
case RequestAllTemperatures r:
Context.ActorOf(DeviceGroupQuery.Props(actorToDeviceId, r.RequestId, Sender, TimeSpan.FromSeconds(3)));
break;
}
}
public static Props Props(string groupId) => Akka.Actor.Props.Create(() => new DeviceGroup(groupId));
}
It is probably worth reiterating what we said at the beginning of the chapter: By keeping the temporary state that is only relevant to the query itself in a separate actor we keep the group actor implementation very simple. It delegates everything to child actors and therefore does not have to keep state that is not relevant to its core business. Also, multiple queries can now run parallel to each other, in fact, as many as needed. In our case querying an individual device actor is a fast operation, but if this were not the case, for example, because the remote sensors need to be contacted over the network, this design would significantly improve throughput.
We close this chapter by testing that everything works together. This test is just a variant of the previous ones, now exercising the group query feature:
[Fact]
public void DeviceGroup_actor_must_be_able_to_collect_temperatures_from_all_active_devices()
{
var probe = CreateTestProbe();
var groupActor = Sys.ActorOf(DeviceGroup.Props("group"));
groupActor.Tell(new RequestTrackDevice("group", "device1"), probe.Ref);
probe.ExpectMsg<DeviceRegistered>();
var deviceActor1 = probe.LastSender;
groupActor.Tell(new RequestTrackDevice("group", "device2"), probe.Ref);
probe.ExpectMsg<DeviceRegistered>();
var deviceActor2 = probe.LastSender;
groupActor.Tell(new RequestTrackDevice("group", "device3"), probe.Ref);
probe.ExpectMsg<DeviceRegistered>();
var deviceActor3 = probe.LastSender;
// Check that the device actors are working
deviceActor1.Tell(new RecordTemperature(requestId: 0, value: 1.0), probe.Ref);
probe.ExpectMsg<TemperatureRecorded>(s => s.RequestId == 0);
deviceActor2.Tell(new RecordTemperature(requestId: 1, value: 2.0), probe.Ref);
probe.ExpectMsg<TemperatureRecorded>(s => s.RequestId == 1);
// No temperature for device3
groupActor.Tell(new RequestAllTemperatures(0), probe.Ref);
probe.ExpectMsg<RespondAllTemperatures>(msg =>
msg.Temperatures["device1"].AsInstanceOf<Temperature>().Value == 1.0 &&
msg.Temperatures["device2"].AsInstanceOf<Temperature>().Value == 2.0 &&
msg.Temperatures["device3"] is TemperatureNotAvailable &&
msg.RequestId == 0);
}