summaryrefslogtreecommitdiff
path: root/domain
diff options
context:
space:
mode:
Diffstat (limited to 'domain')
-rw-r--r--domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java4
-rw-r--r--domain/src/main/java/akkamon/domain/AkkamonNexus.java66
-rw-r--r--domain/src/main/java/akkamon/domain/HeartBeatQuery.java131
-rw-r--r--domain/src/main/java/akkamon/domain/SceneTrainerGroup.java22
-rw-r--r--domain/src/main/java/akkamon/domain/Trainer.java40
5 files changed, 255 insertions, 8 deletions
diff --git a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java
index fc03915..eb998ba 100644
--- a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java
+++ b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java
@@ -1,8 +1,10 @@
package akkamon.domain;
+import java.util.Map;
+
public interface AkkamonMessageEngine {
// broadcasts position info to WebSocket Clients
- void broadCastToScene(String sceneId, String message);
+ void broadCastHeartBeatToScene(String sceneId, Map<String, AkkamonNexus.TrainerPositionReading> trainerPositions);
void registerTrainerSessionToScene(String sceneId, AkkamonSession session);
diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java
index 0b52ffa..9fd2763 100644
--- a/domain/src/main/java/akkamon/domain/AkkamonNexus.java
+++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java
@@ -8,6 +8,7 @@ import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
@@ -37,13 +38,16 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
public static class TrainerRegistered implements Command {
private String trainerId;
+ private String sceneId;
private AkkamonSession session;
public TrainerRegistered(
String trainerId,
+ String sceneId,
AkkamonSession session
) {
this.trainerId = trainerId;
+ this.sceneId = sceneId;
this.session = session;
}
}
@@ -108,16 +112,34 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
implements Command, SceneTrainerGroup.Command {
public long requestId;
+ // TODO find a way to make the command Narrower
public ActorRef<AkkamonNexus.Command> replyTo;
- public RequestHeartBeat(long requestId, ActorRef<AkkamonNexus.Command> replyTo) {
+ public RequestHeartBeat(long requestId, ActorRef<Command> replyTo) {
this.requestId = requestId;
this.replyTo = replyTo;
}
}
+ private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command {
+ public SceneTrainerGroupTerminated(String sceneId) {
+ }
+ }
+
public static class RespondHeartBeatQuery implements Command {
+ public final long requestId;
+ public final String sceneId;
+ public final Map<String, TrainerPositionReading> trainerPositions;
+
+ public RespondHeartBeatQuery(
+ long requestId,
+ String sceneId,
+ Map<String, TrainerPositionReading> trainerPositions) {
+ this.requestId = requestId;
+ this.sceneId = sceneId;
+ this.trainerPositions = trainerPositions;
+ }
}
public interface TrainerPositionReading { }
@@ -141,14 +163,21 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
@Override
public String toString() {
- return "TrainerPosition={x: " + value.x + ", " + value.y + "}";
+ return "TrainerPosition={x: " + value.x + ", y: " + value.y + "}";
}
}
- private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command {
- public SceneTrainerGroupTerminated(String sceneId) {
- }
+ public enum TrainerPositionNotAvailable implements TrainerPositionReading {
+ INSTANCE
+ }
+
+ public enum TrainerOffline implements TrainerPositionReading {
+ INSTANCE
+ }
+
+ public enum TrainerTimedOut implements TrainerPositionReading {
+ INSTANCE
}
public static Behavior<AkkamonNexus.Command> create(AkkamonMessageEngine messagingEngine) {
@@ -170,13 +199,35 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
.onMessage(RequestTrainerRegistration.class, this::onTrainerRegistration)
.onMessage(TrainerRegistered.class, this::onTrainerRegistered)
.onMessage(RequestHeartBeat.class, this::onHeartBeat)
+ .onMessage(RespondHeartBeatQuery.class, this::onHeartBeatQueryResponse)
.onMessage(RequestStartMoving.class, this::onStartMoving)
.onMessage(RequestStopMoving.class, this::onStopMoving)
.onMessage(RequestNewTilePos.class, this::onNewTilePos)
.build();
}
+ private AkkamonNexus onHeartBeatQueryResponse(RespondHeartBeatQuery response) {
+ // Turn on for logging
+
+
+ StringBuilder positions = new StringBuilder();
+ positions.append("\n" + response.sceneId.toUpperCase(Locale.ROOT) + "\n");
+ for (Map.Entry<String, TrainerPositionReading> entry : response.trainerPositions.entrySet()) {
+ positions.append(entry.getKey() + ": " +entry.getValue());
+ positions.append("\n");
+ }
+ getContext().getLog().info(String.valueOf(positions));
+
+ messageEngine.broadCastHeartBeatToScene(response.sceneId, response.trainerPositions);
+
+ return this;
+ }
+
private AkkamonNexus onHeartBeat(RequestHeartBeat heartBeatRequest) {
+ // TODO do some checks here?
+ for (ActorRef<SceneTrainerGroup.Command> sceneGroupActor: sceneIdToActor.values()) {
+ sceneGroupActor.tell(heartBeatRequest);
+ }
return this;
}
@@ -218,8 +269,9 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
private AkkamonNexus onTrainerRegistered(TrainerRegistered reply) {
// TODO test when registration fails?
- getContext().getLog().info("Adding {} to Live AkkamonSessions in Messaging Engine", reply.trainerId);
- messageEngine.registerTrainerSessionToScene(reply.trainerId, reply.session);
+ getContext().getLog().info("Adding {} to scene {} Live AkkamonSessions in Messaging Engine", reply.trainerId, reply.sceneId);
+ messageEngine.registerTrainerSessionToScene(reply.sceneId, reply.session);
+ reply.session.setTrainerId(reply.trainerId);
return this;
}
diff --git a/domain/src/main/java/akkamon/domain/HeartBeatQuery.java b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java
new file mode 100644
index 0000000..4be99ec
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java
@@ -0,0 +1,131 @@
+package akkamon.domain;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.*;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> {
+
+ public interface Command {}
+
+ private static enum CollectionTimeout implements Command {
+ INSTANCE
+ }
+
+ static class WrappedRespondTrainerPosition implements Command {
+ final Trainer.RespondTrainerPosition response;
+
+ WrappedRespondTrainerPosition(
+ Trainer.RespondTrainerPosition response
+ ) {
+ this.response = response;
+ }
+ }
+
+ private static class TrainerOffline implements Command {
+ final String trainerId;
+
+ private TrainerOffline(String trainerId) {
+ this.trainerId = trainerId;
+ }
+ }
+
+ public static Behavior<Command> create(
+ Map<String, ActorRef<Trainer.Command>> trainerIdToActor,
+ long requestId,
+ String sceneId,
+ ActorRef<AkkamonNexus.Command> requester,
+ Duration timeout
+ ) {
+ return Behaviors.setup(
+ context ->
+ Behaviors.withTimers(
+ timers -> new HeartBeatQuery(
+ trainerIdToActor,
+ requestId,
+ sceneId,
+ requester,
+ timeout,
+ context,
+ timers
+ )
+ )
+ );
+ }
+
+ private final long requestId;
+ private final String sceneId;
+ private final ActorRef<AkkamonNexus.Command> requester;
+ private Map<String, AkkamonNexus.TrainerPositionReading> repliesSoFar = new HashMap<String, AkkamonNexus.TrainerPositionReading>();
+ private final Set<String> stillWaiting;
+
+ public HeartBeatQuery(
+ Map<String, ActorRef<Trainer.Command>> trainerIdToActor,
+ long requestId,
+ String sceneId,
+ ActorRef<AkkamonNexus.Command> requester,
+ Duration timeout,
+ ActorContext<Command> context,
+ TimerScheduler<Command> timers) {
+ super(context);
+ this.requestId = requestId;
+ this.sceneId = sceneId;
+ this.requester = requester;
+
+ timers.startSingleTimer(CollectionTimeout.INSTANCE, timeout);
+
+ ActorRef<Trainer.RespondTrainerPosition> respondTrainerPositionAdapter =
+ context.messageAdapter(Trainer.RespondTrainerPosition.class, WrappedRespondTrainerPosition::new);
+
+ for (Map.Entry<String, ActorRef<Trainer.Command>> entry : trainerIdToActor.entrySet()) {
+ context.watchWith(entry.getValue(), new TrainerOffline(entry.getKey()));
+ entry.getValue().tell(
+ new Trainer.ReadTrainerPosition(
+ 0L,
+ respondTrainerPositionAdapter
+ )
+ );
+ }
+ stillWaiting = new HashSet<>(trainerIdToActor.keySet());
+ }
+
+ @Override
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(WrappedRespondTrainerPosition.class, this::onRespondTrainerPosition)
+ .build();
+ }
+
+ private Behavior<Command> onRespondTrainerPosition(WrappedRespondTrainerPosition r) {
+ AkkamonNexus.TrainerPositionReading trainerPositionRead =
+ r.response
+ .value
+ .map(optionalValue -> (AkkamonNexus.TrainerPositionReading) new AkkamonNexus.TrainerPosition(optionalValue))
+ .orElse(AkkamonNexus.TrainerPositionNotAvailable.INSTANCE);
+
+ String trainerId = r.response.trainerId;
+ repliesSoFar.put(trainerId, trainerPositionRead);
+ stillWaiting.remove(trainerId);
+
+ return respondWhenAllCollected();
+ }
+
+ private Behavior<Command> respondWhenAllCollected() {
+ if (stillWaiting.isEmpty()) {
+ requester.tell(new AkkamonNexus.RespondHeartBeatQuery(
+ requestId,
+ sceneId,
+ repliesSoFar));
+ return Behaviors.stopped();
+ } else {
+ return this;
+ }
+ }
+
+}
diff --git a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java
index f01edc4..29778ec 100644
--- a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java
+++ b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java
@@ -7,6 +7,7 @@ import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@@ -60,9 +61,28 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman
AkkamonNexus.RequestNewTilePos.class,
this::onNewTilePos
)
+ .onMessage(
+ AkkamonNexus.RequestHeartBeat.class,
+ this::onHeartBeat
+ )
.build();
}
+ private SceneTrainerGroup onHeartBeat(AkkamonNexus.RequestHeartBeat heartBeatRequest) {
+ Map<String, ActorRef<Trainer.Command>> trainerIdToActorCopy = new HashMap<>(this.trainerIdToActor);
+ getContext()
+ .spawnAnonymous(
+ HeartBeatQuery.create(
+ trainerIdToActorCopy,
+ heartBeatRequest.requestId,
+ sceneId,
+ heartBeatRequest.replyTo,
+ Duration.ofSeconds(3)
+ )
+ );
+ return this;
+ }
+
private SceneTrainerGroup onNewTilePos(AkkamonNexus.RequestNewTilePos newTilePosRequest) {
if (this.sceneId.equals(newTilePosRequest.sceneId)) {
ActorRef<Trainer.Command> trainerActor = trainerIdToActor.get(newTilePosRequest.trainerId);
@@ -142,6 +162,7 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman
// TODO add optional already registered?
registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered(
registrationRequest.trainerId,
+ sceneId,
registrationRequest.session
));
} else {
@@ -154,6 +175,7 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman
trainerIdToActor.put(registrationRequest.trainerId, trainerActor);
registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered(
registrationRequest.trainerId,
+ sceneId,
registrationRequest.session
));
}
diff --git a/domain/src/main/java/akkamon/domain/Trainer.java b/domain/src/main/java/akkamon/domain/Trainer.java
index b756a97..0a3b397 100644
--- a/domain/src/main/java/akkamon/domain/Trainer.java
+++ b/domain/src/main/java/akkamon/domain/Trainer.java
@@ -1,5 +1,6 @@
package akkamon.domain;
+import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
@@ -12,6 +13,32 @@ public class Trainer extends AbstractBehavior<Trainer.Command> {
public interface Command { }
+ public static class ReadTrainerPosition implements Command {
+ final long requestId;
+ final ActorRef<RespondTrainerPosition> replyTo;
+
+ public ReadTrainerPosition(long requestId, ActorRef<RespondTrainerPosition> replyTo) {
+ this.requestId = requestId;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class RespondTrainerPosition {
+ final long requestId;
+ final String trainerId;
+ final Optional<TilePos> value;
+
+ public RespondTrainerPosition(
+ long requestId,
+ String trainerId,
+ Optional<TilePos> value
+ ) {
+ this.requestId = requestId;
+ this.trainerId = trainerId;
+ this.value = value;
+ }
+ }
+
public static Behavior<Command> create(String sceneId, String trainerId) {
return Behaviors.setup(context -> new Trainer(context, sceneId, trainerId));
}
@@ -31,6 +58,10 @@ public class Trainer extends AbstractBehavior<Trainer.Command> {
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(
+ ReadTrainerPosition.class,
+ this::onReadTrainerPosition
+ )
+ .onMessage(
AkkamonNexus.RequestStartMoving.class,
this::onStartMoving
)
@@ -44,6 +75,15 @@ public class Trainer extends AbstractBehavior<Trainer.Command> {
.build();
}
+ private Trainer onReadTrainerPosition(ReadTrainerPosition readTrainerPositionRequest) {
+ readTrainerPositionRequest.replyTo.tell(new RespondTrainerPosition(
+ readTrainerPositionRequest.requestId,
+ trainerId,
+ lastValidTilePos
+ ));
+ return this;
+ }
+
private Trainer onNewTilePos(AkkamonNexus.RequestNewTilePos newTilePosRequest) {
getContext().getLog().info("Trainer {} has new {}.", trainerId, newTilePosRequest.tilePos);
lastValidTilePos = Optional.of(newTilePosRequest.tilePos);