Supposons que j'ai plusieurs futurs et que j'attende que l' un ou l' autre échoue ou que tous réussissent.
Par exemple: Soit il y a 3 à terme: f1
, f2
, f3
.
Si
f1
réussit etf2
échoue, je n'attends pasf3
(et je renvoie l' échec au client).Si
f2
échoue pendantf1
etf3
est toujours en cours d'exécution, je ne les attend pas (et je renvoie l' échec )Si
f1
réussit, puisf2
réussit, je continue d'attendref3
.
Comment le mettriez-vous en œuvre?
scala
concurrency
future
Michael
la source
la source
Réponses:
Vous pouvez utiliser un for-compréhension comme suit à la place:
val fut1 = Future{...} val fut2 = Future{...} val fut3 = Future{...} val aggFut = for{ f1Result <- fut1 f2Result <- fut2 f3Result <- fut3 } yield (f1Result, f2Result, f3Result)
Dans cet exemple, les futurs 1, 2 et 3 sont lancés en parallèle. Ensuite, dans la pour la compréhension, nous attendons que les résultats 1 puis 2 puis 3 soient disponibles. Si 1 ou 2 échoue, nous n'attendrons plus 3. Si les 3 réussissent, alors le
aggFut
val contiendra un tuple avec 3 slots, correspondant aux résultats des 3 futures.Maintenant, si vous avez besoin du comportement où vous voulez arrêter d'attendre si, disons, fut2 échoue en premier, les choses deviennent un peu plus délicates. Dans l'exemple ci-dessus, vous devrez attendre que fut1 se termine avant de réaliser l'échec de fut2. Pour résoudre ce problème, vous pouvez essayer quelque chose comme ceci:
val fut1 = Future{Thread.sleep(3000);1} val fut2 = Promise.failed(new RuntimeException("boo")).future val fut3 = Future{Thread.sleep(1000);3} def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = { val fut = if (futures.size == 1) futures.head._2 else Future.firstCompletedOf(futures.values) fut onComplete{ case Success(value) if (futures.size == 1)=> prom.success(value :: values) case Success(value) => processFutures(futures - value, value :: values, prom) case Failure(ex) => prom.failure(ex) } prom.future } val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]()) aggFut onComplete{ case value => println(value) }
Maintenant, cela fonctionne correctement, mais le problème vient du fait de savoir lequel
Future
supprimer deMap
celui qui a été terminé avec succès. Tant que vous avez un moyen de corréler correctement un résultat avec le futur qui a engendré ce résultat, alors quelque chose comme ça fonctionne. Il continue de supprimer récursivement les contrats à terme terminés de la carte, puis d'appelerFuture.firstCompletedOf
les autresFutures
jusqu'à ce qu'il n'en reste plus, collectant les résultats en cours de route. Ce n'est pas joli, mais si vous avez vraiment besoin du comportement dont vous parlez, alors ceci ou quelque chose de similaire pourrait fonctionner.la source
fut2
échoue avantfut1
? Attendrons-nous encorefut1
dans ce cas? Si nous le voulons, ce n'est pas exactement ce que je veux.onFailure
gestionnaire pourfut2
échouer rapide, etonSuccess
suraggFut
la poignée du succès. Un succès suraggFut
impliquefut2
s'est terminé avec succès, donc vous n'avez appelé qu'un seul des gestionnaires.Vous pouvez utiliser une promesse et lui envoyer soit le premier échec, soit le succès agrégé final terminé:
def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { val p = Promise[M[A]]() // the first Future to fail completes the promise in.foreach(_.onFailure{case i => p.tryFailure(i)}) // if the whole sequence succeeds (i.e. no failures) // then the promise is completed with the aggregated success Future.sequence(in).foreach(p trySuccess _) p.future }
Ensuite, vous pouvez
Await
sur ce résultatFuture
si vous voulez bloquer, ou simplementmap
en quelque chose d'autre.La différence avec pour la compréhension est qu'ici vous obtenez l'erreur du premier à échouer, alors qu'avec pour la compréhension, vous obtenez la première erreur dans l'ordre de parcours de la collection d'entrée (même si une autre a échoué en premier). Par exemple:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)} // this waits one second, then prints "java.lang.ArithmeticException: / by zero" // the first to fail in traversal order
Et:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 } val f2 = Future { 5 } val f3 = Future { None.get } sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)} // this immediately prints "java.util.NoSuchElementException: None.get" // the 'actual' first to fail (usually...) // and it returns early (it does not wait 1 sec)
la source
Voici une solution sans utiliser d'acteurs.
import scala.util._ import scala.concurrent._ import java.util.concurrent.atomic.AtomicInteger // Nondeterministic. // If any failure, return it immediately, else return the final success. def allSucceed[T](fs: Future[T]*): Future[T] = { val remaining = new AtomicInteger(fs.length) val p = promise[T] fs foreach { _ onComplete { case s @ Success(_) => { if (remaining.decrementAndGet() == 0) { // Arbitrarily return the final success p tryComplete s } } case f @ Failure(_) => { p tryComplete f } } } p.future }
la source
Vous pouvez le faire uniquement avec les futurs. Voici une implémentation. Notez qu'il ne mettra pas fin à l'exécution prématurément! Dans ce cas, vous devez faire quelque chose de plus sophistiqué (et probablement implémenter l'interruption vous-même). Mais si vous ne voulez tout simplement pas continuer à attendre quelque chose qui ne fonctionnera pas, la clé est de continuer à attendre que la première chose se termine, et de vous arrêter quand il ne reste plus rien ou que vous rencontrez une exception:
import scala.annotation.tailrec import scala.util.{Try, Success, Failure} import scala.concurrent._ import scala.concurrent.duration.Duration import ExecutionContext.Implicits.global @tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): Either[Throwable, Seq[A]] = { val first = Future.firstCompletedOf(fs) Await.ready(first, Duration.Inf).value match { case None => awaitSuccess(fs, done) // Shouldn't happen! case Some(Failure(e)) => Left(e) case Some(Success(_)) => val (complete, running) = fs.partition(_.isCompleted) val answers = complete.flatMap(_.value) answers.find(_.isFailure) match { case Some(Failure(e)) => Left(e) case _ => if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done) else Right( answers.map(_.get) ++: done ) } } }
Voici un exemple de celui-ci en action lorsque tout fonctionne correctement:
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); println("Fancy meeting you here!") }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! Fancy meeting you here! Bye! res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
Mais quand quelque chose ne va pas:
scala> awaitSuccess(Seq(Future{ println("Hi!") }, Future{ Thread.sleep(1000); throw new Exception("boo"); () }, Future{ Thread.sleep(2000); println("Bye!") } )) Hi! res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo) scala> Bye!
la source
Pour cela, j'utiliserais un acteur Akka. Contrairement à la pour-compréhension, elle échoue dès que l'un des futurs échoue, donc c'est un peu plus efficace dans ce sens.
class ResultCombiner(futs: Future[_]*) extends Actor { var origSender: ActorRef = null var futsRemaining: Set[Future[_]] = futs.toSet override def receive = { case () => origSender = sender for(f <- futs) f.onComplete(result => self ! if(result.isSuccess) f else false) case false => origSender ! SomethingFailed case f: Future[_] => futsRemaining -= f if(futsRemaining.isEmpty) origSender ! EverythingSucceeded } } sealed trait Result case object SomethingFailed extends Result case object EverythingSucceeded extends Result
Ensuite, créez l'acteur, envoyez-lui un message (pour qu'il sache à qui envoyer sa réponse) et attendez une réponse.
val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3))) try { val f4: Future[Result] = actor ? () implicit val timeout = new Timeout(30 seconds) // or whatever Await.result(f4, timeout.duration).asInstanceOf[Result] match { case SomethingFailed => println("Oh noes!") case EverythingSucceeded => println("It all worked!") } } finally { // Avoid memory leaks: destroy the actor actor ! PoisonPill }
la source
Cette question a été répondue mais je publie ma solution de classe de valeur (des classes de valeur ont été ajoutées en 2.10) car il n'y en a pas ici. N'hésitez pas à critiquer.
implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal { def concurrently = ConcurrentFuture(self) } case class ConcurrentFuture[A](future: Future[A]) extends AnyVal { def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future)) def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class } def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = { val p = Promise[B]() val inner = f(outer.future) inner.future onFailure { case t => p.tryFailure(t) } outer.future onFailure { case t => p.tryFailure(t) } inner.future onSuccess { case b => p.trySuccess(b) } ConcurrentFuture(p.future) }
ConcurrentFuture est un wrapper Future sans surcoût qui change la Future map / flatMap par défaut de do-this-then-that en combine-all-and-fail-if-any-fail. Usage:
def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 } def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" } def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 } val f : Future[(Int,String,Double)] = { for { f1 <- func1.concurrently f2 <- func2.concurrently f3 <- func3.concurrently } yield for { v1 <- f1 v2 <- f2 v3 <- f3 } yield (v1,v2,v3) }.future f.onFailure { case t => println("future failed $t") }
Dans l'exemple ci-dessus, f1, f2 et f3 s'exécuteront simultanément et en cas d'échec dans n'importe quel ordre, l'avenir du tuple échouera immédiatement.
la source
Vous voudrez peut-être consulter l'API Future de Twitter. Notamment la méthode Future.collect. Il fait exactement ce que vous voulez: https://twitter.github.io/scala_school/finagle.html
Le code source Future.scala est disponible ici: https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala
la source
Vous pouvez utiliser ceci:
val l = List(1, 6, 8) val f = l.map{ i => future { println("future " +i) Thread.sleep(i* 1000) if (i == 12) throw new Exception("6 is not legal.") i } } val f1 = Future.sequence(f) f1 onSuccess{ case l => { logInfo("onSuccess") l.foreach(i => { logInfo("h : " + i) }) } } f1 onFailure{ case l => { logInfo("onFailure") }
la source