summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Vink <mike1994vink@gmail.com>2021-07-20 17:41:58 +0200
committerMike Vink <mike1994vink@gmail.com>2021-07-20 17:41:58 +0200
commit5f016158e73a7828c9dec1810e54bbe2550d8c20 (patch)
tree82b380c3976be9e48248ae8f30a5e99b5afc7e97
parent36730af06964e75735ae5d099ee9c5bf2c6b7fc9 (diff)
feat(): trainer registration to actor sys
-rw-r--r--api/build.gradle11
-rw-r--r--api/src/main/java/akkamon/api/AkkamonMessageEngine.java6
-rw-r--r--api/src/main/java/akkamon/api/AkkamonSession.java10
-rw-r--r--api/src/main/java/akkamon/api/App.java6
-rw-r--r--api/src/main/java/akkamon/api/EventSocket.java37
-rw-r--r--api/src/main/java/akkamon/api/MessagingEngine.java101
-rw-r--r--domain/build.gradle3
-rw-r--r--domain/src/main/java/akkamon/domain/AkkamonImpl.java20
-rw-r--r--domain/src/main/java/akkamon/domain/AkkamonNexus.java84
-rw-r--r--domain/src/main/java/akkamon/domain/SceneTrainerGroup.java79
-rw-r--r--domain/src/main/java/akkamon/domain/Trainer.java37
-rw-r--r--domain/src/main/java/akkamon/domain/iot/Device.java6
-rw-r--r--domain/src/main/java/akkamon/domain/iot/DeviceGroup.java19
-rw-r--r--domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java128
-rw-r--r--domain/src/main/java/akkamon/domain/iot/DeviceManager.java85
-rw-r--r--domain/src/test/java/akkamon/domain/AkkamonNexusTest.java28
-rw-r--r--domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java245
17 files changed, 724 insertions, 181 deletions
diff --git a/api/build.gradle b/api/build.gradle
index 28ec919..188c417 100644
--- a/api/build.gradle
+++ b/api/build.gradle
@@ -4,6 +4,11 @@ plugins {
id 'application'
}
+def versions = [
+ ScalaBinary: "2.13"
+]
+
+
repositories {
jcenter()
mavenCentral()
@@ -11,6 +16,9 @@ repositories {
dependencies {
+ implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.6.15")
+ implementation "com.typesafe.akka:akka-actor-typed_${versions.ScalaBinary}"
+
// Use the Jersey framework to make writing and testing servlets easier.
implementation 'org.glassfish.jersey.containers:jersey-container-servlet-core:+'
implementation 'org.glassfish.jersey.containers:jersey-container-jetty-http:+'
@@ -31,9 +39,6 @@ dependencies {
implementation 'com.google.code.gson:gson:2.8.7'
- // Reference the domain subproject.
- implementation project(':domain')
-
// Use JUnit Jupiter API for testing.
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
// Also use the Mockito mocking framework to mock simple server functionality.
diff --git a/api/src/main/java/akkamon/api/AkkamonMessageEngine.java b/api/src/main/java/akkamon/api/AkkamonMessageEngine.java
new file mode 100644
index 0000000..2bbe0c8
--- /dev/null
+++ b/api/src/main/java/akkamon/api/AkkamonMessageEngine.java
@@ -0,0 +1,6 @@
+package akkamon.api;
+
+public interface AkkamonMessageEngine {
+ // broadcasts position info to WebSocket Clients
+ void broadCastGridPosition();
+}
diff --git a/api/src/main/java/akkamon/api/AkkamonSession.java b/api/src/main/java/akkamon/api/AkkamonSession.java
index 55ca3c3..048071c 100644
--- a/api/src/main/java/akkamon/api/AkkamonSession.java
+++ b/api/src/main/java/akkamon/api/AkkamonSession.java
@@ -1,14 +1,4 @@
package akkamon.api;
-import akkamon.api.models.User;
-
public interface AkkamonSession {
-
- void receiveGameState(String gameState);
-
- void disconnect(int statusCode, String message);
-
- void setCurrentUser(User user);
-
- User getUser();
}
diff --git a/api/src/main/java/akkamon/api/App.java b/api/src/main/java/akkamon/api/App.java
index 989562b..5d9add9 100644
--- a/api/src/main/java/akkamon/api/App.java
+++ b/api/src/main/java/akkamon/api/App.java
@@ -6,15 +6,19 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.server.JettyWebSocketServlet;
import org.eclipse.jetty.websocket.server.config.JettyWebSocketServletContainerInitializer;
-import org.glassfish.jersey.servlet.ServletContainer;
public class App {
+ public static MessagingEngine messagingEngine;
+
public static void main(String[] args) {
Server server = startServer(8080);
ServletContextHandler context = createStatefulContext(server);
+ messagingEngine = new MessagingEngine();
+
+
// websocket behaviour
// Configure specific websocket behavior
diff --git a/api/src/main/java/akkamon/api/EventSocket.java b/api/src/main/java/akkamon/api/EventSocket.java
index fdefee3..092d763 100644
--- a/api/src/main/java/akkamon/api/EventSocket.java
+++ b/api/src/main/java/akkamon/api/EventSocket.java
@@ -1,18 +1,14 @@
package akkamon.api;
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-
-import akkamon.api.models.User;
-import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+
+import java.util.concurrent.CountDownLatch;
public class EventSocket extends WebSocketAdapter implements AkkamonSession {
private final CountDownLatch closureLatch = new CountDownLatch(1);
- public User user;
-
@Override
public void onWebSocketConnect(Session sess)
{
@@ -25,7 +21,7 @@ public class EventSocket extends WebSocketAdapter implements AkkamonSession {
{
super.onWebSocketText(message);
System.out.println("Received TEXT message: " + message);
- MessagingEngine.getInstance().incoming(this, message);
+ App.messagingEngine.incoming(this, message);
}
@@ -35,7 +31,6 @@ public class EventSocket extends WebSocketAdapter implements AkkamonSession {
super.onWebSocketClose(statusCode, reason);
System.out.println("Socket Closed: [" + statusCode + "] " + reason);
closureLatch.countDown();
- MessagingEngine.getInstance().sessionOffline(this);
}
@Override
@@ -50,28 +45,4 @@ public class EventSocket extends WebSocketAdapter implements AkkamonSession {
System.out.println("Awaiting closure from remote");
closureLatch.await();
}
-
- @Override
- public void receiveGameState(String gameState) {
- try {
- getRemote().sendString(gameState);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void disconnect(int statusCode, String message) {
- getSession().close(statusCode, message);
- }
-
- @Override
- public void setCurrentUser(User user) {
- this.user = user;
- }
-
- @Override
- public User getUser() {
- return user;
- }
}
diff --git a/api/src/main/java/akkamon/api/MessagingEngine.java b/api/src/main/java/akkamon/api/MessagingEngine.java
index 1627ad4..bbab97e 100644
--- a/api/src/main/java/akkamon/api/MessagingEngine.java
+++ b/api/src/main/java/akkamon/api/MessagingEngine.java
@@ -1,120 +1,29 @@
package akkamon.api;
-import akkamon.api.models.*;
-import akkamon.domain.AkkamonImpl;
-import akkamon.domain.Trainer;
+import akkamon.api.models.GameState;
import com.google.gson.Gson;
import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-public class MessagingEngine {
+public class MessagingEngine implements AkkamonMessageEngine {
private HashMap<String, AkkamonSession> akkamonSessions = new HashMap<>();
private static MessagingEngine instance;
private Gson gson = new Gson();
- public static MessagingEngine getInstance() {
- if (instance == null) {
- instance = new MessagingEngine();
- return instance;
- }
- return instance;
- }
-
public MessagingEngine() {
- ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
- executor.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- emitGameState();
- }
- }, 0, 200, TimeUnit.MILLISECONDS);
-
}
- void emitGameState() {
- HashMap<String, Trainer> trainers = AkkamonImpl.getInstance().getDummyTrainersCollection();
-
- if (akkamonSessions.size() == 1) {
- AkkamonSession session = akkamonSessions.get("Ash");
-
- GameState gameState = new GameState();
- // dummy
- gameState.setCurrentPlayer("Ash", trainers);
-
- Event event = new Event("serverSidePosUpdate", gameState);
-
- session.receiveGameState(gson.toJson(event));
-
- } else if (akkamonSessions.size() == 2) {
- for (String name: akkamonSessions.keySet()) {
- AkkamonSession session = akkamonSessions.get(name);
-
- GameState gameState = new GameState();
- // dummy
- gameState.setCurrentPlayer(name, trainers);
- gameState.setRemotePlayers(trainers);
-
- Event event = new Event("serverSidePosUpdate", gameState);
-
- session.receiveGameState(gson.toJson(event));
- }
- }
-
- // for (Map.Entry<User, AkkamonSession> sess: akkamonSessions.entrySet()) {
-
- // User user = sess.getKey();
- // AkkamonSession session = sess.getValue();
-
- // GameState gameState = new GameState();
- // // dummy
- // gameState.setCurrentPlayer(user.name, trainers);
-
- // session.receiveGameState(gson.toJson(gameState));
- // }
+ @Override
+ public void broadCastGridPosition() {
}
void incoming(AkkamonSession session, String message) {
- Event event = gson.fromJson(message, Event.class);
- switch (event.type) {
- case "login":
- login(session, event.user);
- break;
- case "clientSidePosUpdate":
- updatePositions(event.gameState);
- break;
- }
+
}
private void updatePositions(GameState gameState) {
- Player current = gameState.currentPlayer;
- if (gameState.currentPlayer != null) {
- AkkamonImpl.getInstance().updateTrainerPosition(current.name, current.position.x, current.position.y);
- }
- }
- private void login(AkkamonSession session, User user) {
- if (user == null) {
- session.disconnect(401, "Give username and password");
- }
- System.out.println("Currrent connections: " + akkamonSessions.size());
- if (akkamonSessions.size() == 0) {
- akkamonSessions.put("Ash", session);
- System.out.println("After adding ash!: " + akkamonSessions.size());
- session.setCurrentUser(new User("Ash", ""));
- } else if (akkamonSessions.size() == 1) {
- akkamonSessions.put("Misty", session);
- session.setCurrentUser(new User("Misty", ""));
- }
- AkkamonImpl.getInstance().newPlayerConnected(user.name, user.password);
- System.out.println("Emitting gameState!");
- emitGameState();
}
- public void sessionOffline(AkkamonSession session) {
- akkamonSessions.remove(session.getUser().name);
- }
}
diff --git a/domain/build.gradle b/domain/build.gradle
index d2d174a..4b7063a 100644
--- a/domain/build.gradle
+++ b/domain/build.gradle
@@ -26,6 +26,9 @@ dependencies {
implementation 'ch.qos.logback:logback-classic:1.2.3'
implementation 'junit:junit:4.12'
+ // Reference the domain subproject.
+ implementation project(':api')
+
testImplementation "com.typesafe.akka:akka-actor-testkit-typed_${versions.ScalaBinary}"
testImplementation "com.typesafe.akka:akka-stream-testkit_${versions.ScalaBinary}"
diff --git a/domain/src/main/java/akkamon/domain/AkkamonImpl.java b/domain/src/main/java/akkamon/domain/AkkamonImpl.java
index a6a840b..534abf2 100644
--- a/domain/src/main/java/akkamon/domain/AkkamonImpl.java
+++ b/domain/src/main/java/akkamon/domain/AkkamonImpl.java
@@ -17,19 +17,19 @@ public class AkkamonImpl implements Akkamon {
@Override
public void newPlayerConnected(String name, String password) {
- switch (dummyTrainersCollection.size()) {
- case 0:
- dummyTrainersCollection.put("Ash", new Trainer("Ash"));
- break;
- case 1:
- dummyTrainersCollection.put("Misty", new Trainer("Misty"));
- break;
- }
+ // switch (dummyTrainersCollection.size()) {
+ // case 0:
+ // dummyTrainersCollection.put("Ash", new Trainer("Ash"));
+ // break;
+ // case 1:
+ // dummyTrainersCollection.put("Misty", new Trainer("Misty"));
+ // break;
+ // }
}
public void updateTrainerPosition(String name, float x, float y) {
- Trainer trainer = dummyTrainersCollection.get(name);
- trainer.newPosition(x, y);
+ // Trainer trainer = dummyTrainersCollection.get(name);
+ // trainer.newPosition(x, y);
}
public HashMap<String, Trainer> getDummyTrainersCollection() {
diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java
new file mode 100644
index 0000000..a64e751
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java
@@ -0,0 +1,84 @@
+package akkamon.domain;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
+
+
+ public interface Command {}
+
+ public static class RequestTrainerRegistration implements AkkamonNexus.Command, SceneTrainerGroup.Command {
+ public String trainerId;
+ public String sceneId;
+ public ActorRef<TrainerRegistered> replyTo;
+
+ public RequestTrainerRegistration(String trainerId, String sceneId, ActorRef<TrainerRegistered> replyTo) {
+ this.trainerId = trainerId;
+ this.sceneId = sceneId;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static class TrainerRegistered {
+ private ActorRef<Trainer.Command> trainer;
+
+ public TrainerRegistered(ActorRef<Trainer.Command> trainer) {
+ this.trainer = trainer;
+ }
+ }
+
+ private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command {
+ public SceneTrainerGroupTerminated(String sceneId) {
+ }
+ }
+
+
+ public static Behavior<AkkamonNexus.Command> create() {
+ return Behaviors.setup(AkkamonNexus::new);
+ }
+
+ private Map<String, ActorRef<SceneTrainerGroup.Command>> sceneIdToActor = new HashMap<>();
+
+ public AkkamonNexus(ActorContext<Command> context) {
+ super(context);
+ getContext().getLog().info("AkkamonNexus is spinning");
+ }
+
+ @Override
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(
+ RequestTrainerRegistration.class,
+ this::onTrainerRegistration
+ )
+ .build();
+ }
+
+ private AkkamonNexus onTrainerRegistration(RequestTrainerRegistration registrationRequest) {
+ String sceneId = registrationRequest.sceneId;
+ String trainerId = registrationRequest.trainerId;
+
+ ActorRef<SceneTrainerGroup.Command> sceneTrainerGroup = sceneIdToActor.get(sceneId);
+ if (sceneTrainerGroup != null) {
+ sceneTrainerGroup.tell(registrationRequest);
+ } else {
+ getContext().getLog().info("Creating sceneTrainerGroup {} for trainer {}", sceneId, trainerId);
+ ActorRef<SceneTrainerGroup.Command> sceneActor =
+ getContext().spawn(SceneTrainerGroup.create(sceneId), "scene-" + sceneId);
+
+ getContext().watchWith(sceneActor, new SceneTrainerGroupTerminated(sceneId));
+ sceneActor.tell(registrationRequest);
+ sceneIdToActor.put(sceneId, sceneActor);
+ }
+ return this;
+ }
+
+}
diff --git a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java
new file mode 100644
index 0000000..fb9456a
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java
@@ -0,0 +1,79 @@
+package akkamon.domain;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Command> {
+
+ public interface Command { }
+
+ private class TrainerOffline implements Command {
+ public ActorRef<Trainer.Command> trainer;
+ public String sceneId;
+ public String trainerId;
+
+ public TrainerOffline(ActorRef<Trainer.Command> trainerActor, String sceneId, String trainerId) {
+ this.trainer = trainerActor;
+ this.sceneId = sceneId;
+ this.trainerId = trainerId;
+ }
+ }
+
+ public static Behavior<Command> create(String TrainerGroupId) {
+ return Behaviors.setup(context -> new SceneTrainerGroup(context, TrainerGroupId));
+ }
+
+ private final String sceneId;
+ private final Map<String, ActorRef<Trainer.Command>> trainerIdToActor= new HashMap();
+
+ public SceneTrainerGroup(ActorContext<Command> context, String sceneId) {
+ super(context);
+
+ this.sceneId = sceneId;
+
+ getContext().getLog().info("SceneTrainerGroup Actor {} started", sceneId);
+ }
+
+ @Override
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(
+ AkkamonNexus.RequestTrainerRegistration.class,
+ this::onTrainerRegistration)
+ .build();
+ }
+
+ private SceneTrainerGroup onTrainerRegistration(AkkamonNexus.RequestTrainerRegistration registrationRequest) {
+ if (this.sceneId.equals(registrationRequest.sceneId)) {
+ ActorRef<Trainer.Command> trainerActor = trainerIdToActor.get(registrationRequest.trainerId);
+ if (trainerActor != null) {
+ registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered(trainerActor));
+ } else {
+ getContext().getLog().info("Creating trainer actor for {}", registrationRequest.trainerId);
+ trainerActor =
+ getContext()
+ .spawn(Trainer.create(sceneId, registrationRequest.trainerId), "trainer-" + registrationRequest.trainerId);
+ getContext()
+ .watchWith(trainerActor, new SceneTrainerGroup.TrainerOffline(trainerActor, sceneId, registrationRequest.trainerId));
+ trainerIdToActor.put(registrationRequest.trainerId, trainerActor);
+ registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered(trainerActor));
+ }
+ } else {
+ getContext()
+ .getLog()
+ .warn(
+ "Ignoring TrainerRegistration request for {}. This actor is responsible for {}.",
+ registrationRequest.sceneId,
+ this.sceneId);
+ }
+ return this;
+ }
+
+}
diff --git a/domain/src/main/java/akkamon/domain/Trainer.java b/domain/src/main/java/akkamon/domain/Trainer.java
index 8f14c60..7f540a1 100644
--- a/domain/src/main/java/akkamon/domain/Trainer.java
+++ b/domain/src/main/java/akkamon/domain/Trainer.java
@@ -1,28 +1,31 @@
package akkamon.domain;
-public class Trainer {
- private String name;
- private float x;
- private float y;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
- public Trainer(String name) {
- this.name = name;
- }
+public class Trainer extends AbstractBehavior<Trainer.Command> {
- public void newPosition(float x, float y) {
- this.x = x;
- this.y = y;
- }
+ public interface Command { }
- public float getX() {
- return x;
+ public static Behavior<Command> create(String sceneId, String trainerId) {
+ return Behaviors.setup(context -> new Trainer(context, sceneId, trainerId));
}
- public float getY() {
- return y;
+ private String sceneId;
+ private String trainerId;
+
+ public Trainer(ActorContext<Command> context, String sceneId, String trainerId) {
+ super(context);
+ this.sceneId = sceneId;
+ this.trainerId = trainerId;
}
- public String getName() {
- return name;
+ @Override
+ public Receive<Command> createReceive() {
+ return null;
}
+
}
diff --git a/domain/src/main/java/akkamon/domain/iot/Device.java b/domain/src/main/java/akkamon/domain/iot/Device.java
index 27918a6..c4e63f8 100644
--- a/domain/src/main/java/akkamon/domain/iot/Device.java
+++ b/domain/src/main/java/akkamon/domain/iot/Device.java
@@ -46,10 +46,12 @@ public class Device extends AbstractBehavior<Device.Command> {
public static final class RespondTemperature {
final long requestId;
+ final String deviceId;
final Optional<Double> value;
- public RespondTemperature(long requestId, Optional<Double> value) {
+ public RespondTemperature(long requestId, String deviceId, Optional<Double> value) {
this.requestId = requestId;
+ this.deviceId = deviceId;
this.value = value;
}
}
@@ -93,7 +95,7 @@ public class Device extends AbstractBehavior<Device.Command> {
}
private Behavior<Command> onReadTemperature(ReadTemperature r) {
- r.replyTo.tell(new RespondTemperature(r.requestId, lastTemperatureReading));
+ r.replyTo.tell(new RespondTemperature(r.requestId, deviceId, lastTemperatureReading));
return this;
}
diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java b/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java
index 2cfc13a..40e0d7f 100644
--- a/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java
+++ b/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java
@@ -8,6 +8,7 @@ import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@@ -40,6 +41,19 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
context.getLog().info("DeviceGroup {} started", groupId);
}
+ private DeviceGroup onAllTemperatures(DeviceManager.RequestAllTemperatures r) {
+ Map<String, ActorRef<Device.Command>> deviceIdToActorCopy = new HashMap<>(this.deviceIdToActor);
+
+ getContext()
+ .spawnAnonymous(
+ DeviceGroupQuery.create(
+ deviceIdToActorCopy, r.requestId, r.replyTo, Duration.ofSeconds(3)
+ )
+ );
+
+ return this;
+ }
+
private DeviceGroup onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
if (this.groupId.equals(trackMsg.groupId)) {
ActorRef<Device.Command> deviceActor = deviceIdToActor.get(trackMsg.deviceId);
@@ -88,6 +102,11 @@ public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
this::onDeviceList)
.onMessage(DeviceTerminated.class, this::onTerminated)
.onSignal(PostStop.class, signal -> onPostStop())
+ .onMessage(
+ DeviceManager.RequestAllTemperatures.class,
+ r -> r.groupId.equals(groupId),
+ this::onAllTemperatures
+ )
.build();
}
diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java b/domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java
new file mode 100644
index 0000000..b04fd23
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/DeviceGroupQuery.java
@@ -0,0 +1,128 @@
+package akkamon.domain.iot;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.*;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class DeviceGroupQuery extends AbstractBehavior<DeviceGroupQuery.Command> {
+
+
+ public interface Command {}
+
+ private static enum CollectionTimeout implements Command {
+ INSTANCE
+ }
+
+ static class WrappedRespondTemperature implements Command {
+ final Device.RespondTemperature response;
+
+ WrappedRespondTemperature(Device.RespondTemperature response) {
+ this.response = response;
+ }
+ }
+
+ private static class DeviceTerminated implements Command {
+ final String deviceId;
+
+ private DeviceTerminated(String deviceId) {
+ this.deviceId = deviceId;
+ }
+ }
+
+ public static Behavior<Command> create(
+ Map<String, ActorRef<Device.Command>> deviceIdToActor,
+ long requestId,
+ ActorRef<DeviceManager.RespondAllTemperatures> requester,
+ Duration timeout) {
+ return Behaviors.setup(
+ context ->
+ Behaviors.withTimers(
+ timers ->
+ new DeviceGroupQuery(
+ deviceIdToActor, requestId, requester, timeout, context, timers)));
+ }
+
+ private final long requestId;
+ private final ActorRef<DeviceManager.RespondAllTemperatures> requester;
+ private Map<String, DeviceManager.TemperatureReading> repliesSoFar = new HashMap();
+ private final Set<String> stillWaiting;
+
+ private DeviceGroupQuery(
+ Map<String, ActorRef<Device.Command>> deviceIdToActor,
+ long requestId,
+ ActorRef<DeviceManager.RespondAllTemperatures> requester,
+ Duration timeout,
+ ActorContext<Command> context,
+ TimerScheduler<Command> timers) {
+ super(context);
+ this.requestId = requestId;
+ this.requester = requester;
+
+ timers.startSingleTimer(CollectionTimeout.INSTANCE, timeout);
+
+ ActorRef<Device.RespondTemperature> respondTemperatureAdapter =
+ context.messageAdapter(Device.RespondTemperature.class, WrappedRespondTemperature::new);
+
+ for (Map.Entry<String, ActorRef<Device.Command>> entry : deviceIdToActor.entrySet()) {
+ context.watchWith(entry.getValue(), new DeviceTerminated(entry.getKey()));
+ entry.getValue().tell(new Device.ReadTemperature(0L, respondTemperatureAdapter));
+ }
+ stillWaiting = new HashSet<>(deviceIdToActor.keySet());
+ }
+
+
+ @Override
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(WrappedRespondTemperature.class, this::onRespondTemperature)
+ .onMessage(DeviceTerminated.class, this::onDeviceTerminated)
+ .onMessage(CollectionTimeout.class, this::onCollectionTimeout)
+ .build();
+ }
+
+ private Behavior<Command> onRespondTemperature(WrappedRespondTemperature r) {
+ DeviceManager.TemperatureReading reading =
+ r.response
+ .value
+ .map(v -> (DeviceManager.TemperatureReading) new DeviceManager.Temperature(v))
+ .orElse(DeviceManager.TemperatureNotAvailable.INSTANCE);
+
+ String deviceId = r.response.deviceId;
+ repliesSoFar.put(deviceId, reading);
+ stillWaiting.remove(deviceId);
+
+ return respondWhenAllCollected();
+ }
+
+ private Behavior<Command> onDeviceTerminated(DeviceTerminated terminated) {
+ if (stillWaiting.contains(terminated.deviceId)) {
+ repliesSoFar.put(terminated.deviceId, DeviceManager.DeviceNotAvailable.INSTANCE);
+ stillWaiting.remove(terminated.deviceId);
+ }
+ return respondWhenAllCollected();
+ }
+
+ private Behavior<Command> onCollectionTimeout(CollectionTimeout timeout) {
+ for (String deviceId : stillWaiting) {
+ repliesSoFar.put(deviceId, DeviceManager.DeviceTimedOut.INSTANCE);
+ }
+ stillWaiting.clear();
+ return respondWhenAllCollected();
+ }
+
+ private Behavior<Command> respondWhenAllCollected() {
+ if (stillWaiting.isEmpty()) {
+ requester.tell(new DeviceManager.RespondAllTemperatures(requestId, repliesSoFar));
+ return Behaviors.stopped();
+ } else {
+ return this;
+ }
+ }
+
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceManager.java b/domain/src/main/java/akkamon/domain/iot/DeviceManager.java
index e445b7e..c0f8b0d 100644
--- a/domain/src/main/java/akkamon/domain/iot/DeviceManager.java
+++ b/domain/src/main/java/akkamon/domain/iot/DeviceManager.java
@@ -67,6 +67,73 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
}
}
+ public static final class RequestAllTemperatures implements DeviceGroupQuery.Command, DeviceGroup.Command, Command {
+
+ final long requestId;
+ final String groupId;
+ final ActorRef<RespondAllTemperatures> replyTo;
+
+ public RequestAllTemperatures(
+ long requestId, String groupId, ActorRef<RespondAllTemperatures> replyTo) {
+ this.requestId = requestId;
+ this.groupId = groupId;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class RespondAllTemperatures {
+ final long requestId;
+ final Map<String, TemperatureReading> temperatures;
+
+ public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
+ this.requestId = requestId;
+ this.temperatures = temperatures;
+ }
+ }
+
+ public interface TemperatureReading {}
+
+ public static final class Temperature implements TemperatureReading {
+ public final double value;
+
+ public Temperature(double value) {
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Temperature that = (Temperature) o;
+
+ return Double.compare(that.value, value) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ long temp = Double.doubleToLongBits(value);
+ return (int) (temp ^ (temp >>> 32));
+ }
+
+ @Override
+ public String toString() {
+ return "Temperature{" + "value=" + value + '}';
+ }
+ }
+
+ public enum TemperatureNotAvailable implements TemperatureReading {
+ INSTANCE
+ }
+
+ public enum DeviceNotAvailable implements TemperatureReading {
+ INSTANCE
+ }
+
+ public enum DeviceTimedOut implements TemperatureReading {
+ INSTANCE
+ }
+
public static Behavior<Command> create() {
return Behaviors.setup(DeviceManager::new);
}
@@ -78,6 +145,15 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
context.getLog().info("DeviceManager started");
}
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(RequestTrackDevice.class, this::onTrackDevice)
+ .onMessage(RequestDeviceList.class, this::onRequestDeviceList)
+ .onMessage(DeviceGroupTerminated.class, this::onTerminated)
+ .onSignal(PostStop.class, signal -> onPostStop())
+ .build();
+ }
+
private DeviceManager onTrackDevice(RequestTrackDevice trackMsg) {
String groupId = trackMsg.groupId;
ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(groupId);
@@ -110,15 +186,6 @@ public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
return this;
}
- public Receive<Command> createReceive() {
- return newReceiveBuilder()
- .onMessage(RequestTrackDevice.class, this::onTrackDevice)
- .onMessage(RequestDeviceList.class, this::onRequestDeviceList)
- .onMessage(DeviceGroupTerminated.class, this::onTerminated)
- .onSignal(PostStop.class, signal -> onPostStop())
- .build();
- }
-
private DeviceManager onPostStop() {
getContext().getLog().info("DeviceManager stopped");
return this;
diff --git a/domain/src/test/java/akkamon/domain/AkkamonNexusTest.java b/domain/src/test/java/akkamon/domain/AkkamonNexusTest.java
new file mode 100644
index 0000000..5150b1c
--- /dev/null
+++ b/domain/src/test/java/akkamon/domain/AkkamonNexusTest.java
@@ -0,0 +1,28 @@
+package akkamon.domain;
+
+import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
+import akka.actor.testkit.typed.javadsl.TestProbe;
+import akka.actor.typed.ActorRef;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+public class AkkamonNexusTest {
+
+ @ClassRule
+ public static final TestKitJunitResource testKit = new TestKitJunitResource();
+
+ @Test
+ public void given_a_registration_request_when_no_scene_or_trainer_exists_in_the_system_then_create_scene_and_trainer_and_reply() {
+ TestProbe<AkkamonNexus.TrainerRegistered> probe =
+ testKit.createTestProbe(AkkamonNexus.TrainerRegistered.class);
+
+ ActorRef<SceneTrainerGroup.Command> sceneTrainerGroupActor =
+ testKit.spawn(SceneTrainerGroup.create("start"));
+
+ sceneTrainerGroupActor.tell(new AkkamonNexus.RequestTrainerRegistration("ash", "start", probe.getRef()));
+
+ probe.expectMessageClass(AkkamonNexus.TrainerRegistered.class);
+
+ }
+
+} \ No newline at end of file
diff --git a/domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java b/domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java
new file mode 100644
index 0000000..ebfeda5
--- /dev/null
+++ b/domain/src/test/java/akkamon/domain/iot/DeviceGroupQueryTest.java
@@ -0,0 +1,245 @@
+package akkamon.domain.iot;
+
+import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
+import akka.actor.testkit.typed.javadsl.TestProbe;
+import akka.actor.typed.ActorRef;
+import akkamon.domain.iot.DeviceManager.*;
+import akkamon.domain.iot.DeviceManager.RespondAllTemperatures;
+import akkamon.domain.iot.DeviceManager.Temperature;
+import akkamon.domain.iot.DeviceManager.TemperatureReading;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.Assert.*;
+
+public class DeviceGroupQueryTest {
+
+ @ClassRule
+ public static final TestKitJunitResource testKit = new TestKitJunitResource();
+
+ @Test
+ public void testReturnTemperatureValueForWorkingDevices() {
+ TestProbe<RespondAllTemperatures> requester =
+ testKit.createTestProbe(RespondAllTemperatures.class);
+ TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
+ TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
+
+ Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
+ deviceIdToActor.put("device1", device1.getRef());
+ deviceIdToActor.put("device2", device2.getRef());
+
+ ActorRef<DeviceGroupQuery.Command> queryActor =
+ testKit.spawn(
+ DeviceGroupQuery.create(
+ deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3)));
+
+ device1.expectMessageClass(Device.ReadTemperature.class);
+ device2.expectMessageClass(Device.ReadTemperature.class);
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device1", Optional.of(1.0))));
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device2", Optional.of(2.0))));
+
+ RespondAllTemperatures response = requester.receiveMessage();
+ assertEquals(1L, response.requestId);
+
+ Map<String, TemperatureReading> expectedTemperatures = new HashMap<>();
+ expectedTemperatures.put("device1", new Temperature(1.0));
+ expectedTemperatures.put("device2", new Temperature(2.0));
+
+ assertEquals(expectedTemperatures, response.temperatures);
+ }
+
+ @Test
+ public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() {
+ TestProbe<RespondAllTemperatures> requester =
+ testKit.createTestProbe(RespondAllTemperatures.class);
+ TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
+ TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
+
+ Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
+ deviceIdToActor.put("device1", device1.getRef());
+ deviceIdToActor.put("device2", device2.getRef());
+
+ ActorRef<DeviceGroupQuery.Command> queryActor =
+ testKit.spawn(
+ DeviceGroupQuery.create(
+ deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3)));
+
+ assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId);
+ assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId);
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device1", Optional.empty())));
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device2", Optional.of(2.0))));
+
+ RespondAllTemperatures response = requester.receiveMessage();
+ assertEquals(1L, response.requestId);
+
+ Map<String, TemperatureReading> expectedTemperatures = new HashMap<>();
+ expectedTemperatures.put("device1", TemperatureNotAvailable.INSTANCE);
+ expectedTemperatures.put("device2", new Temperature(2.0));
+
+ assertEquals(expectedTemperatures, response.temperatures);
+ }
+
+ @Test
+ public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() {
+ TestProbe<RespondAllTemperatures> requester =
+ testKit.createTestProbe(RespondAllTemperatures.class);
+ TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
+ TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
+
+ Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
+ deviceIdToActor.put("device1", device1.getRef());
+ deviceIdToActor.put("device2", device2.getRef());
+
+ ActorRef<DeviceGroupQuery.Command> queryActor =
+ testKit.spawn(
+ DeviceGroupQuery.create(
+ deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3)));
+
+ assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId);
+ assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId);
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device1", Optional.of(1.0))));
+
+ device2.stop();
+
+ RespondAllTemperatures response = requester.receiveMessage();
+ assertEquals(1L, response.requestId);
+
+ Map<String, TemperatureReading> expectedTemperatures = new HashMap<>();
+ expectedTemperatures.put("device1", new Temperature(1.0));
+ expectedTemperatures.put("device2", DeviceNotAvailable.INSTANCE);
+
+ assertEquals(expectedTemperatures, response.temperatures);
+ }
+
+ @Test
+ public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() {
+ TestProbe<RespondAllTemperatures> requester =
+ testKit.createTestProbe(RespondAllTemperatures.class);
+ TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
+ TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
+
+ Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
+ deviceIdToActor.put("device1", device1.getRef());
+ deviceIdToActor.put("device2", device2.getRef());
+
+ ActorRef<DeviceGroupQuery.Command> queryActor =
+ testKit.spawn(
+ DeviceGroupQuery.create(
+ deviceIdToActor, 1L, requester.getRef(), Duration.ofSeconds(3)));
+
+ assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId);
+ assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId);
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device1", Optional.of(1.0))));
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device2", Optional.of(2.0))));
+
+ device2.stop();
+
+ RespondAllTemperatures response = requester.receiveMessage();
+ assertEquals(1L, response.requestId);
+
+ Map<String, TemperatureReading> expectedTemperatures = new HashMap<>();
+ expectedTemperatures.put("device1", new Temperature(1.0));
+ expectedTemperatures.put("device2", new Temperature(2.0));
+
+ assertEquals(expectedTemperatures, response.temperatures);
+ }
+
+ @Test
+ public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() {
+ TestProbe<RespondAllTemperatures> requester =
+ testKit.createTestProbe(RespondAllTemperatures.class);
+ TestProbe<Device.Command> device1 = testKit.createTestProbe(Device.Command.class);
+ TestProbe<Device.Command> device2 = testKit.createTestProbe(Device.Command.class);
+
+ Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
+ deviceIdToActor.put("device1", device1.getRef());
+ deviceIdToActor.put("device2", device2.getRef());
+
+ ActorRef<DeviceGroupQuery.Command> queryActor =
+ testKit.spawn(
+ DeviceGroupQuery.create(
+ deviceIdToActor, 1L, requester.getRef(), Duration.ofMillis(200)));
+
+ assertEquals(0L, device1.expectMessageClass(Device.ReadTemperature.class).requestId);
+ assertEquals(0L, device2.expectMessageClass(Device.ReadTemperature.class).requestId);
+
+ queryActor.tell(
+ new DeviceGroupQuery.WrappedRespondTemperature(
+ new Device.RespondTemperature(0L, "device1", Optional.of(1.0))));
+
+ // no reply from device2
+
+ RespondAllTemperatures response = requester.receiveMessage();
+ assertEquals(1L, response.requestId);
+
+ Map<String, TemperatureReading> expectedTemperatures = new HashMap<>();
+ expectedTemperatures.put("device1", new Temperature(1.0));
+ expectedTemperatures.put("device2", DeviceTimedOut.INSTANCE);
+
+ assertEquals(expectedTemperatures, response.temperatures);
+ }
+
+ @Test
+ public void testCollectTemperaturesFromAllActiveDevices() {
+ TestProbe<DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceRegistered.class);
+ ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
+
+ groupActor.tell(new RequestTrackDevice("group", "device1", registeredProbe.getRef()));
+ ActorRef<Device.Command> deviceActor1 = registeredProbe.receiveMessage().device;
+
+ groupActor.tell(new RequestTrackDevice("group", "device2", registeredProbe.getRef()));
+ ActorRef<Device.Command> deviceActor2 = registeredProbe.receiveMessage().device;
+
+ groupActor.tell(new RequestTrackDevice("group", "device3", registeredProbe.getRef()));
+ ActorRef<Device.Command> deviceActor3 = registeredProbe.receiveMessage().device;
+
+ // Check that the device actors are working
+ TestProbe<Device.TemperatureRecorded> recordProbe =
+ testKit.createTestProbe(Device.TemperatureRecorded.class);
+ deviceActor1.tell(new Device.RecordTemperature(0L, 1.0, recordProbe.getRef()));
+ assertEquals(0L, recordProbe.receiveMessage().requestId);
+ deviceActor2.tell(new Device.RecordTemperature(1L, 2.0, recordProbe.getRef()));
+ assertEquals(1L, recordProbe.receiveMessage().requestId);
+ // No temperature for device 3
+
+ TestProbe<RespondAllTemperatures> allTempProbe =
+ testKit.createTestProbe(RespondAllTemperatures.class);
+ groupActor.tell(new RequestAllTemperatures(0L, "group", allTempProbe.getRef()));
+ RespondAllTemperatures response = allTempProbe.receiveMessage();
+ assertEquals(0L, response.requestId);
+
+ Map<String, TemperatureReading> expectedTemperatures = new HashMap<>();
+ expectedTemperatures.put("device1", new Temperature(1.0));
+ expectedTemperatures.put("device2", new Temperature(2.0));
+ expectedTemperatures.put("device3", TemperatureNotAvailable.INSTANCE);
+
+ assertEquals(expectedTemperatures, response.temperatures);
+ }
+
+} \ No newline at end of file