From 5d76a30aeba5bfd175b60845c4935c4352b46f4c Mon Sep 17 00:00:00 2001 From: Mike Vink Date: Wed, 21 Jul 2021 16:48:47 +0200 Subject: feat(): heartbeat working on remote engine --- api/src/main/java/akkamon/api/MessagingEngine.java | 45 +++++-- .../java/akkamon/api/models/HeartBeatEvent.java | 9 +- client/src/RemotePlayerEngine.ts | 14 +++ client/src/RemotePlayerSprite.ts | 1 + client/src/scene.ts | 11 +- .../java/akkamon/domain/AkkamonMessageEngine.java | 4 +- .../src/main/java/akkamon/domain/AkkamonNexus.java | 66 +++++++++-- .../main/java/akkamon/domain/HeartBeatQuery.java | 131 +++++++++++++++++++++ .../java/akkamon/domain/SceneTrainerGroup.java | 22 ++++ domain/src/main/java/akkamon/domain/Trainer.java | 40 +++++++ 10 files changed, 319 insertions(+), 24 deletions(-) create mode 100644 client/src/RemotePlayerEngine.ts create mode 100644 client/src/RemotePlayerSprite.ts create mode 100644 domain/src/main/java/akkamon/domain/HeartBeatQuery.java diff --git a/api/src/main/java/akkamon/api/MessagingEngine.java b/api/src/main/java/akkamon/api/MessagingEngine.java index 17b8f08..dc22752 100644 --- a/api/src/main/java/akkamon/api/MessagingEngine.java +++ b/api/src/main/java/akkamon/api/MessagingEngine.java @@ -3,15 +3,13 @@ package akkamon.api; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akkamon.api.models.Event; +import akkamon.api.models.HeartBeatEvent; import akkamon.domain.AkkamonMessageEngine; import akkamon.domain.AkkamonNexus; import akkamon.domain.AkkamonSession; import com.google.gson.Gson; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -43,18 +41,43 @@ public class MessagingEngine implements AkkamonMessageEngine { } @Override - public void broadCastToScene(String sceneId, String message) { - Set sessionsInScene = sceneIdToAkkamonSessions.get(sceneId); - for (AkkamonSession session : sessionsInScene) { - session.send(message); + public void broadCastHeartBeatToScene(String sceneId, + Map trainerPositions) { + Set sceneSessions = sceneIdToAkkamonSessions.get(sceneId); + System.out.println(sceneSessions); + System.out.println(sceneIdToAkkamonSessions.keySet()); + if (sceneSessions != null) { + for (AkkamonSession session : sceneSessions) { + Map withoutSelf = new HashMap<>(trainerPositions); + withoutSelf.remove(session.getTrainerId()); + HeartBeatEvent heartBeat = new HeartBeatEvent( + withoutSelf + ); + String heartBeatMessage = gson.toJson(heartBeat); + System.out.println("Sending to " + session.getTrainerId()); + System.out.println(heartBeatMessage); + session.send( + heartBeatMessage + ); + } } } @Override public void registerTrainerSessionToScene(String sceneId, AkkamonSession session) { - sceneIdToAkkamonSessions.get(sceneId).add(session); - session.setTrainerId(sceneId); - heartBeat(); + System.out.println("Registering session to scene " + sceneId); + Set sessionsInScene = sceneIdToAkkamonSessions.get(sceneId); + if (sessionsInScene != null) { + sessionsInScene.add(session); + } else { + sessionsInScene = new HashSet<>(); + sessionsInScene.add(session); + sceneIdToAkkamonSessions.put(sceneId, + sessionsInScene + ); + System.out.println(sceneIdToAkkamonSessions.keySet()); + } + //heartBeat(); } @Override diff --git a/api/src/main/java/akkamon/api/models/HeartBeatEvent.java b/api/src/main/java/akkamon/api/models/HeartBeatEvent.java index 472c7e4..8db514b 100644 --- a/api/src/main/java/akkamon/api/models/HeartBeatEvent.java +++ b/api/src/main/java/akkamon/api/models/HeartBeatEvent.java @@ -1,7 +1,14 @@ package akkamon.api.models; +import akkamon.domain.AkkamonNexus; + +import java.util.Map; + public class HeartBeatEvent extends Event { - public HeartBeatEvent() { + public Map remoteTrainerPositions; + + public HeartBeatEvent(Map remoteTrainerPositions) { this.type = EventType.HEART_BEAT; + this.remoteTrainerPositions = remoteTrainerPositions; } } diff --git a/client/src/RemotePlayerEngine.ts b/client/src/RemotePlayerEngine.ts new file mode 100644 index 0000000..f19ebe0 --- /dev/null +++ b/client/src/RemotePlayerEngine.ts @@ -0,0 +1,14 @@ +import Phaser from 'phaser'; +import { akkamonClient } from './app'; + +export class RemotePlayerEngine { + + private scene: Phaser.Scene + + constructor(scene: Phaser.Scene) { + this.scene = scene; + } + + update() { + } +} diff --git a/client/src/RemotePlayerSprite.ts b/client/src/RemotePlayerSprite.ts new file mode 100644 index 0000000..8485803 --- /dev/null +++ b/client/src/RemotePlayerSprite.ts @@ -0,0 +1 @@ +import Phaser from 'phaser'; diff --git a/client/src/scene.ts b/client/src/scene.ts index 29bf253..52edbdd 100644 --- a/client/src/scene.ts +++ b/client/src/scene.ts @@ -1,12 +1,14 @@ import Phaser from 'phaser'; import { akkamonClient } from './app'; -import type { GameState } from './GameState'; import { Player } from './player'; import { PlayerSprite } from './sprite'; + import { GridControls } from './GridControls'; import { GridPhysics } from './GridPhysics'; import { Direction } from './Direction'; +import { RemotePlayerEngine } from './RemotePlayerEngine'; + type RemotePlayerStates = { [name: string]: Player @@ -17,10 +19,11 @@ export default class AkkamonStartScene extends Phaser.Scene static readonly TILE_SIZE = 32; - private akkamonState?: GameState private gridPhysics?: GridPhysics private gridControls?: GridControls + private remotePlayerEngine?: RemotePlayerEngine + directionToAnimation: { [key in Direction]: string } = { @@ -81,8 +84,6 @@ export default class AkkamonStartScene extends Phaser.Scene // Create a sprite with physics enabled via the physics system. The image used for the sprite has // a bit of whitespace, so I'm using setSize & setOffset to control the size of the player's body. - this.akkamonState = akkamonClient.getMutableState(); - var tilePos = new Phaser.Math.Vector2( Math.floor(this.spawnPoint.x! / AkkamonStartScene.TILE_SIZE), Math.floor(this.spawnPoint.y! / AkkamonStartScene.TILE_SIZE), @@ -106,6 +107,8 @@ export default class AkkamonStartScene extends Phaser.Scene this.gridPhysics ); + this.remotePlayerEngine = new RemotePlayerEngine(this); + this.createPlayerAnimation(Direction.LEFT, 0, 3); this.createPlayerAnimation(Direction.RIGHT, 0, 3); this.createPlayerAnimation(Direction.UP, 0, 3); diff --git a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java index fc03915..eb998ba 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java +++ b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java @@ -1,8 +1,10 @@ package akkamon.domain; +import java.util.Map; + public interface AkkamonMessageEngine { // broadcasts position info to WebSocket Clients - void broadCastToScene(String sceneId, String message); + void broadCastHeartBeatToScene(String sceneId, Map trainerPositions); void registerTrainerSessionToScene(String sceneId, AkkamonSession session); diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java index 0b52ffa..9fd2763 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonNexus.java +++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java @@ -8,6 +8,7 @@ import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; import java.util.HashMap; +import java.util.Locale; import java.util.Map; public class AkkamonNexus extends AbstractBehavior { @@ -37,13 +38,16 @@ public class AkkamonNexus extends AbstractBehavior { public static class TrainerRegistered implements Command { private String trainerId; + private String sceneId; private AkkamonSession session; public TrainerRegistered( String trainerId, + String sceneId, AkkamonSession session ) { this.trainerId = trainerId; + this.sceneId = sceneId; this.session = session; } } @@ -108,16 +112,34 @@ public class AkkamonNexus extends AbstractBehavior { implements Command, SceneTrainerGroup.Command { public long requestId; + // TODO find a way to make the command Narrower public ActorRef replyTo; - public RequestHeartBeat(long requestId, ActorRef replyTo) { + public RequestHeartBeat(long requestId, ActorRef replyTo) { this.requestId = requestId; this.replyTo = replyTo; } } + private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command { + public SceneTrainerGroupTerminated(String sceneId) { + } + } + public static class RespondHeartBeatQuery implements Command { + public final long requestId; + public final String sceneId; + public final Map trainerPositions; + + public RespondHeartBeatQuery( + long requestId, + String sceneId, + Map trainerPositions) { + this.requestId = requestId; + this.sceneId = sceneId; + this.trainerPositions = trainerPositions; + } } public interface TrainerPositionReading { } @@ -141,14 +163,21 @@ public class AkkamonNexus extends AbstractBehavior { @Override public String toString() { - return "TrainerPosition={x: " + value.x + ", " + value.y + "}"; + return "TrainerPosition={x: " + value.x + ", y: " + value.y + "}"; } } - private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command { - public SceneTrainerGroupTerminated(String sceneId) { - } + public enum TrainerPositionNotAvailable implements TrainerPositionReading { + INSTANCE + } + + public enum TrainerOffline implements TrainerPositionReading { + INSTANCE + } + + public enum TrainerTimedOut implements TrainerPositionReading { + INSTANCE } public static Behavior create(AkkamonMessageEngine messagingEngine) { @@ -170,13 +199,35 @@ public class AkkamonNexus extends AbstractBehavior { .onMessage(RequestTrainerRegistration.class, this::onTrainerRegistration) .onMessage(TrainerRegistered.class, this::onTrainerRegistered) .onMessage(RequestHeartBeat.class, this::onHeartBeat) + .onMessage(RespondHeartBeatQuery.class, this::onHeartBeatQueryResponse) .onMessage(RequestStartMoving.class, this::onStartMoving) .onMessage(RequestStopMoving.class, this::onStopMoving) .onMessage(RequestNewTilePos.class, this::onNewTilePos) .build(); } + private AkkamonNexus onHeartBeatQueryResponse(RespondHeartBeatQuery response) { + // Turn on for logging + + + StringBuilder positions = new StringBuilder(); + positions.append("\n" + response.sceneId.toUpperCase(Locale.ROOT) + "\n"); + for (Map.Entry entry : response.trainerPositions.entrySet()) { + positions.append(entry.getKey() + ": " +entry.getValue()); + positions.append("\n"); + } + getContext().getLog().info(String.valueOf(positions)); + + messageEngine.broadCastHeartBeatToScene(response.sceneId, response.trainerPositions); + + return this; + } + private AkkamonNexus onHeartBeat(RequestHeartBeat heartBeatRequest) { + // TODO do some checks here? + for (ActorRef sceneGroupActor: sceneIdToActor.values()) { + sceneGroupActor.tell(heartBeatRequest); + } return this; } @@ -218,8 +269,9 @@ public class AkkamonNexus extends AbstractBehavior { private AkkamonNexus onTrainerRegistered(TrainerRegistered reply) { // TODO test when registration fails? - getContext().getLog().info("Adding {} to Live AkkamonSessions in Messaging Engine", reply.trainerId); - messageEngine.registerTrainerSessionToScene(reply.trainerId, reply.session); + getContext().getLog().info("Adding {} to scene {} Live AkkamonSessions in Messaging Engine", reply.trainerId, reply.sceneId); + messageEngine.registerTrainerSessionToScene(reply.sceneId, reply.session); + reply.session.setTrainerId(reply.trainerId); return this; } diff --git a/domain/src/main/java/akkamon/domain/HeartBeatQuery.java b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java new file mode 100644 index 0000000..4be99ec --- /dev/null +++ b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java @@ -0,0 +1,131 @@ +package akkamon.domain; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.*; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class HeartBeatQuery extends AbstractBehavior { + + public interface Command {} + + private static enum CollectionTimeout implements Command { + INSTANCE + } + + static class WrappedRespondTrainerPosition implements Command { + final Trainer.RespondTrainerPosition response; + + WrappedRespondTrainerPosition( + Trainer.RespondTrainerPosition response + ) { + this.response = response; + } + } + + private static class TrainerOffline implements Command { + final String trainerId; + + private TrainerOffline(String trainerId) { + this.trainerId = trainerId; + } + } + + public static Behavior create( + Map> trainerIdToActor, + long requestId, + String sceneId, + ActorRef requester, + Duration timeout + ) { + return Behaviors.setup( + context -> + Behaviors.withTimers( + timers -> new HeartBeatQuery( + trainerIdToActor, + requestId, + sceneId, + requester, + timeout, + context, + timers + ) + ) + ); + } + + private final long requestId; + private final String sceneId; + private final ActorRef requester; + private Map repliesSoFar = new HashMap(); + private final Set stillWaiting; + + public HeartBeatQuery( + Map> trainerIdToActor, + long requestId, + String sceneId, + ActorRef requester, + Duration timeout, + ActorContext context, + TimerScheduler timers) { + super(context); + this.requestId = requestId; + this.sceneId = sceneId; + this.requester = requester; + + timers.startSingleTimer(CollectionTimeout.INSTANCE, timeout); + + ActorRef respondTrainerPositionAdapter = + context.messageAdapter(Trainer.RespondTrainerPosition.class, WrappedRespondTrainerPosition::new); + + for (Map.Entry> entry : trainerIdToActor.entrySet()) { + context.watchWith(entry.getValue(), new TrainerOffline(entry.getKey())); + entry.getValue().tell( + new Trainer.ReadTrainerPosition( + 0L, + respondTrainerPositionAdapter + ) + ); + } + stillWaiting = new HashSet<>(trainerIdToActor.keySet()); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(WrappedRespondTrainerPosition.class, this::onRespondTrainerPosition) + .build(); + } + + private Behavior onRespondTrainerPosition(WrappedRespondTrainerPosition r) { + AkkamonNexus.TrainerPositionReading trainerPositionRead = + r.response + .value + .map(optionalValue -> (AkkamonNexus.TrainerPositionReading) new AkkamonNexus.TrainerPosition(optionalValue)) + .orElse(AkkamonNexus.TrainerPositionNotAvailable.INSTANCE); + + String trainerId = r.response.trainerId; + repliesSoFar.put(trainerId, trainerPositionRead); + stillWaiting.remove(trainerId); + + return respondWhenAllCollected(); + } + + private Behavior respondWhenAllCollected() { + if (stillWaiting.isEmpty()) { + requester.tell(new AkkamonNexus.RespondHeartBeatQuery( + requestId, + sceneId, + repliesSoFar)); + return Behaviors.stopped(); + } else { + return this; + } + } + +} diff --git a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java index f01edc4..29778ec 100644 --- a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java +++ b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java @@ -7,6 +7,7 @@ import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -60,9 +61,28 @@ public class SceneTrainerGroup extends AbstractBehavior> trainerIdToActorCopy = new HashMap<>(this.trainerIdToActor); + getContext() + .spawnAnonymous( + HeartBeatQuery.create( + trainerIdToActorCopy, + heartBeatRequest.requestId, + sceneId, + heartBeatRequest.replyTo, + Duration.ofSeconds(3) + ) + ); + return this; + } + private SceneTrainerGroup onNewTilePos(AkkamonNexus.RequestNewTilePos newTilePosRequest) { if (this.sceneId.equals(newTilePosRequest.sceneId)) { ActorRef trainerActor = trainerIdToActor.get(newTilePosRequest.trainerId); @@ -142,6 +162,7 @@ public class SceneTrainerGroup extends AbstractBehavior { public interface Command { } + public static class ReadTrainerPosition implements Command { + final long requestId; + final ActorRef replyTo; + + public ReadTrainerPosition(long requestId, ActorRef replyTo) { + this.requestId = requestId; + this.replyTo = replyTo; + } + } + + public static final class RespondTrainerPosition { + final long requestId; + final String trainerId; + final Optional value; + + public RespondTrainerPosition( + long requestId, + String trainerId, + Optional value + ) { + this.requestId = requestId; + this.trainerId = trainerId; + this.value = value; + } + } + public static Behavior create(String sceneId, String trainerId) { return Behaviors.setup(context -> new Trainer(context, sceneId, trainerId)); } @@ -30,6 +57,10 @@ public class Trainer extends AbstractBehavior { @Override public Receive createReceive() { return newReceiveBuilder() + .onMessage( + ReadTrainerPosition.class, + this::onReadTrainerPosition + ) .onMessage( AkkamonNexus.RequestStartMoving.class, this::onStartMoving @@ -44,6 +75,15 @@ public class Trainer extends AbstractBehavior { .build(); } + private Trainer onReadTrainerPosition(ReadTrainerPosition readTrainerPositionRequest) { + readTrainerPositionRequest.replyTo.tell(new RespondTrainerPosition( + readTrainerPositionRequest.requestId, + trainerId, + lastValidTilePos + )); + return this; + } + private Trainer onNewTilePos(AkkamonNexus.RequestNewTilePos newTilePosRequest) { getContext().getLog().info("Trainer {} has new {}.", trainerId, newTilePosRequest.tilePos); lastValidTilePos = Optional.of(newTilePosRequest.tilePos); -- cgit v1.2.3