diff options
Diffstat (limited to 'domain')
22 files changed, 1034 insertions, 17 deletions
diff --git a/domain/build.gradle b/domain/build.gradle index 2f385d0..d2d174a 100644 --- a/domain/build.gradle +++ b/domain/build.gradle @@ -1,8 +1,14 @@ plugins { id 'java' id 'java-library' + id 'idea' + id 'application' } +def versions = [ + ScalaBinary: "2.13" +] + version 'unspecified' repositories { @@ -11,10 +17,33 @@ repositories { } dependencies { - testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0' - testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0' + + implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.6.15") + implementation "com.typesafe.akka:akka-actor-typed_${versions.ScalaBinary}" + + implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}" + + implementation 'ch.qos.logback:logback-classic:1.2.3' + implementation 'junit:junit:4.12' + + + testImplementation "com.typesafe.akka:akka-actor-testkit-typed_${versions.ScalaBinary}" + testImplementation "com.typesafe.akka:akka-stream-testkit_${versions.ScalaBinary}" + + testImplementation 'junit:junit:4.12' + testRuntimeOnly 'junit:junit:4.12' + } test { useJUnitPlatform() -}
\ No newline at end of file +} + +application { + // Define the main class for the application. + mainClassName = 'akkamon.domain.iot.IotEntryPoint' +} + +run { + standardInput = System.in +} diff --git a/domain/src/main/java/akkamon/domain/AkkamonImpl.java b/domain/src/main/java/akkamon/domain/AkkamonImpl.java index c7f0b7d..a6a840b 100644 --- a/domain/src/main/java/akkamon/domain/AkkamonImpl.java +++ b/domain/src/main/java/akkamon/domain/AkkamonImpl.java @@ -35,4 +35,7 @@ public class AkkamonImpl implements Akkamon { public HashMap<String, Trainer> getDummyTrainersCollection() { return dummyTrainersCollection; } + + + } diff --git a/domain/src/main/java/akkamon/domain/greeter/AkkaQuickstart.java b/domain/src/main/java/akkamon/domain/greeter/AkkaQuickstart.java new file mode 100644 index 0000000..8b29aae --- /dev/null +++ b/domain/src/main/java/akkamon/domain/greeter/AkkaQuickstart.java @@ -0,0 +1,25 @@ +package akkamon.domain.greeter; + +import akka.actor.typed.ActorSystem; + +import java.io.IOException; + +public class AkkaQuickstart { + public static void main(String[] args) { + //#actor-system + final ActorSystem<GreeterMain.SayHello> greeterMain = ActorSystem.create(GreeterMain.create(), "helloakka"); + //#actor-system + + //#main-send-messages + greeterMain.tell(new GreeterMain.SayHello("Charles")); + //#main-send-messages + + try { + System.out.println(">>> Press ENTER to exit <<<"); + System.in.read(); + } catch (IOException ignored) { + } finally { + greeterMain.terminate(); + } + } +} diff --git a/domain/src/main/java/akkamon/domain/greeter/Greeter.java b/domain/src/main/java/akkamon/domain/greeter/Greeter.java new file mode 100644 index 0000000..ea2a948 --- /dev/null +++ b/domain/src/main/java/akkamon/domain/greeter/Greeter.java @@ -0,0 +1,77 @@ +package akkamon.domain.greeter; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.*; + +import java.util.Objects; + +// #greeter +public class Greeter extends AbstractBehavior<Greeter.Greet> { + + public static final class Greet { + public final String whom; + public final ActorRef<Greeted> replyTo; + + public Greet(String whom, ActorRef<Greeted> replyTo) { + this.whom = whom; + this.replyTo = replyTo; + } + } + + public static final class Greeted { + public final String whom; + public final ActorRef<Greet> from; + + public Greeted(String whom, ActorRef<Greet> from) { + this.whom = whom; + this.from = from; + } + +// #greeter + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Greeted greeted = (Greeted) o; + return Objects.equals(whom, greeted.whom) && + Objects.equals(from, greeted.from); + } + + @Override + public int hashCode() { + return Objects.hash(whom, from); + } + + @Override + public String toString() { + return "Greeted{" + + "whom='" + whom + '\'' + + ", from=" + from + + '}'; + } +// #greeter + } + + public static Behavior<Greet> create() { + return Behaviors.setup(Greeter::new); + } + + private Greeter(ActorContext<Greet> context) { + super(context); + } + + @Override + public Receive<Greet> createReceive() { + return newReceiveBuilder().onMessage(Greet.class, this::onGreet).build(); + } + + private Behavior<Greet> onGreet(Greet command) { + getContext().getLog().info("Hello {}!", command.whom); + //#greeter-send-message + command.replyTo.tell(new Greeted(command.whom, getContext().getSelf())); + //#greeter-send-message + return this; + } +} +// #greeter diff --git a/domain/src/main/java/akkamon/domain/greeter/GreeterBot.java b/domain/src/main/java/akkamon/domain/greeter/GreeterBot.java new file mode 100644 index 0000000..22fd601 --- /dev/null +++ b/domain/src/main/java/akkamon/domain/greeter/GreeterBot.java @@ -0,0 +1,35 @@ +package akkamon.domain.greeter; + +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.*; + +public class GreeterBot extends AbstractBehavior<Greeter.Greeted> { + + public static Behavior<Greeter.Greeted> create(int max) { + return Behaviors.setup(context -> new GreeterBot(context, max)); + } + + private final int max; + private int greetingCounter; + + private GreeterBot(ActorContext<Greeter.Greeted> context, int max) { + super(context); + this.max = max; + } + + @Override + public Receive<Greeter.Greeted> createReceive() { + return newReceiveBuilder().onMessage(Greeter.Greeted.class, this::onGreeted).build(); + } + + private Behavior<Greeter.Greeted> onGreeted(Greeter.Greeted message) { + greetingCounter++; + getContext().getLog().info("Greeting {} for {}", greetingCounter, message.whom); + if (greetingCounter == max) { + return Behaviors.stopped(); + } else { + message.from.tell(new Greeter.Greet(message.whom, getContext().getSelf())); + return this; + } + } +} diff --git a/domain/src/main/java/akkamon/domain/greeter/GreeterMain.java b/domain/src/main/java/akkamon/domain/greeter/GreeterMain.java new file mode 100644 index 0000000..9180a0e --- /dev/null +++ b/domain/src/main/java/akkamon/domain/greeter/GreeterMain.java @@ -0,0 +1,44 @@ +package akkamon.domain.greeter; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.*; + +public class GreeterMain extends AbstractBehavior<GreeterMain.SayHello> { + + public static class SayHello { + public final String name; + + public SayHello(String name) { + this.name = name; + } + } + + private final ActorRef<Greeter.Greet> greeter; + + public static Behavior<SayHello> create() { + return Behaviors.setup(GreeterMain::new); + } + + private GreeterMain(ActorContext<SayHello> context) { + super(context); + //#create-actors + greeter = context.spawn(Greeter.create(), "greeter"); + //#create-actors + } + + @Override + public Receive<SayHello> createReceive() { + return newReceiveBuilder().onMessage(SayHello.class, this::onSayHello).build(); + } + + private Behavior<SayHello> onSayHello(SayHello command) { + //#create-actors + ActorRef<Greeter.Greeted> replyTo = + getContext().spawn(GreeterBot.create(3), command.name); + greeter.tell(new Greeter.Greet(command.name, replyTo)); + //#create-actors + return this; + } +} + diff --git a/domain/src/main/java/akkamon/domain/iot/ActorHierarchyExperiments.java b/domain/src/main/java/akkamon/domain/iot/ActorHierarchyExperiments.java new file mode 100644 index 0000000..cba3412 --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/ActorHierarchyExperiments.java @@ -0,0 +1,16 @@ +package akkamon.domain.iot; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akkamon.domain.iot.supervision.MainSupervize; + +public class ActorHierarchyExperiments { + public static void main(String[] args) { + // ActorRef<String> testSystem = ActorSystem.create(Main.create(), "testSystem"); + // testSystem.tell("start"); + // ActorRef<String> testSystemStartStop = ActorSystem.create(MainStartStop.create(), "test-supervize"); + // testSystemStartStop.tell("start"); + ActorRef<String> testSupervize = ActorSystem.create(MainSupervize.create(), "test-supervize"); + testSupervize.tell("start"); + } +} diff --git a/domain/src/main/java/akkamon/domain/iot/Device.java b/domain/src/main/java/akkamon/domain/iot/Device.java new file mode 100644 index 0000000..27918a6 --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/Device.java @@ -0,0 +1,104 @@ +package akkamon.domain.iot; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.PostStop; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +import java.util.Optional; + +public class Device extends AbstractBehavior<Device.Command> { + + public interface Command {} + + public static final class RecordTemperature implements Command { + final long requestId; + final double value; + final ActorRef<TemperatureRecorded> replyTo; + + public RecordTemperature(long requestId, double value, ActorRef<TemperatureRecorded> replyTo) { + this.requestId = requestId; + this.value = value; + this.replyTo = replyTo; + } + } + + public static final class TemperatureRecorded { + final long requestId; + + public TemperatureRecorded(long requestId) { + this.requestId = requestId; + } + } + + public static final class ReadTemperature implements Command { + final long requestId; + final ActorRef<RespondTemperature> replyTo; + + public ReadTemperature(long requestId, ActorRef<RespondTemperature> replyTo) { + this.requestId = requestId; + this.replyTo = replyTo; + } + } + + public static final class RespondTemperature { + final long requestId; + final Optional<Double> value; + + public RespondTemperature(long requestId, Optional<Double> value) { + this.requestId = requestId; + this.value = value; + } + } + + static enum Passivate implements Command { + INSTANCE + } + + public static Behavior<Command> create(String groupId, String deviceId) { + return Behaviors.setup(context -> new Device(context, groupId, deviceId)); + } + + private final String groupId; + private final String deviceId; + + private Optional<Double> lastTemperatureReading = Optional.empty(); + + private Device(ActorContext<Command> context, String groupId, String deviceId) { + super(context); + this.groupId = groupId; + this.deviceId = deviceId; + + context.getLog().info("Device actor {}-{} started", groupId, deviceId); + } + + @Override + public Receive<Command> createReceive() { + return newReceiveBuilder() + .onMessage(RecordTemperature.class, this::onRecordTemperature) + .onMessage(ReadTemperature.class, this::onReadTemperature) + .onMessage(Passivate.class, m -> Behaviors.stopped()) + .onSignal(PostStop.class, signal -> onPostStop()) + .build(); + } + + private Behavior<Command> onRecordTemperature(RecordTemperature r) { + getContext().getLog().info("Recorded temperature reading {} with {}", r.value, r.requestId); + lastTemperatureReading = Optional.of(r.value); + r.replyTo.tell(new TemperatureRecorded(r.requestId)); + return this; + } + + private Behavior<Command> onReadTemperature(ReadTemperature r) { + r.replyTo.tell(new RespondTemperature(r.requestId, lastTemperatureReading)); + return this; + } + + private Behavior<Command> onPostStop() { + getContext().getLog().info("Device actor {}-{} stopped", groupId, deviceId); + return Behaviors.stopped(); + } +} diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java b/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java new file mode 100644 index 0000000..2cfc13a --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java @@ -0,0 +1,98 @@ +package akkamon.domain.iot; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.PostStop; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +import java.util.HashMap; +import java.util.Map; + +public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> { + + public interface Command {} + + private class DeviceTerminated implements Command { + public final ActorRef<Device.Command> device; + public final String groupId; + public final String deviceId; + + DeviceTerminated(ActorRef<Device.Command> device, String groupId, String deviceId) { + this.device = device; + this.groupId = groupId; + this.deviceId = deviceId; + } + } + + public static Behavior<Command> create(String groupId) { + return Behaviors.setup(context -> new DeviceGroup(context, groupId)); + } + + private final String groupId; + private final Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>(); + + private DeviceGroup(ActorContext<Command> context, String groupId) { + super(context); + this.groupId = groupId; + context.getLog().info("DeviceGroup {} started", groupId); + } + + private DeviceGroup onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { + if (this.groupId.equals(trackMsg.groupId)) { + ActorRef<Device.Command> deviceActor = deviceIdToActor.get(trackMsg.deviceId); + if (deviceActor != null) { + trackMsg.replyTo.tell(new DeviceManager.DeviceRegistered(deviceActor)); + } else { + getContext().getLog().info("Creating device actor for {}", trackMsg.deviceId); + deviceActor = + getContext() + .spawn(Device.create(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId); + getContext() + .watchWith(deviceActor, new DeviceTerminated(deviceActor, groupId, trackMsg.deviceId)); + deviceIdToActor.put(trackMsg.deviceId, deviceActor); + trackMsg.replyTo.tell(new DeviceManager.DeviceRegistered(deviceActor)); + } + } else { + getContext() + .getLog() + .warn( + "Ignoring TrackDevice request for {}. This actor is responsible for {}.", + groupId, + this.groupId); + } + return this; + } + + + private DeviceGroup onDeviceList(DeviceManager.RequestDeviceList r) { + r.replyTo.tell(new DeviceManager.ReplyDeviceList(r.requestId, deviceIdToActor.keySet())); + return this; + } + + private DeviceGroup onTerminated(DeviceTerminated t) { + getContext().getLog().info("Device actor for {} has been terminated", t.deviceId); + deviceIdToActor.remove(t.deviceId); + return this; + } + + @Override + public Receive<Command> createReceive() { + return newReceiveBuilder() + .onMessage(DeviceManager.RequestTrackDevice.class, this::onTrackDevice) + .onMessage( + DeviceManager.RequestDeviceList.class, + r -> r.groupId.equals(groupId), + this::onDeviceList) + .onMessage(DeviceTerminated.class, this::onTerminated) + .onSignal(PostStop.class, signal -> onPostStop()) + .build(); + } + + private DeviceGroup onPostStop() { + getContext().getLog().info("DeviceGroup {} stopped", groupId); + return this; + } +} diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceManager.java b/domain/src/main/java/akkamon/domain/iot/DeviceManager.java new file mode 100644 index 0000000..e445b7e --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/DeviceManager.java @@ -0,0 +1,126 @@ +package akkamon.domain.iot; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.PostStop; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class DeviceManager extends AbstractBehavior<DeviceManager.Command> { + + public interface Command {} + + public static final class RequestTrackDevice implements DeviceManager.Command, DeviceGroup.Command { + public final String groupId; + public final String deviceId; + public final ActorRef<DeviceRegistered> replyTo; + + public RequestTrackDevice(String groupId, String deviceId, ActorRef<DeviceRegistered> replyTo) { + this.groupId = groupId; + this.deviceId = deviceId; + this.replyTo = replyTo; + } + } + + public static final class DeviceRegistered { + public final ActorRef<Device.Command> device; + + public DeviceRegistered(ActorRef<Device.Command> device) { + this.device = device; + } + } + + public static final class RequestDeviceList implements DeviceManager.Command, DeviceGroup.Command { + final long requestId; + final String groupId; + final ActorRef<ReplyDeviceList> replyTo; + + public RequestDeviceList(long requestId, String groupId, ActorRef<ReplyDeviceList> replyTo) { + this.requestId = requestId; + this.groupId = groupId; + this.replyTo = replyTo; + } + } + + public static final class ReplyDeviceList { + final long requestId; + final Set<String> ids; + + public ReplyDeviceList(long requestId, Set<String> ids) { + this.requestId = requestId; + this.ids = ids; + } + } + + private static class DeviceGroupTerminated implements DeviceManager.Command { + public final String groupId; + + DeviceGroupTerminated(String groupId) { + this.groupId = groupId; + } + } + + public static Behavior<Command> create() { + return Behaviors.setup(DeviceManager::new); + } + + private final Map<String, ActorRef<DeviceGroup.Command>> groupIdToActor = new HashMap<>(); + + private DeviceManager(ActorContext<Command> context) { + super(context); + context.getLog().info("DeviceManager started"); + } + + private DeviceManager onTrackDevice(RequestTrackDevice trackMsg) { + String groupId = trackMsg.groupId; + ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(groupId); + if (ref != null) { + ref.tell(trackMsg); + } else { + getContext().getLog().info("Creating device group actor for {}", groupId); + ActorRef<DeviceGroup.Command> groupActor = + getContext().spawn(DeviceGroup.create(groupId), "group-" + groupId); + getContext().watchWith(groupActor, new DeviceGroupTerminated(groupId)); + groupActor.tell(trackMsg); + groupIdToActor.put(groupId, groupActor); + } + return this; + } + + private DeviceManager onRequestDeviceList(RequestDeviceList request) { + ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(request.groupId); + if (ref != null) { + ref.tell(request); + } else { + request.replyTo.tell(new ReplyDeviceList(request.requestId, Collections.emptySet())); + } + return this; + } + + private DeviceManager onTerminated(DeviceGroupTerminated t) { + getContext().getLog().info("Device group actor for {} has been terminated", t.groupId); + groupIdToActor.remove(t.groupId); + return this; + } + + public Receive<Command> createReceive() { + return newReceiveBuilder() + .onMessage(RequestTrackDevice.class, this::onTrackDevice) + .onMessage(RequestDeviceList.class, this::onRequestDeviceList) + .onMessage(DeviceGroupTerminated.class, this::onTerminated) + .onSignal(PostStop.class, signal -> onPostStop()) + .build(); + } + + private DeviceManager onPostStop() { + getContext().getLog().info("DeviceManager stopped"); + return this; + } +} diff --git a/domain/src/main/java/akkamon/domain/iot/IotEntryPoint.java b/domain/src/main/java/akkamon/domain/iot/IotEntryPoint.java new file mode 100644 index 0000000..e8faf9d --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/IotEntryPoint.java @@ -0,0 +1,9 @@ +package akkamon.domain.iot; + +import akka.actor.typed.ActorSystem; + +public class IotEntryPoint { + public static void main(String[] args) { + ActorSystem.create(IotSupervisor.create(), "iot-system"); + } +} diff --git a/domain/src/main/java/akkamon/domain/iot/IotSupervisor.java b/domain/src/main/java/akkamon/domain/iot/IotSupervisor.java new file mode 100644 index 0000000..dc50929 --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/IotSupervisor.java @@ -0,0 +1,31 @@ +package akkamon.domain.iot; + +import akka.actor.typed.Behavior; +import akka.actor.typed.PostStop; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +public class IotSupervisor extends AbstractBehavior<Void> { + + public static Behavior<Void> create() { + return Behaviors.setup(IotSupervisor::new); + } + + private IotSupervisor(ActorContext<Void> context) { + super(context); + context.getLog().info("IoT Application started"); + } + + // No need to handle any messages + @Override + public Receive<Void> createReceive() { + return newReceiveBuilder().onSignal(PostStop.class, signal -> onPostStop()).build(); + } + + private IotSupervisor onPostStop() { + getContext().getLog().info("IoT Application stopped"); + return this; + } +} diff --git a/domain/src/main/java/akkamon/domain/iot/refs/Main.java b/domain/src/main/java/akkamon/domain/iot/refs/Main.java new file mode 100644 index 0000000..923d505 --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/refs/Main.java @@ -0,0 +1,32 @@ +package akkamon.domain.iot.refs; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +class Main extends AbstractBehavior<String> { + + static Behavior<String> create() { + return Behaviors.setup(Main::new); + } + + private Main(ActorContext<String> context) { + super(context); + } + + @Override + public Receive<String> createReceive() { + return newReceiveBuilder().onMessageEquals("start", this::start).build(); + } + + private Behavior<String> start() { + ActorRef<String> firstRef = getContext().spawn(PrintMyActorRefActor.create(), "first-actor"); + + System.out.println("First: " + firstRef); + firstRef.tell("printit"); + return Behaviors.same(); + } +} diff --git a/domain/src/main/java/akkamon/domain/iot/refs/PrintMyActorRefActor.java b/domain/src/main/java/akkamon/domain/iot/refs/PrintMyActorRefActor.java new file mode 100644 index 0000000..76b5c42 --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/refs/PrintMyActorRefActor.java @@ -0,0 +1,30 @@ +package akkamon.domain.iot.refs; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +class PrintMyActorRefActor extends AbstractBehavior<String> { + + static Behavior<String> create() { + return Behaviors.setup(PrintMyActorRefActor::new); + } + + private PrintMyActorRefActor(ActorContext<String> context) { + super(context); + } + + @Override + public Receive<String> createReceive() { + return newReceiveBuilder().onMessageEquals("printit", this::printIt).build(); + } + + private Behavior<String> printIt() { + ActorRef<String> secondRef = getContext().spawn(Behaviors.empty(), "second-actor"); + System.out.println("Second: " + secondRef); + return this; + } +} diff --git a/domain/src/main/java/akkamon/domain/iot/startstop/MainStartStop.java b/domain/src/main/java/akkamon/domain/iot/startstop/MainStartStop.java new file mode 100644 index 0000000..d8f13be --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/startstop/MainStartStop.java @@ -0,0 +1,31 @@ +package akkamon.domain.iot.startstop; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +public class MainStartStop extends AbstractBehavior<String> { + + static Behavior<String> create() { + return Behaviors.setup(MainStartStop::new); + } + + private MainStartStop(ActorContext<String> context) { + super(context); + } + + @Override + public Receive<String> createReceive() { + return newReceiveBuilder().onMessageEquals("start", this::start).build(); + } + + private Behavior<String> start() { + ActorRef<String> first = getContext().spawn(StartStopActor1.create(), "first"); + first.tell("stop"); + return Behaviors.same(); + } + +} diff --git a/domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor1.java b/domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor1.java new file mode 100644 index 0000000..37c8237 --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor1.java @@ -0,0 +1,35 @@ +package akkamon.domain.iot.startstop; + +import akka.actor.typed.Behavior; +import akka.actor.typed.PostStop; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +class StartStopActor1 extends AbstractBehavior<String> { + + static Behavior<String> create() { + return Behaviors.setup(StartStopActor1::new); + } + + private StartStopActor1(ActorContext<String> context) { + super(context); + System.out.println("first started"); + + context.spawn(StartStopActor2.create(), "second"); + } + + @Override + public Receive<String> createReceive() { + return newReceiveBuilder() + .onMessageEquals("stop", Behaviors::stopped) + .onSignal(PostStop.class, signal -> onPostStop()) + .build(); + } + + private Behavior<String> onPostStop() { + System.out.println("first stopped"); + return this; + } +} diff --git a/domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor2.java b/domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor2.java new file mode 100644 index 0000000..28d01cf --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor2.java @@ -0,0 +1,30 @@ +package akkamon.domain.iot.startstop; + +import akka.actor.typed.Behavior; +import akka.actor.typed.PostStop; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +class StartStopActor2 extends AbstractBehavior<String> { + + static Behavior<String> create() { + return Behaviors.setup(StartStopActor2::new); + } + + private StartStopActor2(ActorContext<String> context) { + super(context); + System.out.println("second started"); + } + + @Override + public Receive<String> createReceive() { + return newReceiveBuilder().onSignal(PostStop.class, signal -> onPostStop()).build(); + } + + private Behavior<String> onPostStop() { + System.out.println("second stopped"); + return this; + } +} diff --git a/domain/src/main/java/akkamon/domain/iot/supervision/MainSupervize.java b/domain/src/main/java/akkamon/domain/iot/supervision/MainSupervize.java new file mode 100644 index 0000000..294245f --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/supervision/MainSupervize.java @@ -0,0 +1,31 @@ +package akkamon.domain.iot.supervision; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +public class MainSupervize extends AbstractBehavior<String> { + + public static Behavior<String> create() { + return Behaviors.setup(MainSupervize::new); + } + + private MainSupervize(ActorContext<String> context) { + super(context); + } + + @Override + public Receive<String> createReceive() { + return newReceiveBuilder().onMessageEquals("start", this::start).build(); + } + + private Behavior<String> start() { + ActorRef<String> supervisingActor = getContext().spawn(SupervisingActor.create(), "supervising-actor"); + supervisingActor.tell("failChild"); + return Behaviors.same(); + } + +} diff --git a/domain/src/main/java/akkamon/domain/iot/supervision/SupervisedActor.java b/domain/src/main/java/akkamon/domain/iot/supervision/SupervisedActor.java new file mode 100644 index 0000000..a050cfe --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/supervision/SupervisedActor.java @@ -0,0 +1,45 @@ +package akkamon.domain.iot.supervision; + +import akka.actor.typed.Behavior; +import akka.actor.typed.PostStop; +import akka.actor.typed.PreRestart; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +class SupervisedActor extends AbstractBehavior<String> { + + static Behavior<String> create() { + return Behaviors.setup(SupervisedActor::new); + } + + private SupervisedActor(ActorContext<String> context) { + super(context); + System.out.println("supervised actor started"); + } + + @Override + public Receive<String> createReceive() { + return newReceiveBuilder() + .onMessageEquals("fail", this::fail) + .onSignal(PreRestart.class, signal -> preRestart()) + .onSignal(PostStop.class, signal -> postStop()) + .build(); + } + + private Behavior<String> fail() { + System.out.println("supervised actor fails now"); + throw new RuntimeException("I failed!"); + } + + private Behavior<String> preRestart() { + System.out.println("supervised will be restarted"); + return this; + } + + private Behavior<String> postStop() { + System.out.println("supervised stopped"); + return this; + } +} diff --git a/domain/src/main/java/akkamon/domain/iot/supervision/SupervisingActor.java b/domain/src/main/java/akkamon/domain/iot/supervision/SupervisingActor.java new file mode 100644 index 0000000..f1e098b --- /dev/null +++ b/domain/src/main/java/akkamon/domain/iot/supervision/SupervisingActor.java @@ -0,0 +1,36 @@ +package akkamon.domain.iot.supervision; + +import akka.actor.typed.SupervisorStrategy; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; + +class SupervisingActor extends AbstractBehavior<String> { + + static Behavior<String> create() { + return Behaviors.setup(SupervisingActor::new); + } + + private final ActorRef<String> child; + + private SupervisingActor(ActorContext<String> context) { + super(context); + child = + context.spawn( + Behaviors.supervise(SupervisedActor.create()).onFailure(SupervisorStrategy.restart()), + "supervised-actor"); + } + + @Override + public Receive<String> createReceive() { + return newReceiveBuilder().onMessageEquals("failChild", this::onFailChild).build(); + } + + private Behavior<String> onFailChild() { + child.tell("fail"); + return this; + } +} diff --git a/domain/src/test/java/akkamon/domain/AkkamonImplTest.java b/domain/src/test/java/akkamon/domain/AkkamonImplTest.java index 6570c2a..90c4f44 100644 --- a/domain/src/test/java/akkamon/domain/AkkamonImplTest.java +++ b/domain/src/test/java/akkamon/domain/AkkamonImplTest.java @@ -1,23 +1,11 @@ package akkamon.domain; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; +import org.junit.Test; -import static org.junit.jupiter.api.Assertions.*; +import static junit.framework.TestCase.assertNotNull; class AkkamonImplTest { - @BeforeEach - void setUp() { - } - - @AfterEach - void tearDown() { - } - - @Nested class getInstance_behaviour { @Test void given_there_is_no_instance_yet_when_getInstance_is_called_then_give_class_property_instance() { diff --git a/domain/src/test/java/akkamon/domain/iot/DeviceTest.java b/domain/src/test/java/akkamon/domain/iot/DeviceTest.java new file mode 100644 index 0000000..a05499f --- /dev/null +++ b/domain/src/test/java/akkamon/domain/iot/DeviceTest.java @@ -0,0 +1,162 @@ +package akkamon.domain.iot; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class DeviceTest { + + @ClassRule + public static final TestKitJunitResource testKit = new TestKitJunitResource(); + + @Test + public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() { + TestProbe<Device.RespondTemperature> probe = testKit.createTestProbe(Device.RespondTemperature.class); + + ActorRef<Device.Command> deviceActor = testKit.spawn(Device.create("group", "device")); + deviceActor.tell(new Device.ReadTemperature(42L, probe.getRef())); + Device.RespondTemperature response = probe.receiveMessage(); + assertEquals(42L, response.requestId); + assertEquals(Optional.empty(), response.value); + + + } + + @Test + public void testReplyWithLatestTemperatureReading() { + TestProbe<Device.TemperatureRecorded> recordProbe = + testKit.createTestProbe(Device.TemperatureRecorded.class); + + TestProbe<Device.RespondTemperature> readProbe = + testKit.createTestProbe(Device.RespondTemperature.class); + + ActorRef<Device.Command> deviceActor = testKit.spawn(Device.create("group", "device")); + + deviceActor.tell(new Device.RecordTemperature(1L, 24.0, recordProbe.getRef())); + assertEquals(1L, recordProbe.receiveMessage().requestId); + + deviceActor.tell(new Device.ReadTemperature(2L, readProbe.getRef())); + Device.RespondTemperature response1 = readProbe.receiveMessage(); + assertEquals(2L, response1.requestId); + assertEquals(Optional.of(24.0), response1.value); + + deviceActor.tell(new Device.RecordTemperature(3L, 55.0, recordProbe.getRef())); + assertEquals(3L, recordProbe.receiveMessage().requestId); + + deviceActor.tell(new Device.ReadTemperature(4L, readProbe.getRef())); + Device.RespondTemperature response2 = readProbe.receiveMessage(); + assertEquals(4L, response2.requestId); + assertEquals(Optional.of(55.0), response2.value); + } + + @Test + public void testReplyToRegistrationRequests() { + TestProbe<DeviceManager.DeviceRegistered> probe = testKit.createTestProbe(DeviceManager.DeviceRegistered.class); + ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device", probe.getRef())); + DeviceManager.DeviceRegistered registered1 = probe.receiveMessage(); + + // another deviceId + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3", probe.getRef())); + DeviceManager.DeviceRegistered registered2 = probe.receiveMessage(); + assertNotEquals(registered1.device, registered2.device); + + // Check that the device actors are working + TestProbe<Device.TemperatureRecorded> recordProbe = + testKit.createTestProbe(Device.TemperatureRecorded.class); + registered1.device.tell(new Device.RecordTemperature(0L, 1.0, recordProbe.getRef())); + assertEquals(0L, recordProbe.receiveMessage().requestId); + registered2.device.tell(new Device.RecordTemperature(1L, 2.0, recordProbe.getRef())); + assertEquals(1L, recordProbe.receiveMessage().requestId); + } + + @Test + public void testIgnoreWrongRegistrationRequests() { + TestProbe<DeviceManager.DeviceRegistered> probe = testKit.createTestProbe(DeviceManager.DeviceRegistered.class); + ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group")); + groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1", probe.getRef())); + probe.expectNoMessage(); + } + + + @Test + public void testReturnSameActorForSameDeviceId() { + TestProbe<DeviceManager.DeviceRegistered> probe = testKit.createTestProbe(DeviceManager.DeviceRegistered.class); + ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device", probe.getRef())); + DeviceManager.DeviceRegistered registered1 = probe.receiveMessage(); + + // registering same again should be idempotent + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device", probe.getRef())); + DeviceManager.DeviceRegistered registered2 = probe.receiveMessage(); + assertEquals(registered1.device, registered2.device); + } + + @Test +public void testListActiveDevices() { + TestProbe<DeviceManager.DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceManager.DeviceRegistered.class); + ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1", registeredProbe.getRef())); + registeredProbe.receiveMessage(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2", registeredProbe.getRef())); + registeredProbe.receiveMessage(); + + TestProbe<DeviceManager.ReplyDeviceList> deviceListProbe = testKit.createTestProbe(DeviceManager.ReplyDeviceList.class); + + groupActor.tell(new DeviceManager.RequestDeviceList(0L, "group", deviceListProbe.getRef())); + DeviceManager.ReplyDeviceList reply = deviceListProbe.receiveMessage(); + assertEquals(0L, reply.requestId); + Set<String> testIds = Stream.of("device1", "device2").collect(Collectors.toSet()); + assertEquals(testIds, reply.ids); +} + +@Test +public void testListActiveDevicesAfterOneShutsDown() { + TestProbe<DeviceManager.DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceManager.DeviceRegistered.class); + ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group")); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1", registeredProbe.getRef())); + DeviceManager.DeviceRegistered registered1 = registeredProbe.receiveMessage(); + + groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2", registeredProbe.getRef())); + DeviceManager.DeviceRegistered registered2 = registeredProbe.receiveMessage(); + + ActorRef<Device.Command> toShutDown = registered1.device; + + TestProbe<DeviceManager.ReplyDeviceList> deviceListProbe = testKit.createTestProbe(DeviceManager.ReplyDeviceList.class); + + groupActor.tell(new DeviceManager.RequestDeviceList(0L, "group", deviceListProbe.getRef())); + DeviceManager.ReplyDeviceList reply = deviceListProbe.receiveMessage(); + assertEquals(0L, reply.requestId); + assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); + + toShutDown.tell(Device.Passivate.INSTANCE); + registeredProbe.expectTerminated(toShutDown, registeredProbe.getRemainingOrDefault()); + + // using awaitAssert to retry because it might take longer for the groupActor + // to see the Terminated, that order is undefined + registeredProbe.awaitAssert( + () -> { + groupActor.tell(new DeviceManager.RequestDeviceList(1L, "group", deviceListProbe.getRef())); + DeviceManager.ReplyDeviceList r = deviceListProbe.receiveMessage(); + assertEquals(1L, r.requestId); + assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids); + return null; + }); +} + +} |
