From 4a354cd7a4edb203dd5e66355dbed0e62994e09b Mon Sep 17 00:00:00 2001 From: Mike Vink Date: Thu, 29 Jul 2021 03:34:50 +0200 Subject: feat(): handshaker part1 --- .../java/akkamon/domain/AkkamonMessageEngine.java | 4 +- .../src/main/java/akkamon/domain/AkkamonNexus.java | 32 +++++++-- .../java/akkamon/domain/InteractionHandshaker.java | 80 ++++++++++++++++++++-- 3 files changed, 105 insertions(+), 11 deletions(-) (limited to 'domain/src/main/java') diff --git a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java index b3ab9b2..075726d 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java +++ b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java @@ -1,5 +1,7 @@ package akkamon.domain; +import akka.actor.typed.ActorRef; + import java.util.List; import java.util.Map; @@ -7,7 +9,7 @@ public interface AkkamonMessageEngine { // broadcasts position info to WebSocket Clients void broadCastHeartBeatToScene(String sceneId, Map trainerPositions); - void broadCastInteractionRequestToSessionWithTrainerIds(List trainerIds, String type, String trainerId, String requestName); + void broadCastInteractionRequestToSessionWithTrainerIds(List trainerIds, String type, String trainerId, String requestName, ActorRef handshaker); void registerTrainerSessionToSceneAndTrainerIdMaps(String sceneId, AkkamonSession session); diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java index 003630f..9099617 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonNexus.java +++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java @@ -15,6 +15,22 @@ public class AkkamonNexus extends AbstractBehavior { public interface Command {} + + public static class RespondInteractionHandshaker implements Command { + public String requestName; + public boolean allRepliedInTime; + + public RespondInteractionHandshaker(String requestName, boolean allRepliedInTime) { + this.requestName = requestName; + this.allRepliedInTime = allRepliedInTime; + } + + } + + public static class InteractionReply implements Command { + public String trainerId; + } + public static class RequestInteraction implements Command { @@ -239,24 +255,32 @@ public class AkkamonNexus extends AbstractBehavior { .onMessage(RequestStopMoving.class, this::onStopMoving) .onMessage(RequestNewTilePos.class, this::onNewTilePos) .onMessage(RequestInteraction.class, this::onInteractionRequest) + .onMessage(RespondInteractionHandshaker.class, this::onInteractionHandshakerResponse) .build(); } + private Behavior onInteractionHandshakerResponse(RespondInteractionHandshaker r) { + + return this; + } + private AkkamonNexus onInteractionRequest(RequestInteraction interactionRequest) { List needConfirmation = interactionRequest.forwardTo; getContext().getLog().info("Creating interactionHandshaker of type {} from {} to {} ", interactionRequest.type, interactionRequest.trainerId, interactionRequest.forwardTo); String requestName = "interaction-handshaker-" + interactionRequest.type + "-" + interactionRequest.trainerId + "-" + interactionRequest.requestId; - getContext().spawn(InteractionHandshaker.create( + + ActorRef handshaker = getContext().spawn(InteractionHandshaker.create( interactionRequest.trainerId, + interactionRequest.type, interactionRequest.forwardTo, - interactionRequest.requestId, + requestName, interactionRequest.replyTo, - Duration.ofSeconds(20) + Duration.ofSeconds(60) ), requestName); - messageEngine.broadCastInteractionRequestToSessionWithTrainerIds(needConfirmation, interactionRequest.type, interactionRequest.trainerId, requestName); + messageEngine.broadCastInteractionRequestToSessionWithTrainerIds(needConfirmation, interactionRequest.type, interactionRequest.trainerId, requestName, handshaker); return this; } diff --git a/domain/src/main/java/akkamon/domain/InteractionHandshaker.java b/domain/src/main/java/akkamon/domain/InteractionHandshaker.java index d6c35ea..6f47282 100644 --- a/domain/src/main/java/akkamon/domain/InteractionHandshaker.java +++ b/domain/src/main/java/akkamon/domain/InteractionHandshaker.java @@ -1,6 +1,5 @@ package akkamon.domain; -import akka.actor.Timers; import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.*; @@ -8,6 +7,7 @@ import akka.actor.typed.javadsl.*; import java.time.Duration; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; public class InteractionHandshaker extends AbstractBehavior { @@ -15,11 +15,27 @@ public class InteractionHandshaker extends AbstractBehavior create( String trainerId, + String type, List needingConfirmation, - long requestId, + String requestName, ActorRef replyTo, Duration timeout) { @@ -28,8 +44,9 @@ public class InteractionHandshaker extends AbstractBehavior new InteractionHandshaker( context, trainerId, + type, new HashSet(needingConfirmation), - requestId, + requestName, replyTo, timeout, timers @@ -37,19 +54,70 @@ public class InteractionHandshaker extends AbstractBehavior stillWaiting; + + private ActorRef replyTo; + + private String requestName; + private String type; + + private Set alreadyWaitingForInteraction = new HashSet<>(); + public InteractionHandshaker(ActorContext context, String trainerId, - Set needingConfirmation, - long requestId, + String type, + Set needingToShakeHands, + String requestName, ActorRef replyTo, Duration timeout, TimerScheduler timers) { + super(context); + + timers.startSingleTimer(HandshakeTimeout.INSTANCE, timeout); + + this.replyTo = replyTo; + this.requestName = requestName; + this.type = type; + + ActorRef respondTrainerPositionAdapter = context.messageAdapter(AkkamonNexus.InteractionReply.class, WrappedReply::new); + + + stillWaiting = needingToShakeHands; } @Override public Receive createReceive() { - return null; + return newReceiveBuilder() + .onMessage(WrappedReply.class, this::onReply) + .onMessage(HandshakeTimeout.class, this::onHandshakeTimeOut) + .build(); + } + + private Behavior onHandshakeTimeOut(HandshakeTimeout timeoutInstance) { + getContext().getLog().info("Received {}", timeoutInstance); + replyTo.tell( + new AkkamonNexus.RespondInteractionHandshaker( + requestName, + false + ) + ); + return Behaviors.stopped(); + } + + + private Behavior onReply(WrappedReply w) { + getContext().getLog().info("received reply from {}!", w.reply.trainerId); + return respondIfAllRepliesReceived(); + } + + private Behavior respondIfAllRepliesReceived() { + if (this.stillWaiting.isEmpty()) { + // send response + return Behaviors.stopped(); + } else { + return this; + } } } -- cgit v1.2.3