diff options
| author | Mike Vink <mike1994vink@gmail.com> | 2021-07-20 17:41:58 +0200 |
|---|---|---|
| committer | Mike Vink <mike1994vink@gmail.com> | 2021-07-20 17:41:58 +0200 |
| commit | 5f016158e73a7828c9dec1810e54bbe2550d8c20 (patch) | |
| tree | 82b380c3976be9e48248ae8f30a5e99b5afc7e97 | |
| parent | 36730af06964e75735ae5d099ee9c5bf2c6b7fc9 (diff) | |
feat(): trainer registration to actor sys
17 files changed, 724 insertions, 181 deletions
diff --git a/api/build.gradle b/api/build.gradle index 28ec919..188c417 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -4,6 +4,11 @@ plugins { id 'application' } +def versions = [ + ScalaBinary: "2.13" +] + + repositories { jcenter() mavenCentral() @@ -11,6 +16,9 @@ repositories { dependencies { + implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.6.15") + implementation "com.typesafe.akka:akka-actor-typed_${versions.ScalaBinary}" + // Use the Jersey framework to make writing and testing servlets easier. implementation 'org.glassfish.jersey.containers:jersey-container-servlet-core:+' implementation 'org.glassfish.jersey.containers:jersey-container-jetty-http:+' @@ -31,9 +39,6 @@ dependencies { implementation 'com.google.code.gson:gson:2.8.7' - // Reference the domain subproject. - implementation project(':domain') - // Use JUnit Jupiter API for testing. testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0' // Also use the Mockito mocking framework to mock simple server functionality. diff --git a/api/src/main/java/akkamon/api/AkkamonMessageEngine.java b/api/src/main/java/akkamon/api/AkkamonMessageEngine.java new file mode 100644 index 0000000..2bbe0c8 --- /dev/null +++ b/api/src/main/java/akkamon/api/AkkamonMessageEngine.java @@ -0,0 +1,6 @@ +package akkamon.api; + +public interface AkkamonMessageEngine { + // broadcasts position info to WebSocket Clients + void broadCastGridPosition(); +} diff --git a/api/src/main/java/akkamon/api/AkkamonSession.java b/api/src/main/java/akkamon/api/AkkamonSession.java index 55ca3c3..048071c 100644 --- a/api/src/main/java/akkamon/api/AkkamonSession.java +++ b/api/src/main/java/akkamon/api/AkkamonSession.java @@ -1,14 +1,4 @@ package akkamon.api; -import akkamon.api.models.User; - public interface AkkamonSession { - - void receiveGameState(String gameState); - - void disconnect(int statusCode, String message); - - void setCurrentUser(User user); - - User getUser(); } diff --git a/api/src/main/java/akkamon/api/App.java b/api/src/main/java/akkamon/api/App.java index 989562b..5d9add9 100644 --- a/api/src/main/java/akkamon/api/App.java +++ b/api/src/main/java/akkamon/api/App.java @@ -6,15 +6,19 @@ import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.websocket.server.JettyWebSocketServlet; import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer; -import org.glassfish.jersey.servlet.ServletContainer; public class App { + public static MessagingEngine messagingEngine; + public static void main(String[] args) { Server server = startServer(8080); ServletContextHandler context = createStatefulContext(server); + messagingEngine = new MessagingEngine(); + + // websocket behaviour // Configure specific websocket behavior diff --git a/api/src/main/java/akkamon/api/EventSocket.java b/api/src/main/java/akkamon/api/EventSocket.java index fdefee3..092d763 100644 --- a/api/src/main/java/akkamon/api/EventSocket.java +++ b/api/src/main/java/akkamon/api/EventSocket.java @@ -1,18 +1,14 @@ package akkamon.api; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; - -import akkamon.api.models.User; -import org.eclipse.jetty.websocket.api.WebSocketAdapter; import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; + +import java.util.concurrent.CountDownLatch; public class EventSocket extends WebSocketAdapter implements AkkamonSession { private final CountDownLatch closureLatch = new CountDownLatch(1); - public User user; - @Override public void onWebSocketConnect(Session sess) { @@ -25,7 +21,7 @@ public class EventSocket extends WebSocketAdapter implements AkkamonSession { { super.onWebSocketText(message); System.out.println("Received TEXT message: " + message); - MessagingEngine.getInstance().incoming(this, message); + App.messagingEngine.incoming(this, message); } @@ -35,7 +31,6 @@ public class EventSocket extends WebSocketAdapter implements AkkamonSession { super.onWebSocketClose(statusCode, reason); System.out.println("Socket Closed: [" + statusCode + "] " + reason); closureLatch.countDown(); - MessagingEngine.getInstance().sessionOffline(this); } @Override @@ -50,28 +45,4 @@ public class EventSocket extends WebSocketAdapter implements AkkamonSession { System.out.println("Awaiting closure from remote"); closureLatch.await(); } - - @Override - public void receiveGameState(String gameState) { - try { - getRemote().sendString(gameState); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void disconnect(int statusCode, String message) { - getSession().close(statusCode, message); - } - - @Override - public void setCurrentUser(User user) { - this.user = user; - } - - @Override - public User getUser() { - return user; - } } diff --git a/api/src/main/java/akkamon/api/MessagingEngine.java b/api/src/main/java/akkamon/api/MessagingEngine.java index 1627ad4..bbab97e 100644 --- a/api/src/main/java/akkamon/api/MessagingEngine.java +++ b/api/src/main/java/akkamon/api/MessagingEngine.java @@ -1,120 +1,29 @@ package akkamon.api; -import akkamon.api.models.*; -import akkamon.domain.AkkamonImpl; -import akkamon.domain.Trainer; +import akkamon.api.models.GameState; import com.google.gson.Gson; import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -public class MessagingEngine { +public class MessagingEngine implements AkkamonMessageEngine { private HashMap<String, AkkamonSession> akkamonSessions = new HashMap<>(); private static MessagingEngine instance; private Gson gson = new Gson(); - public static MessagingEngine getInstance() { - if (instance == null) { - instance = new MessagingEngine(); - return instance; - } - return instance; - } - public MessagingEngine() { - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2); - executor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - emitGameState(); - } - }, 0, 200, TimeUnit.MILLISECONDS); - } - void emitGameState() { - HashMap<String, Trainer> trainers = AkkamonImpl.getInstance().getDummyTrainersCollection(); - - if (akkamonSessions.size() == 1) { - AkkamonSession session = akkamonSessions.get("Ash"); - - GameState gameState = new GameState(); - // dummy - gameState.setCurrentPlayer("Ash", trainers); - - Event event = new Event("serverSidePosUpdate", gameState); - - session.receiveGameState(gson.toJson(event)); - - } else if (akkamonSessions.size() == 2) { - for (String name: akkamonSessions.keySet()) { - AkkamonSession session = akkamonSessions.get(name); - - GameState gameState = new GameState(); - // dummy - gameState.setCurrentPlayer(name, trainers); - gameState.setRemotePlayers(trainers); - - Event event = new Event("serverSidePosUpdate", gameState); - - session.receiveGameState(gson.toJson(event)); - } - } - - // for (Map.Entry<User, AkkamonSession> sess: akkamonSessions.entrySet()) { - - // User user = sess.getKey(); - // AkkamonSession session = sess.getValue(); - - // GameState gameState = new GameState(); - // // dummy - // gameState.setCurrentPlayer(user.name, trainers); - - // session.receiveGameState(gson.toJson(gameState)); - // } + @Override + public void broadCastGridPosition() { } void incoming(AkkamonSession session, String message) { - Event event = gson.fromJson(message, Event.class); - switch (event.type) { - case "login": - login(session, event.user); - break; - case "clientSidePosUpdate": - updatePositions(event.gameState); - break; - } + } private void updatePositions(GameState gameState) { - Player current = gameState.currentPlayer; - if (gameState.currentPlayer != null) { - AkkamonImpl.getInstance().updateTrainerPosition(current.name, current.position.x, current.position.y); - } - } - private void login(AkkamonSession session, User user) { - if (user == null) { - session.disconnect(401, "Give username and password"); - } - System.out.println("Currrent connections: " + akkamonSessions.size()); - if (akkamonSessions.size() == 0) { - akkamonSessions.put("Ash", session); - System.out.println("After adding ash!: " + akkamonSessions.size()); - session.setCurrentUser(new User("Ash", "")); - } else if (akkamonSessions.size() == 1) { - akkamonSessions.put("Misty", session); - session.setCurrentUser(new User("Misty", "")); - } - AkkamonImpl.getInstance().newPlayerConnected(user.name, user.password); - System.out.println("Emitting gameState!"); - emitGameState(); } - public void sessionOffline(AkkamonSession session) { - akkamonSessions.remove(session.getUser().name); - } } diff --git a/domain/build.gradle b/domain/build.gradle index d2d174a..4b7063a 100644 --- a/domain/build.gradle +++ b/domain/build.gradle @@ -26,6 +26,9 @@ dependencies { implementation 'ch.qos.logback:logback-classic:1.2.3' implementation 'junit:junit:4.12' + // Reference the domain subproject. + implementation project(':api') + testImplementation "com.typesafe.akka:akka-actor-testkit-typed_${versions.ScalaBinary}" testImplementation "com.typesafe.akka:akka-stream-testkit_${versions.ScalaBinary}" diff --git a/domain/src/main/java/akkamon/domain/AkkamonImpl.java b/domain/src/main/java/akkamon/domain/AkkamonImpl.java index a6a840b..534abf2 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonImpl.java +++ b/domain/src/main/java/akkamon/domain/AkkamonImpl.java @@ -17,19 +17,19 @@ public class AkkamonImpl implements Akkamon { @Override public void newPlayerConnected(String name, String password) { - switch (dummyTrainersCollection.size()) { - case 0: - dummyTrainersCollection.put("Ash", new Trainer("Ash")); - break; - case 1: - dummyTrainersCollection.put("Misty", new Trainer("Misty")); - break; - } + // switch (dummyTrainersCollection.size()) { + // case 0: + // dummyTrainersCollection.put("Ash", new Trainer("Ash")); + // break; + // case 1: + // dummyTrainersCollection.put("Misty", new Trainer("Misty")); + // break; + // } } public void updateTrainerPosition(String name, float x, float y) { - Trainer trainer = dummyTrainersCollection.get(name); - trainer.newPosition(x, y); + // Trainer trainer = dummyTrainersCollection.get(name); + // trainer.newPosition(x, y); } public HashMap<String, Trainer> getDummyTrainersCollection() { diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java new file mode 100644 index 0000000..a64e751 --- /dev/null +++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java @@ -0,0 +1,84 @@ +package akkamon.domain; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +import java.util.HashMap; +import java.util.Map; + +public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { + + + public interface Command {} + + public static class RequestTrainerRegistration implements AkkamonNexus.Command, SceneTrainerGroup.Command { + public String trainerId; + public String sceneId; + public ActorRef<TrainerRegistered> replyTo; + + public RequestTrainerRegistration(String trainerId, String sceneId, ActorRef<TrainerRegistered> replyTo) { + this.trainerId = trainerId; + this.sceneId = sceneId; + this.replyTo = replyTo; + } + } + + public static class TrainerRegistered { + private ActorRef<Trainer.Command> trainer; + + public TrainerRegistered(ActorRef<Trainer.Command> trainer) { + this.trainer = trainer; + } + } + + private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command { + public SceneTrainerGroupTerminated(String sceneId) { + } + } + + + public static Behavior<AkkamonNexus.Command> create() { + return Behaviors.setup(AkkamonNexus::new); + } + + private Map<String, ActorRef<SceneTrainerGroup.Command>> sceneIdToActor = new HashMap<>(); + + public AkkamonNexus(ActorContext<Command> context) { + super(context); + getContext().getLog().info("AkkamonNexus is spinning"); + } + + @Override + public Receive<Command> createReceive() { + return newReceiveBuilder() + .onMessage( + RequestTrainerRegistration.class, + this::onTrainerRegistration + ) + .build(); + } + + private AkkamonNexus onTrainerRegistration(RequestTrainerRegistration registrationRequest) { + String sceneId = registrationRequest.sceneId; + String trainerId = registrationRequest.trainerId; + + ActorRef<SceneTrainerGroup.Command> sceneTrainerGroup = sceneIdToActor.get(sceneId); + if (sceneTrainerGroup != null) { + sceneTrainerGroup.tell(registrationRequest); + } else { + getContext().getLog().info("Creating sceneTrainerGroup {} for trainer {}", sceneId, trainerId); + ActorRef<SceneTrainerGroup.Command> sceneActor = + getContext().spawn(SceneTrainerGroup.create(sceneId), "scene-" + sceneId); + + getContext().watchWith(sceneActor, new SceneTrainerGroupTerminated(sceneId)); + sceneActor.tell(registrationRequest); + sceneIdToActor.put(sceneId, sceneActor); + } + return this; + } + +} diff --git a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java new file mode 100644 index 0000000..fb9456a --- /dev/null +++ b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java @@ -0,0 +1,79 @@ +package akkamon.domain; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +import java.util.HashMap; +import java.util.Map; + +public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Command> { + + public interface Command { } + + private class TrainerOffline implements Command { + public ActorRef<Trainer.Command> trainer; + public String sceneId; + public String trainerId; + + public TrainerOffline(ActorRef<Trainer.Command> trainerActor, String sceneId, String trainerId) { + this.trainer = trainerActor; + this.sceneId = sceneId; + this.trainerId = trainerId; + } + } + + public static Behavior<Command> create(String TrainerGroupId) { + return Behaviors.setup(context -> new SceneTrainerGroup(context, TrainerGroupId)); + } + + private final String sceneId; + private final Map<String, ActorRef<Trainer.Command>> trainerIdToActor= new HashMap(); + + public SceneTrainerGroup(ActorContext<Command> context, String sceneId) { + super(context); + + this.sceneId = sceneId; + + getContext().getLog().info("SceneTrainerGroup Actor {} started", sceneId); + } + + @Override + public Receive<Command> createReceive() { + return newReceiveBuilder() + .onMessage( + AkkamonNexus.RequestTrainerRegistration.class, + this::onTrainerRegistration) + .build(); + } + + private SceneTrainerGroup onTrainerRegistration(AkkamonNexus.RequestTrainerRegistration registrationRequest) { + if (this.sceneId.equals(registrationRequest.sceneId)) { + ActorRef<Trainer.Command> trainerActor = trainerIdToActor.get(registrationRequest.trainerId); + if (trainerActor != null) { + registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered(trainerActor)); + } else { + getContext().getLog().info("Creating trainer actor for {}", registrationRequest.trainerId); + trainerActor = + getContext() + .spawn(Trainer.create(sceneId, registrationRequest.trainerId), "trainer-" + registrationRequest.trainerId); + getContext() + .watchWith(trainerActor, new SceneTrainerGroup.TrainerOffline(trainerActor, sceneId, registrationRequest.trainerId)); + trainerIdToActor.put(registrationRequest.trainerId, trainerActor); + registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered(trainerActor)); + } + } else { + getContext() + .getLog() + .warn( + "Ignoring TrainerRegistration request for {}. This actor is responsible for {}.", + registrationRequest.sceneId, + this.sceneId); + } + return this; + } + +} diff --git a/domain/src/main/java/akkamon/domain/Trainer.java b/domain/src/main/java/akkamon/domain/Trainer.java index 8f14c60..7f540a1 100644 --- a/domain/src/main/java/akkamon/domain/Trainer.java +++ b/domain/src/main/java/akkamon/domain/Trainer.java @@ -1,28 +1,31 @@ package akkamon.domain; -public class Trainer { - private String name; - private float x; - private float y; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; - public Trainer(String name) { - this.name = name; - } +public class Trainer extends AbstractBehavior<Trainer.Command> { - public void newPosition(float x, float y) { - this.x = x; - this.y = y; - } + public interface Command { } - public float getX() { - return x; + public static Behavior<Command> create(String sceneId, String trainerId) { + return Behaviors.setup(context -> new Trainer(context, sceneId, trainerId)); } - public float getY() { - return y; + private String sceneId; + private String trainerId; + + public Trainer(ActorContext<Command> context, String sceneId, String trainerId) { + super(context); + this.sceneId = sceneId; + this.trainerId = trainerId; } - public String getName() { - return name; + @Override + public Receive<Command> createReceive() { + return null; } + } diff --git a/domain/src/main/java/akkamon/domain/iot/Device.java b/domain/src/main/java/akkamon/domain/iot/Device.java index 27918a6..c4e63f8 100644 --- a/domain/src/main/java/akkamon/domain/iot/Device.java +++ b/domain/src/main/java/akkamon/domain/iot/Device.java @@ -46,10 +46,12 @@ public class Device extends AbstractBehavior<Device.Command> { public static final class RespondTemperature { final long requestId; + final String deviceId; final Optional<Double> value; - public RespondTemperature(long requestId, Optional<Double> value) { + public RespondTemperature(long requestId, String deviceId, Optional<Double> value) { this.requestId = requestId; + this.deviceId = deviceId; this.value = value; } } @@ -93,7 +95,7 @@ public class Device extends AbstractBehavior<Device.Command> { } private Behavior<Command> onReadTemperature(ReadTemperature r) { - r.replyTo.tell(new RespondTemperature(r.requestId, lastTemperatureReading)); + r.replyTo.tell(new RespondTemperature(r.requestId, deviceId, lastTemperatureReading)); return this; } diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java b/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java index 2cfc13a..40e0d7f 100644 --- a/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java +++ b/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java @@ -8,6 +8,7 @@ import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -40,6 +41,19 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> { context.getLog().info("DeviceGroup {} started", groupId); } + private DeviceGroup onAllTemperatures(DeviceManager.RequestAllTemperatures r) { + Map<String, ActorRef<Device.Command>> deviceIdToActorCopy = new HashMap<>(this.deviceIdToActor); + + getContext() + .spawnAnonymous( + DeviceGroupQuery.create( + deviceIdToActorCopy, r.requestId, r.replyTo, Duration.ofSeconds(3) + ) + ); + + return this; + } + private DeviceGroup onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { if (this.groupId.equals(trackMsg.groupId)) { ActorRef<Device.Command> deviceActor = deviceIdToActor.get(trackMsg.deviceId); @@ -88,6 +102,11 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> { this::onDeviceList) .onMessage(DeviceTerminated.class, this::onTerminated) .onSignal(PostStop.class, signal -> onPostStop()) + .onMessage( + DeviceManager.RequestAllTemperatures.class, + r -> r.groupId.equals(groupId), + this::onAllTemperatures + ) .build(); } diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java b/domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java new file mode 100644 index 0000000..b04fd23 --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java @@ -0,0 +1,128 @@ +package akkamon.domain.iot; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.*; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class DeviceGroupQuery extends AbstractBehavior<DeviceGroupQuery.Command> { + + + public interface Command {} + + private static enum CollectionTimeout implements Command { + INSTANCE + } + + static class WrappedRespondTemperature implements Command { + final Device.RespondTemperature response; + + WrappedRespondTemperature(Device.RespondTemperature response) { + this.response = response; + } + } + + private static class DeviceTerminated implements Command { + final String deviceId; + + private DeviceTerminated(String deviceId) { + this.deviceId = deviceId; + } + } + + public static Behavior<Command> create( + Map<String, ActorRef<Device.Command>> deviceIdToActor, + long requestId, + ActorRef<DeviceManager.RespondAllTemperatures> requester, + Duration timeout) { + return Behaviors.setup( + context -> + Behaviors.withTimers( + timers -> + new DeviceGroupQuery( + deviceIdToActor, requestId, requester, timeout, context, timers))); + } + + private final long requestId; + private final ActorRef<DeviceManager.RespondAllTemperatures> requester; + private Map<String, DeviceManager.TemperatureReading> repliesSoFar = new HashMap(); + private final Set<String> stillWaiting; + + private DeviceGroupQuery( + Map<String, ActorRef<Device.Command>> deviceIdToActor, + long requestId, + ActorRef<DeviceManager.RespondAllTemperatures> requester, + Duration timeout, + ActorContext<Command> context, + TimerScheduler<Command> timers) { + super(context); + this.requestId = requestId; + this.requester = requester; + + timers.startSingleTimer(CollectionTimeout.INSTANCE, timeout); + + ActorRef<Device.RespondTemperature> respondTemperatureAdapter = + context.messageAdapter(Device.RespondTemperature.class, WrappedRespondTemperature::new); + + for (Map.Entry<String, ActorRef<Device.Command>> entry : deviceIdToActor.entrySet()) { + context.watchWith(entry.getValue(), new DeviceTerminated(entry.getKey())); + entry.getValue().tell(new Device.ReadTemperature(0L, respondTemperatureAdapter)); + } + stillWaiting = new HashSet<>(deviceIdToActor.keySet()); + } + + + @Override + public Receive<Command> createReceive() { + return newReceiveBuilder() + .onMessage(WrappedRespondTemperature.class, this::onRespondTemperature) + .onMessage(DeviceTerminated.class, this::onDeviceTerminated) + .onMessage(CollectionTimeout.class, this::onCollectionTimeout) + .build(); + } + + private Behavior<Command> onRespondTemperature(WrappedRespondTemperature r) { + DeviceManager.TemperatureReading reading = + r.response + .value + .map(v -> (DeviceManager.TemperatureReading) new DeviceManager.Temperature(v)) + .orElse(DeviceManager.TemperatureNotAvailable.INSTANCE); + + String deviceId = r.response.deviceId; + repliesSoFar.put(deviceId, reading); + stillWaiting.remove(deviceId); + + return respondWhenAllCollected(); + } + + private Behavior<Command> onDeviceTerminated(DeviceTerminated terminated) { + if (stillWaiting.contains(terminated.deviceId)) { + repliesSoFar.put(terminated.deviceId, DeviceManager.DeviceNotAvailable.INSTANCE); + stillWaiting.remove(terminated.deviceId); + } + return respondWhenAllCollected(); + } + + private Behavior<Command> onCollectionTimeout(CollectionTimeout timeout) { + for (String deviceId : stillWaiting) { + repliesSoFar.put(deviceId, DeviceManager.DeviceTimedOut.INSTANCE); + } + stillWaiting.clear(); + return respondWhenAllCollected(); + } + + private Behavior<Command> respondWhenAllCollected() { + if (stillWaiting.isEmpty()) { + requester.tell(new DeviceManager.RespondAllTemperatures(requestId, repliesSoFar)); + return Behaviors.stopped(); + } else { + return this; + } + } + +} diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceManager.java b/domain/src/main/java/akkamon/domain/iot/DeviceManager.java index e445b7e..c0f8b0d 100644 --- a/domain/src/main/java/akkamon/domain/iot/DeviceManager.java +++ b/domain/src/main/java/akkamon/domain/iot/DeviceManager.java @@ -67,6 +67,73 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> { } } + public static final class RequestAllTemperatures implements DeviceGroupQuery.Command, DeviceGroup.Command, Command { + + final long requestId; + final String groupId; + final ActorRef<RespondAllTemperatures> replyTo; + + public RequestAllTemperatures( + long requestId, String groupId, ActorRef<RespondAllTemperatures> replyTo) { + this.requestId = requestId; + this.groupId = groupId; + this.replyTo = replyTo; + } + } + + public static final class RespondAllTemperatures { + final long requestId; + final Map<String, TemperatureReading> temperatures; + + public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) { + this.requestId = requestId; + this.temperatures = temperatures; + } + } + + public interface TemperatureReading {} + + public static final class Temperature implements TemperatureReading { + public final double value; + + public Temperature(double value) { + this.value = value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Temperature that = (Temperature) o; + + return Double.compare(that.value, value) == 0; + } + + @Override + public int hashCode() { + long temp = Double.doubleToLongBits(value); + return (int) (temp ^ (temp >>> 32)); + } + + @Override + public String toString() { + return "Temperature{" + "value=" + value + '}'; + } + } + + public enum TemperatureNotAvailable implements TemperatureReading { + INSTANCE + } + + public enum DeviceNotAvailable implements TemperatureReading { + INSTANCE + } + + public enum DeviceTimedOut implements TemperatureReading { + INSTANCE + } + public static Behavior<Command> create() { return Behaviors.setup(DeviceManager::new); } @@ -78,6 +145,15 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> { context.getLog().info("DeviceManager started"); } + public Receive<Command> createReceive() { + return newReceiveBuilder() + .onMessage(RequestTrackDevice.class, this::onTrackDevice) + .onMessage(RequestDeviceList.class, this::onRequestDeviceList) + .onMessage(DeviceGroupTerminated.class, this::onTerminated) + .onSignal(PostStop.class, signal -> onPostStop()) + .build(); + } + private DeviceManager onTrackDevice(RequestTrackDevice trackMsg) { String groupId = trackMsg.groupId; ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(groupId); @@ -110,15 +186,6 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> { return this; } - public Receive<Command> createReceive() { - return newReceiveBuilder() - .onMessage(RequestTrackDevice.class, this::onTrackDevice) - .onMessage(RequestDeviceList.class, this::onRequestDeviceList) - .onMessage(DeviceGroupTerminated.class, this::onTerminated) - .onSignal(PostStop.class, signal -> onPostStop()) - .build(); - } - private DeviceManager onPostStop() { getContext().getLog().info("DeviceManager stopped"); return this; diff --git a/domain/src/test/java/akkamon/domain/AkkamonNexusTest.java b/domain/src/test/java/akkamon/domain/AkkamonNexusTest.java new file mode 100644 index 0000000..5150b1c --- /dev/null +++ b/domain/src/test/java/akkamon/domain/AkkamonNexusTest.java @@ -0,0 +1,28 @@ +package akkamon.domain; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import org.junit.ClassRule; +import org.junit.Test; + +public class AkkamonNexusTest { + + @ClassRule + public static final TestKitJunitResource testKit = new TestKitJunitResource(); + + @Test + public void given_a_registration_request_when_no_scene_or_trainer_exists_in_the_system_then_create_scene_and_trainer_and_reply() { + TestProbe<AkkamonNexus.TrainerRegistered> probe = + testKit.createTestProbe(AkkamonNexus.TrainerRegistered.class); + + ActorRef<SceneTrainerGroup.Command> sceneTrainerGroupActor = + testKit.spawn(SceneTrainerGroup.create("start")); + + sceneTrainerGroupActor.tell(new AkkamonNexus.RequestTrainerRegistration("ash", "start", probe.getRef())); + + probe.expectMessageClass(AkkamonNexus.TrainerRegistered.class); + + } + +}
\ No newline at end of file diff --git a/domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java b/domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java new file mode 100644 index 0000000..ebfeda5 --- /dev/null +++ b/domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java @@ -0,0 +1,245 @@ +package akkamon.domain.iot; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import akkamon.domain.iot.DeviceManager.*; +import akkamon.domain.iot.DeviceManager.RespondAllTemperatures; +import akkamon.domain.iot.DeviceManager.Temperature; +import akkamon.domain.iot.DeviceManager.TemperatureReading; +import org.junit.ClassRule; +import org.junit.Test; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import static org.junit.Assert.*; + +public class DeviceGroupQueryTest { + + @ClassRule + public static final TestKitJunitResource testKit = new TestKitJunitResource(); + + @Test + public void testReturnTemperatureValueForWorkingDevices() { + TestProbe<RespondAllTemperatures> requester = + testKit.createTestProbe(RespondAllTemperatures.class); + TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class); + TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class); + + Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>(); + deviceIdToActor.put("device1", device1.getRef()); + deviceIdToActor.put("device2", device2.getRef()); + + ActorRef<DeviceGroupQuery.Command> queryActor = + testKit.spawn( + DeviceGroupQuery.create( + deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3))); + + device1.expectMessageClass(Device.ReadTemperature.class); + device2.expectMessageClass(Device.ReadTemperature.class); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device1", Optional.of(1.0)))); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device2", Optional.of(2.0)))); + + RespondAllTemperatures response = requester.receiveMessage(); + assertEquals(1L, response.requestId); + + Map<String, TemperatureReading> expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new Temperature(1.0)); + expectedTemperatures.put("device2", new Temperature(2.0)); + + assertEquals(expectedTemperatures, response.temperatures); + } + + @Test + public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() { + TestProbe<RespondAllTemperatures> requester = + testKit.createTestProbe(RespondAllTemperatures.class); + TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class); + TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class); + + Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>(); + deviceIdToActor.put("device1", device1.getRef()); + deviceIdToActor.put("device2", device2.getRef()); + + ActorRef<DeviceGroupQuery.Command> queryActor = + testKit.spawn( + DeviceGroupQuery.create( + deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3))); + + assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device1", Optional.empty()))); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device2", Optional.of(2.0)))); + + RespondAllTemperatures response = requester.receiveMessage(); + assertEquals(1L, response.requestId); + + Map<String, TemperatureReading> expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", TemperatureNotAvailable.INSTANCE); + expectedTemperatures.put("device2", new Temperature(2.0)); + + assertEquals(expectedTemperatures, response.temperatures); + } + + @Test + public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() { + TestProbe<RespondAllTemperatures> requester = + testKit.createTestProbe(RespondAllTemperatures.class); + TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class); + TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class); + + Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>(); + deviceIdToActor.put("device1", device1.getRef()); + deviceIdToActor.put("device2", device2.getRef()); + + ActorRef<DeviceGroupQuery.Command> queryActor = + testKit.spawn( + DeviceGroupQuery.create( + deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3))); + + assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device1", Optional.of(1.0)))); + + device2.stop(); + + RespondAllTemperatures response = requester.receiveMessage(); + assertEquals(1L, response.requestId); + + Map<String, TemperatureReading> expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new Temperature(1.0)); + expectedTemperatures.put("device2", DeviceNotAvailable.INSTANCE); + + assertEquals(expectedTemperatures, response.temperatures); + } + + @Test + public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() { + TestProbe<RespondAllTemperatures> requester = + testKit.createTestProbe(RespondAllTemperatures.class); + TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class); + TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class); + + Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>(); + deviceIdToActor.put("device1", device1.getRef()); + deviceIdToActor.put("device2", device2.getRef()); + + ActorRef<DeviceGroupQuery.Command> queryActor = + testKit.spawn( + DeviceGroupQuery.create( + deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3))); + + assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device1", Optional.of(1.0)))); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device2", Optional.of(2.0)))); + + device2.stop(); + + RespondAllTemperatures response = requester.receiveMessage(); + assertEquals(1L, response.requestId); + + Map<String, TemperatureReading> expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new Temperature(1.0)); + expectedTemperatures.put("device2", new Temperature(2.0)); + + assertEquals(expectedTemperatures, response.temperatures); + } + + @Test + public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() { + TestProbe<RespondAllTemperatures> requester = + testKit.createTestProbe(RespondAllTemperatures.class); + TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class); + TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class); + + Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>(); + deviceIdToActor.put("device1", device1.getRef()); + deviceIdToActor.put("device2", device2.getRef()); + + ActorRef<DeviceGroupQuery.Command> queryActor = + testKit.spawn( + DeviceGroupQuery.create( + deviceIdToActor, 1L, requester.getRef(), Duration.ofMillis(200))); + + assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId); + assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId); + + queryActor.tell( + new DeviceGroupQuery.WrappedRespondTemperature( + new Device.RespondTemperature(0L, "device1", Optional.of(1.0)))); + + // no reply from device2 + + RespondAllTemperatures response = requester.receiveMessage(); + assertEquals(1L, response.requestId); + + Map<String, TemperatureReading> expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new Temperature(1.0)); + expectedTemperatures.put("device2", DeviceTimedOut.INSTANCE); + + assertEquals(expectedTemperatures, response.temperatures); + } + + @Test + public void testCollectTemperaturesFromAllActiveDevices() { + TestProbe<DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceRegistered.class); + ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group")); + + groupActor.tell(new RequestTrackDevice("group", "device1", registeredProbe.getRef())); + ActorRef<Device.Command> deviceActor1 = registeredProbe.receiveMessage().device; + + groupActor.tell(new RequestTrackDevice("group", "device2", registeredProbe.getRef())); + ActorRef<Device.Command> deviceActor2 = registeredProbe.receiveMessage().device; + + groupActor.tell(new RequestTrackDevice("group", "device3", registeredProbe.getRef())); + ActorRef<Device.Command> deviceActor3 = registeredProbe.receiveMessage().device; + + // Check that the device actors are working + TestProbe<Device.TemperatureRecorded> recordProbe = + testKit.createTestProbe(Device.TemperatureRecorded.class); + deviceActor1.tell(new Device.RecordTemperature(0L, 1.0, recordProbe.getRef())); + assertEquals(0L, recordProbe.receiveMessage().requestId); + deviceActor2.tell(new Device.RecordTemperature(1L, 2.0, recordProbe.getRef())); + assertEquals(1L, recordProbe.receiveMessage().requestId); + // No temperature for device 3 + + TestProbe<RespondAllTemperatures> allTempProbe = + testKit.createTestProbe(RespondAllTemperatures.class); + groupActor.tell(new RequestAllTemperatures(0L, "group", allTempProbe.getRef())); + RespondAllTemperatures response = allTempProbe.receiveMessage(); + assertEquals(0L, response.requestId); + + Map<String, TemperatureReading> expectedTemperatures = new HashMap<>(); + expectedTemperatures.put("device1", new Temperature(1.0)); + expectedTemperatures.put("device2", new Temperature(2.0)); + expectedTemperatures.put("device3", TemperatureNotAvailable.INSTANCE); + + assertEquals(expectedTemperatures, response.temperatures); + } + +}
\ No newline at end of file |
