summaryrefslogtreecommitdiff
path: root/domain/src/main/java
diff options
context:
space:
mode:
authorMike Vink <mike1994vink@gmail.com>2021-07-29 03:34:50 +0200
committerMike Vink <mike1994vink@gmail.com>2021-07-29 03:34:50 +0200
commit4a354cd7a4edb203dd5e66355dbed0e62994e09b (patch)
treed42d20fb1bc6da7b80ad8c0c94d44b70c5f54fc8 /domain/src/main/java
parent9e675c3652eb7a16ce5c2a865c030c76653c921e (diff)
feat(): handshaker part1
Diffstat (limited to 'domain/src/main/java')
-rw-r--r--domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java4
-rw-r--r--domain/src/main/java/akkamon/domain/AkkamonNexus.java32
-rw-r--r--domain/src/main/java/akkamon/domain/InteractionHandshaker.java80
3 files changed, 105 insertions, 11 deletions
diff --git a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java
index b3ab9b2..075726d 100644
--- a/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java
+++ b/domain/src/main/java/akkamon/domain/AkkamonMessageEngine.java
@@ -1,5 +1,7 @@
package akkamon.domain;
+import akka.actor.typed.ActorRef;
+
import java.util.List;
import java.util.Map;
@@ -7,7 +9,7 @@ public interface AkkamonMessageEngine {
// broadcasts position info to WebSocket Clients
void broadCastHeartBeatToScene(String sceneId, Map<String, AkkamonNexus.MovementQueueReading> trainerPositions);
- void broadCastInteractionRequestToSessionWithTrainerIds(List<String> trainerIds, String type, String trainerId, String requestName);
+ void broadCastInteractionRequestToSessionWithTrainerIds(List<String> trainerIds, String type, String trainerId, String requestName, ActorRef<InteractionHandshaker.Command> handshaker);
void registerTrainerSessionToSceneAndTrainerIdMaps(String sceneId, AkkamonSession session);
diff --git a/domain/src/main/java/akkamon/domain/AkkamonNexus.java b/domain/src/main/java/akkamon/domain/AkkamonNexus.java
index 003630f..9099617 100644
--- a/domain/src/main/java/akkamon/domain/AkkamonNexus.java
+++ b/domain/src/main/java/akkamon/domain/AkkamonNexus.java
@@ -15,6 +15,22 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
public interface Command {}
+
+ public static class RespondInteractionHandshaker implements Command {
+ public String requestName;
+ public boolean allRepliedInTime;
+
+ public RespondInteractionHandshaker(String requestName, boolean allRepliedInTime) {
+ this.requestName = requestName;
+ this.allRepliedInTime = allRepliedInTime;
+ }
+
+ }
+
+ public static class InteractionReply implements Command {
+ public String trainerId;
+ }
+
public static class RequestInteraction
implements Command {
@@ -239,24 +255,32 @@ public class AkkamonNexus extends AbstractBehavior<AkkamonNexus.Command> {
.onMessage(RequestStopMoving.class, this::onStopMoving)
.onMessage(RequestNewTilePos.class, this::onNewTilePos)
.onMessage(RequestInteraction.class, this::onInteractionRequest)
+ .onMessage(RespondInteractionHandshaker.class, this::onInteractionHandshakerResponse)
.build();
}
+ private Behavior<Command> onInteractionHandshakerResponse(RespondInteractionHandshaker r) {
+
+ return this;
+ }
+
private AkkamonNexus onInteractionRequest(RequestInteraction interactionRequest) {
List<String> needConfirmation = interactionRequest.forwardTo;
getContext().getLog().info("Creating interactionHandshaker of type {} from {} to {} ", interactionRequest.type, interactionRequest.trainerId, interactionRequest.forwardTo);
String requestName = "interaction-handshaker-" + interactionRequest.type + "-" + interactionRequest.trainerId + "-" + interactionRequest.requestId;
- getContext().spawn(InteractionHandshaker.create(
+
+ ActorRef<InteractionHandshaker.Command> handshaker = getContext().spawn(InteractionHandshaker.create(
interactionRequest.trainerId,
+ interactionRequest.type,
interactionRequest.forwardTo,
- interactionRequest.requestId,
+ requestName,
interactionRequest.replyTo,
- Duration.ofSeconds(20)
+ Duration.ofSeconds(60)
), requestName);
- messageEngine.broadCastInteractionRequestToSessionWithTrainerIds(needConfirmation, interactionRequest.type, interactionRequest.trainerId, requestName);
+ messageEngine.broadCastInteractionRequestToSessionWithTrainerIds(needConfirmation, interactionRequest.type, interactionRequest.trainerId, requestName, handshaker);
return this;
}
diff --git a/domain/src/main/java/akkamon/domain/InteractionHandshaker.java b/domain/src/main/java/akkamon/domain/InteractionHandshaker.java
index d6c35ea..6f47282 100644
--- a/domain/src/main/java/akkamon/domain/InteractionHandshaker.java
+++ b/domain/src/main/java/akkamon/domain/InteractionHandshaker.java
@@ -1,6 +1,5 @@
package akkamon.domain;
-import akka.actor.Timers;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.*;
@@ -8,6 +7,7 @@ import akka.actor.typed.javadsl.*;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class InteractionHandshaker extends AbstractBehavior<InteractionHandshaker.Command> {
@@ -15,11 +15,27 @@ public class InteractionHandshaker extends AbstractBehavior<InteractionHandshake
public interface Command {
}
+ public static enum HandshakeTimeout implements Command {
+ INSTANCE
+ }
+
+ static class WrappedReply implements Command {
+
+ final AkkamonNexus.InteractionReply reply;
+
+ WrappedReply(
+ AkkamonNexus.InteractionReply reply
+ ) {
+ this.reply = reply;
+ }
+
+ }
public static Behavior<Command> create(
String trainerId,
+ String type,
List<String> needingConfirmation,
- long requestId,
+ String requestName,
ActorRef<AkkamonNexus.Command> replyTo,
Duration timeout) {
@@ -28,8 +44,9 @@ public class InteractionHandshaker extends AbstractBehavior<InteractionHandshake
timers -> new InteractionHandshaker(
context,
trainerId,
+ type,
new HashSet(needingConfirmation),
- requestId,
+ requestName,
replyTo,
timeout,
timers
@@ -37,19 +54,70 @@ public class InteractionHandshaker extends AbstractBehavior<InteractionHandshake
));
}
+ private Set<String> stillWaiting;
+
+ private ActorRef<AkkamonNexus.Command> replyTo;
+
+ private String requestName;
+ private String type;
+
+ private Set<String> alreadyWaitingForInteraction = new HashSet<>();
+
public InteractionHandshaker(ActorContext<Command> context,
String trainerId,
- Set<String> needingConfirmation,
- long requestId,
+ String type,
+ Set<String> needingToShakeHands,
+ String requestName,
ActorRef<AkkamonNexus.Command> replyTo,
Duration timeout,
TimerScheduler<Command> timers) {
+
super(context);
+
+ timers.startSingleTimer(HandshakeTimeout.INSTANCE, timeout);
+
+ this.replyTo = replyTo;
+ this.requestName = requestName;
+ this.type = type;
+
+ ActorRef<AkkamonNexus.InteractionReply> respondTrainerPositionAdapter = context.messageAdapter(AkkamonNexus.InteractionReply.class, WrappedReply::new);
+
+
+ stillWaiting = needingToShakeHands;
}
@Override
public Receive<Command> createReceive() {
- return null;
+ return newReceiveBuilder()
+ .onMessage(WrappedReply.class, this::onReply)
+ .onMessage(HandshakeTimeout.class, this::onHandshakeTimeOut)
+ .build();
+ }
+
+ private Behavior<Command> onHandshakeTimeOut(HandshakeTimeout timeoutInstance) {
+ getContext().getLog().info("Received {}", timeoutInstance);
+ replyTo.tell(
+ new AkkamonNexus.RespondInteractionHandshaker(
+ requestName,
+ false
+ )
+ );
+ return Behaviors.stopped();
+ }
+
+
+ private Behavior<Command> onReply(WrappedReply w) {
+ getContext().getLog().info("received reply from {}!", w.reply.trainerId);
+ return respondIfAllRepliesReceived();
+ }
+
+ private Behavior<Command> respondIfAllRepliesReceived() {
+ if (this.stillWaiting.isEmpty()) {
+ // send response
+ return Behaviors.stopped();
+ } else {
+ return this;
+ }
}
}