summaryrefslogtreecommitdiff
path: root/domain
diff options
context:
space:
mode:
authorMike Vink <mike1994vink@gmail.com>2021-07-20 17:41:58 +0200
committerMike Vink <mike1994vink@gmail.com>2021-07-20 17:41:58 +0200
commit5f016158e73a7828c9dec1810e54bbe2550d8c20 (patch)
tree82b380c3976be9e48248ae8f30a5e99b5afc7e97 /domain
parent36730af06964e75735ae5d099ee9c5bf2c6b7fc9 (diff)
feat(): trainer registration to actor sys
Diffstat (limited to 'domain')
-rw-r--r--domain/build.gradle3
-rw-r--r--domain/src/main/java/akkamon/domain/AkkamonImpl.java20
-rw-r--r--domain/src/main/java/akkamon/domain/AkkamonNexus.java84
-rw-r--r--domain/src/main/java/akkamon/domain/SceneTrainerGroup.java79
-rw-r--r--domain/src/main/java/akkamon/domain/Trainer.java37
-rw-r--r--domain/src/main/java/akkamon/domain/iot/Device.java6
-rw-r--r--domain/src/main/java/akkamon/domain/iot/DeviceGroup.java19
-rw-r--r--domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java128
-rw-r--r--domain/src/main/java/akkamon/domain/iot/DeviceManager.java85
-rw-r--r--domain/src/test/java/akkamon/domain/AkkamonNexusTest.java28
-rw-r--r--domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java245
11 files changed, 696 insertions, 38 deletions
diff --git a/domain/build.gradle b/domain/build.gradle
index d2d174a..4b7063a 100644
--- a/domain/build.gradle
+++ b/domain/build.gradle
@@ -26,6 +26,9 @@ dependencies {
implementation 'ch.qos.logback:logback-classic:1.2.3'
implementation 'junit:junit:4.12'
+ // Reference the domain subproject.
+ implementation project(':api')
+
testImplementation "com.typesafe.akka:akka-actor-testkit-typed_${versions.ScalaBinary}"
testImplementation "com.typesafe.akka:akka-stream-testkit_${versions.ScalaBinary}"
diff --git a/domain/src/main/java/akkamon/domain/AkkamonImpl.java b/domain/src/main/java/akkamon/domain/AkkamonImpl.java
index a6a840b..534abf2 100644
--- a/domain/src/main/java/akkamon/domain/AkkamonImpl.java
+++ b/domain/src/main/java/akkamon/domain/AkkamonImpl.java
@@ -17,19 +17,19 @@ public class AkkamonImpl implements Akkamon {
@Override
public void newPlayerConnected(String name, String password) {
- switch (dummyTrainersCollection.size()) {
- case 0:
- dummyTrainersCollection.put("Ash", new Trainer("Ash"));
- break;
- case 1:
- dummyTrainersCollection.put("Misty", new Trainer("Misty"));
- break;
- }
+ // switch (dummyTrainersCollection.size()) {
+ // case 0:
+ // dummyTrainersCollection.put("Ash", new Trainer("Ash"));
+ // break;
+ // case 1:
+ // dummyTrainersCollection.put("Misty", new Trainer("Misty"));
+ // break;
+ // }
}
public void updateTrainerPosition(String name, float x, float y) {
- Trainer trainer = dummyTrainersCollection.get(name);
- trainer.newPosition(x, y);
+ // Trainer trainer = dummyTrainersCollection.get(name);
+ // trainer.newPosition(x, y);
}
public HashMap<String, Trainer> getDummyTrainersCollection() {
diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java
new file mode 100644
index 0000000..a64e751
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java
@@ -0,0 +1,84 @@
+package akkamon.domain;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
+
+
+ public interface Command {}
+
+ public static class RequestTrainerRegistration implements AkkamonNexus.Command, SceneTrainerGroup.Command {
+ public String trainerId;
+ public String sceneId;
+ public ActorRef<TrainerRegistered> replyTo;
+
+ public RequestTrainerRegistration(String trainerId, String sceneId, ActorRef<TrainerRegistered> replyTo) {
+ this.trainerId = trainerId;
+ this.sceneId = sceneId;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static class TrainerRegistered {
+ private ActorRef<Trainer.Command> trainer;
+
+ public TrainerRegistered(ActorRef<Trainer.Command> trainer) {
+ this.trainer = trainer;
+ }
+ }
+
+ private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command {
+ public SceneTrainerGroupTerminated(String sceneId) {
+ }
+ }
+
+
+ public static Behavior<AkkamonNexus.Command> create() {
+ return Behaviors.setup(AkkamonNexus::new);
+ }
+
+ private Map<String, ActorRef<SceneTrainerGroup.Command>> sceneIdToActor = new HashMap<>();
+
+ public AkkamonNexus(ActorContext<Command> context) {
+ super(context);
+ getContext().getLog().info("AkkamonNexus is spinning");
+ }
+
+ @Override
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(
+ RequestTrainerRegistration.class,
+ this::onTrainerRegistration
+ )
+ .build();
+ }
+
+ private AkkamonNexus onTrainerRegistration(RequestTrainerRegistration registrationRequest) {
+ String sceneId = registrationRequest.sceneId;
+ String trainerId = registrationRequest.trainerId;
+
+ ActorRef<SceneTrainerGroup.Command> sceneTrainerGroup = sceneIdToActor.get(sceneId);
+ if (sceneTrainerGroup != null) {
+ sceneTrainerGroup.tell(registrationRequest);
+ } else {
+ getContext().getLog().info("Creating sceneTrainerGroup {} for trainer {}", sceneId, trainerId);
+ ActorRef<SceneTrainerGroup.Command> sceneActor =
+ getContext().spawn(SceneTrainerGroup.create(sceneId), "scene-" + sceneId);
+
+ getContext().watchWith(sceneActor, new SceneTrainerGroupTerminated(sceneId));
+ sceneActor.tell(registrationRequest);
+ sceneIdToActor.put(sceneId, sceneActor);
+ }
+ return this;
+ }
+
+}
diff --git a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java
new file mode 100644
index 0000000..fb9456a
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java
@@ -0,0 +1,79 @@
+package akkamon.domain;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Command> {
+
+ public interface Command { }
+
+ private class TrainerOffline implements Command {
+ public ActorRef<Trainer.Command> trainer;
+ public String sceneId;
+ public String trainerId;
+
+ public TrainerOffline(ActorRef<Trainer.Command> trainerActor, String sceneId, String trainerId) {
+ this.trainer = trainerActor;
+ this.sceneId = sceneId;
+ this.trainerId = trainerId;
+ }
+ }
+
+ public static Behavior<Command> create(String TrainerGroupId) {
+ return Behaviors.setup(context -> new SceneTrainerGroup(context, TrainerGroupId));
+ }
+
+ private final String sceneId;
+ private final Map<String, ActorRef<Trainer.Command>> trainerIdToActor= new HashMap();
+
+ public SceneTrainerGroup(ActorContext<Command> context, String sceneId) {
+ super(context);
+
+ this.sceneId = sceneId;
+
+ getContext().getLog().info("SceneTrainerGroup Actor {} started", sceneId);
+ }
+
+ @Override
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(
+ AkkamonNexus.RequestTrainerRegistration.class,
+ this::onTrainerRegistration)
+ .build();
+ }
+
+ private SceneTrainerGroup onTrainerRegistration(AkkamonNexus.RequestTrainerRegistration registrationRequest) {
+ if (this.sceneId.equals(registrationRequest.sceneId)) {
+ ActorRef<Trainer.Command> trainerActor = trainerIdToActor.get(registrationRequest.trainerId);
+ if (trainerActor != null) {
+ registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered(trainerActor));
+ } else {
+ getContext().getLog().info("Creating trainer actor for {}", registrationRequest.trainerId);
+ trainerActor =
+ getContext()
+ .spawn(Trainer.create(sceneId, registrationRequest.trainerId), "trainer-" + registrationRequest.trainerId);
+ getContext()
+ .watchWith(trainerActor, new SceneTrainerGroup.TrainerOffline(trainerActor, sceneId, registrationRequest.trainerId));
+ trainerIdToActor.put(registrationRequest.trainerId, trainerActor);
+ registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered(trainerActor));
+ }
+ } else {
+ getContext()
+ .getLog()
+ .warn(
+ "Ignoring TrainerRegistration request for {}. This actor is responsible for {}.",
+ registrationRequest.sceneId,
+ this.sceneId);
+ }
+ return this;
+ }
+
+}
diff --git a/domain/src/main/java/akkamon/domain/Trainer.java b/domain/src/main/java/akkamon/domain/Trainer.java
index 8f14c60..7f540a1 100644
--- a/domain/src/main/java/akkamon/domain/Trainer.java
+++ b/domain/src/main/java/akkamon/domain/Trainer.java
@@ -1,28 +1,31 @@
package akkamon.domain;
-public class Trainer {
- private String name;
- private float x;
- private float y;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
- public Trainer(String name) {
- this.name = name;
- }
+public class Trainer extends AbstractBehavior<Trainer.Command> {
- public void newPosition(float x, float y) {
- this.x = x;
- this.y = y;
- }
+ public interface Command { }
- public float getX() {
- return x;
+ public static Behavior<Command> create(String sceneId, String trainerId) {
+ return Behaviors.setup(context -> new Trainer(context, sceneId, trainerId));
}
- public float getY() {
- return y;
+ private String sceneId;
+ private String trainerId;
+
+ public Trainer(ActorContext<Command> context, String sceneId, String trainerId) {
+ super(context);
+ this.sceneId = sceneId;
+ this.trainerId = trainerId;
}
- public String getName() {
- return name;
+ @Override
+ public Receive<Command> createReceive() {
+ return null;
}
+
}
diff --git a/domain/src/main/java/akkamon/domain/iot/Device.java b/domain/src/main/java/akkamon/domain/iot/Device.java
index 27918a6..c4e63f8 100644
--- a/domain/src/main/java/akkamon/domain/iot/Device.java
+++ b/domain/src/main/java/akkamon/domain/iot/Device.java
@@ -46,10 +46,12 @@ public class Device extends AbstractBehavior<Device.Command> {
public static final class RespondTemperature {
final long requestId;
+ final String deviceId;
final Optional<Double> value;
- public RespondTemperature(long requestId, Optional<Double> value) {
+ public RespondTemperature(long requestId, String deviceId, Optional<Double> value) {
this.requestId = requestId;
+ this.deviceId = deviceId;
this.value = value;
}
}
@@ -93,7 +95,7 @@ public class Device extends AbstractBehavior<Device.Command> {
}
private Behavior<Command> onReadTemperature(ReadTemperature r) {
- r.replyTo.tell(new RespondTemperature(r.requestId, lastTemperatureReading));
+ r.replyTo.tell(new RespondTemperature(r.requestId, deviceId, lastTemperatureReading));
return this;
}
diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java b/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java
index 2cfc13a..40e0d7f 100644
--- a/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java
+++ b/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java
@@ -8,6 +8,7 @@ import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@@ -40,6 +41,19 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
context.getLog().info("DeviceGroup {} started", groupId);
}
+ private DeviceGroup onAllTemperatures(DeviceManager.RequestAllTemperatures r) {
+ Map<String, ActorRef<Device.Command>> deviceIdToActorCopy = new HashMap<>(this.deviceIdToActor);
+
+ getContext()
+ .spawnAnonymous(
+ DeviceGroupQuery.create(
+ deviceIdToActorCopy, r.requestId, r.replyTo, Duration.ofSeconds(3)
+ )
+ );
+
+ return this;
+ }
+
private DeviceGroup onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
if (this.groupId.equals(trackMsg.groupId)) {
ActorRef<Device.Command> deviceActor = deviceIdToActor.get(trackMsg.deviceId);
@@ -88,6 +102,11 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
this::onDeviceList)
.onMessage(DeviceTerminated.class, this::onTerminated)
.onSignal(PostStop.class, signal -> onPostStop())
+ .onMessage(
+ DeviceManager.RequestAllTemperatures.class,
+ r -> r.groupId.equals(groupId),
+ this::onAllTemperatures
+ )
.build();
}
diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java b/domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java
new file mode 100644
index 0000000..b04fd23
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java
@@ -0,0 +1,128 @@
+package akkamon.domain.iot;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.*;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class DeviceGroupQuery extends AbstractBehavior<DeviceGroupQuery.Command> {
+
+
+ public interface Command {}
+
+ private static enum CollectionTimeout implements Command {
+ INSTANCE
+ }
+
+ static class WrappedRespondTemperature implements Command {
+ final Device.RespondTemperature response;
+
+ WrappedRespondTemperature(Device.RespondTemperature response) {
+ this.response = response;
+ }
+ }
+
+ private static class DeviceTerminated implements Command {
+ final String deviceId;
+
+ private DeviceTerminated(String deviceId) {
+ this.deviceId = deviceId;
+ }
+ }
+
+ public static Behavior<Command> create(
+ Map<String, ActorRef<Device.Command>> deviceIdToActor,
+ long requestId,
+ ActorRef<DeviceManager.RespondAllTemperatures> requester,
+ Duration timeout) {
+ return Behaviors.setup(
+ context ->
+ Behaviors.withTimers(
+ timers ->
+ new DeviceGroupQuery(
+ deviceIdToActor, requestId, requester, timeout, context, timers)));
+ }
+
+ private final long requestId;
+ private final ActorRef<DeviceManager.RespondAllTemperatures> requester;
+ private Map<String, DeviceManager.TemperatureReading> repliesSoFar = new HashMap();
+ private final Set<String> stillWaiting;
+
+ private DeviceGroupQuery(
+ Map<String, ActorRef<Device.Command>> deviceIdToActor,
+ long requestId,
+ ActorRef<DeviceManager.RespondAllTemperatures> requester,
+ Duration timeout,
+ ActorContext<Command> context,
+ TimerScheduler<Command> timers) {
+ super(context);
+ this.requestId = requestId;
+ this.requester = requester;
+
+ timers.startSingleTimer(CollectionTimeout.INSTANCE, timeout);
+
+ ActorRef<Device.RespondTemperature> respondTemperatureAdapter =
+ context.messageAdapter(Device.RespondTemperature.class, WrappedRespondTemperature::new);
+
+ for (Map.Entry<String, ActorRef<Device.Command>> entry : deviceIdToActor.entrySet()) {
+ context.watchWith(entry.getValue(), new DeviceTerminated(entry.getKey()));
+ entry.getValue().tell(new Device.ReadTemperature(0L, respondTemperatureAdapter));
+ }
+ stillWaiting = new HashSet<>(deviceIdToActor.keySet());
+ }
+
+
+ @Override
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(WrappedRespondTemperature.class, this::onRespondTemperature)
+ .onMessage(DeviceTerminated.class, this::onDeviceTerminated)
+ .onMessage(CollectionTimeout.class, this::onCollectionTimeout)
+ .build();
+ }
+
+ private Behavior<Command> onRespondTemperature(WrappedRespondTemperature r) {
+ DeviceManager.TemperatureReading reading =
+ r.response
+ .value
+ .map(v -> (DeviceManager.TemperatureReading) new DeviceManager.Temperature(v))
+ .orElse(DeviceManager.TemperatureNotAvailable.INSTANCE);
+
+ String deviceId = r.response.deviceId;
+ repliesSoFar.put(deviceId, reading);
+ stillWaiting.remove(deviceId);
+
+ return respondWhenAllCollected();
+ }
+
+ private Behavior<Command> onDeviceTerminated(DeviceTerminated terminated) {
+ if (stillWaiting.contains(terminated.deviceId)) {
+ repliesSoFar.put(terminated.deviceId, DeviceManager.DeviceNotAvailable.INSTANCE);
+ stillWaiting.remove(terminated.deviceId);
+ }
+ return respondWhenAllCollected();
+ }
+
+ private Behavior<Command> onCollectionTimeout(CollectionTimeout timeout) {
+ for (String deviceId : stillWaiting) {
+ repliesSoFar.put(deviceId, DeviceManager.DeviceTimedOut.INSTANCE);
+ }
+ stillWaiting.clear();
+ return respondWhenAllCollected();
+ }
+
+ private Behavior<Command> respondWhenAllCollected() {
+ if (stillWaiting.isEmpty()) {
+ requester.tell(new DeviceManager.RespondAllTemperatures(requestId, repliesSoFar));
+ return Behaviors.stopped();
+ } else {
+ return this;
+ }
+ }
+
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceManager.java b/domain/src/main/java/akkamon/domain/iot/DeviceManager.java
index e445b7e..c0f8b0d 100644
--- a/domain/src/main/java/akkamon/domain/iot/DeviceManager.java
+++ b/domain/src/main/java/akkamon/domain/iot/DeviceManager.java
@@ -67,6 +67,73 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
}
}
+ public static final class RequestAllTemperatures implements DeviceGroupQuery.Command, DeviceGroup.Command, Command {
+
+ final long requestId;
+ final String groupId;
+ final ActorRef<RespondAllTemperatures> replyTo;
+
+ public RequestAllTemperatures(
+ long requestId, String groupId, ActorRef<RespondAllTemperatures> replyTo) {
+ this.requestId = requestId;
+ this.groupId = groupId;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class RespondAllTemperatures {
+ final long requestId;
+ final Map<String, TemperatureReading> temperatures;
+
+ public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
+ this.requestId = requestId;
+ this.temperatures = temperatures;
+ }
+ }
+
+ public interface TemperatureReading {}
+
+ public static final class Temperature implements TemperatureReading {
+ public final double value;
+
+ public Temperature(double value) {
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Temperature that = (Temperature) o;
+
+ return Double.compare(that.value, value) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ long temp = Double.doubleToLongBits(value);
+ return (int) (temp ^ (temp >>> 32));
+ }
+
+ @Override
+ public String toString() {
+ return "Temperature{" + "value=" + value + '}';
+ }
+ }
+
+ public enum TemperatureNotAvailable implements TemperatureReading {
+ INSTANCE
+ }
+
+ public enum DeviceNotAvailable implements TemperatureReading {
+ INSTANCE
+ }
+
+ public enum DeviceTimedOut implements TemperatureReading {
+ INSTANCE
+ }
+
public static Behavior<Command> create() {
return Behaviors.setup(DeviceManager::new);
}
@@ -78,6 +145,15 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
context.getLog().info("DeviceManager started");
}
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(RequestTrackDevice.class, this::onTrackDevice)
+ .onMessage(RequestDeviceList.class, this::onRequestDeviceList)
+ .onMessage(DeviceGroupTerminated.class, this::onTerminated)
+ .onSignal(PostStop.class, signal -> onPostStop())
+ .build();
+ }
+
private DeviceManager onTrackDevice(RequestTrackDevice trackMsg) {
String groupId = trackMsg.groupId;
ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(groupId);
@@ -110,15 +186,6 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
return this;
}
- public Receive<Command> createReceive() {
- return newReceiveBuilder()
- .onMessage(RequestTrackDevice.class, this::onTrackDevice)
- .onMessage(RequestDeviceList.class, this::onRequestDeviceList)
- .onMessage(DeviceGroupTerminated.class, this::onTerminated)
- .onSignal(PostStop.class, signal -> onPostStop())
- .build();
- }
-
private DeviceManager onPostStop() {
getContext().getLog().info("DeviceManager stopped");
return this;
diff --git a/domain/src/test/java/akkamon/domain/AkkamonNexusTest.java b/domain/src/test/java/akkamon/domain/AkkamonNexusTest.java
new file mode 100644
index 0000000..5150b1c
--- /dev/null
+++ b/domain/src/test/java/akkamon/domain/AkkamonNexusTest.java
@@ -0,0 +1,28 @@
+package akkamon.domain;
+
+import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
+import akka.actor.testkit.typed.javadsl.TestProbe;
+import akka.actor.typed.ActorRef;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+public class AkkamonNexusTest {
+
+ @ClassRule
+ public static final TestKitJunitResource testKit = new TestKitJunitResource();
+
+ @Test
+ public void given_a_registration_request_when_no_scene_or_trainer_exists_in_the_system_then_create_scene_and_trainer_and_reply() {
+ TestProbe<AkkamonNexus.TrainerRegistered> probe =
+ testKit.createTestProbe(AkkamonNexus.TrainerRegistered.class);
+
+ ActorRef<SceneTrainerGroup.Command> sceneTrainerGroupActor =
+ testKit.spawn(SceneTrainerGroup.create("start"));
+
+ sceneTrainerGroupActor.tell(new AkkamonNexus.RequestTrainerRegistration("ash", "start", probe.getRef()));
+
+ probe.expectMessageClass(AkkamonNexus.TrainerRegistered.class);
+
+ }
+
+} \ No newline at end of file
diff --git a/domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java b/domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java
new file mode 100644
index 0000000..ebfeda5
--- /dev/null
+++ b/domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java
@@ -0,0 +1,245 @@
+package akkamon.domain.iot;
+
+import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
+import akka.actor.testkit.typed.javadsl.TestProbe;
+import akka.actor.typed.ActorRef;
+import akkamon.domain.iot.DeviceManager.*;
+import akkamon.domain.iot.DeviceManager.RespondAllTemperatures;
+import akkamon.domain.iot.DeviceManager.Temperature;
+import akkamon.domain.iot.DeviceManager.TemperatureReading;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.Assert.*;
+
+public class DeviceGroupQueryTest {
+
+ @ClassRule
+ public static final TestKitJunitResource testKit = new TestKitJunitResource();
+
+ @Test
+ public void testReturnTemperatureValueForWorkingDevices() {
+ TestProbe<RespondAllTemperatures> requester =
+ testKit.createTestProbe(RespondAllTemperatures.class);
+ TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
+ TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
+
+ Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
+ deviceIdToActor.put("device1", device1.getRef());
+ deviceIdToActor.put("device2", device2.getRef());
+
+ ActorRef<DeviceGroupQuery.Command> queryActor =
+ testKit.spawn(
+ DeviceGroupQuery.create(
+ deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3)));
+
+ device1.expectMessageClass(Device.ReadTemperature.class);
+ device2.expectMessageClass(Device.ReadTemperature.class);
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device1", Optional.of(1.0))));
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device2", Optional.of(2.0))));
+
+ RespondAllTemperatures response = requester.receiveMessage();
+ assertEquals(1L, response.requestId);
+
+ Map<String, TemperatureReading> expectedTemperatures = new HashMap<>();
+ expectedTemperatures.put("device1", new Temperature(1.0));
+ expectedTemperatures.put("device2", new Temperature(2.0));
+
+ assertEquals(expectedTemperatures, response.temperatures);
+ }
+
+ @Test
+ public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() {
+ TestProbe<RespondAllTemperatures> requester =
+ testKit.createTestProbe(RespondAllTemperatures.class);
+ TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
+ TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
+
+ Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
+ deviceIdToActor.put("device1", device1.getRef());
+ deviceIdToActor.put("device2", device2.getRef());
+
+ ActorRef<DeviceGroupQuery.Command> queryActor =
+ testKit.spawn(
+ DeviceGroupQuery.create(
+ deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3)));
+
+ assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId);
+ assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId);
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device1", Optional.empty())));
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device2", Optional.of(2.0))));
+
+ RespondAllTemperatures response = requester.receiveMessage();
+ assertEquals(1L, response.requestId);
+
+ Map<String, TemperatureReading> expectedTemperatures = new HashMap<>();
+ expectedTemperatures.put("device1", TemperatureNotAvailable.INSTANCE);
+ expectedTemperatures.put("device2", new Temperature(2.0));
+
+ assertEquals(expectedTemperatures, response.temperatures);
+ }
+
+ @Test
+ public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() {
+ TestProbe<RespondAllTemperatures> requester =
+ testKit.createTestProbe(RespondAllTemperatures.class);
+ TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
+ TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
+
+ Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
+ deviceIdToActor.put("device1", device1.getRef());
+ deviceIdToActor.put("device2", device2.getRef());
+
+ ActorRef<DeviceGroupQuery.Command> queryActor =
+ testKit.spawn(
+ DeviceGroupQuery.create(
+ deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3)));
+
+ assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId);
+ assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId);
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device1", Optional.of(1.0))));
+
+ device2.stop();
+
+ RespondAllTemperatures response = requester.receiveMessage();
+ assertEquals(1L, response.requestId);
+
+ Map<String, TemperatureReading> expectedTemperatures = new HashMap<>();
+ expectedTemperatures.put("device1", new Temperature(1.0));
+ expectedTemperatures.put("device2", DeviceNotAvailable.INSTANCE);
+
+ assertEquals(expectedTemperatures, response.temperatures);
+ }
+
+ @Test
+ public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() {
+ TestProbe<RespondAllTemperatures> requester =
+ testKit.createTestProbe(RespondAllTemperatures.class);
+ TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
+ TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
+
+ Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
+ deviceIdToActor.put("device1", device1.getRef());
+ deviceIdToActor.put("device2", device2.getRef());
+
+ ActorRef<DeviceGroupQuery.Command> queryActor =
+ testKit.spawn(
+ DeviceGroupQuery.create(
+ deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3)));
+
+ assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId);
+ assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId);
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device1", Optional.of(1.0))));
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device2", Optional.of(2.0))));
+
+ device2.stop();
+
+ RespondAllTemperatures response = requester.receiveMessage();
+ assertEquals(1L, response.requestId);
+
+ Map<String, TemperatureReading> expectedTemperatures = new HashMap<>();
+ expectedTemperatures.put("device1", new Temperature(1.0));
+ expectedTemperatures.put("device2", new Temperature(2.0));
+
+ assertEquals(expectedTemperatures, response.temperatures);
+ }
+
+ @Test
+ public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() {
+ TestProbe<RespondAllTemperatures> requester =
+ testKit.createTestProbe(RespondAllTemperatures.class);
+ TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
+ TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
+
+ Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
+ deviceIdToActor.put("device1", device1.getRef());
+ deviceIdToActor.put("device2", device2.getRef());
+
+ ActorRef<DeviceGroupQuery.Command> queryActor =
+ testKit.spawn(
+ DeviceGroupQuery.create(
+ deviceIdToActor, 1L, requester.getRef(), Duration.ofMillis(200)));
+
+ assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId);
+ assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId);
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device1", Optional.of(1.0))));
+
+ // no reply from device2
+
+ RespondAllTemperatures response = requester.receiveMessage();
+ assertEquals(1L, response.requestId);
+
+ Map<String, TemperatureReading> expectedTemperatures = new HashMap<>();
+ expectedTemperatures.put("device1", new Temperature(1.0));
+ expectedTemperatures.put("device2", DeviceTimedOut.INSTANCE);
+
+ assertEquals(expectedTemperatures, response.temperatures);
+ }
+
+ @Test
+ public void testCollectTemperaturesFromAllActiveDevices() {
+ TestProbe<DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceRegistered.class);
+ ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
+
+ groupActor.tell(new RequestTrackDevice("group", "device1", registeredProbe.getRef()));
+ ActorRef<Device.Command> deviceActor1 = registeredProbe.receiveMessage().device;
+
+ groupActor.tell(new RequestTrackDevice("group", "device2", registeredProbe.getRef()));
+ ActorRef<Device.Command> deviceActor2 = registeredProbe.receiveMessage().device;
+
+ groupActor.tell(new RequestTrackDevice("group", "device3", registeredProbe.getRef()));
+ ActorRef<Device.Command> deviceActor3 = registeredProbe.receiveMessage().device;
+
+ // Check that the device actors are working
+ TestProbe<Device.TemperatureRecorded> recordProbe =
+ testKit.createTestProbe(Device.TemperatureRecorded.class);
+ deviceActor1.tell(new Device.RecordTemperature(0L, 1.0, recordProbe.getRef()));
+ assertEquals(0L, recordProbe.receiveMessage().requestId);
+ deviceActor2.tell(new Device.RecordTemperature(1L, 2.0, recordProbe.getRef()));
+ assertEquals(1L, recordProbe.receiveMessage().requestId);
+ // No temperature for device 3
+
+ TestProbe<RespondAllTemperatures> allTempProbe =
+ testKit.createTestProbe(RespondAllTemperatures.class);
+ groupActor.tell(new RequestAllTemperatures(0L, "group", allTempProbe.getRef()));
+ RespondAllTemperatures response = allTempProbe.receiveMessage();
+ assertEquals(0L, response.requestId);
+
+ Map<String, TemperatureReading> expectedTemperatures = new HashMap<>();
+ expectedTemperatures.put("device1", new Temperature(1.0));
+ expectedTemperatures.put("device2", new Temperature(2.0));
+ expectedTemperatures.put("device3", TemperatureNotAvailable.INSTANCE);
+
+ assertEquals(expectedTemperatures, response.temperatures);
+ }
+
+} \ No newline at end of file