diff options
| -rw-r--r-- | api/src/main/java/akkamon/api/EventSocket.java | 1 | ||||
| -rw-r--r-- | api/src/main/java/akkamon/api/MessagingEngine.java | 23 | ||||
| -rw-r--r-- | api/src/main/java/akkamon/api/models/EventType.java | 2 | ||||
| -rw-r--r-- | api/src/main/java/akkamon/api/models/HeartBeatEvent.java | 6 | ||||
| -rw-r--r-- | api/src/main/java/akkamon/api/models/SocketClosedEvent.java | 8 | ||||
| -rw-r--r-- | client/src/RemotePlayerEngine.ts | 76 | ||||
| -rw-r--r-- | client/src/RemotePlayerSprite.ts | 137 | ||||
| -rw-r--r-- | client/src/client.ts | 32 | ||||
| -rw-r--r-- | client/src/events.ts | 12 | ||||
| -rw-r--r-- | client/src/scene.ts | 22 | ||||
| -rw-r--r-- | client/src/socket.ts | 8 | ||||
| -rw-r--r-- | client/src/sprite.ts | 8 | ||||
| -rw-r--r-- | client/src/uiScene.ts | 17 | ||||
| -rw-r--r-- | domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java | 4 | ||||
| -rw-r--r-- | domain/src/main/java/akkamon/domain/AkkamonNexus.java | 91 | ||||
| -rw-r--r-- | domain/src/main/java/akkamon/domain/HeartBeatQuery.java | 40 | ||||
| -rw-r--r-- | domain/src/main/java/akkamon/domain/SceneTrainerGroup.java | 52 | ||||
| -rw-r--r-- | domain/src/main/java/akkamon/domain/Trainer.java | 49 |
18 files changed, 477 insertions, 111 deletions
diff --git a/api/src/main/java/akkamon/api/EventSocket.java b/api/src/main/java/akkamon/api/EventSocket.java index f800ab3..77950ed 100644 --- a/api/src/main/java/akkamon/api/EventSocket.java +++ b/api/src/main/java/akkamon/api/EventSocket.java @@ -33,6 +33,7 @@ public class EventSocket extends WebSocketAdapter implements AkkamonSession { super.onWebSocketClose(statusCode, reason); System.out.println("Socket Closed: [" + statusCode + "] " + reason); closureLatch.countDown(); + App.messagingEngine.trainerDisconnected(this); } @Override diff --git a/api/src/main/java/akkamon/api/MessagingEngine.java b/api/src/main/java/akkamon/api/MessagingEngine.java index dc22752..3d3cd96 100644 --- a/api/src/main/java/akkamon/api/MessagingEngine.java +++ b/api/src/main/java/akkamon/api/MessagingEngine.java @@ -42,13 +42,13 @@ public class MessagingEngine implements AkkamonMessageEngine { @Override public void broadCastHeartBeatToScene(String sceneId, - Map<String, AkkamonNexus.TrainerPositionReading> trainerPositions) { + Map<String, AkkamonNexus.MovementQueueReading> trainerPositions) { Set<AkkamonSession> sceneSessions = sceneIdToAkkamonSessions.get(sceneId); System.out.println(sceneSessions); System.out.println(sceneIdToAkkamonSessions.keySet()); if (sceneSessions != null) { for (AkkamonSession session : sceneSessions) { - Map<String, AkkamonNexus.TrainerPositionReading> withoutSelf = new HashMap<>(trainerPositions); + Map<String, AkkamonNexus.MovementQueueReading> withoutSelf = new HashMap<>(trainerPositions); withoutSelf.remove(session.getTrainerId()); HeartBeatEvent heartBeat = new HeartBeatEvent( withoutSelf @@ -82,7 +82,23 @@ public class MessagingEngine implements AkkamonMessageEngine { @Override public void removeTrainerSessionFromScene(String sceneId, AkkamonSession session) { + this.sceneIdToAkkamonSessions.get(sceneId).remove(session); + } + + @Override + public void trainerDisconnected(AkkamonSession session) { + String sceneId = null; + for (Map.Entry<String, Set<AkkamonSession>> entry : this.sceneIdToAkkamonSessions.entrySet()) { + if (entry.getValue().contains(session)) sceneId = entry.getKey(); + } + system.tell(new AkkamonNexus.RequestTrainerOffline( + UUID.randomUUID().getMostSignificantBits() & Long.MAX_VALUE, + session.getTrainerId(), + sceneId, + session, + system + )); } void incoming(AkkamonSession session, String message) { @@ -123,7 +139,7 @@ public class MessagingEngine implements AkkamonMessageEngine { ); break; case TRAINER_REGISTRATION: - String trainerId = String.valueOf(sceneIdToAkkamonSessions.size()); + String trainerId = String.valueOf(sceneIdToAkkamonSessions.get(sceneId) == null ? 0 : sceneIdToAkkamonSessions.get(sceneId).size() + 1); system.tell(new AkkamonNexus.RequestTrainerRegistration( trainerId, sceneId, @@ -141,5 +157,4 @@ public class MessagingEngine implements AkkamonMessageEngine { private void updatePositions() { } - } diff --git a/api/src/main/java/akkamon/api/models/EventType.java b/api/src/main/java/akkamon/api/models/EventType.java index be3b2b3..cb10cee 100644 --- a/api/src/main/java/akkamon/api/models/EventType.java +++ b/api/src/main/java/akkamon/api/models/EventType.java @@ -18,5 +18,7 @@ public enum EventType { @SerializedName("StopMoving") STOP_MOVING, + @SerializedName("SocketClosed") + SOCKET_CLOSED, } diff --git a/api/src/main/java/akkamon/api/models/HeartBeatEvent.java b/api/src/main/java/akkamon/api/models/HeartBeatEvent.java index 8db514b..b795feb 100644 --- a/api/src/main/java/akkamon/api/models/HeartBeatEvent.java +++ b/api/src/main/java/akkamon/api/models/HeartBeatEvent.java @@ -5,10 +5,10 @@ import akkamon.domain.AkkamonNexus; import java.util.Map; public class HeartBeatEvent extends Event { - public Map<String, AkkamonNexus.TrainerPositionReading> remoteTrainerPositions; + public Map<String, AkkamonNexus.MovementQueueReading> remoteMovementQueues; - public HeartBeatEvent(Map<String, AkkamonNexus.TrainerPositionReading> remoteTrainerPositions) { + public HeartBeatEvent(Map<String, AkkamonNexus.MovementQueueReading> remoteMovementQueues) { this.type = EventType.HEART_BEAT; - this.remoteTrainerPositions = remoteTrainerPositions; + this.remoteMovementQueues = remoteMovementQueues; } } diff --git a/api/src/main/java/akkamon/api/models/SocketClosedEvent.java b/api/src/main/java/akkamon/api/models/SocketClosedEvent.java new file mode 100644 index 0000000..1f8a2a0 --- /dev/null +++ b/api/src/main/java/akkamon/api/models/SocketClosedEvent.java @@ -0,0 +1,8 @@ +package akkamon.api.models; + +public class SocketClosedEvent extends Event { + + public SocketClosedEvent() { + this.type = EventType.SOCKET_CLOSED; + } +} diff --git a/client/src/RemotePlayerEngine.ts b/client/src/RemotePlayerEngine.ts index f19ebe0..5d85f24 100644 --- a/client/src/RemotePlayerEngine.ts +++ b/client/src/RemotePlayerEngine.ts @@ -1,14 +1,84 @@ import Phaser from 'phaser'; +import type AkkamonStartScene from './scene'; import { akkamonClient } from './app'; +import type { Direction } from './Direction'; +import { + Queue, + RemotePlayerSprite +} from './RemotePlayerSprite'; +import type { + RemoteMovementQueues +} from './events'; export class RemotePlayerEngine { - private scene: Phaser.Scene + private scene: AkkamonStartScene; - constructor(scene: Phaser.Scene) { + private trainerIdToRemotePlayerSprite: Map<string, RemotePlayerSprite> = new Map(); + + constructor(scene: AkkamonStartScene) { this.scene = scene; } - update() { + push(remoteMovementQueues: RemoteMovementQueues) { + this.updateMembers(remoteMovementQueues); + this.pushMovesToSprites(remoteMovementQueues); + } + + pushMovesToSprites(remoteMovementQueues: RemoteMovementQueues) { + this.trainerIdToRemotePlayerSprite.forEach((remoteSprite: RemotePlayerSprite, key: string) => { + remoteSprite.push(remoteMovementQueues[key].value); + }); + } + + update(delta: number): void { + this.trainerIdToRemotePlayerSprite.forEach((remoteSprite: RemotePlayerSprite, key: string) => { + if (remoteSprite.isMoving()) { + console.log("remote player currently walking"); + remoteSprite.updatePixelPosition(delta); + } else if (remoteSprite.hasMovesLeft()) { + console.log("remote player starts moving"); + remoteSprite.startMoving(); + } + }); + } + + updateMembers(newRemoteMovementQueues: RemoteMovementQueues) { + const traineridToQueueValue = newRemoteMovementQueues; + + Object.keys(newRemoteMovementQueues).forEach((key: string) => { + + var moveQueue = traineridToQueueValue[key].value; + if (moveQueue !== undefined) { + + // console.log("-> key: " + key + " has position " + newTilePos.x + ", " + newTilePos.y); + + if (!this.trainerIdToRemotePlayerSprite.has(key)) { + // console.log("adding remote player sprite for " + key); + this.trainerIdToRemotePlayerSprite.set(key, + new RemotePlayerSprite({ + scene: this.scene, + tilePos: new Phaser.Math.Vector2(this.scene.spawnPointTilePos!), + texture: this.scene.textures.get("atlas"), + frame: "misa-front", + moveQueue: new Queue(moveQueue) + } + )); + } else { + // console.log("key: " + key + " already had a sprite!"); + } + } + + }); + + this.trainerIdToRemotePlayerSprite.forEach((value: RemotePlayerSprite, key: string) => { + if (!(key in newRemoteMovementQueues)) { + // console.log("removing remote player sprite for " + key); + this.trainerIdToRemotePlayerSprite.get(key)!.destroy(); + this.trainerIdToRemotePlayerSprite.delete(key); + } else { + // console.log("Player " + key + " was not removed!"); + } + }); } } diff --git a/client/src/RemotePlayerSprite.ts b/client/src/RemotePlayerSprite.ts index 8485803..6103d09 100644 --- a/client/src/RemotePlayerSprite.ts +++ b/client/src/RemotePlayerSprite.ts @@ -1 +1,138 @@ import Phaser from 'phaser'; +import AkkamonStartScene from './scene'; +import { PlayerSprite } from './sprite'; +import { GridPhysics } from './GridPhysics'; +import { Direction } from './Direction'; + +export class Queue<T> { + private _data = new Array(); + + constructor(data?: Array<T>) { + if (data !== undefined) { + this._data = data; + } + } + + push(element: T): void { + this._data.push(element); + } + + pushArray(arr: T[]): void { + for (var element of arr) { + this._data.push(element); + } + } + + pop(): T | undefined { + return this._data.shift(); + } + + isEmpty(): boolean { + return this._data.length == 0; + } + + peek() { + return this._data[0]; + } +} + +type RemotePlayerSpriteConfig = { + scene: Phaser.Scene, + tilePos: Phaser.Math.Vector2, + texture: Phaser.Textures.Texture | string, + frame?: string, + moveQueue: Queue<Direction> +} + +export class RemotePlayerSprite extends PlayerSprite { + + private lastTilePos?: Phaser.Math.Vector2; + private moveQueue: Queue<Direction> = new Queue(); + + private movementDirection: Direction = Direction.NONE; + + private speedPixelsPerSecond: number = AkkamonStartScene.TILE_SIZE * 4; + + private tileSizePixelsWalked: number = 0; + + constructor(config: RemotePlayerSpriteConfig) { + super(config); + } + + push(moveQueue: Array<Direction>): void { + for (var direction of moveQueue) { + if (direction !== Direction.NONE) { + this.moveQueue.push(direction); + } + } + // console.log(this.moveQueue); + } + + updatePixelPosition(delta: number): void { + const pixelsToWalkThisUpdate = this.getPixelsToWalk(delta); + + if (!this.willCrossTileBorderThisUpdate(pixelsToWalkThisUpdate)) { + this.move(pixelsToWalkThisUpdate); + } else if (this.shouldContinueMoving()) { + this.move(pixelsToWalkThisUpdate); + } else { + this.move(AkkamonStartScene.TILE_SIZE - this.tileSizePixelsWalked); + this.stopMoving(); + } + } + + shouldContinueMoving(): boolean { + if (this.moveQueue.peek() == this.movementDirection) { + console.log("continueing to move."); + this.moveQueue.pop(); + return true; + } + return false; + } + + willCrossTileBorderThisUpdate(pixelsToWalkThisUpdate: number): boolean { + return (this.tileSizePixelsWalked + pixelsToWalkThisUpdate) >= AkkamonStartScene.TILE_SIZE; + } + + move(pixelsToMove: number): void { + this.tileSizePixelsWalked += pixelsToMove; + this.tileSizePixelsWalked %= AkkamonStartScene.TILE_SIZE; + + const directionVec = GridPhysics.movementDirectionVectors[this.movementDirection]!.clone(); + + const moveVec = directionVec.multiply( + new Phaser.Math.Vector2(pixelsToMove) + ); + + const newPosition = this.getPosition().add(moveVec); + this.newPosition(newPosition); + } + + getPixelsToWalk(delta: number): number { + const deltaInSeconds = delta / 1000; + return this.speedPixelsPerSecond * deltaInSeconds; + } + + hasMovesLeft(): boolean { + return !this.moveQueue.isEmpty(); + } + + isMoving(): boolean { + return this.movementDirection !== Direction.NONE; + } + + startMoving(): void { + if (!this.moveQueue.isEmpty()) { + this.movementDirection = this.moveQueue.pop()!; + this.startAnimation(this.movementDirection); + // console.log("remote player now walking in direction: " + this.movementDirection); + } else { + // console.log("moveQueue empty!"); + } + } + + stopMoving(): void { + this.stopAnimation(this.movementDirection); + this.movementDirection = Direction.NONE; + } +} diff --git a/client/src/client.ts b/client/src/client.ts index 77d6676..f166c88 100644 --- a/client/src/client.ts +++ b/client/src/client.ts @@ -1,10 +1,13 @@ import type AkkamonSession from './session'; -import type { GameState } from './GameState'; import { Socket } from './socket'; +import type { GridPhysics } from './GridPhysics'; +import type { RemotePlayerEngine } from './RemotePlayerEngine'; + import { EventType, HeartBeatReplyEvent, - AkkamonEvent + IncomingEvent, + AkkamonEvent, } from './events'; @@ -12,7 +15,8 @@ export class Client { private session: AkkamonSession; - private akkamonState?: GameState; + private gridPhysics?: GridPhysics; + private remotePlayerEngine?: RemotePlayerEngine; constructor( url: string @@ -20,18 +24,21 @@ export class Client this.session = new Socket(url, this); } - getMutableState(): GameState { - return this.akkamonState!; - } - in(eventString: string) { - let event: AkkamonEvent = JSON.parse(eventString); - console.log("-> client is handling incoming event:"); - console.log(event); + let event: IncomingEvent = JSON.parse(eventString); switch (event.type) { case EventType.HEART_BEAT: + if (this.remotePlayerEngine !== undefined) { + this.remotePlayerEngine.push(event.remoteMovementQueues!); + } this.send(new HeartBeatReplyEvent()); break; + case EventType.PLAYERS_NEARBY: + this.ui.setPlayersNearby(); + break; + default: + console.log("ignored incoming event, doesn't match EventType interface."); + break; } } @@ -42,4 +49,9 @@ export class Client this.session.send(JSON.stringify(event)); } } + + setRemotePlayerEngine(engine: RemotePlayerEngine) { + this.remotePlayerEngine = engine; + } + } diff --git a/client/src/events.ts b/client/src/events.ts index cc001f7..1954fd7 100644 --- a/client/src/events.ts +++ b/client/src/events.ts @@ -1,6 +1,4 @@ import Phaser from 'phaser'; -import type { Player } from './player'; -import type { GameState } from './GameState'; import type { Direction } from './Direction'; export enum EventType { @@ -15,6 +13,16 @@ export interface AkkamonEvent { type: EventType } +export type TrainerPosition = { x: number, y: number } + +export type RemoteMovementQueues = { + [trainerId: string]: { value: Array<Direction> } +} + +export interface IncomingEvent extends AkkamonEvent { + remoteMovementQueues?: RemoteMovementQueues +} + export class PlayerRegistrationEvent implements AkkamonEvent { public type: EventType = EventType.PLAYER_REGISTRATION; diff --git a/client/src/scene.ts b/client/src/scene.ts index 52edbdd..b6924d4 100644 --- a/client/src/scene.ts +++ b/client/src/scene.ts @@ -1,6 +1,4 @@ import Phaser from 'phaser'; -import { akkamonClient } from './app'; -import { Player } from './player'; import { PlayerSprite } from './sprite'; import { GridControls } from './GridControls'; @@ -9,10 +7,7 @@ import { Direction } from './Direction'; import { RemotePlayerEngine } from './RemotePlayerEngine'; - -type RemotePlayerStates = { - [name: string]: Player -} +import { akkamonClient } from './app'; export default class AkkamonStartScene extends Phaser.Scene { @@ -24,6 +19,11 @@ export default class AkkamonStartScene extends Phaser.Scene private remotePlayerEngine?: RemotePlayerEngine + public spawnPointTilePos?: { + x: number, + y: number + }; + directionToAnimation: { [key in Direction]: string } = { @@ -34,7 +34,6 @@ export default class AkkamonStartScene extends Phaser.Scene [Direction.NONE]: "misa-front-walk" } - remotePlayerSprites: {[name: string]: PlayerSprite} = {}; spawnPoint: Phaser.Types.Tilemaps.TiledObject | undefined; @@ -88,19 +87,15 @@ export default class AkkamonStartScene extends Phaser.Scene Math.floor(this.spawnPoint.x! / AkkamonStartScene.TILE_SIZE), Math.floor(this.spawnPoint.y! / AkkamonStartScene.TILE_SIZE), ); + this.spawnPointTilePos = tilePos; let player = new PlayerSprite({ scene: this, tilePos: tilePos, texture: this.textures.get("atlas"), frame: "misa-front", - player: new Player({ - trainerId: 'ash', - position: tilePos - })// this.akkamonState.getLocalMutablePlayerState(), }); - this.add.existing(player); this.gridPhysics = new GridPhysics(player, map); this.gridControls = new GridControls( this.input, @@ -108,6 +103,7 @@ export default class AkkamonStartScene extends Phaser.Scene ); this.remotePlayerEngine = new RemotePlayerEngine(this); + akkamonClient.setRemotePlayerEngine(this.remotePlayerEngine); this.createPlayerAnimation(Direction.LEFT, 0, 3); this.createPlayerAnimation(Direction.RIGHT, 0, 3); @@ -120,12 +116,14 @@ export default class AkkamonStartScene extends Phaser.Scene camera.roundPixels = true; camera.setBounds(0, 0, map.widthInPixels, map.heightInPixels); + this.scene.launch('AkkamonUI'); } update(time: number, delta: number) { this.gridControls!.update(); this.gridPhysics!.update(delta); + this.remotePlayerEngine!.update(delta); } private createPlayerAnimation(direction: Direction, start: number, end: number) { diff --git a/client/src/socket.ts b/client/src/socket.ts index 8e2b9ef..4e83f89 100644 --- a/client/src/socket.ts +++ b/client/src/socket.ts @@ -14,17 +14,11 @@ export class Socket extends WebSocket implements AkkamonSession super(url); this.onopen = function echo(this: WebSocket, ev: Event) { - console.log("opening socket"); - console.log("this is the websocket"); - console.log(this); - console.log("logging in the session to the server"); + console.log("Sending PlayerRegistrationEvent."); client.send(new PlayerRegistrationEvent()); } this.onmessage = function incomingMessage(this: WebSocket, ev: MessageEvent) { - //console.log("received message from the server!"); - console.log("-> " + ev.data); - // console.log("calling client.in:"); client.in(ev.data); } } diff --git a/client/src/sprite.ts b/client/src/sprite.ts index 960d374..948d8d9 100644 --- a/client/src/sprite.ts +++ b/client/src/sprite.ts @@ -1,6 +1,5 @@ import Phaser from 'phaser'; import AkkamonStartScene from './scene'; -import type { Player } from './player'; import type { Direction } from './Direction'; type PlayerSpriteConfig = { @@ -8,16 +7,13 @@ type PlayerSpriteConfig = { tilePos: Phaser.Math.Vector2, texture: Phaser.Textures.Texture | string, frame?: string, - player: Player, } interface AkkamonPlayerSprite extends Phaser.GameObjects.Sprite { - player: Player } export class PlayerSprite extends Phaser.GameObjects.Sprite implements AkkamonPlayerSprite { - player: Player; tilePos: Phaser.Math.Vector2; constructor(config: PlayerSpriteConfig) { @@ -30,10 +26,12 @@ export class PlayerSprite extends Phaser.GameObjects.Sprite implements AkkamonPl config.texture, config.frame); - this.player = config.player; this.tilePos = new Phaser.Math.Vector2(config.tilePos.x, config.tilePos.y); this.setOrigin(0.5, 1); + + // add to scene! + config.scene.add.existing(this); } getPosition(): Phaser.Math.Vector2 { diff --git a/client/src/uiScene.ts b/client/src/uiScene.ts new file mode 100644 index 0000000..8b92954 --- /dev/null +++ b/client/src/uiScene.ts @@ -0,0 +1,17 @@ + +export class AkkamonUI extends Phaser.Scene +{ + constructor () + { + super('AkkamonUI'); + } + + preload() { + + } + + create () { + + } + +} diff --git a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java index eb998ba..05efc32 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java +++ b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java @@ -4,9 +4,11 @@ import java.util.Map; public interface AkkamonMessageEngine { // broadcasts position info to WebSocket Clients - void broadCastHeartBeatToScene(String sceneId, Map<String, AkkamonNexus.TrainerPositionReading> trainerPositions); + void broadCastHeartBeatToScene(String sceneId, Map<String, AkkamonNexus.MovementQueueReading> trainerPositions); void registerTrainerSessionToScene(String sceneId, AkkamonSession session); void removeTrainerSessionFromScene(String sceneId, AkkamonSession session); + + void trainerDisconnected(AkkamonSession session); } diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java index 9fd2763..a55e728 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonNexus.java +++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java @@ -10,6 +10,7 @@ import akka.actor.typed.javadsl.Receive; import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Queue; public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { @@ -126,57 +127,71 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { } } + public static class RequestTrainerOffline + implements Command, SceneTrainerGroup.Command, Trainer.Command { + public long requestId; + public String trainerId; + public String sceneId; + public AkkamonSession session; + public ActorRef<AkkamonNexus.Command> replyTo; + + public RequestTrainerOffline(long requestId, String trainerId, String sceneId, AkkamonSession session, ActorRef<Command> replyTo) { + this.requestId = requestId; + this.trainerId = trainerId; + this.sceneId = sceneId; + this.session = session; + this.replyTo = replyTo; + } + } + + public static class RespondTrainerOffline + implements Command { + public long requestId; + public String sceneId; + public AkkamonSession session; + + public RespondTrainerOffline(long requestId, String sceneId, AkkamonSession session) { + this.requestId = requestId; + this.sceneId = sceneId; + this.session = session; + } + } + public static class RespondHeartBeatQuery implements Command { public final long requestId; public final String sceneId; - public final Map<String, TrainerPositionReading> trainerPositions; + public final Map<String, MovementQueueReading> trainerMovementQueues; public RespondHeartBeatQuery( long requestId, String sceneId, - Map<String, TrainerPositionReading> trainerPositions) { + Map<String, MovementQueueReading> trainerPositions) { this.requestId = requestId; this.sceneId = sceneId; - this.trainerPositions = trainerPositions; + this.trainerMovementQueues = trainerPositions; } } - public interface TrainerPositionReading { } + public interface MovementQueueReading { } - public static class TrainerPosition implements TrainerPositionReading { - public final TilePos value; + public static class MovementQueue implements MovementQueueReading { + public final Queue<Direction> value; - public TrainerPosition(TilePos value) { + public MovementQueue(Queue<Direction> value) { this.value = value; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TrainerPosition other = (TrainerPosition) o; - - return this.value.x == other.value.x && this.value.y == other.value.y; - } - - @Override - public String toString() { - return "TrainerPosition={x: " + value.x + ", y: " + value.y + "}"; - } - } - public enum TrainerPositionNotAvailable implements TrainerPositionReading { + public enum MovementQueueEmpty implements MovementQueueReading { INSTANCE } - public enum TrainerOffline implements TrainerPositionReading { + public enum TrainerOffline implements MovementQueueReading { INSTANCE } - public enum TrainerTimedOut implements TrainerPositionReading { + public enum TrainerTimedOut implements MovementQueueReading { INSTANCE } @@ -198,6 +213,8 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { return newReceiveBuilder() .onMessage(RequestTrainerRegistration.class, this::onTrainerRegistration) .onMessage(TrainerRegistered.class, this::onTrainerRegistered) + .onMessage(RequestTrainerOffline.class, this::onTrainerOfflineRequest) + .onMessage(RespondTrainerOffline.class, this::onTrainerOffline) .onMessage(RequestHeartBeat.class, this::onHeartBeat) .onMessage(RespondHeartBeatQuery.class, this::onHeartBeatQueryResponse) .onMessage(RequestStartMoving.class, this::onStartMoving) @@ -206,19 +223,37 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> { .build(); } + private AkkamonNexus onTrainerOffline(RespondTrainerOffline trainerOfflineMsg) { + getContext().getLog().info("Removing {} from akkamon sessions!", trainerOfflineMsg.session.getTrainerId()); + messageEngine.removeTrainerSessionFromScene(trainerOfflineMsg.sceneId, trainerOfflineMsg.session); + return this; + } + + private AkkamonNexus onTrainerOfflineRequest(RequestTrainerOffline trainerOfflineRequest) { + ActorRef<SceneTrainerGroup.Command> sceneTrainerGroup = sceneIdToActor.get( + trainerOfflineRequest.sceneId + ); + if (sceneTrainerGroup != null) { + sceneTrainerGroup.tell(trainerOfflineRequest); + } else { + getContext().getLog().info("Ignoring trainerOffline request in scene {}, it isn't mapped to a sceneTrainerActor."); + } + return this; + } + private AkkamonNexus onHeartBeatQueryResponse(RespondHeartBeatQuery response) { // Turn on for logging StringBuilder positions = new StringBuilder(); positions.append("\n" + response.sceneId.toUpperCase(Locale.ROOT) + "\n"); - for (Map.Entry<String, TrainerPositionReading> entry : response.trainerPositions.entrySet()) { + for (Map.Entry<String, MovementQueueReading> entry : response.trainerMovementQueues.entrySet()) { positions.append(entry.getKey() + ": " +entry.getValue()); positions.append("\n"); } getContext().getLog().info(String.valueOf(positions)); - messageEngine.broadCastHeartBeatToScene(response.sceneId, response.trainerPositions); + messageEngine.broadCastHeartBeatToScene(response.sceneId, response.trainerMovementQueues); return this; } diff --git a/domain/src/main/java/akkamon/domain/HeartBeatQuery.java b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java index 4be99ec..5e88ff5 100644 --- a/domain/src/main/java/akkamon/domain/HeartBeatQuery.java +++ b/domain/src/main/java/akkamon/domain/HeartBeatQuery.java @@ -5,10 +5,7 @@ import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.*; import java.time.Duration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> { @@ -18,11 +15,11 @@ public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> { INSTANCE } - static class WrappedRespondTrainerPosition implements Command { - final Trainer.RespondTrainerPosition response; + static class WrappedRespondMovementQueue implements Command { + final Trainer.RespondMovementQueue response; - WrappedRespondTrainerPosition( - Trainer.RespondTrainerPosition response + WrappedRespondMovementQueue( + Trainer.RespondMovementQueue response ) { this.response = response; } @@ -62,7 +59,7 @@ public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> { private final long requestId; private final String sceneId; private final ActorRef<AkkamonNexus.Command> requester; - private Map<String, AkkamonNexus.TrainerPositionReading> repliesSoFar = new HashMap<String, AkkamonNexus.TrainerPositionReading>(); + private Map<String, AkkamonNexus.MovementQueueReading> repliesSoFar = new HashMap<String, AkkamonNexus.MovementQueueReading>(); private final Set<String> stillWaiting; public HeartBeatQuery( @@ -80,13 +77,13 @@ public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> { timers.startSingleTimer(CollectionTimeout.INSTANCE, timeout); - ActorRef<Trainer.RespondTrainerPosition> respondTrainerPositionAdapter = - context.messageAdapter(Trainer.RespondTrainerPosition.class, WrappedRespondTrainerPosition::new); + ActorRef<Trainer.RespondMovementQueue> respondTrainerPositionAdapter = + context.messageAdapter(Trainer.RespondMovementQueue.class, WrappedRespondMovementQueue::new); for (Map.Entry<String, ActorRef<Trainer.Command>> entry : trainerIdToActor.entrySet()) { context.watchWith(entry.getValue(), new TrainerOffline(entry.getKey())); entry.getValue().tell( - new Trainer.ReadTrainerPosition( + new Trainer.ReadMovementQueue( 0L, respondTrainerPositionAdapter ) @@ -98,19 +95,22 @@ public class HeartBeatQuery extends AbstractBehavior<HeartBeatQuery.Command> { @Override public Receive<Command> createReceive() { return newReceiveBuilder() - .onMessage(WrappedRespondTrainerPosition.class, this::onRespondTrainerPosition) + .onMessage(WrappedRespondMovementQueue.class, this::onRespondMovementQueue) .build(); } - private Behavior<Command> onRespondTrainerPosition(WrappedRespondTrainerPosition r) { - AkkamonNexus.TrainerPositionReading trainerPositionRead = - r.response - .value - .map(optionalValue -> (AkkamonNexus.TrainerPositionReading) new AkkamonNexus.TrainerPosition(optionalValue)) - .orElse(AkkamonNexus.TrainerPositionNotAvailable.INSTANCE); + private Behavior<Command> onRespondMovementQueue(WrappedRespondMovementQueue r) { + AkkamonNexus.MovementQueueReading movementQueueRead = null; + if (r.response.value.size() != 0) { + movementQueueRead = new AkkamonNexus.MovementQueue(r.response.value); + } else { + Queue<Direction> queue = new LinkedList<>(); + queue.add(Direction.NONE); + movementQueueRead = new AkkamonNexus.MovementQueue(queue); + } String trainerId = r.response.trainerId; - repliesSoFar.put(trainerId, trainerPositionRead); + repliesSoFar.put(trainerId, movementQueueRead); stillWaiting.remove(trainerId); return respondWhenAllCollected(); diff --git a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java index 29778ec..e8ace82 100644 --- a/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java +++ b/domain/src/main/java/akkamon/domain/SceneTrainerGroup.java @@ -15,15 +15,18 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman public interface Command { } - private class TrainerOffline implements Command { + public static class TrainerOffline + implements Command, AkkamonNexus.Command { public ActorRef<Trainer.Command> trainer; public String sceneId; public String trainerId; + public ActorRef<AkkamonNexus.Command> replyTo; - public TrainerOffline(ActorRef<Trainer.Command> trainerActor, String sceneId, String trainerId) { + public TrainerOffline(ActorRef<Trainer.Command> trainerActor, String sceneId, String trainerId, ActorRef<AkkamonNexus.Command> replyTo) { this.trainer = trainerActor; this.sceneId = sceneId; this.trainerId = trainerId; + this.replyTo = replyTo; } } @@ -50,6 +53,14 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman this::onTrainerRegistration ) .onMessage( + AkkamonNexus.RequestTrainerOffline.class, + this::onTrainerOfflineRequest + ) + .onMessage( + TrainerOffline.class, + this::onWatchedTrainerOffline + ) + .onMessage( AkkamonNexus.RequestStartMoving.class, this::onStartMoving ) @@ -68,6 +79,41 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman .build(); } + private SceneTrainerGroup onWatchedTrainerOffline(TrainerOffline trainerOfflineMsg) { + trainerOfflineMsg.replyTo.tell(trainerOfflineMsg); + trainerIdToActor.remove(trainerOfflineMsg.trainerId); + return this; + } + + private SceneTrainerGroup onTrainerOfflineRequest(AkkamonNexus.RequestTrainerOffline trainerOfflineRequest) { + if (this.sceneId.equals(trainerOfflineRequest.sceneId)) { + ActorRef<Trainer.Command> trainerActor = trainerIdToActor.get(trainerOfflineRequest.trainerId); + if (trainerActor != null) { + trainerActor.tell(trainerOfflineRequest); + trainerOfflineRequest.replyTo.tell(new AkkamonNexus.RespondTrainerOffline( + trainerOfflineRequest.requestId, + trainerOfflineRequest.sceneId, + trainerOfflineRequest.session + )); + } else { + getContext() + .getLog() + .warn( + "Ignoring trainerOffline for trainerId {}. There is no actor mapped to it.", + trainerOfflineRequest.trainerId + ); + } + } else { + getContext() + .getLog() + .warn( + "Ignoring trainerOffline for {}. This actor is responsible for {}.", + trainerOfflineRequest.sceneId, + this.sceneId); + } + return this; + } + private SceneTrainerGroup onHeartBeat(AkkamonNexus.RequestHeartBeat heartBeatRequest) { Map<String, ActorRef<Trainer.Command>> trainerIdToActorCopy = new HashMap<>(this.trainerIdToActor); getContext() @@ -171,7 +217,7 @@ public class SceneTrainerGroup extends AbstractBehavior<SceneTrainerGroup.Comman getContext() .spawn(Trainer.create(sceneId, registrationRequest.trainerId), "trainer-" + registrationRequest.trainerId); getContext() - .watchWith(trainerActor, new SceneTrainerGroup.TrainerOffline(trainerActor, sceneId, registrationRequest.trainerId)); + .watchWith(trainerActor, new SceneTrainerGroup.TrainerOffline(trainerActor, sceneId, registrationRequest.trainerId, registrationRequest.replyTo)); trainerIdToActor.put(registrationRequest.trainerId, trainerActor); registrationRequest.replyTo.tell(new AkkamonNexus.TrainerRegistered( registrationRequest.trainerId, diff --git a/domain/src/main/java/akkamon/domain/Trainer.java b/domain/src/main/java/akkamon/domain/Trainer.java index 0a3b397..710f844 100644 --- a/domain/src/main/java/akkamon/domain/Trainer.java +++ b/domain/src/main/java/akkamon/domain/Trainer.java @@ -7,31 +7,33 @@ import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; +import java.util.LinkedList; import java.util.Optional; +import java.util.Queue; public class Trainer extends AbstractBehavior<Trainer.Command> { public interface Command { } - public static class ReadTrainerPosition implements Command { + public static class ReadMovementQueue implements Command { final long requestId; - final ActorRef<RespondTrainerPosition> replyTo; + final ActorRef<RespondMovementQueue> replyTo; - public ReadTrainerPosition(long requestId, ActorRef<RespondTrainerPosition> replyTo) { + public ReadMovementQueue(long requestId, ActorRef<RespondMovementQueue> replyTo) { this.requestId = requestId; this.replyTo = replyTo; } } - public static final class RespondTrainerPosition { + public static final class RespondMovementQueue { final long requestId; final String trainerId; - final Optional<TilePos> value; + final Queue<Direction> value; - public RespondTrainerPosition( + public RespondMovementQueue( long requestId, String trainerId, - Optional<TilePos> value + Queue<Direction> value ) { this.requestId = requestId; this.trainerId = trainerId; @@ -46,6 +48,10 @@ public class Trainer extends AbstractBehavior<Trainer.Command> { private String sceneId; private String trainerId; + private Queue<Direction> movementQueue = new LinkedList<>(); + + private Direction movementDirection = Direction.NONE; + private Optional<TilePos> lastValidTilePos = Optional.empty(); public Trainer(ActorContext<Command> context, String sceneId, String trainerId) { @@ -58,8 +64,12 @@ public class Trainer extends AbstractBehavior<Trainer.Command> { public Receive<Command> createReceive() { return newReceiveBuilder() .onMessage( - ReadTrainerPosition.class, - this::onReadTrainerPosition + ReadMovementQueue.class, + this::onReadMovementQueue + ) + .onMessage( + AkkamonNexus.RequestTrainerOffline.class, + this::onTrainerOffline ) .onMessage( AkkamonNexus.RequestStartMoving.class, @@ -75,29 +85,42 @@ public class Trainer extends AbstractBehavior<Trainer.Command> { .build(); } - private Trainer onReadTrainerPosition(ReadTrainerPosition readTrainerPositionRequest) { - readTrainerPositionRequest.replyTo.tell(new RespondTrainerPosition( + private Behavior<Command> onTrainerOffline(AkkamonNexus.RequestTrainerOffline trainerOfflineRequest) { + getContext().getLog().info("Trainer {} went offline, the actor has stopped! My supervisor should handle closing my connection!"); + return Behaviors.stopped(); + } + + private Trainer onReadMovementQueue(ReadMovementQueue readTrainerPositionRequest) { + readTrainerPositionRequest.replyTo.tell(new RespondMovementQueue( readTrainerPositionRequest.requestId, trainerId, - lastValidTilePos + new LinkedList<>(movementQueue) )); + this.movementQueue.clear(); return this; } private Trainer onNewTilePos(AkkamonNexus.RequestNewTilePos newTilePosRequest) { getContext().getLog().info("Trainer {} has new {}.", trainerId, newTilePosRequest.tilePos); - lastValidTilePos = Optional.of(newTilePosRequest.tilePos); + if (isMoving()) { + this.movementQueue.add(this.movementDirection); + } return this; } private Trainer onStopMoving(AkkamonNexus.RequestStopMoving stopMovingRequest) { getContext().getLog().info("Trainer {} stops to move {}.", trainerId, stopMovingRequest.direction); + this.movementDirection = Direction.NONE; return this; } private Trainer onStartMoving(AkkamonNexus.RequestStartMoving startMovingRequest) { getContext().getLog().info("Trainer {} starts to move {}.", trainerId, startMovingRequest.direction); + this.movementDirection = startMovingRequest.direction; return this; } + private boolean isMoving() { + return this.movementDirection != Direction.NONE; + } } |
