diff options
| author | Mike Vink <mike1994vink@gmail.com> | 2021-07-20 17:41:58 +0200 |
|---|---|---|
| committer | Mike Vink <mike1994vink@gmail.com> | 2021-07-20 17:41:58 +0200 |
| commit | 5f016158e73a7828c9dec1810e54bbe2550d8c20 (patch) | |
| tree | 82b380c3976be9e48248ae8f30a5e99b5afc7e97 /domain | |
| parent | 36730af06964e75735ae5d099ee9c5bf2c6b7fc9 (diff) | |
feat(): trainer registration to actor sys
Diffstat (limited to 'domain')
11 files changed, 696 insertions, 38 deletions
diff --git a/domain/build.gradle b/domain/build.gradle index d2d174a..4b7063a 100644 --- a/domain/build.gradle +++ b/domain/build.gradle @@ -26,6 +26,9 @@ dependencies { implementation 'ch.qos.logback:logback-classic:1.2.3' implementation 'junit:junit:4.12' + // Reference the domain subproject. + implementation project(':api') + testImplementation "com.typesafe.akka:akka-actor-testkit-typed_${versions.ScalaBinary}" testImplementation "com.typesafe.akka:akka-stream-testkit_${versions.ScalaBinary}" diff --git a/domain/src/main/java/akkamon/domain/AkkamonImpl.java b/domain/src/main/java/akkamon/domain/AkkamonImpl.java index a6a840b..534abf2 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonImpl.java +++ b/domain/src/main/java/akkamon/domain/AkkamonImpl.java @@ -17,19 +17,19 @@ public class AkkamonImpl implements Akkamon { @Override public void newPlayerConnected(String name, String password) { - switch (dummyTrainersCollection.size()) { - case 0: - dummyTrainersCollection.put("Ash", new Trainer("Ash")); - break; - case 1: - dummyTrainersCollection.put("Misty", new Trainer("Misty")); - break; - } + // switch (dummyTrainersCollection.size()) { + // case 0: + // dummyTrainersCollection.put("Ash", new Trainer("Ash")); + // break; + // case 1: + // dummyTrainersCollection.put("Misty", new Trainer("Misty")); + // break; + // } } public void updateTrainerPosition(String name, float x, float y) { - Trainer trainer = dummyTrainersCollection.get(name); - trainer.newPosition(x, y); + // Trainer trainer = dummyTrainersCollection.get(name); + // trainer.newPosition(x, y); } public HashMap<String, Trainer> getDummyTrainersCollection() { diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java new file mode 100644 index 0000000..a64e751 --- /dev/null +++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java @@ -0,0 +1,84 @@ +package akkamon.domain; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +import java.util.HashMap; +import java.util.Map; + +public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { + + + public interface Command {} + + public static class RequestTrainerRegistration implements AkkamonNexus.Command, SceneTrainerGroup.Command { + public String trainerId; + public String sceneId; + public ActorRef<TrainerRegistered> replyTo; + + public RequestTrainerRegistration(String trainerId, String sceneId, ActorRef<TrainerRegistered> replyTo) { + this.trainerId = trainerId; + this.sceneId = sceneId; + this.replyTo = replyTo; + } + } + + public static class TrainerRegistered { + private ActorRef<Trainer.Command> trainer; + + public TrainerRegistered(ActorRef<Trainer.Command> trainer) { + this.trainer = trainer; + } + } + + private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command { + public SceneTrainerGroupTerminated(String sceneId) { + } + } + + + public static Behavior<AkkamonNexus.Command> create() { + return Behaviors.setup(AkkamonNexus::new); + } + + private Map<String, ActorRef<SceneTrainerGroup.Command>> sceneIdToActor = new HashMap<>(); + + public AkkamonNexus(ActorContext<Command> context) { + super(context); + getContext().getLog().info("AkkamonNexus is spinning"); + } + + @Override + public Receive<Command> createReceive() { + return newReceiveBuilder() + .onMessage( + RequestTrainerRegistration.class, + this::onTrainerRegistration + ) + .build(); + } + + private AkkamonNexus onTrainerRegistration(RequestTrainerRegistration registrationRequest) { + String sceneId = registrationRequest.sceneId; + String trainerId = registrationRequest.trainerId; + + ActorRef<SceneTrainerGroup.Command> sceneTrainerGroup = sceneIdToActor.get(sceneId); + if (sceneTrainerGroup != null) { + sceneTrainerGroup.tell(registrationRequest); + } else { + getContext().getLog().info("Creating sceneTrainerGroup {} for trainer {}", sceneId, trainerId); + ActorRef<SceneTrainerGroup.Command> sceneActor = + getContext().spawn(SceneTrainerGroup.create(sceneId), "scene-" + sceneId); + + getContext().watchWith(sceneActor, new SceneTrainerGroupTerminated(sceneId)); + sceneActor.tell(registrationRequest); + sceneIdToActor.put(sceneId, sceneActor); + } + return this; + } + +} diff --git a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java new file mode 100644 index 0000000..fb9456a --- /dev/null +++ b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java @@ -0,0 +1,79 @@ +package akkamon.domain; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +import java.util.HashMap; +import java.util.Map; + +public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Command> { + + public interface Command { } + + private class TrainerOffline implements Command { + public ActorRef<Trainer.Command> trainer; + public String sceneId; + public String trainerId; + + public TrainerOffline(ActorRef<Trainer.Command> trainerActor, String sceneId, String trainerId) { + this.trainer = trainerActor; + this.sceneId = sceneId; + this.trainerId = trainerId; + } + } + + public static Behavior<Command> create(String TrainerGroupId) { + return Behaviors.setup(context -> new SceneTrainerGroup(context, TrainerGroupId)); + } + + private final String sceneId; + private final Map<String, ActorRef<Trainer.Command>> trainerIdToActor= new HashMap(); + + public SceneTrainerGroup(ActorContext<Command> context, String sceneId) { + super(context); + + this.sceneId = sceneId; + + getContext().getLog().info("SceneTrainerGroup Actor {} started", sceneId); + } + + @Override + public Receive<Command> createReceive() { + return newReceiveBuilder() + .onMessage( + AkkamonNexus.RequestTrainerRegistration.class, + this::onTrainerRegistration) + .build(); + } + + private SceneTrainerGroup onTrainerRegistration(AkkamonNexus.RequestTrainerRegistration registrationRequest) { + if (this.sceneId.equals(registrationRequest.sceneId)) { + ActorRef<Trainer.Command> trainerActor = trainerIdToActor.get(registrationRequest.trainerId); + if (trainerActor != null) { + registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered(trainerActor)); + } else { + getContext().getLog().info("Creating trainer actor for {}", registrationRequest.trainerId); + trainerActor = + getContext() + .spawn(Trainer.create(sceneId, registrationRequest.trainerId), "trainer-" + registrationRequest.trainerId); + getContext() + .watchWith(trainerActor, new SceneTrainerGroup.TrainerOffline(trainerActor, sceneId, registrationRequest.trainerId)); + trainerIdToActor.put(registrationRequest.trainerId, trainerActor); + registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered(trainerActor)); + } + } else { + getContext() + .getLog() + .warn( + "Ignoring TrainerRegistration request for {}. This actor is responsible for {}.", + registrationRequest.sceneId, + this.sceneId); + } + return this; + } + +} diff --git a/domain/src/main/java/akkamon/domain/Trainer.java b/domain/src/main/java/akkamon/domain/Trainer.java index 8f14c60..7f540a1 100644 --- a/domain/src/main/java/akkamon/domain/Trainer.java +++ b/domain/src/main/java/akkamon/domain/Trainer.java @@ -1,28 +1,31 @@ package akkamon.domain; -public class Trainer { - private String name; - private float x; - private float y; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; - public Trainer(String name) { - this.name = name; - } +public class Trainer extends AbstractBehavior<Trainer.Command> { - public void newPosition(float x, float y) { - this.x = x; - this.y = y; - } + public interface Command { } - public float getX() { - return x; + public static Behavior<Command> create(String sceneId, String trainerId) { + return Behaviors.setup(context -> new Trainer(context, sceneId, trainerId)); } - public float getY() { - return y; + private String sceneId; + private String trainerId; + + public Trainer(ActorContext<Command> context, String sceneId, String trainerId) { + super(context); + this.sceneId = sceneId; + this.trainerId = trainerId; } - public String getName() { - return name; + @Override + public Receive<Command> createReceive() { + return null; } + } diff --git a/domain/src/main/java/akkamon/domain/iot/Device.java b/domain/src/main/java/akkamon/domain/iot/Device.java index 27918a6..c4e63f8 100644 --- a/domain/src/main/java/akkamon/domain/iot/Device.java +++ b/domain/src/main/java/akkamon/domain/iot/Device.java @@ -46,10 +46,12 @@ public class Device extends AbstractBehavior<Device.Command> { public static final class RespondTemperature { final long requestId; + final String deviceId; final Optional<Double> value; - public RespondTemperature(long requestId, Optional<Double> value) { + public RespondTemperature(long requestId, String deviceId, Optional<Double> value) { this.requestId = requestId; + this.deviceId = deviceId; this.value = value; } } @@ -93,7 +95,7 @@ public class Device extends AbstractBehavior<Device.Command> { } private Behavior<Command> onReadTemperature(ReadTemperature r) { - r.replyTo.tell(new RespondTemperature(r.requestId, lastTemperatureReading)); + r.replyTo.tell(new RespondTemperature(r.requestId, deviceId, lastTemperatureReading)); return this; } diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java b/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java index 2cfc13a..40e0d7f 100644 --- a/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java +++ b/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java @@ -8,6 +8,7 @@ import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -40,6 +41,19 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> { context.getLog().info("DeviceGroup {} started", groupId); } + private DeviceGroup onAllTemperatures(DeviceManager.RequestAllTemperatures r) { + Map<String, ActorRef<Device.Command>> deviceIdToActorCopy = new HashMap<>(this.deviceIdToActor); + + getContext() + .spawnAnonymous( + DeviceGroupQuery.create( + deviceIdToActorCopy, r.requestId, r.replyTo, Duration.ofSeconds(3) + ) + ); + + return this; + } + private DeviceGroup onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { if (this.groupId.equals(trackMsg.groupId)) { ActorRef<Device.Command> deviceActor = deviceIdToActor.get(trackMsg.deviceId); @@ -88,6 +102,11 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> { this::onDeviceList) .onMessage(DeviceTerminated.class, this::onTerminated) .onSignal(PostStop.class, signal -> onPostStop()) + .onMessage( + DeviceManager.RequestAllTemperatures.class, + r -> r.groupId.equals(groupId), + this::onAllTemperatures + ) .build(); } diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java b/domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java new file mode 100644 index 0000000..b04fd23 --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java @@ -0,0 +1,128 @@ +package akkamon.domain.iot; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.*; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class DeviceGroupQuery extends AbstractBehavior<DeviceGroupQuery.Command> { + + + public interface Command {} + + private static enum CollectionTimeout implements Command { + INSTANCE + } + + static class WrappedRespondTemperature implements Command { + final Device.RespondTemperature response; + + WrappedRespondTemperature(Device.RespondTemperature response) { + this.response = response; + } + } + + private static class DeviceTerminated implements Command { + final String deviceId; + + private DeviceTerminated(String deviceId) { + this.deviceId = deviceId; + } + } + + public static Behavior<Command> create( + Map<String, ActorRef<Device.Command>> deviceIdToActor, + long requestId, + ActorRef<DeviceManager.RespondAllTemperatures> requester, + Duration timeout) { + return Behaviors.setup( + context -> + Behaviors.withTimers( + timers -> + new DeviceGroupQuery( + deviceIdToActor, requestId, requester, timeout, context, timers))); + } + + private final long requestId; + private final ActorRef<DeviceManager.RespondAllTemperatures> requester; + private Map<String, DeviceManager.TemperatureReading> repliesSoFar = new HashMap(); + private final Set<String> stillWaiting; + + private DeviceGroupQuery( + Map<String, ActorRef<Device.Command>> deviceIdToActor, + long requestId, + ActorRef<DeviceManager.RespondAllTemperatures> requester, + Duration timeout, + ActorContext<Command> context, + TimerScheduler<Command> timers) { + super(context); + this.requestId = requestId; + this.requester = requester; + + timers.startSingleTimer(CollectionTimeout.INSTANCE, timeout); + + ActorRef<Device.RespondTemperature> respondTemperatureAdapter = + context.messageAdapter(Device.RespondTemperature.class, WrappedRespondTemperature::new); + + for (Map.Entry<String, ActorRef<Device.Command>> entry : deviceIdToActor.entrySet()) { + context.watchWith(entry.getValue(), new DeviceTerminated(entry.getKey())); + entry.getValue().tell(new Device.ReadTemperature(0L, respondTemperatureAdapter)); + } + stillWaiting = new HashSet<>(deviceIdToActor.keySet()); + } + + + @Override + public Receive<Command> createReceive() { + return newReceiveBuilder() + .onMessage(WrappedRespondTemperature.class, this::onRespondTemperature) + .onMessage(DeviceTerminated.class, this::onDeviceTerminated) + .onMessage(CollectionTimeout.class, this::onCollectionTimeout) + .build(); + } + + private Behavior<Command> onRespondTemperature(WrappedRespondTemperature r) { + DeviceManager.TemperatureReading reading = + r.response + .value + .map(v -> (DeviceManager.TemperatureReading) new DeviceManager.Temperature(v)) + .orElse(DeviceManager.TemperatureNotAvailable.INSTANCE); + + String deviceId = r.response.deviceId; + repliesSoFar.put(deviceId, reading); + stillWaiting.remove(deviceId); + + return respondWhenAllCollected(); + } + + private Behavior<Command> onDeviceTerminated(DeviceTerminated terminated) { + if (stillWaiting.contains(terminated.deviceId)) { + repliesSoFar.put(terminated.deviceId, DeviceManager.DeviceNotAvailable.INSTANCE); + stillWaiting.remove(terminated.deviceId); + } + return respondWhenAllCollected(); + } + + private Behavior<Command> onCollectionTimeout(CollectionTimeout timeout) { + for (String deviceId : stillWaiting) { + repliesSoFar.put(deviceId, DeviceManager.DeviceTimedOut.INSTANCE); + } + stillWaiting.clear(); + return respondWhenAllCollected(); + } + + private Behavior<Command> respondWhenAllCollected() { + if (stillWaiting.isEmpty()) { + requester.tell(new DeviceManager.RespondAllTemperatures(requestId, repliesSoFar)); + return Behaviors.stopped(); + } else { + return this; + } + } + +} diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceManager.java b/domain/src/main/java/akkamon/domain/iot/DeviceManager.java index e445b7e..c0f8b0d 100644 --- a/domain/src/main/java/akkamon/domain/iot/DeviceManager.java +++ b/domain/src/main/java/akkamon/domain/iot/DeviceManager.java @@ -67,6 +67,73 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> { } } + public static final class RequestAllTemperatures implements DeviceGroupQuery.Command, DeviceGroup.Command, Command { + + final long requestId; + final String groupId; + final ActorRef<RespondAllTemperatures> replyTo; + + public RequestAllTemperatures( + long requestId, String groupId, ActorRef<RespondAllTemperatures> replyTo) { + this.requestId = requestId; + this.groupId = groupId; + this.replyTo = replyTo; + } + } + + public static final class RespondAllTemperatures { + final long requestId; + final Map<String, TemperatureReading> temperatures; + + public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) { + this.requestId = requestId; + this.temperatures = temperatures; + } + } + + public interface TemperatureReading {} + + public static final class Temperature implements TemperatureReading { + public final double value; + + public Temperature(double value) { + this.value = value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Temperature that = (Temperature) o; + + return Double.compare(that.value, value) == 0; + } + + @Override + public int hashCode() { + long temp = Double.doubleToLongBits(value); + return (int) (temp ^ (temp >>> 32)); + } + + @Override + public String toString() { + return "Temperature{" + "value=" + value + '}'; + } + } + + public enum TemperatureNotAvailable implements TemperatureReading { + INSTANCE + } + + public enum DeviceNotAvailable implements TemperatureReading { + INSTANCE + } + + public enum DeviceTimedOut implements TemperatureReading { + INSTANCE + } + public static Behavior<Command> create() { return Behaviors.setup(DeviceManager::new); } @@ -78,6 +145,15 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> { context.getLog().info("DeviceManager started"); } + public Receive<Command> createReceive() { + return newReceiveBuilder() + .onMessage(RequestTrackDevice.class, this::onTrackDevice) + .onMessage(RequestDeviceList.class, this::onRequestDeviceList) + .onMessage(DeviceGroupTerminated.class, this::onTerminated) + .onSignal(PostStop.class, signal -> onPostStop()) + .build(); + } + private DeviceManager onTrackDevice(RequestTrackDevice trackMsg) { String groupId = trackMsg.groupId; ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(groupId); @@ -110,15 +186,6 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> { return this; } - public Receive<Command> createReceive() { - return newReceiveBuilder() - .onMessage(RequestTrackDevice.class, this::onTrackDevice) - .onMessage(RequestDeviceList.class, this::onRequestDeviceList) - .onMessage(DeviceGroupTerminated.class, this::onTerminated) - .onSignal(PostStop.class, signal -> onPostStop()) - .build(); - } - private DeviceManager onPostStop() { getContext().getLog().info("DeviceManager stopped"); return this; diff --git a/domain/src/test/java/akkamon/domain/AkkamonNexusTest.java b/domain/src/test/java/akkamon/domain/AkkamonNexusTest.java new file mode 100644 index 0000000..5150b1c --- /dev/null +++ b/domain/src/test/java/akkamon/domain/AkkamonNexusTest.java @@ -0,0 +1,28 @@ +package akkamon.domain; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import org.junit.ClassRule; +import org.junit.Test; + +public class AkkamonNexusTest { + + @ClassRule + public static final TestKitJunitResource testKit = new TestKitJunitResource(); + + @Test + public void given_a_registration_request_when_no_scene_or_trainer_exists_in_the_system_then_create_scene_and_trainer_and_reply() { + TestProbe<AkkamonNexus.TrainerRegistered> probe = + testKit.createTestProbe(AkkamonNexus.TrainerRegistered.class); + + ActorRef<SceneTrainerGroup.Command> sceneTrainerGroupActor = + testKit.spawn(SceneTrainerGroup.create("start")); + + sceneTrainerGroupActor.tell(new AkkamonNexus.RequestTrainerRegistration("ash", "start", probe.getRef())); + + probe.expectMessageClass(AkkamonNexus.TrainerRegistered.class); + + } + +}
\ No newline at end of file diff --git a/domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java b/domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java new file mode 100644 index 0000000..ebfeda5 --- /dev/null +++ b/domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java @@ -0,0 +1,245 @@ +package akkamon.domain.iot; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import akkamon.domain.iot.DeviceManager.*; +import akkamon.domain.iot.DeviceManager.RespondAllTemperatures; +import akkamon.domain.iot.DeviceManager.Temperature; +import akkamon.domain.iot.DeviceManager.TemperatureReading; +import org.junit.ClassRule; +import org.junit.Test; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.Assert.*; + +public class DeviceGroupQueryTest { + + @ClassRule + public static final TestKitJunitResource testKit = new TestKitJunitResource(); + + @Test + public void testReturnTemperatureValueForWorkingDevices() { + TestProbe<RespondAllTemperatures> requester = + testKit.createTestProbe(RespondAllTemperatures.class); + TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class); + TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class); + + Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>(); + deviceIdToActor.put("device1", device1.getRef()); + deviceIdToActor.put("device2", device2.getRef()); + + ActorRef<DeviceGroupQuery.Command> queryActor = + testKit.spawn( + DeviceGroupQuery.create( + deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3))); + + device1.expectMessageClass(Device.ReadTemperature.class); + device2.expectMessageClass(Device.ReadTemperature.class); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device1", Optional.of(1.0)))); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device2", Optional.of(2.0)))); + + RespondAllTemperatures response = requester.receiveMessage(); + assertEquals(1L, response.requestId); + + Map<String, TemperatureReading> expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new Temperature(1.0)); + expectedTemperatures.put("device2", new Temperature(2.0)); + + assertEquals(expectedTemperatures, response.temperatures); + } + + @Test + public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() { + TestProbe<RespondAllTemperatures> requester = + testKit.createTestProbe(RespondAllTemperatures.class); + TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class); + TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class); + + Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>(); + deviceIdToActor.put("device1", device1.getRef()); + deviceIdToActor.put("device2", device2.getRef()); + + ActorRef<DeviceGroupQuery.Command> queryActor = + testKit.spawn( + DeviceGroupQuery.create( + deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3))); + + assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device1", Optional.empty()))); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device2", Optional.of(2.0)))); + + RespondAllTemperatures response = requester.receiveMessage(); + assertEquals(1L, response.requestId); + + Map<String, TemperatureReading> expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", TemperatureNotAvailable.INSTANCE); + expectedTemperatures.put("device2", new Temperature(2.0)); + + assertEquals(expectedTemperatures, response.temperatures); + } + + @Test + public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() { + TestProbe<RespondAllTemperatures> requester = + testKit.createTestProbe(RespondAllTemperatures.class); + TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class); + TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class); + + Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>(); + deviceIdToActor.put("device1", device1.getRef()); + deviceIdToActor.put("device2", device2.getRef()); + + ActorRef<DeviceGroupQuery.Command> queryActor = + testKit.spawn( + DeviceGroupQuery.create( + deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3))); + + assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device1", Optional.of(1.0)))); + + device2.stop(); + + RespondAllTemperatures response = requester.receiveMessage(); + assertEquals(1L, response.requestId); + + Map<String, TemperatureReading> expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new Temperature(1.0)); + expectedTemperatures.put("device2", DeviceNotAvailable.INSTANCE); + + assertEquals(expectedTemperatures, response.temperatures); + } + + @Test + public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() { + TestProbe<RespondAllTemperatures> requester = + testKit.createTestProbe(RespondAllTemperatures.class); + TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class); + TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class); + + Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>(); + deviceIdToActor.put("device1", device1.getRef()); + deviceIdToActor.put("device2", device2.getRef()); + + ActorRef<DeviceGroupQuery.Command> queryActor = + testKit.spawn( + DeviceGroupQuery.create( + deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3))); + + assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device1", Optional.of(1.0)))); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device2", Optional.of(2.0)))); + + device2.stop(); + + RespondAllTemperatures response = requester.receiveMessage(); + assertEquals(1L, response.requestId); + + Map<String, TemperatureReading> expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new Temperature(1.0)); + expectedTemperatures.put("device2", new Temperature(2.0)); + + assertEquals(expectedTemperatures, response.temperatures); + } + + @Test + public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() { + TestProbe<RespondAllTemperatures> requester = + testKit.createTestProbe(RespondAllTemperatures.class); + TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class); + TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class); + + Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>(); + deviceIdToActor.put("device1", device1.getRef()); + deviceIdToActor.put("device2", device2.getRef()); + + ActorRef<DeviceGroupQuery.Command> queryActor = + testKit.spawn( + DeviceGroupQuery.create( + deviceIdToActor, 1L, requester.getRef(), Duration.ofMillis(200))); + + assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device1", Optional.of(1.0)))); + + // no reply from device2 + + RespondAllTemperatures response = requester.receiveMessage(); + assertEquals(1L, response.requestId); + + Map<String, TemperatureReading> expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new Temperature(1.0)); + expectedTemperatures.put("device2", DeviceTimedOut.INSTANCE); + + assertEquals(expectedTemperatures, response.temperatures); + } + + @Test + public void testCollectTemperaturesFromAllActiveDevices() { + TestProbe<DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceRegistered.class); + ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group")); + + groupActor.tell(new RequestTrackDevice("group", "device1", registeredProbe.getRef())); + ActorRef<Device.Command> deviceActor1 = registeredProbe.receiveMessage().device; + + groupActor.tell(new RequestTrackDevice("group", "device2", registeredProbe.getRef())); + ActorRef<Device.Command> deviceActor2 = registeredProbe.receiveMessage().device; + + groupActor.tell(new RequestTrackDevice("group", "device3", registeredProbe.getRef())); + ActorRef<Device.Command> deviceActor3 = registeredProbe.receiveMessage().device; + + // Check that the device actors are working + TestProbe<Device.TemperatureRecorded> recordProbe = + testKit.createTestProbe(Device.TemperatureRecorded.class); + deviceActor1.tell(new Device.RecordTemperature(0L, 1.0, recordProbe.getRef())); + assertEquals(0L, recordProbe.receiveMessage().requestId); + deviceActor2.tell(new Device.RecordTemperature(1L, 2.0, recordProbe.getRef())); + assertEquals(1L, recordProbe.receiveMessage().requestId); + // No temperature for device 3 + + TestProbe<RespondAllTemperatures> allTempProbe = + testKit.createTestProbe(RespondAllTemperatures.class); + groupActor.tell(new RequestAllTemperatures(0L, "group", allTempProbe.getRef())); + RespondAllTemperatures response = allTempProbe.receiveMessage(); + assertEquals(0L, response.requestId); + + Map<String, TemperatureReading> expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new Temperature(1.0)); + expectedTemperatures.put("device2", new Temperature(2.0)); + expectedTemperatures.put("device3", TemperatureNotAvailable.INSTANCE); + + assertEquals(expectedTemperatures, response.temperatures); + } + +}
\ No newline at end of file |
