summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Vink <mike1994vink@gmail.com>2021-07-19 20:53:00 +0200
committerMike Vink <mike1994vink@gmail.com>2021-07-19 20:53:00 +0200
commit918f85df10d8d59de3d2c90a81984810dec7a072 (patch)
treebdc5b403f092f9dd359b74c6e6749b92ea8fbf4a
parenta159975d09c9160b89a733f2434b406d8f99dbb4 (diff)
prog(akka): Iot Example code
-rw-r--r--domain/build.gradle35
-rw-r--r--domain/src/main/java/akkamon/domain/AkkamonImpl.java3
-rw-r--r--domain/src/main/java/akkamon/domain/greeter/AkkaQuickstart.java25
-rw-r--r--domain/src/main/java/akkamon/domain/greeter/Greeter.java77
-rw-r--r--domain/src/main/java/akkamon/domain/greeter/GreeterBot.java35
-rw-r--r--domain/src/main/java/akkamon/domain/greeter/GreeterMain.java44
-rw-r--r--domain/src/main/java/akkamon/domain/iot/ActorHierarchyExperiments.java16
-rw-r--r--domain/src/main/java/akkamon/domain/iot/Device.java104
-rw-r--r--domain/src/main/java/akkamon/domain/iot/DeviceGroup.java98
-rw-r--r--domain/src/main/java/akkamon/domain/iot/DeviceManager.java126
-rw-r--r--domain/src/main/java/akkamon/domain/iot/IotEntryPoint.java9
-rw-r--r--domain/src/main/java/akkamon/domain/iot/IotSupervisor.java31
-rw-r--r--domain/src/main/java/akkamon/domain/iot/refs/Main.java32
-rw-r--r--domain/src/main/java/akkamon/domain/iot/refs/PrintMyActorRefActor.java30
-rw-r--r--domain/src/main/java/akkamon/domain/iot/startstop/MainStartStop.java31
-rw-r--r--domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor1.java35
-rw-r--r--domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor2.java30
-rw-r--r--domain/src/main/java/akkamon/domain/iot/supervision/MainSupervize.java31
-rw-r--r--domain/src/main/java/akkamon/domain/iot/supervision/SupervisedActor.java45
-rw-r--r--domain/src/main/java/akkamon/domain/iot/supervision/SupervisingActor.java36
-rw-r--r--domain/src/test/java/akkamon/domain/AkkamonImplTest.java16
-rw-r--r--domain/src/test/java/akkamon/domain/iot/DeviceTest.java162
-rw-r--r--settings.gradle3
23 files changed, 1035 insertions, 19 deletions
diff --git a/domain/build.gradle b/domain/build.gradle
index 2f385d0..d2d174a 100644
--- a/domain/build.gradle
+++ b/domain/build.gradle
@@ -1,8 +1,14 @@
plugins {
id 'java'
id 'java-library'
+ id 'idea'
+ id 'application'
}
+def versions = [
+ ScalaBinary: "2.13"
+]
+
version 'unspecified'
repositories {
@@ -11,10 +17,33 @@ repositories {
}
dependencies {
- testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
- testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
+
+ implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.6.15")
+ implementation "com.typesafe.akka:akka-actor-typed_${versions.ScalaBinary}"
+
+ implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}"
+
+ implementation 'ch.qos.logback:logback-classic:1.2.3'
+ implementation 'junit:junit:4.12'
+
+
+ testImplementation "com.typesafe.akka:akka-actor-testkit-typed_${versions.ScalaBinary}"
+ testImplementation "com.typesafe.akka:akka-stream-testkit_${versions.ScalaBinary}"
+
+ testImplementation 'junit:junit:4.12'
+ testRuntimeOnly 'junit:junit:4.12'
+
}
test {
useJUnitPlatform()
-} \ No newline at end of file
+}
+
+application {
+ // Define the main class for the application.
+ mainClassName = 'akkamon.domain.iot.IotEntryPoint'
+}
+
+run {
+ standardInput = System.in
+}
diff --git a/domain/src/main/java/akkamon/domain/AkkamonImpl.java b/domain/src/main/java/akkamon/domain/AkkamonImpl.java
index c7f0b7d..a6a840b 100644
--- a/domain/src/main/java/akkamon/domain/AkkamonImpl.java
+++ b/domain/src/main/java/akkamon/domain/AkkamonImpl.java
@@ -35,4 +35,7 @@ public class AkkamonImpl implements Akkamon {
public HashMap<String, Trainer> getDummyTrainersCollection() {
return dummyTrainersCollection;
}
+
+
+
}
diff --git a/domain/src/main/java/akkamon/domain/greeter/AkkaQuickstart.java b/domain/src/main/java/akkamon/domain/greeter/AkkaQuickstart.java
new file mode 100644
index 0000000..8b29aae
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/greeter/AkkaQuickstart.java
@@ -0,0 +1,25 @@
+package akkamon.domain.greeter;
+
+import akka.actor.typed.ActorSystem;
+
+import java.io.IOException;
+
+public class AkkaQuickstart {
+ public static void main(String[] args) {
+ //#actor-system
+ final ActorSystem<GreeterMain.SayHello> greeterMain = ActorSystem.create(GreeterMain.create(), "helloakka");
+ //#actor-system
+
+ //#main-send-messages
+ greeterMain.tell(new GreeterMain.SayHello("Charles"));
+ //#main-send-messages
+
+ try {
+ System.out.println(">>> Press ENTER to exit <<<");
+ System.in.read();
+ } catch (IOException ignored) {
+ } finally {
+ greeterMain.terminate();
+ }
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/greeter/Greeter.java b/domain/src/main/java/akkamon/domain/greeter/Greeter.java
new file mode 100644
index 0000000..ea2a948
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/greeter/Greeter.java
@@ -0,0 +1,77 @@
+package akkamon.domain.greeter;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.*;
+
+import java.util.Objects;
+
+// #greeter
+public class Greeter extends AbstractBehavior<Greeter.Greet> {
+
+ public static final class Greet {
+ public final String whom;
+ public final ActorRef<Greeted> replyTo;
+
+ public Greet(String whom, ActorRef<Greeted> replyTo) {
+ this.whom = whom;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class Greeted {
+ public final String whom;
+ public final ActorRef<Greet> from;
+
+ public Greeted(String whom, ActorRef<Greet> from) {
+ this.whom = whom;
+ this.from = from;
+ }
+
+// #greeter
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Greeted greeted = (Greeted) o;
+ return Objects.equals(whom, greeted.whom) &&
+ Objects.equals(from, greeted.from);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(whom, from);
+ }
+
+ @Override
+ public String toString() {
+ return "Greeted{" +
+ "whom='" + whom + '\'' +
+ ", from=" + from +
+ '}';
+ }
+// #greeter
+ }
+
+ public static Behavior<Greet> create() {
+ return Behaviors.setup(Greeter::new);
+ }
+
+ private Greeter(ActorContext<Greet> context) {
+ super(context);
+ }
+
+ @Override
+ public Receive<Greet> createReceive() {
+ return newReceiveBuilder().onMessage(Greet.class, this::onGreet).build();
+ }
+
+ private Behavior<Greet> onGreet(Greet command) {
+ getContext().getLog().info("Hello {}!", command.whom);
+ //#greeter-send-message
+ command.replyTo.tell(new Greeted(command.whom, getContext().getSelf()));
+ //#greeter-send-message
+ return this;
+ }
+}
+// #greeter
diff --git a/domain/src/main/java/akkamon/domain/greeter/GreeterBot.java b/domain/src/main/java/akkamon/domain/greeter/GreeterBot.java
new file mode 100644
index 0000000..22fd601
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/greeter/GreeterBot.java
@@ -0,0 +1,35 @@
+package akkamon.domain.greeter;
+
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.*;
+
+public class GreeterBot extends AbstractBehavior<Greeter.Greeted> {
+
+ public static Behavior<Greeter.Greeted> create(int max) {
+ return Behaviors.setup(context -> new GreeterBot(context, max));
+ }
+
+ private final int max;
+ private int greetingCounter;
+
+ private GreeterBot(ActorContext<Greeter.Greeted> context, int max) {
+ super(context);
+ this.max = max;
+ }
+
+ @Override
+ public Receive<Greeter.Greeted> createReceive() {
+ return newReceiveBuilder().onMessage(Greeter.Greeted.class, this::onGreeted).build();
+ }
+
+ private Behavior<Greeter.Greeted> onGreeted(Greeter.Greeted message) {
+ greetingCounter++;
+ getContext().getLog().info("Greeting {} for {}", greetingCounter, message.whom);
+ if (greetingCounter == max) {
+ return Behaviors.stopped();
+ } else {
+ message.from.tell(new Greeter.Greet(message.whom, getContext().getSelf()));
+ return this;
+ }
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/greeter/GreeterMain.java b/domain/src/main/java/akkamon/domain/greeter/GreeterMain.java
new file mode 100644
index 0000000..9180a0e
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/greeter/GreeterMain.java
@@ -0,0 +1,44 @@
+package akkamon.domain.greeter;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.*;
+
+public class GreeterMain extends AbstractBehavior<GreeterMain.SayHello> {
+
+ public static class SayHello {
+ public final String name;
+
+ public SayHello(String name) {
+ this.name = name;
+ }
+ }
+
+ private final ActorRef<Greeter.Greet> greeter;
+
+ public static Behavior<SayHello> create() {
+ return Behaviors.setup(GreeterMain::new);
+ }
+
+ private GreeterMain(ActorContext<SayHello> context) {
+ super(context);
+ //#create-actors
+ greeter = context.spawn(Greeter.create(), "greeter");
+ //#create-actors
+ }
+
+ @Override
+ public Receive<SayHello> createReceive() {
+ return newReceiveBuilder().onMessage(SayHello.class, this::onSayHello).build();
+ }
+
+ private Behavior<SayHello> onSayHello(SayHello command) {
+ //#create-actors
+ ActorRef<Greeter.Greeted> replyTo =
+ getContext().spawn(GreeterBot.create(3), command.name);
+ greeter.tell(new Greeter.Greet(command.name, replyTo));
+ //#create-actors
+ return this;
+ }
+}
+
diff --git a/domain/src/main/java/akkamon/domain/iot/ActorHierarchyExperiments.java b/domain/src/main/java/akkamon/domain/iot/ActorHierarchyExperiments.java
new file mode 100644
index 0000000..cba3412
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/ActorHierarchyExperiments.java
@@ -0,0 +1,16 @@
+package akkamon.domain.iot;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.ActorSystem;
+import akkamon.domain.iot.supervision.MainSupervize;
+
+public class ActorHierarchyExperiments {
+ public static void main(String[] args) {
+ // ActorRef<String> testSystem = ActorSystem.create(Main.create(), "testSystem");
+ // testSystem.tell("start");
+ // ActorRef<String> testSystemStartStop = ActorSystem.create(MainStartStop.create(), "test-supervize");
+ // testSystemStartStop.tell("start");
+ ActorRef<String> testSupervize = ActorSystem.create(MainSupervize.create(), "test-supervize");
+ testSupervize.tell("start");
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/Device.java b/domain/src/main/java/akkamon/domain/iot/Device.java
new file mode 100644
index 0000000..27918a6
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/Device.java
@@ -0,0 +1,104 @@
+package akkamon.domain.iot;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.PostStop;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+import java.util.Optional;
+
+public class Device extends AbstractBehavior<Device.Command> {
+
+ public interface Command {}
+
+ public static final class RecordTemperature implements Command {
+ final long requestId;
+ final double value;
+ final ActorRef<TemperatureRecorded> replyTo;
+
+ public RecordTemperature(long requestId, double value, ActorRef<TemperatureRecorded> replyTo) {
+ this.requestId = requestId;
+ this.value = value;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class TemperatureRecorded {
+ final long requestId;
+
+ public TemperatureRecorded(long requestId) {
+ this.requestId = requestId;
+ }
+ }
+
+ public static final class ReadTemperature implements Command {
+ final long requestId;
+ final ActorRef<RespondTemperature> replyTo;
+
+ public ReadTemperature(long requestId, ActorRef<RespondTemperature> replyTo) {
+ this.requestId = requestId;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class RespondTemperature {
+ final long requestId;
+ final Optional<Double> value;
+
+ public RespondTemperature(long requestId, Optional<Double> value) {
+ this.requestId = requestId;
+ this.value = value;
+ }
+ }
+
+ static enum Passivate implements Command {
+ INSTANCE
+ }
+
+ public static Behavior<Command> create(String groupId, String deviceId) {
+ return Behaviors.setup(context -> new Device(context, groupId, deviceId));
+ }
+
+ private final String groupId;
+ private final String deviceId;
+
+ private Optional<Double> lastTemperatureReading = Optional.empty();
+
+ private Device(ActorContext<Command> context, String groupId, String deviceId) {
+ super(context);
+ this.groupId = groupId;
+ this.deviceId = deviceId;
+
+ context.getLog().info("Device actor {}-{} started", groupId, deviceId);
+ }
+
+ @Override
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(RecordTemperature.class, this::onRecordTemperature)
+ .onMessage(ReadTemperature.class, this::onReadTemperature)
+ .onMessage(Passivate.class, m -> Behaviors.stopped())
+ .onSignal(PostStop.class, signal -> onPostStop())
+ .build();
+ }
+
+ private Behavior<Command> onRecordTemperature(RecordTemperature r) {
+ getContext().getLog().info("Recorded temperature reading {} with {}", r.value, r.requestId);
+ lastTemperatureReading = Optional.of(r.value);
+ r.replyTo.tell(new TemperatureRecorded(r.requestId));
+ return this;
+ }
+
+ private Behavior<Command> onReadTemperature(ReadTemperature r) {
+ r.replyTo.tell(new RespondTemperature(r.requestId, lastTemperatureReading));
+ return this;
+ }
+
+ private Behavior<Command> onPostStop() {
+ getContext().getLog().info("Device actor {}-{} stopped", groupId, deviceId);
+ return Behaviors.stopped();
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java b/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java
new file mode 100644
index 0000000..2cfc13a
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/DeviceGroup.java
@@ -0,0 +1,98 @@
+package akkamon.domain.iot;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.PostStop;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DeviceGroup extends AbstractBehavior<DeviceGroup.Command> {
+
+ public interface Command {}
+
+ private class DeviceTerminated implements Command {
+ public final ActorRef<Device.Command> device;
+ public final String groupId;
+ public final String deviceId;
+
+ DeviceTerminated(ActorRef<Device.Command> device, String groupId, String deviceId) {
+ this.device = device;
+ this.groupId = groupId;
+ this.deviceId = deviceId;
+ }
+ }
+
+ public static Behavior<Command> create(String groupId) {
+ return Behaviors.setup(context -> new DeviceGroup(context, groupId));
+ }
+
+ private final String groupId;
+ private final Map<String, ActorRef<Device.Command>> deviceIdToActor = new HashMap<>();
+
+ private DeviceGroup(ActorContext<Command> context, String groupId) {
+ super(context);
+ this.groupId = groupId;
+ context.getLog().info("DeviceGroup {} started", groupId);
+ }
+
+ private DeviceGroup onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
+ if (this.groupId.equals(trackMsg.groupId)) {
+ ActorRef<Device.Command> deviceActor = deviceIdToActor.get(trackMsg.deviceId);
+ if (deviceActor != null) {
+ trackMsg.replyTo.tell(new DeviceManager.DeviceRegistered(deviceActor));
+ } else {
+ getContext().getLog().info("Creating device actor for {}", trackMsg.deviceId);
+ deviceActor =
+ getContext()
+ .spawn(Device.create(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
+ getContext()
+ .watchWith(deviceActor, new DeviceTerminated(deviceActor, groupId, trackMsg.deviceId));
+ deviceIdToActor.put(trackMsg.deviceId, deviceActor);
+ trackMsg.replyTo.tell(new DeviceManager.DeviceRegistered(deviceActor));
+ }
+ } else {
+ getContext()
+ .getLog()
+ .warn(
+ "Ignoring TrackDevice request for {}. This actor is responsible for {}.",
+ groupId,
+ this.groupId);
+ }
+ return this;
+ }
+
+
+ private DeviceGroup onDeviceList(DeviceManager.RequestDeviceList r) {
+ r.replyTo.tell(new DeviceManager.ReplyDeviceList(r.requestId, deviceIdToActor.keySet()));
+ return this;
+ }
+
+ private DeviceGroup onTerminated(DeviceTerminated t) {
+ getContext().getLog().info("Device actor for {} has been terminated", t.deviceId);
+ deviceIdToActor.remove(t.deviceId);
+ return this;
+ }
+
+ @Override
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
+ .onMessage(
+ DeviceManager.RequestDeviceList.class,
+ r -> r.groupId.equals(groupId),
+ this::onDeviceList)
+ .onMessage(DeviceTerminated.class, this::onTerminated)
+ .onSignal(PostStop.class, signal -> onPostStop())
+ .build();
+ }
+
+ private DeviceGroup onPostStop() {
+ getContext().getLog().info("DeviceGroup {} stopped", groupId);
+ return this;
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/DeviceManager.java b/domain/src/main/java/akkamon/domain/iot/DeviceManager.java
new file mode 100644
index 0000000..e445b7e
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/DeviceManager.java
@@ -0,0 +1,126 @@
+package akkamon.domain.iot;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.PostStop;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class DeviceManager extends AbstractBehavior<DeviceManager.Command> {
+
+ public interface Command {}
+
+ public static final class RequestTrackDevice implements DeviceManager.Command, DeviceGroup.Command {
+ public final String groupId;
+ public final String deviceId;
+ public final ActorRef<DeviceRegistered> replyTo;
+
+ public RequestTrackDevice(String groupId, String deviceId, ActorRef<DeviceRegistered> replyTo) {
+ this.groupId = groupId;
+ this.deviceId = deviceId;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class DeviceRegistered {
+ public final ActorRef<Device.Command> device;
+
+ public DeviceRegistered(ActorRef<Device.Command> device) {
+ this.device = device;
+ }
+ }
+
+ public static final class RequestDeviceList implements DeviceManager.Command, DeviceGroup.Command {
+ final long requestId;
+ final String groupId;
+ final ActorRef<ReplyDeviceList> replyTo;
+
+ public RequestDeviceList(long requestId, String groupId, ActorRef<ReplyDeviceList> replyTo) {
+ this.requestId = requestId;
+ this.groupId = groupId;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class ReplyDeviceList {
+ final long requestId;
+ final Set<String> ids;
+
+ public ReplyDeviceList(long requestId, Set<String> ids) {
+ this.requestId = requestId;
+ this.ids = ids;
+ }
+ }
+
+ private static class DeviceGroupTerminated implements DeviceManager.Command {
+ public final String groupId;
+
+ DeviceGroupTerminated(String groupId) {
+ this.groupId = groupId;
+ }
+ }
+
+ public static Behavior<Command> create() {
+ return Behaviors.setup(DeviceManager::new);
+ }
+
+ private final Map<String, ActorRef<DeviceGroup.Command>> groupIdToActor = new HashMap<>();
+
+ private DeviceManager(ActorContext<Command> context) {
+ super(context);
+ context.getLog().info("DeviceManager started");
+ }
+
+ private DeviceManager onTrackDevice(RequestTrackDevice trackMsg) {
+ String groupId = trackMsg.groupId;
+ ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(groupId);
+ if (ref != null) {
+ ref.tell(trackMsg);
+ } else {
+ getContext().getLog().info("Creating device group actor for {}", groupId);
+ ActorRef<DeviceGroup.Command> groupActor =
+ getContext().spawn(DeviceGroup.create(groupId), "group-" + groupId);
+ getContext().watchWith(groupActor, new DeviceGroupTerminated(groupId));
+ groupActor.tell(trackMsg);
+ groupIdToActor.put(groupId, groupActor);
+ }
+ return this;
+ }
+
+ private DeviceManager onRequestDeviceList(RequestDeviceList request) {
+ ActorRef<DeviceGroup.Command> ref = groupIdToActor.get(request.groupId);
+ if (ref != null) {
+ ref.tell(request);
+ } else {
+ request.replyTo.tell(new ReplyDeviceList(request.requestId, Collections.emptySet()));
+ }
+ return this;
+ }
+
+ private DeviceManager onTerminated(DeviceGroupTerminated t) {
+ getContext().getLog().info("Device group actor for {} has been terminated", t.groupId);
+ groupIdToActor.remove(t.groupId);
+ return this;
+ }
+
+ public Receive<Command> createReceive() {
+ return newReceiveBuilder()
+ .onMessage(RequestTrackDevice.class, this::onTrackDevice)
+ .onMessage(RequestDeviceList.class, this::onRequestDeviceList)
+ .onMessage(DeviceGroupTerminated.class, this::onTerminated)
+ .onSignal(PostStop.class, signal -> onPostStop())
+ .build();
+ }
+
+ private DeviceManager onPostStop() {
+ getContext().getLog().info("DeviceManager stopped");
+ return this;
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/IotEntryPoint.java b/domain/src/main/java/akkamon/domain/iot/IotEntryPoint.java
new file mode 100644
index 0000000..e8faf9d
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/IotEntryPoint.java
@@ -0,0 +1,9 @@
+package akkamon.domain.iot;
+
+import akka.actor.typed.ActorSystem;
+
+public class IotEntryPoint {
+ public static void main(String[] args) {
+ ActorSystem.create(IotSupervisor.create(), "iot-system");
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/IotSupervisor.java b/domain/src/main/java/akkamon/domain/iot/IotSupervisor.java
new file mode 100644
index 0000000..dc50929
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/IotSupervisor.java
@@ -0,0 +1,31 @@
+package akkamon.domain.iot;
+
+import akka.actor.typed.Behavior;
+import akka.actor.typed.PostStop;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+public class IotSupervisor extends AbstractBehavior<Void> {
+
+ public static Behavior<Void> create() {
+ return Behaviors.setup(IotSupervisor::new);
+ }
+
+ private IotSupervisor(ActorContext<Void> context) {
+ super(context);
+ context.getLog().info("IoT Application started");
+ }
+
+ // No need to handle any messages
+ @Override
+ public Receive<Void> createReceive() {
+ return newReceiveBuilder().onSignal(PostStop.class, signal -> onPostStop()).build();
+ }
+
+ private IotSupervisor onPostStop() {
+ getContext().getLog().info("IoT Application stopped");
+ return this;
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/refs/Main.java b/domain/src/main/java/akkamon/domain/iot/refs/Main.java
new file mode 100644
index 0000000..923d505
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/refs/Main.java
@@ -0,0 +1,32 @@
+package akkamon.domain.iot.refs;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+class Main extends AbstractBehavior<String> {
+
+ static Behavior<String> create() {
+ return Behaviors.setup(Main::new);
+ }
+
+ private Main(ActorContext<String> context) {
+ super(context);
+ }
+
+ @Override
+ public Receive<String> createReceive() {
+ return newReceiveBuilder().onMessageEquals("start", this::start).build();
+ }
+
+ private Behavior<String> start() {
+ ActorRef<String> firstRef = getContext().spawn(PrintMyActorRefActor.create(), "first-actor");
+
+ System.out.println("First: " + firstRef);
+ firstRef.tell("printit");
+ return Behaviors.same();
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/refs/PrintMyActorRefActor.java b/domain/src/main/java/akkamon/domain/iot/refs/PrintMyActorRefActor.java
new file mode 100644
index 0000000..76b5c42
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/refs/PrintMyActorRefActor.java
@@ -0,0 +1,30 @@
+package akkamon.domain.iot.refs;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+class PrintMyActorRefActor extends AbstractBehavior<String> {
+
+ static Behavior<String> create() {
+ return Behaviors.setup(PrintMyActorRefActor::new);
+ }
+
+ private PrintMyActorRefActor(ActorContext<String> context) {
+ super(context);
+ }
+
+ @Override
+ public Receive<String> createReceive() {
+ return newReceiveBuilder().onMessageEquals("printit", this::printIt).build();
+ }
+
+ private Behavior<String> printIt() {
+ ActorRef<String> secondRef = getContext().spawn(Behaviors.empty(), "second-actor");
+ System.out.println("Second: " + secondRef);
+ return this;
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/startstop/MainStartStop.java b/domain/src/main/java/akkamon/domain/iot/startstop/MainStartStop.java
new file mode 100644
index 0000000..d8f13be
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/startstop/MainStartStop.java
@@ -0,0 +1,31 @@
+package akkamon.domain.iot.startstop;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+public class MainStartStop extends AbstractBehavior<String> {
+
+ static Behavior<String> create() {
+ return Behaviors.setup(MainStartStop::new);
+ }
+
+ private MainStartStop(ActorContext<String> context) {
+ super(context);
+ }
+
+ @Override
+ public Receive<String> createReceive() {
+ return newReceiveBuilder().onMessageEquals("start", this::start).build();
+ }
+
+ private Behavior<String> start() {
+ ActorRef<String> first = getContext().spawn(StartStopActor1.create(), "first");
+ first.tell("stop");
+ return Behaviors.same();
+ }
+
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor1.java b/domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor1.java
new file mode 100644
index 0000000..37c8237
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor1.java
@@ -0,0 +1,35 @@
+package akkamon.domain.iot.startstop;
+
+import akka.actor.typed.Behavior;
+import akka.actor.typed.PostStop;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+class StartStopActor1 extends AbstractBehavior<String> {
+
+ static Behavior<String> create() {
+ return Behaviors.setup(StartStopActor1::new);
+ }
+
+ private StartStopActor1(ActorContext<String> context) {
+ super(context);
+ System.out.println("first started");
+
+ context.spawn(StartStopActor2.create(), "second");
+ }
+
+ @Override
+ public Receive<String> createReceive() {
+ return newReceiveBuilder()
+ .onMessageEquals("stop", Behaviors::stopped)
+ .onSignal(PostStop.class, signal -> onPostStop())
+ .build();
+ }
+
+ private Behavior<String> onPostStop() {
+ System.out.println("first stopped");
+ return this;
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor2.java b/domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor2.java
new file mode 100644
index 0000000..28d01cf
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/startstop/StartStopActor2.java
@@ -0,0 +1,30 @@
+package akkamon.domain.iot.startstop;
+
+import akka.actor.typed.Behavior;
+import akka.actor.typed.PostStop;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+class StartStopActor2 extends AbstractBehavior<String> {
+
+ static Behavior<String> create() {
+ return Behaviors.setup(StartStopActor2::new);
+ }
+
+ private StartStopActor2(ActorContext<String> context) {
+ super(context);
+ System.out.println("second started");
+ }
+
+ @Override
+ public Receive<String> createReceive() {
+ return newReceiveBuilder().onSignal(PostStop.class, signal -> onPostStop()).build();
+ }
+
+ private Behavior<String> onPostStop() {
+ System.out.println("second stopped");
+ return this;
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/supervision/MainSupervize.java b/domain/src/main/java/akkamon/domain/iot/supervision/MainSupervize.java
new file mode 100644
index 0000000..294245f
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/supervision/MainSupervize.java
@@ -0,0 +1,31 @@
+package akkamon.domain.iot.supervision;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+public class MainSupervize extends AbstractBehavior<String> {
+
+ public static Behavior<String> create() {
+ return Behaviors.setup(MainSupervize::new);
+ }
+
+ private MainSupervize(ActorContext<String> context) {
+ super(context);
+ }
+
+ @Override
+ public Receive<String> createReceive() {
+ return newReceiveBuilder().onMessageEquals("start", this::start).build();
+ }
+
+ private Behavior<String> start() {
+ ActorRef<String> supervisingActor = getContext().spawn(SupervisingActor.create(), "supervising-actor");
+ supervisingActor.tell("failChild");
+ return Behaviors.same();
+ }
+
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/supervision/SupervisedActor.java b/domain/src/main/java/akkamon/domain/iot/supervision/SupervisedActor.java
new file mode 100644
index 0000000..a050cfe
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/supervision/SupervisedActor.java
@@ -0,0 +1,45 @@
+package akkamon.domain.iot.supervision;
+
+import akka.actor.typed.Behavior;
+import akka.actor.typed.PostStop;
+import akka.actor.typed.PreRestart;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+class SupervisedActor extends AbstractBehavior<String> {
+
+ static Behavior<String> create() {
+ return Behaviors.setup(SupervisedActor::new);
+ }
+
+ private SupervisedActor(ActorContext<String> context) {
+ super(context);
+ System.out.println("supervised actor started");
+ }
+
+ @Override
+ public Receive<String> createReceive() {
+ return newReceiveBuilder()
+ .onMessageEquals("fail", this::fail)
+ .onSignal(PreRestart.class, signal -> preRestart())
+ .onSignal(PostStop.class, signal -> postStop())
+ .build();
+ }
+
+ private Behavior<String> fail() {
+ System.out.println("supervised actor fails now");
+ throw new RuntimeException("I failed!");
+ }
+
+ private Behavior<String> preRestart() {
+ System.out.println("supervised will be restarted");
+ return this;
+ }
+
+ private Behavior<String> postStop() {
+ System.out.println("supervised stopped");
+ return this;
+ }
+}
diff --git a/domain/src/main/java/akkamon/domain/iot/supervision/SupervisingActor.java b/domain/src/main/java/akkamon/domain/iot/supervision/SupervisingActor.java
new file mode 100644
index 0000000..f1e098b
--- /dev/null
+++ b/domain/src/main/java/akkamon/domain/iot/supervision/SupervisingActor.java
@@ -0,0 +1,36 @@
+package akkamon.domain.iot.supervision;
+
+import akka.actor.typed.SupervisorStrategy;
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+
+class SupervisingActor extends AbstractBehavior<String> {
+
+ static Behavior<String> create() {
+ return Behaviors.setup(SupervisingActor::new);
+ }
+
+ private final ActorRef<String> child;
+
+ private SupervisingActor(ActorContext<String> context) {
+ super(context);
+ child =
+ context.spawn(
+ Behaviors.supervise(SupervisedActor.create()).onFailure(SupervisorStrategy.restart()),
+ "supervised-actor");
+ }
+
+ @Override
+ public Receive<String> createReceive() {
+ return newReceiveBuilder().onMessageEquals("failChild", this::onFailChild).build();
+ }
+
+ private Behavior<String> onFailChild() {
+ child.tell("fail");
+ return this;
+ }
+}
diff --git a/domain/src/test/java/akkamon/domain/AkkamonImplTest.java b/domain/src/test/java/akkamon/domain/AkkamonImplTest.java
index 6570c2a..90c4f44 100644
--- a/domain/src/test/java/akkamon/domain/AkkamonImplTest.java
+++ b/domain/src/test/java/akkamon/domain/AkkamonImplTest.java
@@ -1,23 +1,11 @@
package akkamon.domain;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Nested;
-import org.junit.jupiter.api.Test;
+import org.junit.Test;
-import static org.junit.jupiter.api.Assertions.*;
+import static junit.framework.TestCase.assertNotNull;
class AkkamonImplTest {
- @BeforeEach
- void setUp() {
- }
-
- @AfterEach
- void tearDown() {
- }
-
- @Nested
class getInstance_behaviour {
@Test
void given_there_is_no_instance_yet_when_getInstance_is_called_then_give_class_property_instance() {
diff --git a/domain/src/test/java/akkamon/domain/iot/DeviceTest.java b/domain/src/test/java/akkamon/domain/iot/DeviceTest.java
new file mode 100644
index 0000000..a05499f
--- /dev/null
+++ b/domain/src/test/java/akkamon/domain/iot/DeviceTest.java
@@ -0,0 +1,162 @@
+package akkamon.domain.iot;
+
+import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
+import akka.actor.testkit.typed.javadsl.TestProbe;
+import akka.actor.typed.ActorRef;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class DeviceTest {
+
+ @ClassRule
+ public static final TestKitJunitResource testKit = new TestKitJunitResource();
+
+ @Test
+ public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
+ TestProbe<Device.RespondTemperature> probe = testKit.createTestProbe(Device.RespondTemperature.class);
+
+ ActorRef<Device.Command> deviceActor = testKit.spawn(Device.create("group", "device"));
+ deviceActor.tell(new Device.ReadTemperature(42L, probe.getRef()));
+ Device.RespondTemperature response = probe.receiveMessage();
+ assertEquals(42L, response.requestId);
+ assertEquals(Optional.empty(), response.value);
+
+
+ }
+
+ @Test
+ public void testReplyWithLatestTemperatureReading() {
+ TestProbe<Device.TemperatureRecorded> recordProbe =
+ testKit.createTestProbe(Device.TemperatureRecorded.class);
+
+ TestProbe<Device.RespondTemperature> readProbe =
+ testKit.createTestProbe(Device.RespondTemperature.class);
+
+ ActorRef<Device.Command> deviceActor = testKit.spawn(Device.create("group", "device"));
+
+ deviceActor.tell(new Device.RecordTemperature(1L, 24.0, recordProbe.getRef()));
+ assertEquals(1L, recordProbe.receiveMessage().requestId);
+
+ deviceActor.tell(new Device.ReadTemperature(2L, readProbe.getRef()));
+ Device.RespondTemperature response1 = readProbe.receiveMessage();
+ assertEquals(2L, response1.requestId);
+ assertEquals(Optional.of(24.0), response1.value);
+
+ deviceActor.tell(new Device.RecordTemperature(3L, 55.0, recordProbe.getRef()));
+ assertEquals(3L, recordProbe.receiveMessage().requestId);
+
+ deviceActor.tell(new Device.ReadTemperature(4L, readProbe.getRef()));
+ Device.RespondTemperature response2 = readProbe.receiveMessage();
+ assertEquals(4L, response2.requestId);
+ assertEquals(Optional.of(55.0), response2.value);
+ }
+
+ @Test
+ public void testReplyToRegistrationRequests() {
+ TestProbe<DeviceManager.DeviceRegistered> probe = testKit.createTestProbe(DeviceManager.DeviceRegistered.class);
+ ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
+
+ groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device", probe.getRef()));
+ DeviceManager.DeviceRegistered registered1 = probe.receiveMessage();
+
+ // another deviceId
+ groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3", probe.getRef()));
+ DeviceManager.DeviceRegistered registered2 = probe.receiveMessage();
+ assertNotEquals(registered1.device, registered2.device);
+
+ // Check that the device actors are working
+ TestProbe<Device.TemperatureRecorded> recordProbe =
+ testKit.createTestProbe(Device.TemperatureRecorded.class);
+ registered1.device.tell(new Device.RecordTemperature(0L, 1.0, recordProbe.getRef()));
+ assertEquals(0L, recordProbe.receiveMessage().requestId);
+ registered2.device.tell(new Device.RecordTemperature(1L, 2.0, recordProbe.getRef()));
+ assertEquals(1L, recordProbe.receiveMessage().requestId);
+ }
+
+ @Test
+ public void testIgnoreWrongRegistrationRequests() {
+ TestProbe<DeviceManager.DeviceRegistered> probe = testKit.createTestProbe(DeviceManager.DeviceRegistered.class);
+ ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
+ groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1", probe.getRef()));
+ probe.expectNoMessage();
+ }
+
+
+ @Test
+ public void testReturnSameActorForSameDeviceId() {
+ TestProbe<DeviceManager.DeviceRegistered> probe = testKit.createTestProbe(DeviceManager.DeviceRegistered.class);
+ ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
+
+ groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device", probe.getRef()));
+ DeviceManager.DeviceRegistered registered1 = probe.receiveMessage();
+
+ // registering same again should be idempotent
+ groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device", probe.getRef()));
+ DeviceManager.DeviceRegistered registered2 = probe.receiveMessage();
+ assertEquals(registered1.device, registered2.device);
+ }
+
+ @Test
+public void testListActiveDevices() {
+ TestProbe<DeviceManager.DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceManager.DeviceRegistered.class);
+ ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
+
+ groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1", registeredProbe.getRef()));
+ registeredProbe.receiveMessage();
+
+ groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2", registeredProbe.getRef()));
+ registeredProbe.receiveMessage();
+
+ TestProbe<DeviceManager.ReplyDeviceList> deviceListProbe = testKit.createTestProbe(DeviceManager.ReplyDeviceList.class);
+
+ groupActor.tell(new DeviceManager.RequestDeviceList(0L, "group", deviceListProbe.getRef()));
+ DeviceManager.ReplyDeviceList reply = deviceListProbe.receiveMessage();
+ assertEquals(0L, reply.requestId);
+ Set<String> testIds = Stream.of("device1", "device2").collect(Collectors.toSet());
+ assertEquals(testIds, reply.ids);
+}
+
+@Test
+public void testListActiveDevicesAfterOneShutsDown() {
+ TestProbe<DeviceManager.DeviceRegistered> registeredProbe = testKit.createTestProbe(DeviceManager.DeviceRegistered.class);
+ ActorRef<DeviceGroup.Command> groupActor = testKit.spawn(DeviceGroup.create("group"));
+
+ groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1", registeredProbe.getRef()));
+ DeviceManager.DeviceRegistered registered1 = registeredProbe.receiveMessage();
+
+ groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2", registeredProbe.getRef()));
+ DeviceManager.DeviceRegistered registered2 = registeredProbe.receiveMessage();
+
+ ActorRef<Device.Command> toShutDown = registered1.device;
+
+ TestProbe<DeviceManager.ReplyDeviceList> deviceListProbe = testKit.createTestProbe(DeviceManager.ReplyDeviceList.class);
+
+ groupActor.tell(new DeviceManager.RequestDeviceList(0L, "group", deviceListProbe.getRef()));
+ DeviceManager.ReplyDeviceList reply = deviceListProbe.receiveMessage();
+ assertEquals(0L, reply.requestId);
+ assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
+
+ toShutDown.tell(Device.Passivate.INSTANCE);
+ registeredProbe.expectTerminated(toShutDown, registeredProbe.getRemainingOrDefault());
+
+ // using awaitAssert to retry because it might take longer for the groupActor
+ // to see the Terminated, that order is undefined
+ registeredProbe.awaitAssert(
+ () -> {
+ groupActor.tell(new DeviceManager.RequestDeviceList(1L, "group", deviceListProbe.getRef()));
+ DeviceManager.ReplyDeviceList r = deviceListProbe.receiveMessage();
+ assertEquals(1L, r.requestId);
+ assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids);
+ return null;
+ });
+}
+
+}
diff --git a/settings.gradle b/settings.gradle
index 951c56b..7eb47f8 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -8,6 +8,5 @@
*/
rootProject.name = 'mvink_akkamons'
-include('model', 'api')
-include 'domain'
+include('domain', 'api')