diff options
Diffstat (limited to 'domain')
5 files changed, 255 insertions, 8 deletions
diff --git a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java index fc03915..eb998ba 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java +++ b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java @@ -1,8 +1,10 @@ package akkamon.domain; +import java.util.Map; + public interface AkkamonMessageEngine { // broadcasts position info to WebSocket Clients - void broadCastToScene(String sceneId, String message); + void broadCastHeartBeatToScene(String sceneId, Map<String, AkkamonNexus.TrainerPositionReading> trainerPositions); void registerTrainerSessionToScene(String sceneId, AkkamonSession session); diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java index 0b52ffa..9fd2763 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonNexus.java +++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java @@ -8,6 +8,7 @@ import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; import java.util.HashMap; +import java.util.Locale; import java.util.Map; public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { @@ -37,13 +38,16 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { public static class TrainerRegistered implements Command { private String trainerId; + private String sceneId; private AkkamonSession session; public TrainerRegistered( String trainerId, + String sceneId, AkkamonSession session ) { this.trainerId = trainerId; + this.sceneId = sceneId; this.session = session; } } @@ -108,16 +112,34 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { implements Command, SceneTrainerGroup.Command { public long requestId; + // TODO find a way to make the command Narrower public ActorRef<AkkamonNexus.Command> replyTo; - public RequestHeartBeat(long requestId, ActorRef<AkkamonNexus.Command> replyTo) { + public RequestHeartBeat(long requestId, ActorRef<Command> replyTo) { this.requestId = requestId; this.replyTo = replyTo; } } + private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command { + public SceneTrainerGroupTerminated(String sceneId) { + } + } + public static class RespondHeartBeatQuery implements Command { + public final long requestId; + public final String sceneId; + public final Map<String, TrainerPositionReading> trainerPositions; + + public RespondHeartBeatQuery( + long requestId, + String sceneId, + Map<String, TrainerPositionReading> trainerPositions) { + this.requestId = requestId; + this.sceneId = sceneId; + this.trainerPositions = trainerPositions; + } } public interface TrainerPositionReading { } @@ -141,14 +163,21 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { @Override public String toString() { - return "TrainerPosition={x: " + value.x + ", " + value.y + "}"; + return "TrainerPosition={x: " + value.x + ", y: " + value.y + "}"; } } - private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command { - public SceneTrainerGroupTerminated(String sceneId) { - } + public enum TrainerPositionNotAvailable implements TrainerPositionReading { + INSTANCE + } + + public enum TrainerOffline implements TrainerPositionReading { + INSTANCE + } + + public enum TrainerTimedOut implements TrainerPositionReading { + INSTANCE } public static Behavior<AkkamonNexus.Command> create(AkkamonMessageEngine messagingEngine) { @@ -170,13 +199,35 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { .onMessage(RequestTrainerRegistration.class, this::onTrainerRegistration) .onMessage(TrainerRegistered.class, this::onTrainerRegistered) .onMessage(RequestHeartBeat.class, this::onHeartBeat) + .onMessage(RespondHeartBeatQuery.class, this::onHeartBeatQueryResponse) .onMessage(RequestStartMoving.class, this::onStartMoving) .onMessage(RequestStopMoving.class, this::onStopMoving) .onMessage(RequestNewTilePos.class, this::onNewTilePos) .build(); } + private AkkamonNexus onHeartBeatQueryResponse(RespondHeartBeatQuery response) { + // Turn on for logging + + + StringBuilder positions = new StringBuilder(); + positions.append("\n" + response.sceneId.toUpperCase(Locale.ROOT) + "\n"); + for (Map.Entry<String, TrainerPositionReading> entry : response.trainerPositions.entrySet()) { + positions.append(entry.getKey() + ": " +entry.getValue()); + positions.append("\n"); + } + getContext().getLog().info(String.valueOf(positions)); + + messageEngine.broadCastHeartBeatToScene(response.sceneId, response.trainerPositions); + + return this; + } + private AkkamonNexus onHeartBeat(RequestHeartBeat heartBeatRequest) { + // TODO do some checks here? + for (ActorRef<SceneTrainerGroup.Command> sceneGroupActor: sceneIdToActor.values()) { + sceneGroupActor.tell(heartBeatRequest); + } return this; } @@ -218,8 +269,9 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { private AkkamonNexus onTrainerRegistered(TrainerRegistered reply) { // TODO test when registration fails? - getContext().getLog().info("Adding {} to Live AkkamonSessions in Messaging Engine", reply.trainerId); - messageEngine.registerTrainerSessionToScene(reply.trainerId, reply.session); + getContext().getLog().info("Adding {} to scene {} Live AkkamonSessions in Messaging Engine", reply.trainerId, reply.sceneId); + messageEngine.registerTrainerSessionToScene(reply.sceneId, reply.session); + reply.session.setTrainerId(reply.trainerId); return this; } diff --git a/domain/src/main/java/akkamon/domain/HeartBeatQuery.java b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java new file mode 100644 index 0000000..4be99ec --- /dev/null +++ b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java @@ -0,0 +1,131 @@ +package akkamon.domain; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.*; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> { + + public interface Command {} + + private static enum CollectionTimeout implements Command { + INSTANCE + } + + static class WrappedRespondTrainerPosition implements Command { + final Trainer.RespondTrainerPosition response; + + WrappedRespondTrainerPosition( + Trainer.RespondTrainerPosition response + ) { + this.response = response; + } + } + + private static class TrainerOffline implements Command { + final String trainerId; + + private TrainerOffline(String trainerId) { + this.trainerId = trainerId; + } + } + + public static Behavior<Command> create( + Map<String, ActorRef<Trainer.Command>> trainerIdToActor, + long requestId, + String sceneId, + ActorRef<AkkamonNexus.Command> requester, + Duration timeout + ) { + return Behaviors.setup( + context -> + Behaviors.withTimers( + timers -> new HeartBeatQuery( + trainerIdToActor, + requestId, + sceneId, + requester, + timeout, + context, + timers + ) + ) + ); + } + + private final long requestId; + private final String sceneId; + private final ActorRef<AkkamonNexus.Command> requester; + private Map<String, AkkamonNexus.TrainerPositionReading> repliesSoFar = new HashMap<String, AkkamonNexus.TrainerPositionReading>(); + private final Set<String> stillWaiting; + + public HeartBeatQuery( + Map<String, ActorRef<Trainer.Command>> trainerIdToActor, + long requestId, + String sceneId, + ActorRef<AkkamonNexus.Command> requester, + Duration timeout, + ActorContext<Command> context, + TimerScheduler<Command> timers) { + super(context); + this.requestId = requestId; + this.sceneId = sceneId; + this.requester = requester; + + timers.startSingleTimer(CollectionTimeout.INSTANCE, timeout); + + ActorRef<Trainer.RespondTrainerPosition> respondTrainerPositionAdapter = + context.messageAdapter(Trainer.RespondTrainerPosition.class, WrappedRespondTrainerPosition::new); + + for (Map.Entry<String, ActorRef<Trainer.Command>> entry : trainerIdToActor.entrySet()) { + context.watchWith(entry.getValue(), new TrainerOffline(entry.getKey())); + entry.getValue().tell( + new Trainer.ReadTrainerPosition( + 0L, + respondTrainerPositionAdapter + ) + ); + } + stillWaiting = new HashSet<>(trainerIdToActor.keySet()); + } + + @Override + public Receive<Command> createReceive() { + return newReceiveBuilder() + .onMessage(WrappedRespondTrainerPosition.class, this::onRespondTrainerPosition) + .build(); + } + + private Behavior<Command> onRespondTrainerPosition(WrappedRespondTrainerPosition r) { + AkkamonNexus.TrainerPositionReading trainerPositionRead = + r.response + .value + .map(optionalValue -> (AkkamonNexus.TrainerPositionReading) new AkkamonNexus.TrainerPosition(optionalValue)) + .orElse(AkkamonNexus.TrainerPositionNotAvailable.INSTANCE); + + String trainerId = r.response.trainerId; + repliesSoFar.put(trainerId, trainerPositionRead); + stillWaiting.remove(trainerId); + + return respondWhenAllCollected(); + } + + private Behavior<Command> respondWhenAllCollected() { + if (stillWaiting.isEmpty()) { + requester.tell(new AkkamonNexus.RespondHeartBeatQuery( + requestId, + sceneId, + repliesSoFar)); + return Behaviors.stopped(); + } else { + return this; + } + } + +} diff --git a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java index f01edc4..29778ec 100644 --- a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java +++ b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java @@ -7,6 +7,7 @@ import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -60,9 +61,28 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman AkkamonNexus.RequestNewTilePos.class, this::onNewTilePos ) + .onMessage( + AkkamonNexus.RequestHeartBeat.class, + this::onHeartBeat + ) .build(); } + private SceneTrainerGroup onHeartBeat(AkkamonNexus.RequestHeartBeat heartBeatRequest) { + Map<String, ActorRef<Trainer.Command>> trainerIdToActorCopy = new HashMap<>(this.trainerIdToActor); + getContext() + .spawnAnonymous( + HeartBeatQuery.create( + trainerIdToActorCopy, + heartBeatRequest.requestId, + sceneId, + heartBeatRequest.replyTo, + Duration.ofSeconds(3) + ) + ); + return this; + } + private SceneTrainerGroup onNewTilePos(AkkamonNexus.RequestNewTilePos newTilePosRequest) { if (this.sceneId.equals(newTilePosRequest.sceneId)) { ActorRef<Trainer.Command> trainerActor = trainerIdToActor.get(newTilePosRequest.trainerId); @@ -142,6 +162,7 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman // TODO add optional already registered? registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered( registrationRequest.trainerId, + sceneId, registrationRequest.session )); } else { @@ -154,6 +175,7 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman trainerIdToActor.put(registrationRequest.trainerId, trainerActor); registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered( registrationRequest.trainerId, + sceneId, registrationRequest.session )); } diff --git a/domain/src/main/java/akkamon/domain/Trainer.java b/domain/src/main/java/akkamon/domain/Trainer.java index b756a97..0a3b397 100644 --- a/domain/src/main/java/akkamon/domain/Trainer.java +++ b/domain/src/main/java/akkamon/domain/Trainer.java @@ -1,5 +1,6 @@ package akkamon.domain; +import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.AbstractBehavior; import akka.actor.typed.javadsl.ActorContext; @@ -12,6 +13,32 @@ public class Trainer extends AbstractBehavior<Trainer.Command> { public interface Command { } + public static class ReadTrainerPosition implements Command { + final long requestId; + final ActorRef<RespondTrainerPosition> replyTo; + + public ReadTrainerPosition(long requestId, ActorRef<RespondTrainerPosition> replyTo) { + this.requestId = requestId; + this.replyTo = replyTo; + } + } + + public static final class RespondTrainerPosition { + final long requestId; + final String trainerId; + final Optional<TilePos> value; + + public RespondTrainerPosition( + long requestId, + String trainerId, + Optional<TilePos> value + ) { + this.requestId = requestId; + this.trainerId = trainerId; + this.value = value; + } + } + public static Behavior<Command> create(String sceneId, String trainerId) { return Behaviors.setup(context -> new Trainer(context, sceneId, trainerId)); } @@ -31,6 +58,10 @@ public class Trainer extends AbstractBehavior<Trainer.Command> { public Receive<Command> createReceive() { return newReceiveBuilder() .onMessage( + ReadTrainerPosition.class, + this::onReadTrainerPosition + ) + .onMessage( AkkamonNexus.RequestStartMoving.class, this::onStartMoving ) @@ -44,6 +75,15 @@ public class Trainer extends AbstractBehavior<Trainer.Command> { .build(); } + private Trainer onReadTrainerPosition(ReadTrainerPosition readTrainerPositionRequest) { + readTrainerPositionRequest.replyTo.tell(new RespondTrainerPosition( + readTrainerPositionRequest.requestId, + trainerId, + lastValidTilePos + )); + return this; + } + private Trainer onNewTilePos(AkkamonNexus.RequestNewTilePos newTilePosRequest) { getContext().getLog().info("Trainer {} has new {}.", trainerId, newTilePosRequest.tilePos); lastValidTilePos = Optional.of(newTilePosRequest.tilePos); |
