summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Vink <mike1994vink@gmail.com>2021-07-21 16:48:47 +0200
committerMike Vink <mike1994vink@gmail.com>2021-07-21 16:48:47 +0200
commit5d76a30aeba5bfd175b60845c4935c4352b46f4c (patch)
tree0b3bb79bb71fca8b412a168147d941d3f3f13131
parentda613e94970da31409464c81740162f9bd7c7831 (diff)
feat(): heartbeat working on remote engine
-rw-r--r--api/src/main/java/akkamon/api/MessagingEngine.java45
-rw-r--r--api/src/main/java/akkamon/api/models/HeartBeatEvent.java9
-rw-r--r--client/src/RemotePlayerEngine.ts14
-rw-r--r--client/src/RemotePlayerSprite.ts1
-rw-r--r--client/src/scene.ts11
-rw-r--r--domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java4
-rw-r--r--domain/src/main/java/akkamon/domain/AkkamonNexus.java66
-rw-r--r--domain/src/main/java/akkamon/domain/HeartBeatQuery.java131
-rw-r--r--domain/src/main/java/akkamon/domain/SceneTrainerGroup.java22
-rw-r--r--domain/src/main/java/akkamon/domain/Trainer.java40
10 files changed, 319 insertions, 24 deletions
diff --git a/api/src/main/java/akkamon/api/MessagingEngine.java b/api/src/main/java/akkamon/api/MessagingEngine.java
index 17b8f08..dc22752 100644
--- a/api/src/main/java/akkamon/api/MessagingEngine.java
+++ b/api/src/main/java/akkamon/api/MessagingEngine.java
@@ -3,15 +3,13 @@ package akkamon.api;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akkamon.api.models.Event;
+import akkamon.api.models.HeartBeatEvent;
import akkamon.domain.AkkamonMessageEngine;
import akkamon.domain.AkkamonNexus;
import akkamon.domain.AkkamonSession;
import com.google.gson.Gson;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -43,18 +41,43 @@ public class MessagingEngine implements AkkamonMessageEngine {
}
@Override
- public void broadCastToScene(String sceneId, String message) {
- Set<AkkamonSession> sessionsInScene = sceneIdToAkkamonSessions.get(sceneId);
- for (AkkamonSession session : sessionsInScene) {
- session.send(message);
+ public void broadCastHeartBeatToScene(String sceneId,
+ Map<String, AkkamonNexus.TrainerPositionReading> trainerPositions) {
+ Set<AkkamonSession> sceneSessions = sceneIdToAkkamonSessions.get(sceneId);
+ System.out.println(sceneSessions);
+ System.out.println(sceneIdToAkkamonSessions.keySet());
+ if (sceneSessions != null) {
+ for (AkkamonSession session : sceneSessions) {
+ Map<String, AkkamonNexus.TrainerPositionReading> withoutSelf = new HashMap<>(trainerPositions);
+ withoutSelf.remove(session.getTrainerId());
+ HeartBeatEvent heartBeat = new HeartBeatEvent(
+ withoutSelf
+ );
+ String heartBeatMessage = gson.toJson(heartBeat);
+ System.out.println("Sending to " + session.getTrainerId());
+ System.out.println(heartBeatMessage);
+ session.send(
+ heartBeatMessage
+ );
+ }
}
}
@Override
public void registerTrainerSessionToScene(String sceneId, AkkamonSession session) {
- sceneIdToAkkamonSessions.get(sceneId).add(session);
- session.setTrainerId(sceneId);
- heartBeat();
+ System.out.println("Registering session to scene " + sceneId);
+ Set<AkkamonSession> sessionsInScene = sceneIdToAkkamonSessions.get(sceneId);
+ if (sessionsInScene != null) {
+ sessionsInScene.add(session);
+ } else {
+ sessionsInScene = new HashSet<>();
+ sessionsInScene.add(session);
+ sceneIdToAkkamonSessions.put(sceneId,
+ sessionsInScene
+ );
+ System.out.println(sceneIdToAkkamonSessions.keySet());
+ }
+ //heartBeat();
}
@Override
diff --git a/api/src/main/java/akkamon/api/models/HeartBeatEvent.java b/api/src/main/java/akkamon/api/models/HeartBeatEvent.java
index 472c7e4..8db514b 100644
--- a/api/src/main/java/akkamon/api/models/HeartBeatEvent.java
+++ b/api/src/main/java/akkamon/api/models/HeartBeatEvent.java
@@ -1,7 +1,14 @@
package akkamon.api.models;
+import akkamon.domain.AkkamonNexus;
+
+import java.util.Map;
+
public class HeartBeatEvent extends Event {
- public HeartBeatEvent() {
+ public Map<String, AkkamonNexus.TrainerPositionReading> remoteTrainerPositions;
+
+ public HeartBeatEvent(Map<String, AkkamonNexus.TrainerPositionReading> remoteTrainerPositions) {
this.type = EventType.HEART_BEAT;
+ this.remoteTrainerPositions = remoteTrainerPositions;
}
}
diff --git a/client/src/RemotePlayerEngine.ts b/client/src/RemotePlayerEngine.ts
new file mode 100644
index 0000000..f19ebe0
--- /dev/null
+++ b/client/src/RemotePlayerEngine.ts
@@ -0,0 +1,14 @@
+import Phaser from 'phaser';
+import { akkamonClient } from './app';
+
+export class RemotePlayerEngine {
+
+ private scene: Phaser.Scene
+
+ constructor(scene: Phaser.Scene) {
+ this.scene = scene;
+ }
+
+ update() {
+ }
+}
diff --git a/client/src/RemotePlayerSprite.ts b/client/src/RemotePlayerSprite.ts
new file mode 100644
index 0000000..8485803
--- /dev/null
+++ b/client/src/RemotePlayerSprite.ts
@@ -0,0 +1 @@
+import Phaser from 'phaser';
diff --git a/client/src/scene.ts b/client/src/scene.ts
index 29bf253..52edbdd 100644
--- a/client/src/scene.ts
+++ b/client/src/scene.ts
@@ -1,12 +1,14 @@
import Phaser from 'phaser';
import { akkamonClient } from './app';
-import type { GameState } from './GameState';
import { Player } from './player';
import { PlayerSprite } from './sprite';
+
import { GridControls } from './GridControls';
import { GridPhysics } from './GridPhysics';
import { Direction } from './Direction';
+import { RemotePlayerEngine } from './RemotePlayerEngine';
+
type RemotePlayerStates = {
[name: string]: Player
@@ -17,10 +19,11 @@ export default class AkkamonStartScene extends Phaser.Scene
static readonly TILE_SIZE = 32;
- private akkamonState?: GameState
private gridPhysics?: GridPhysics
private gridControls?: GridControls
+ private remotePlayerEngine?: RemotePlayerEngine
+
directionToAnimation: {
[key in Direction]: string
} = {
@@ -81,8 +84,6 @@ export default class AkkamonStartScene extends Phaser.Scene
// Create a sprite with physics enabled via the physics system. The image used for the sprite has
// a bit of whitespace, so I'm using setSize & setOffset to control the size of the player's body.
- this.akkamonState = akkamonClient.getMutableState();
-
var tilePos = new Phaser.Math.Vector2(
Math.floor(this.spawnPoint.x! / AkkamonStartScene.TILE_SIZE),
Math.floor(this.spawnPoint.y! / AkkamonStartScene.TILE_SIZE),
@@ -106,6 +107,8 @@ export default class AkkamonStartScene extends Phaser.Scene
this.gridPhysics
);
+ this.remotePlayerEngine = new RemotePlayerEngine(this);
+
this.createPlayerAnimation(Direction.LEFT, 0, 3);
this.createPlayerAnimation(Direction.RIGHT, 0, 3);
this.createPlayerAnimation(Direction.UP, 0, 3);
diff --git a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java
index fc03915..eb998ba 100644
--- a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java
+++ b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java
@@ -1,8 +1,10 @@
package akkamon.domain;
+import java.util.Map;
+
public interface AkkamonMessageEngine {
// broadcasts position info to WebSocket Clients
- void broadCastToScene(String sceneId, String message);
+ void broadCastHeartBeatToScene(String sceneId, Map<String, AkkamonNexus.TrainerPositionReading> trainerPositions);
void registerTrainerSessionToScene(String sceneId, AkkamonSession session);
diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java
index 0b52ffa..9fd2763 100644
--- a/domain/src/main/java/akkamon/domain/AkkamonNexus.java
+++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java
@@ -8,6 +8,7 @@ import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
@@ -37,13 +38,16 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
public static class TrainerRegistered implements Command {
private String trainerId;
+ private String sceneId;
private AkkamonSession session;
public TrainerRegistered(
String trainerId,
+ String sceneId,
AkkamonSession session
) {
this.trainerId = trainerId;
+ this.sceneId = sceneId;
this.session = session;
}
}
@@ -108,16 +112,34 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
implements Command, SceneTrainerGroup.Command {
public long requestId;
+ // TODO find a way to make the command Narrower
public ActorRef<AkkamonNexus.Command> replyTo;
- public RequestHeartBeat(long requestId, ActorRef<AkkamonNexus.Command> replyTo) {
+ public RequestHeartBeat(long requestId, ActorRef<Command> replyTo) {
this.requestId = requestId;
this.replyTo = replyTo;
}
}
+ private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command {
+ public SceneTrainerGroupTerminated(String sceneId) {
+ }
+ }
+
public static class RespondHeartBeatQuery implements Command {
+ public final long requestId;
+ public final String sceneId;
+ public final Map<String, TrainerPositionReading> trainerPositions;
+
+ public RespondHeartBeatQuery(
+ long requestId,
+ String sceneId,
+ Map<String, TrainerPositionReading> trainerPositions) {
+ this.requestId = requestId;
+ this.sceneId = sceneId;
+ this.trainerPositions = trainerPositions;
+ }
}
public interface TrainerPositionReading { }
@@ -141,14 +163,21 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
@Override
public String toString() {
- return "TrainerPosition={x: " + value.x + ", " + value.y + "}";
+ return "TrainerPosition={x: " + value.x + ", y: " + value.y + "}";
}
}
- private static class SceneTrainerGroupTerminated implements AkkamonNexus.Command {
- public SceneTrainerGroupTerminated(String sceneId) {
- }
+ public enum TrainerPositionNotAvailable implements TrainerPositionReading {
+ INSTANCE
+ }
+
+ public enum TrainerOffline implements TrainerPositionReading {
+ INSTANCE
+ }
+
+ public enum TrainerTimedOut implements TrainerPositionReading {
+ INSTANCE
}
public static Behavior<AkkamonNexus.Command> create(AkkamonMessageEngine messagingEngine) {
@@ -170,13 +199,35 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
.onMessage(RequestTrainerRegistration.class, this::onTrainerRegistration)
.onMessage(TrainerRegistered.class, this::onTrainerRegistered)
.onMessage(RequestHeartBeat.class, this::onHeartBeat)
+ .onMessage(RespondHeartBeatQuery.class, this::onHeartBeatQueryResponse)
.onMessage(RequestStartMoving.class, this::onStartMoving)
.onMessage(RequestStopMoving.class, this::onStopMoving)
.onMessage(RequestNewTilePos.class, this::onNewTilePos)
.build();
}
+ private AkkamonNexus onHeartBeatQueryResponse(RespondHeartBeatQuery response) {
+ // Turn on for logging
+
+
+ StringBuilder positions = new StringBuilder();
+ positions.append("\n" + response.sceneId.toUpperCase(Locale.ROOT) + "\n");
+ for (Map.Entry<String, TrainerPositionReading> entry : response.trainerPositions.entrySet()) {
+ positions.append(entry.getKey() + ": " +entry.getValue());
+ positions.append("\n");
+ }
+ getContext().getLog().info(String.valueOf(positions));
+
+ messageEngine.broadCastHeartBeatToScene(response.sceneId, response.trainerPositions);
+
+ return this;
+ }
+
private AkkamonNexus onHeartBeat(RequestHeartBeat heartBeatRequest) {
+ // TODO do some checks here?
+ for (ActorRef<SceneTrainerGroup.Command> sceneGroupActor: sceneIdToActor.values()) {
+ sceneGroupActor.tell(heartBeatRequest);
+ }
return this;
}
@@ -218,8 +269,9 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
private AkkamonNexus onTrainerRegistered(TrainerRegistered reply) {
// TODO test when registration fails?
- getContext().getLog().info("Adding {} to Live AkkamonSessions in Messaging Engine", reply.trainerId);
- messageEngine.registerTrainerSessionToScene(reply.trainerId, reply.session);
+ getContext().getLog().info("Adding {} to scene {} Live AkkamonSessions in Messaging Engine", reply.trainerId, reply.sceneId);
+ messageEngine.registerTrainerSessionToScene(reply.sceneId, reply.session);
+ reply.session.setTrainerId(reply.trainerId);
return this;
}
diff --git a/domain/src/main/java/akkamon/domain/HeartBeatQuery.java b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java
new file mode 100644
index 0000000..4be99ec
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java
@@ -0,0 +1,131 @@
+package akkamon.domain;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.*;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> {
+
+ public interface Command {}
+
+ private static enum CollectionTimeout implements Command {
+ INSTANCE
+ }
+
+ static class WrappedRespondTrainerPosition implements Command {
+ final Trainer.RespondTrainerPosition response;
+
+ WrappedRespondTrainerPosition(
+ Trainer.RespondTrainerPosition response
+ ) {
+ this.response = response;
+ }
+ }
+
+ private static class TrainerOffline implements Command {
+ final String trainerId;
+
+ private TrainerOffline(String trainerId) {
+ this.trainerId = trainerId;
+ }
+ }
+
+ public static Behavior<Command> create(
+ Map<String, ActorRef<Trainer.Command>> trainerIdToActor,
+ long requestId,
+ String sceneId,
+ ActorRef<AkkamonNexus.Command> requester,
+ Duration timeout
+ ) {
+ return Behaviors.setup(
+ context ->
+ Behaviors.withTimers(
+ timers -> new HeartBeatQuery(
+ trainerIdToActor,
+ requestId,
+ sceneId,
+ requester,
+ timeout,
+ context,
+ timers
+ )
+ )
+ );
+ }
+
+ private final long requestId;
+ private final String sceneId;
+ private final ActorRef<AkkamonNexus.Command> requester;
+ private Map<String, AkkamonNexus.TrainerPositionReading> repliesSoFar = new HashMap<String, AkkamonNexus.TrainerPositionReading>();
+ private final Set<String> stillWaiting;
+
+ public HeartBeatQuery(
+ Map<String, ActorRef<Trainer.Command>> trainerIdToActor,
+ long requestId,
+ String sceneId,
+ ActorRef<AkkamonNexus.Command> requester,
+ Duration timeout,
+ ActorContext<Command> context,
+ TimerScheduler<Command> timers) {
+ super(context);
+ this.requestId = requestId;
+ this.sceneId = sceneId;
+ this.requester = requester;
+
+ timers.startSingleTimer(CollectionTimeout.INSTANCE, timeout);
+
+ ActorRef<Trainer.RespondTrainerPosition> respondTrainerPositionAdapter =
+ context.messageAdapter(Trainer.RespondTrainerPosition.class, WrappedRespondTrainerPosition::new);
+
+ for (Map.Entry<String, ActorRef<Trainer.Command>> entry : trainerIdToActor.entrySet()) {
+ context.watchWith(entry.getValue(), new TrainerOffline(entry.getKey()));
+ entry.getValue().tell(
+ new Trainer.ReadTrainerPosition(
+ 0L,
+ respondTrainerPositionAdapter
+ )
+ );
+ }
+ stillWaiting = new HashSet<>(trainerIdToActor.keySet());
+ }
+
+ @Override
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(WrappedRespondTrainerPosition.class, this::onRespondTrainerPosition)
+ .build();
+ }
+
+ private Behavior<Command> onRespondTrainerPosition(WrappedRespondTrainerPosition r) {
+ AkkamonNexus.TrainerPositionReading trainerPositionRead =
+ r.response
+ .value
+ .map(optionalValue -> (AkkamonNexus.TrainerPositionReading) new AkkamonNexus.TrainerPosition(optionalValue))
+ .orElse(AkkamonNexus.TrainerPositionNotAvailable.INSTANCE);
+
+ String trainerId = r.response.trainerId;
+ repliesSoFar.put(trainerId, trainerPositionRead);
+ stillWaiting.remove(trainerId);
+
+ return respondWhenAllCollected();
+ }
+
+ private Behavior<Command> respondWhenAllCollected() {
+ if (stillWaiting.isEmpty()) {
+ requester.tell(new AkkamonNexus.RespondHeartBeatQuery(
+ requestId,
+ sceneId,
+ repliesSoFar));
+ return Behaviors.stopped();
+ } else {
+ return this;
+ }
+ }
+
+}
diff --git a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java
index f01edc4..29778ec 100644
--- a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java
+++ b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java
@@ -7,6 +7,7 @@ import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@@ -60,9 +61,28 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman
AkkamonNexus.RequestNewTilePos.class,
this::onNewTilePos
)
+ .onMessage(
+ AkkamonNexus.RequestHeartBeat.class,
+ this::onHeartBeat
+ )
.build();
}
+ private SceneTrainerGroup onHeartBeat(AkkamonNexus.RequestHeartBeat heartBeatRequest) {
+ Map<String, ActorRef<Trainer.Command>> trainerIdToActorCopy = new HashMap<>(this.trainerIdToActor);
+ getContext()
+ .spawnAnonymous(
+ HeartBeatQuery.create(
+ trainerIdToActorCopy,
+ heartBeatRequest.requestId,
+ sceneId,
+ heartBeatRequest.replyTo,
+ Duration.ofSeconds(3)
+ )
+ );
+ return this;
+ }
+
private SceneTrainerGroup onNewTilePos(AkkamonNexus.RequestNewTilePos newTilePosRequest) {
if (this.sceneId.equals(newTilePosRequest.sceneId)) {
ActorRef<Trainer.Command> trainerActor = trainerIdToActor.get(newTilePosRequest.trainerId);
@@ -142,6 +162,7 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman
// TODO add optional already registered?
registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered(
registrationRequest.trainerId,
+ sceneId,
registrationRequest.session
));
} else {
@@ -154,6 +175,7 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman
trainerIdToActor.put(registrationRequest.trainerId, trainerActor);
registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered(
registrationRequest.trainerId,
+ sceneId,
registrationRequest.session
));
}
diff --git a/domain/src/main/java/akkamon/domain/Trainer.java b/domain/src/main/java/akkamon/domain/Trainer.java
index b756a97..0a3b397 100644
--- a/domain/src/main/java/akkamon/domain/Trainer.java
+++ b/domain/src/main/java/akkamon/domain/Trainer.java
@@ -1,5 +1,6 @@
package akkamon.domain;
+import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
@@ -12,6 +13,32 @@ public class Trainer extends AbstractBehavior<Trainer.Command> {
public interface Command { }
+ public static class ReadTrainerPosition implements Command {
+ final long requestId;
+ final ActorRef<RespondTrainerPosition> replyTo;
+
+ public ReadTrainerPosition(long requestId, ActorRef<RespondTrainerPosition> replyTo) {
+ this.requestId = requestId;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class RespondTrainerPosition {
+ final long requestId;
+ final String trainerId;
+ final Optional<TilePos> value;
+
+ public RespondTrainerPosition(
+ long requestId,
+ String trainerId,
+ Optional<TilePos> value
+ ) {
+ this.requestId = requestId;
+ this.trainerId = trainerId;
+ this.value = value;
+ }
+ }
+
public static Behavior<Command> create(String sceneId, String trainerId) {
return Behaviors.setup(context -> new Trainer(context, sceneId, trainerId));
}
@@ -31,6 +58,10 @@ public class Trainer extends AbstractBehavior<Trainer.Command> {
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(
+ ReadTrainerPosition.class,
+ this::onReadTrainerPosition
+ )
+ .onMessage(
AkkamonNexus.RequestStartMoving.class,
this::onStartMoving
)
@@ -44,6 +75,15 @@ public class Trainer extends AbstractBehavior<Trainer.Command> {
.build();
}
+ private Trainer onReadTrainerPosition(ReadTrainerPosition readTrainerPositionRequest) {
+ readTrainerPositionRequest.replyTo.tell(new RespondTrainerPosition(
+ readTrainerPositionRequest.requestId,
+ trainerId,
+ lastValidTilePos
+ ));
+ return this;
+ }
+
private Trainer onNewTilePos(AkkamonNexus.RequestNewTilePos newTilePosRequest) {
getContext().getLog().info("Trainer {} has new {}.", trainerId, newTilePosRequest.tilePos);
lastValidTilePos = Optional.of(newTilePosRequest.tilePos);