diff options
Diffstat (limited to 'domain')
5 files changed, 171 insertions, 65 deletions
diff --git a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java index eb998ba..05efc32 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java +++ b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java @@ -4,9 +4,11 @@ import java.util.Map; public interface AkkamonMessageEngine { // broadcasts position info to WebSocket Clients - void broadCastHeartBeatToScene(String sceneId, Map<String, AkkamonNexus.TrainerPositionReading> trainerPositions); + void broadCastHeartBeatToScene(String sceneId, Map<String, AkkamonNexus.MovementQueueReading> trainerPositions); void registerTrainerSessionToScene(String sceneId, AkkamonSession session); void removeTrainerSessionFromScene(String sceneId, AkkamonSession session); + + void trainerDisconnected(AkkamonSession session); } diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java index 9fd2763..a55e728 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonNexus.java +++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java @@ -10,6 +10,7 @@ import akka.actor.typed.javadsl.Receive; import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Queue; public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { @@ -126,57 +127,71 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { } } + public static class RequestTrainerOffline + implements Command, SceneTrainerGroup.Command, Trainer.Command { + public long requestId; + public String trainerId; + public String sceneId; + public AkkamonSession session; + public ActorRef<AkkamonNexus.Command> replyTo; + + public RequestTrainerOffline(long requestId, String trainerId, String sceneId, AkkamonSession session, ActorRef<Command> replyTo) { + this.requestId = requestId; + this.trainerId = trainerId; + this.sceneId = sceneId; + this.session = session; + this.replyTo = replyTo; + } + } + + public static class RespondTrainerOffline + implements Command { + public long requestId; + public String sceneId; + public AkkamonSession session; + + public RespondTrainerOffline(long requestId, String sceneId, AkkamonSession session) { + this.requestId = requestId; + this.sceneId = sceneId; + this.session = session; + } + } + public static class RespondHeartBeatQuery implements Command { public final long requestId; public final String sceneId; - public final Map<String, TrainerPositionReading> trainerPositions; + public final Map<String, MovementQueueReading> trainerMovementQueues; public RespondHeartBeatQuery( long requestId, String sceneId, - Map<String, TrainerPositionReading> trainerPositions) { + Map<String, MovementQueueReading> trainerPositions) { this.requestId = requestId; this.sceneId = sceneId; - this.trainerPositions = trainerPositions; + this.trainerMovementQueues = trainerPositions; } } - public interface TrainerPositionReading { } + public interface MovementQueueReading { } - public static class TrainerPosition implements TrainerPositionReading { - public final TilePos value; + public static class MovementQueue implements MovementQueueReading { + public final Queue<Direction> value; - public TrainerPosition(TilePos value) { + public MovementQueue(Queue<Direction> value) { this.value = value; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TrainerPosition other = (TrainerPosition) o; - - return this.value.x == other.value.x && this.value.y == other.value.y; - } - - @Override - public String toString() { - return "TrainerPosition={x: " + value.x + ", y: " + value.y + "}"; - } - } - public enum TrainerPositionNotAvailable implements TrainerPositionReading { + public enum MovementQueueEmpty implements MovementQueueReading { INSTANCE } - public enum TrainerOffline implements TrainerPositionReading { + public enum TrainerOffline implements MovementQueueReading { INSTANCE } - public enum TrainerTimedOut implements TrainerPositionReading { + public enum TrainerTimedOut implements MovementQueueReading { INSTANCE } @@ -198,6 +213,8 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { return newReceiveBuilder() .onMessage(RequestTrainerRegistration.class, this::onTrainerRegistration) .onMessage(TrainerRegistered.class, this::onTrainerRegistered) + .onMessage(RequestTrainerOffline.class, this::onTrainerOfflineRequest) + .onMessage(RespondTrainerOffline.class, this::onTrainerOffline) .onMessage(RequestHeartBeat.class, this::onHeartBeat) .onMessage(RespondHeartBeatQuery.class, this::onHeartBeatQueryResponse) .onMessage(RequestStartMoving.class, this::onStartMoving) @@ -206,19 +223,37 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { .build(); } + private AkkamonNexus onTrainerOffline(RespondTrainerOffline trainerOfflineMsg) { + getContext().getLog().info("Removing {} from akkamon sessions!", trainerOfflineMsg.session.getTrainerId()); + messageEngine.removeTrainerSessionFromScene(trainerOfflineMsg.sceneId, trainerOfflineMsg.session); + return this; + } + + private AkkamonNexus onTrainerOfflineRequest(RequestTrainerOffline trainerOfflineRequest) { + ActorRef<SceneTrainerGroup.Command> sceneTrainerGroup = sceneIdToActor.get( + trainerOfflineRequest.sceneId + ); + if (sceneTrainerGroup != null) { + sceneTrainerGroup.tell(trainerOfflineRequest); + } else { + getContext().getLog().info("Ignoring trainerOffline request in scene {}, it isn't mapped to a sceneTrainerActor."); + } + return this; + } + private AkkamonNexus onHeartBeatQueryResponse(RespondHeartBeatQuery response) { // Turn on for logging StringBuilder positions = new StringBuilder(); positions.append("\n" + response.sceneId.toUpperCase(Locale.ROOT) + "\n"); - for (Map.Entry<String, TrainerPositionReading> entry : response.trainerPositions.entrySet()) { + for (Map.Entry<String, MovementQueueReading> entry : response.trainerMovementQueues.entrySet()) { positions.append(entry.getKey() + ": " +entry.getValue()); positions.append("\n"); } getContext().getLog().info(String.valueOf(positions)); - messageEngine.broadCastHeartBeatToScene(response.sceneId, response.trainerPositions); + messageEngine.broadCastHeartBeatToScene(response.sceneId, response.trainerMovementQueues); return this; } diff --git a/domain/src/main/java/akkamon/domain/HeartBeatQuery.java b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java index 4be99ec..5e88ff5 100644 --- a/domain/src/main/java/akkamon/domain/HeartBeatQuery.java +++ b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java @@ -5,10 +5,7 @@ import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.*; import java.time.Duration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> { @@ -18,11 +15,11 @@ public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> { INSTANCE } - static class WrappedRespondTrainerPosition implements Command { - final Trainer.RespondTrainerPosition response; + static class WrappedRespondMovementQueue implements Command { + final Trainer.RespondMovementQueue response; - WrappedRespondTrainerPosition( - Trainer.RespondTrainerPosition response + WrappedRespondMovementQueue( + Trainer.RespondMovementQueue response ) { this.response = response; } @@ -62,7 +59,7 @@ public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> { private final long requestId; private final String sceneId; private final ActorRef<AkkamonNexus.Command> requester; - private Map<String, AkkamonNexus.TrainerPositionReading> repliesSoFar = new HashMap<String, AkkamonNexus.TrainerPositionReading>(); + private Map<String, AkkamonNexus.MovementQueueReading> repliesSoFar = new HashMap<String, AkkamonNexus.MovementQueueReading>(); private final Set<String> stillWaiting; public HeartBeatQuery( @@ -80,13 +77,13 @@ public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> { timers.startSingleTimer(CollectionTimeout.INSTANCE, timeout); - ActorRef<Trainer.RespondTrainerPosition> respondTrainerPositionAdapter = - context.messageAdapter(Trainer.RespondTrainerPosition.class, WrappedRespondTrainerPosition::new); + ActorRef<Trainer.RespondMovementQueue> respondTrainerPositionAdapter = + context.messageAdapter(Trainer.RespondMovementQueue.class, WrappedRespondMovementQueue::new); for (Map.Entry<String, ActorRef<Trainer.Command>> entry : trainerIdToActor.entrySet()) { context.watchWith(entry.getValue(), new TrainerOffline(entry.getKey())); entry.getValue().tell( - new Trainer.ReadTrainerPosition( + new Trainer.ReadMovementQueue( 0L, respondTrainerPositionAdapter ) @@ -98,19 +95,22 @@ public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> { @Override public Receive<Command> createReceive() { return newReceiveBuilder() - .onMessage(WrappedRespondTrainerPosition.class, this::onRespondTrainerPosition) + .onMessage(WrappedRespondMovementQueue.class, this::onRespondMovementQueue) .build(); } - private Behavior<Command> onRespondTrainerPosition(WrappedRespondTrainerPosition r) { - AkkamonNexus.TrainerPositionReading trainerPositionRead = - r.response - .value - .map(optionalValue -> (AkkamonNexus.TrainerPositionReading) new AkkamonNexus.TrainerPosition(optionalValue)) - .orElse(AkkamonNexus.TrainerPositionNotAvailable.INSTANCE); + private Behavior<Command> onRespondMovementQueue(WrappedRespondMovementQueue r) { + AkkamonNexus.MovementQueueReading movementQueueRead = null; + if (r.response.value.size() != 0) { + movementQueueRead = new AkkamonNexus.MovementQueue(r.response.value); + } else { + Queue<Direction> queue = new LinkedList<>(); + queue.add(Direction.NONE); + movementQueueRead = new AkkamonNexus.MovementQueue(queue); + } String trainerId = r.response.trainerId; - repliesSoFar.put(trainerId, trainerPositionRead); + repliesSoFar.put(trainerId, movementQueueRead); stillWaiting.remove(trainerId); return respondWhenAllCollected(); diff --git a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java index 29778ec..e8ace82 100644 --- a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java +++ b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java @@ -15,15 +15,18 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman public interface Command { } - private class TrainerOffline implements Command { + public static class TrainerOffline + implements Command, AkkamonNexus.Command { public ActorRef<Trainer.Command> trainer; public String sceneId; public String trainerId; + public ActorRef<AkkamonNexus.Command> replyTo; - public TrainerOffline(ActorRef<Trainer.Command> trainerActor, String sceneId, String trainerId) { + public TrainerOffline(ActorRef<Trainer.Command> trainerActor, String sceneId, String trainerId, ActorRef<AkkamonNexus.Command> replyTo) { this.trainer = trainerActor; this.sceneId = sceneId; this.trainerId = trainerId; + this.replyTo = replyTo; } } @@ -50,6 +53,14 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman this::onTrainerRegistration ) .onMessage( + AkkamonNexus.RequestTrainerOffline.class, + this::onTrainerOfflineRequest + ) + .onMessage( + TrainerOffline.class, + this::onWatchedTrainerOffline + ) + .onMessage( AkkamonNexus.RequestStartMoving.class, this::onStartMoving ) @@ -68,6 +79,41 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman .build(); } + private SceneTrainerGroup onWatchedTrainerOffline(TrainerOffline trainerOfflineMsg) { + trainerOfflineMsg.replyTo.tell(trainerOfflineMsg); + trainerIdToActor.remove(trainerOfflineMsg.trainerId); + return this; + } + + private SceneTrainerGroup onTrainerOfflineRequest(AkkamonNexus.RequestTrainerOffline trainerOfflineRequest) { + if (this.sceneId.equals(trainerOfflineRequest.sceneId)) { + ActorRef<Trainer.Command> trainerActor = trainerIdToActor.get(trainerOfflineRequest.trainerId); + if (trainerActor != null) { + trainerActor.tell(trainerOfflineRequest); + trainerOfflineRequest.replyTo.tell(new AkkamonNexus.RespondTrainerOffline( + trainerOfflineRequest.requestId, + trainerOfflineRequest.sceneId, + trainerOfflineRequest.session + )); + } else { + getContext() + .getLog() + .warn( + "Ignoring trainerOffline for trainerId {}. There is no actor mapped to it.", + trainerOfflineRequest.trainerId + ); + } + } else { + getContext() + .getLog() + .warn( + "Ignoring trainerOffline for {}. This actor is responsible for {}.", + trainerOfflineRequest.sceneId, + this.sceneId); + } + return this; + } + private SceneTrainerGroup onHeartBeat(AkkamonNexus.RequestHeartBeat heartBeatRequest) { Map<String, ActorRef<Trainer.Command>> trainerIdToActorCopy = new HashMap<>(this.trainerIdToActor); getContext() @@ -171,7 +217,7 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman getContext() .spawn(Trainer.create(sceneId, registrationRequest.trainerId), "trainer-" + registrationRequest.trainerId); getContext() - .watchWith(trainerActor, new SceneTrainerGroup.TrainerOffline(trainerActor, sceneId, registrationRequest.trainerId)); + .watchWith(trainerActor, new SceneTrainerGroup.TrainerOffline(trainerActor, sceneId, registrationRequest.trainerId, registrationRequest.replyTo)); trainerIdToActor.put(registrationRequest.trainerId, trainerActor); registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered( registrationRequest.trainerId, diff --git a/domain/src/main/java/akkamon/domain/Trainer.java b/domain/src/main/java/akkamon/domain/Trainer.java index 0a3b397..710f844 100644 --- a/domain/src/main/java/akkamon/domain/Trainer.java +++ b/domain/src/main/java/akkamon/domain/Trainer.java @@ -7,31 +7,33 @@ import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; +import java.util.LinkedList; import java.util.Optional; +import java.util.Queue; public class Trainer extends AbstractBehavior<Trainer.Command> { public interface Command { } - public static class ReadTrainerPosition implements Command { + public static class ReadMovementQueue implements Command { final long requestId; - final ActorRef<RespondTrainerPosition> replyTo; + final ActorRef<RespondMovementQueue> replyTo; - public ReadTrainerPosition(long requestId, ActorRef<RespondTrainerPosition> replyTo) { + public ReadMovementQueue(long requestId, ActorRef<RespondMovementQueue> replyTo) { this.requestId = requestId; this.replyTo = replyTo; } } - public static final class RespondTrainerPosition { + public static final class RespondMovementQueue { final long requestId; final String trainerId; - final Optional<TilePos> value; + final Queue<Direction> value; - public RespondTrainerPosition( + public RespondMovementQueue( long requestId, String trainerId, - Optional<TilePos> value + Queue<Direction> value ) { this.requestId = requestId; this.trainerId = trainerId; @@ -46,6 +48,10 @@ public class Trainer extends AbstractBehavior<Trainer.Command> { private String sceneId; private String trainerId; + private Queue<Direction> movementQueue = new LinkedList<>(); + + private Direction movementDirection = Direction.NONE; + private Optional<TilePos> lastValidTilePos = Optional.empty(); public Trainer(ActorContext<Command> context, String sceneId, String trainerId) { @@ -58,8 +64,12 @@ public class Trainer extends AbstractBehavior<Trainer.Command> { public Receive<Command> createReceive() { return newReceiveBuilder() .onMessage( - ReadTrainerPosition.class, - this::onReadTrainerPosition + ReadMovementQueue.class, + this::onReadMovementQueue + ) + .onMessage( + AkkamonNexus.RequestTrainerOffline.class, + this::onTrainerOffline ) .onMessage( AkkamonNexus.RequestStartMoving.class, @@ -75,29 +85,42 @@ public class Trainer extends AbstractBehavior<Trainer.Command> { .build(); } - private Trainer onReadTrainerPosition(ReadTrainerPosition readTrainerPositionRequest) { - readTrainerPositionRequest.replyTo.tell(new RespondTrainerPosition( + private Behavior<Command> onTrainerOffline(AkkamonNexus.RequestTrainerOffline trainerOfflineRequest) { + getContext().getLog().info("Trainer {} went offline, the actor has stopped! My supervisor should handle closing my connection!"); + return Behaviors.stopped(); + } + + private Trainer onReadMovementQueue(ReadMovementQueue readTrainerPositionRequest) { + readTrainerPositionRequest.replyTo.tell(new RespondMovementQueue( readTrainerPositionRequest.requestId, trainerId, - lastValidTilePos + new LinkedList<>(movementQueue) )); + this.movementQueue.clear(); return this; } private Trainer onNewTilePos(AkkamonNexus.RequestNewTilePos newTilePosRequest) { getContext().getLog().info("Trainer {} has new {}.", trainerId, newTilePosRequest.tilePos); - lastValidTilePos = Optional.of(newTilePosRequest.tilePos); + if (isMoving()) { + this.movementQueue.add(this.movementDirection); + } return this; } private Trainer onStopMoving(AkkamonNexus.RequestStopMoving stopMovingRequest) { getContext().getLog().info("Trainer {} stops to move {}.", trainerId, stopMovingRequest.direction); + this.movementDirection = Direction.NONE; return this; } private Trainer onStartMoving(AkkamonNexus.RequestStartMoving startMovingRequest) { getContext().getLog().info("Trainer {} starts to move {}.", trainerId, startMovingRequest.direction); + this.movementDirection = startMovingRequest.direction; return this; } + private boolean isMoving() { + return this.movementDirection != Direction.NONE; + } } |
