From 5d76a30aeba5bfd175b60845c4935c4352b46f4c Mon Sep 17 00:00:00 2001 From: Mike Vink Date: Wed, 21 Jul 2021 16:48:47 +0200 Subject: feat(): heartbeat working on remote engine --- .../java/akkamon/domain/AkkamonMessageEngine.java | 4 +- .../src/main/java/akkamon/domain/AkkamonNexus.java | 66 +++++++++-- .../main/java/akkamon/domain/HeartBeatQuery.java | 131 +++++++++++++++++++++ .../java/akkamon/domain/SceneTrainerGroup.java | 22 ++++ domain/src/main/java/akkamon/domain/Trainer.java | 40 +++++++ 5 files changed, 255 insertions(+), 8 deletions(-) create mode 100644 domain/src/main/java/akkamon/domain/HeartBeatQuery.java (limited to 'domain') diff --git a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java index fc03915..eb998ba 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java +++ b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java @@ -1,8 +1,10 @@ package akkamon.domain; +import java.util.Map; + public interface AkkamonMessageEngine { // broadcasts position info to WebSocket Clients - void broadCastToScene(String sceneId, String message); + void broadCastHeartBeatToScene(String sceneId, Map trainerPositions); void registerTrainerSessionToScene(String sceneId, AkkamonSession session); diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java index 0b52ffa..9fd2763 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonNexus.java +++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java @@ -8,6 +8,7 @@ import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; import java.util.HashMap; +import java.util.Locale; import java.util.Map; public class AkkamonNexus extends AbstractBehavior { @@ -37,13 +38,16 @@ public class AkkamonNexus extends AbstractBehavior { public static class TrainerRegistered implements Command { private String trainerId; + private String sceneId; private AkkamonSession session; public TrainerRegistered( String trainerId, + String sceneId, AkkamonSession session ) { this.trainerId = trainerId; + this.sceneId = sceneId; this.session = session; } } @@ -108,16 +112,34 @@ public class AkkamonNexus extends AbstractBehavior { implements Command, SceneTrainerGroup.Command { public long requestId; + // TODO find a way to make the command Narrower public ActorRef replyTo; - public RequestHeartBeat(long requestId, ActorRef replyTo) { + public RequestHeartBeat(long requestId, ActorRef replyTo) { this.requestId = requestId; this.replyTo = replyTo; } } + private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command { + public SceneTrainerGroupTerminated(String sceneId) { + } + } + public static class RespondHeartBeatQuery implements Command { + public final long requestId; + public final String sceneId; + public final Map trainerPositions; + + public RespondHeartBeatQuery( + long requestId, + String sceneId, + Map trainerPositions) { + this.requestId = requestId; + this.sceneId = sceneId; + this.trainerPositions = trainerPositions; + } } public interface TrainerPositionReading { } @@ -141,14 +163,21 @@ public class AkkamonNexus extends AbstractBehavior { @Override public String toString() { - return "TrainerPosition={x: " + value.x + ", " + value.y + "}"; + return "TrainerPosition={x: " + value.x + ", y: " + value.y + "}"; } } - private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command { - public SceneTrainerGroupTerminated(String sceneId) { - } + public enum TrainerPositionNotAvailable implements TrainerPositionReading { + INSTANCE + } + + public enum TrainerOffline implements TrainerPositionReading { + INSTANCE + } + + public enum TrainerTimedOut implements TrainerPositionReading { + INSTANCE } public static Behavior create(AkkamonMessageEngine messagingEngine) { @@ -170,13 +199,35 @@ public class AkkamonNexus extends AbstractBehavior { .onMessage(RequestTrainerRegistration.class, this::onTrainerRegistration) .onMessage(TrainerRegistered.class, this::onTrainerRegistered) .onMessage(RequestHeartBeat.class, this::onHeartBeat) + .onMessage(RespondHeartBeatQuery.class, this::onHeartBeatQueryResponse) .onMessage(RequestStartMoving.class, this::onStartMoving) .onMessage(RequestStopMoving.class, this::onStopMoving) .onMessage(RequestNewTilePos.class, this::onNewTilePos) .build(); } + private AkkamonNexus onHeartBeatQueryResponse(RespondHeartBeatQuery response) { + // Turn on for logging + + + StringBuilder positions = new StringBuilder(); + positions.append("\n" + response.sceneId.toUpperCase(Locale.ROOT) + "\n"); + for (Map.Entry entry : response.trainerPositions.entrySet()) { + positions.append(entry.getKey() + ": " +entry.getValue()); + positions.append("\n"); + } + getContext().getLog().info(String.valueOf(positions)); + + messageEngine.broadCastHeartBeatToScene(response.sceneId, response.trainerPositions); + + return this; + } + private AkkamonNexus onHeartBeat(RequestHeartBeat heartBeatRequest) { + // TODO do some checks here? + for (ActorRef sceneGroupActor: sceneIdToActor.values()) { + sceneGroupActor.tell(heartBeatRequest); + } return this; } @@ -218,8 +269,9 @@ public class AkkamonNexus extends AbstractBehavior { private AkkamonNexus onTrainerRegistered(TrainerRegistered reply) { // TODO test when registration fails? - getContext().getLog().info("Adding {} to Live AkkamonSessions in Messaging Engine", reply.trainerId); - messageEngine.registerTrainerSessionToScene(reply.trainerId, reply.session); + getContext().getLog().info("Adding {} to scene {} Live AkkamonSessions in Messaging Engine", reply.trainerId, reply.sceneId); + messageEngine.registerTrainerSessionToScene(reply.sceneId, reply.session); + reply.session.setTrainerId(reply.trainerId); return this; } diff --git a/domain/src/main/java/akkamon/domain/HeartBeatQuery.java b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java new file mode 100644 index 0000000..4be99ec --- /dev/null +++ b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java @@ -0,0 +1,131 @@ +package akkamon.domain; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.*; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class HeartBeatQuery extends AbstractBehavior { + + public interface Command {} + + private static enum CollectionTimeout implements Command { + INSTANCE + } + + static class WrappedRespondTrainerPosition implements Command { + final Trainer.RespondTrainerPosition response; + + WrappedRespondTrainerPosition( + Trainer.RespondTrainerPosition response + ) { + this.response = response; + } + } + + private static class TrainerOffline implements Command { + final String trainerId; + + private TrainerOffline(String trainerId) { + this.trainerId = trainerId; + } + } + + public static Behavior create( + Map> trainerIdToActor, + long requestId, + String sceneId, + ActorRef requester, + Duration timeout + ) { + return Behaviors.setup( + context -> + Behaviors.withTimers( + timers -> new HeartBeatQuery( + trainerIdToActor, + requestId, + sceneId, + requester, + timeout, + context, + timers + ) + ) + ); + } + + private final long requestId; + private final String sceneId; + private final ActorRef requester; + private Map repliesSoFar = new HashMap(); + private final Set stillWaiting; + + public HeartBeatQuery( + Map> trainerIdToActor, + long requestId, + String sceneId, + ActorRef requester, + Duration timeout, + ActorContext context, + TimerScheduler timers) { + super(context); + this.requestId = requestId; + this.sceneId = sceneId; + this.requester = requester; + + timers.startSingleTimer(CollectionTimeout.INSTANCE, timeout); + + ActorRef respondTrainerPositionAdapter = + context.messageAdapter(Trainer.RespondTrainerPosition.class, WrappedRespondTrainerPosition::new); + + for (Map.Entry> entry : trainerIdToActor.entrySet()) { + context.watchWith(entry.getValue(), new TrainerOffline(entry.getKey())); + entry.getValue().tell( + new Trainer.ReadTrainerPosition( + 0L, + respondTrainerPositionAdapter + ) + ); + } + stillWaiting = new HashSet<>(trainerIdToActor.keySet()); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(WrappedRespondTrainerPosition.class, this::onRespondTrainerPosition) + .build(); + } + + private Behavior onRespondTrainerPosition(WrappedRespondTrainerPosition r) { + AkkamonNexus.TrainerPositionReading trainerPositionRead = + r.response + .value + .map(optionalValue -> (AkkamonNexus.TrainerPositionReading) new AkkamonNexus.TrainerPosition(optionalValue)) + .orElse(AkkamonNexus.TrainerPositionNotAvailable.INSTANCE); + + String trainerId = r.response.trainerId; + repliesSoFar.put(trainerId, trainerPositionRead); + stillWaiting.remove(trainerId); + + return respondWhenAllCollected(); + } + + private Behavior respondWhenAllCollected() { + if (stillWaiting.isEmpty()) { + requester.tell(new AkkamonNexus.RespondHeartBeatQuery( + requestId, + sceneId, + repliesSoFar)); + return Behaviors.stopped(); + } else { + return this; + } + } + +} diff --git a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java index f01edc4..29778ec 100644 --- a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java +++ b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java @@ -7,6 +7,7 @@ import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -60,9 +61,28 @@ public class SceneTrainerGroup extends AbstractBehavior> trainerIdToActorCopy = new HashMap<>(this.trainerIdToActor); + getContext() + .spawnAnonymous( + HeartBeatQuery.create( + trainerIdToActorCopy, + heartBeatRequest.requestId, + sceneId, + heartBeatRequest.replyTo, + Duration.ofSeconds(3) + ) + ); + return this; + } + private SceneTrainerGroup onNewTilePos(AkkamonNexus.RequestNewTilePos newTilePosRequest) { if (this.sceneId.equals(newTilePosRequest.sceneId)) { ActorRef trainerActor = trainerIdToActor.get(newTilePosRequest.trainerId); @@ -142,6 +162,7 @@ public class SceneTrainerGroup extends AbstractBehavior { public interface Command { } + public static class ReadTrainerPosition implements Command { + final long requestId; + final ActorRef replyTo; + + public ReadTrainerPosition(long requestId, ActorRef replyTo) { + this.requestId = requestId; + this.replyTo = replyTo; + } + } + + public static final class RespondTrainerPosition { + final long requestId; + final String trainerId; + final Optional value; + + public RespondTrainerPosition( + long requestId, + String trainerId, + Optional value + ) { + this.requestId = requestId; + this.trainerId = trainerId; + this.value = value; + } + } + public static Behavior create(String sceneId, String trainerId) { return Behaviors.setup(context -> new Trainer(context, sceneId, trainerId)); } @@ -30,6 +57,10 @@ public class Trainer extends AbstractBehavior { @Override public Receive createReceive() { return newReceiveBuilder() + .onMessage( + ReadTrainerPosition.class, + this::onReadTrainerPosition + ) .onMessage( AkkamonNexus.RequestStartMoving.class, this::onStartMoving @@ -44,6 +75,15 @@ public class Trainer extends AbstractBehavior { .build(); } + private Trainer onReadTrainerPosition(ReadTrainerPosition readTrainerPositionRequest) { + readTrainerPositionRequest.replyTo.tell(new RespondTrainerPosition( + readTrainerPositionRequest.requestId, + trainerId, + lastValidTilePos + )); + return this; + } + private Trainer onNewTilePos(AkkamonNexus.RequestNewTilePos newTilePosRequest) { getContext().getLog().info("Trainer {} has new {}.", trainerId, newTilePosRequest.tilePos); lastValidTilePos = Optional.of(newTilePosRequest.tilePos); -- cgit v1.2.3