Ich versuche, ein Diagramm innerhalb eines Schauspielers zu materialisieren. Dies scheint zu funktionieren, wenn eine der folgenden Bedingungen erfüllt sind:Materialisieren eines Graphen innerhalb eines Schauspielers
- Der Graph enthält keine Broadcast (erstellt mit
alsoTo
) oder - Das gleiche
ActorMaterializer
für jeden Materialisierung verwendet wird, oder - Der Graph außerhalb eines
Actor
materialisiert ich es reduziert den folgenden Testfälle nach unten:
import java.util.concurrent.{CountDownLatch, TimeUnit}
import akka.NotUsed
import akka.actor.{Actor, ActorSystem}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{RunnableGraph, Sink, Source}
import akka.testkit.{TestActorRef, TestKit}
import org.scalatest.{FlatSpecLike, Matchers}
class ActorFlowTest extends TestKit(ActorSystem("ActorFlowTest")) with Matchers with FlatSpecLike {
def createGraph(withBroadcast: Boolean) = {
if (withBroadcast) Source.empty.alsoTo(Sink.ignore).to(Sink.ignore)
else Source.empty.to(Sink.ignore)
}
case object Bomb
class FlowActor(
graph: RunnableGraph[NotUsed],
latch: CountDownLatch,
materializer: (ActorSystem) => ActorMaterializer
) extends Actor {
override def preStart(): Unit = {
graph.run()(materializer(context.system))
latch.countDown()
}
override def receive: Receive = {
case Bomb => throw new RuntimeException
}
}
"Without an actor" should "be able to materialize twice" in {
val graph = Source.empty.alsoTo(Sink.ignore).to(Sink.ignore)
val materializer1 = ActorMaterializer()(system)
val materializer2 = ActorMaterializer()(system)
graph.run()(materializer1)
graph.run()(materializer2) // Pass
}
"With a the same materializer" should "be able to materialize twice" in {
val graph = createGraph(withBroadcast = true)
val latch = new CountDownLatch(2)
val materializer = ActorMaterializer()(system)
val actorRef = TestActorRef(new FlowActor(graph, latch, _ => materializer))
verify(actorRef, latch) should be(true) // Pass
}
"With a new materializer but no broadcast" should "be able to materialize twice" in {
val graph = createGraph(withBroadcast = false)
val latch = new CountDownLatch(2)
def materializer(system: ActorSystem) = ActorMaterializer()(system)
val actorRef = TestActorRef(new FlowActor(graph, latch, materializer))
verify(actorRef, latch) should be(true) // Pass
}
"With a new materializer and a broadcast" should "be able to materialize twice" in {
val graph = createGraph(withBroadcast = true)
val latch = new CountDownLatch(2)
def materializer(system: ActorSystem) = ActorMaterializer()(system)
val actorRef = TestActorRef(new FlowActor(graph, latch, materializer))
verify(actorRef, latch) should be(true) // Fail
}
def verify(actorRef: TestActorRef[_], latch: CountDownLatch): Boolean = {
actorRef.start()
actorRef ! Bomb
latch.await(25, TimeUnit.SECONDS)
}
}
Es scheint, dass die letzten Fälle mit dem folgenden Fehler immer in der Log-Timeout werden:
[ERROR] [07/05/2016 16:06:30.625] [ActorFlowTest-akka.actor.default-dispatcher-6] [akka://ActorFlowTest/user/$$c] Futures timed out after [20000 milliseconds]
akka.actor.PostRestartException: akka://ActorFlowTest/user/$$c: exception post restart (class java.lang.RuntimeException)
at akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:250)
at akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:248)
at akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:303)
at akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:298)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:248)
at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:76)
at akka.actor.ActorCell.faultRecreate(ActorCell.scala:374)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:464)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.testkit.CallingThreadDispatcher.process$1(CallingThreadDispatcher.scala:243)
at akka.testkit.CallingThreadDispatcher.runQueue(CallingThreadDispatcher.scala:283)
at akka.testkit.CallingThreadDispatcher.systemDispatch(CallingThreadDispatcher.scala:191)
at akka.actor.dungeon.Dispatch$class.restart(Dispatch.scala:119)
at akka.actor.ActorCell.restart(ActorCell.scala:374)
at akka.actor.LocalActorRef.restart(ActorRef.scala:406)
at akka.actor.SupervisorStrategy.restartChild(FaultHandling.scala:365)
at akka.actor.OneForOneStrategy.processFailure(FaultHandling.scala:518)
at akka.actor.SupervisorStrategy.handleFailure(FaultHandling.scala:303)
at akka.actor.dungeon.FaultHandling$class.handleFailure(FaultHandling.scala:263)
at akka.actor.ActorCell.handleFailure(ActorCell.scala:374)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:167)
at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:165)
at scala.concurrent.Await$.result(package.scala:190)
at akka.stream.impl.ActorMaterializerImpl.actorOf(ActorMaterializerImpl.scala:207)
at akka.stream.impl.ActorMaterializerImpl$$anon$2.matGraph(ActorMaterializerImpl.scala:166)
at akka.stream.impl.ActorMaterializerImpl$$anon$2.materializeAtomic(ActorMaterializerImpl.scala:150)
at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:919)
at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:915)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
at akka.stream.impl.MaterializerSession.materializeModule(StreamLayout.scala:915)
at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:922)
at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:915)
at scala.collection.immutable.Set$Set4.foreach(Set.scala:200)
at akka.stream.impl.MaterializerSession.materializeModule(StreamLayout.scala:915)
at akka.stream.impl.MaterializerSession.materialize(StreamLayout.scala:882)
at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:182)
at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:80)
at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:351)
at ActorFlowTest$FlowActor.preStart(ActorFlowTest.scala:40)
at akka.actor.Actor$class.postRestart(Actor.scala:566)
at ActorFlowTest$FlowActor.postRestart(ActorFlowTest.scala:33)
at akka.actor.Actor$class.aroundPostRestart(Actor.scala:504)
at ActorFlowTest$FlowActor.aroundPostRestart(ActorFlowTest.scala:33)
at akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:239)
... 25 more
ich ausdrücklich die ActorMaterializers
beendet versucht habe, aber dass das Problem nicht reproduzieren.
Eine Abhilfe ist, einen Verschluss um die ActorMaterializer
im Props
zu schaffen, aber wenn dies auch von einer anderen Actor
kam ich mache mir Sorgen, ich schließlich ähnliche Probleme bekommen.
Irgendeine Idee, warum das ist? Offensichtlich hat es etwas mit der ActorMaterializer
zu tun, aber interessant, wie das Entfernen der Sendung es auch löst (sogar mit einem viel komplizierteren Graphen).
Können Sie versuchen, postRestart zu überschreiben und zu validieren, was dort passiert (prüfen Sie eingehende Ausnahme und sehen, ob der Code es erreicht)? Vergessen Sie auch nicht, preStart aufzurufen, wenn Sie postRestart überschreiben. – thwiegan
Die 'RuntimeException', die in den' receive' geworfen wird, ist der 'Grund' in' postRestart'. Die 'TimeoutException' wird ausgelöst, wenn' postRestart' 'preStart' aufruft und' graph.run' aufgerufen wird. – Steiny