Différence entre CompletableFuture, Future et Observable de RxJava

206

J'aimerais connaître la différence entre CompletableFuture, Futureet Observable RxJava.

Ce que je sais, c'est que tous sont asynchrones mais

Future.get() bloque le fil

CompletableFuture donne les méthodes de rappel

RxJava Observable--- similaire à CompletableFutured'autres avantages (pas sûr)

Par exemple: si le client a besoin de faire plusieurs appels de service et lorsque nous utilisons Futures(Java) Future.get()sera exécuté séquentiellement ... aimerait savoir comment c'est mieux dans RxJava ..

Et la documentation http://reactivex.io/intro.html dit

Il est difficile d'utiliser Futures pour composer de manière optimale des flux d'exécution asynchrones conditionnels (voire impossible, car les latences de chaque requête varient à l'exécution). Cela peut être fait, bien sûr, mais cela devient rapidement compliqué (et donc sujet aux erreurs) ou bloque prématurément sur Future.get (), ce qui élimine l'avantage de l'exécution asynchrone.

Vraiment intéressé de savoir comment RxJavarésout ce problème. J'ai eu du mal à comprendre à partir de la documentation.

shiv455
la source
Avez-vous lu la documentation de chacun? Je ne connais pas du tout RxJava, mais la documentation semble extrêmement complète en un coup d'œil. Cela ne semble pas particulièrement comparable aux deux futurs.
FThompson
J'ai traversé mais je ne suis pas en mesure de comprendre à quel point c'est différent de l'avenir de Java ... corrigez-moi si je me trompe
shiv455
En quoi les observables sont-ils similaires aux futurs?
FThompson
2
aimerait savoir en quoi il est différent comme est-ce différent dans la gestion des threads ?? EX: Future.get () bloque le thread .... comment il sera géré dans Observable ???
shiv455
2
au moins, c'est un peu déroutant pour moi ... une différence de niveau élevé serait vraiment utile !!
shiv455

Réponses:

297

Futures

Les contrats à terme ont été introduits dans Java 5 (2004). Ce sont essentiellement des espaces réservés pour le résultat d'une opération qui n'est pas encore terminée. Une fois l'opération terminée, le Futurecontiendra ce résultat. Par exemple, une opération peut être une instance exécutable ou appelable soumise à un ExecutorService . L'émetteur de l'opération peut utiliser l' Futureobjet pour vérifier si l'opération est terminée () ou attendre qu'elle se termine en utilisant la méthode bloquante get () .

Exemple:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures

CompletableFutures a été introduit dans Java 8 (2014). Ils sont en fait une évolution des Futures classiques, inspirés des Listenable Futures de Google , qui fait partie de la bibliothèque Guava . Ce sont des Futures qui vous permettent également d'enchaîner des tâches dans une chaîne. Vous pouvez les utiliser pour dire à un thread de travail de "faire une tâche X, et quand vous avez terminé, faites cette autre chose en utilisant le résultat de X". En utilisant CompletableFutures, vous pouvez faire quelque chose avec le résultat de l'opération sans réellement bloquer un thread pour attendre le résultat. Voici un exemple simple:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava est une bibliothèque complète pour la programmation réactive créée chez Netflix. En un coup d'œil, il semblera similaire aux flux de Java 8 . C'est le cas, sauf que c'est beaucoup plus puissant.

De la même manière que Futures, RxJava peut être utilisé pour enchaîner un ensemble d'actions synchrones ou asynchrones pour créer un pipeline de traitement. Contrairement aux Futures, qui sont à usage unique, RxJava fonctionne sur des flux de zéro ou plusieurs éléments. Y compris des flux sans fin avec un nombre infini d'éléments. Il est également beaucoup plus flexible et puissant grâce à un ensemble incroyablement riche d'opérateurs .

Contrairement aux flux de Java 8, RxJava dispose également d'un mécanisme de contre- pression , qui lui permet de gérer les cas dans lesquels différentes parties de votre pipeline de traitement fonctionnent dans différents threads, à des taux différents .

L'inconvénient de RxJava est que malgré la documentation solide, c'est une bibliothèque difficile à apprendre en raison du changement de paradigme impliqué. Le code Rx peut également être un cauchemar à déboguer, en particulier si plusieurs threads sont impliqués, et pire encore, si une contre-pression est nécessaire.

Si vous voulez y entrer, il y a toute une page de divers tutoriels sur le site officiel, ainsi que la documentation officielle et Javadoc . Vous pouvez également regarder certaines des vidéos telles que celle-ci qui donne une brève introduction à Rx et parle également des différences entre Rx et Futures.

Bonus: flux réactifs Java 9

Les flux réactifs de Java 9, également appelés API Flow, sont un ensemble d'interfaces implémentées par diverses bibliothèques de flux réactifs telles que RxJava 2 , Akka Streams et Vertx . Ils permettent à ces bibliothèques réactives de s'interconnecter, tout en préservant la contre-pression importante.

Malt
la source
Ce serait bien de donner un exemple de code de la façon dont Rx fait cela
Zinan Xing
Donc, en utilisant Reactive Streams, nous pouvons mélanger RxJava, Akka et Vertx dans une seule application?
IgorGanapolsky
1
@IgorGanapolsky Oui.
Malt du
Dans CompletableFutures, nous utilisons des méthodes de rappel, ces méthodes de rappel seront également bloquées si la sortie d'une méthode est entrée d'un autre rappel. En tant que futur bloc avec l'appel Future.get (). Pourquoi on dit que Future.get () bloque l'appel alors que CompletableFutures ne bloque pas. Veuillez expliquer
Deepak
2
@Federico Bien sûr. Chacun Futureest un espace réservé pour un résultat unique qui peut ou peut ne pas être encore terminé. Si vous effectuez à nouveau la même opération, vous obtiendrez une nouvelle Futureinstance. RxJava traite des flux de résultats qui peuvent arriver à tout moment. Une série d'opérations peut donc renvoyer une seule observable RxJava qui pompera un tas de résultats. C'est un peu comme la différence entre une seule enveloppe postale et un tube pneumatique qui continue de pomper le courrier.
Malt du
22

Je travaille avec Rx Java depuis 0.9, maintenant à 1.3.2 et bientôt migrant vers 2.x Je l'utilise dans un projet privé sur lequel je travaille déjà depuis 8 ans.

Je ne programmerais plus du tout sans cette bibliothèque. Au début, j'étais sceptique mais c'est un autre état d'esprit qu'il faut créer. Quiete difficile au début. Je regardais parfois les billes pendant des heures .. lol

C'est juste une question de pratique et de vraiment connaître le flux (alias contrat d'observables et d'observateurs), une fois que vous y êtes, vous détesterez le faire autrement.

Pour moi, il n'y a pas vraiment d'inconvénient à cette bibliothèque.

Cas d'utilisation: j'ai une vue moniteur qui contient 9 jauges (cpu, mem, réseau, etc ...). Lors du démarrage de la vue, la vue s'abonne à une classe de moniteur système qui renvoie une observable (intervalle) contenant toutes les données des 9 mètres. Il poussera chaque seconde un nouveau résultat vers la vue (donc pas d'interrogation !!!). Cet observable utilise une flatmap pour récupérer simultanément (async!) Des données de 9 sources différentes et zippe le résultat dans un nouveau modèle que votre vue obtiendra sur le onNext ().

Comment diable allez-vous faire ça avec les futurs, les complétables etc ... Bonne chance! :)

Rx Java résout de nombreux problèmes de programmation pour moi et rend d'une manière beaucoup plus facile ...

Avantages:

  • Statelss !!! (chose importante à mentionner, la plus importante peut-être)
  • Gestion des threads prête à l'emploi
  • Créez des séquences qui ont leur propre cycle de vie
  • Tout est observable, donc l'enchaînement est facile
  • Moins de code à écrire
  • Pot unique sur classpath (très léger)
  • Très simultané
  • Plus d'enfer de rappel
  • Abonné (contrat serré entre le consommateur et le producteur)
  • Stratégies de contre-pression (disjoncteur, etc.)
  • Splendide gestion des erreurs et récupération
  • Très belle documentation (billes <3)
  • Contrôle complet
  • Beaucoup plus ...

Inconvénients: - Difficile à tester

Kristoffer
la source
16
~ " Je ne programmerais plus du tout sans cette bibliothèque. " RxJava est donc la solution ultime pour tous les projets logiciels?
IgorGanapolsky
Est-ce utile même si je n'ai pas de flux d'événements asynchrones?
Mukesh Verma
3

Le principal avantage de CompletableFuture par rapport à Future normal est que CompletableFuture tire parti de l'API de flux extrêmement puissante et vous donne des gestionnaires de rappel pour enchaîner vos tâches, ce qui est absolument absent si vous utilisez Future normal. En plus de fournir une architecture asynchrone, CompletableFuture est la voie à suivre pour gérer les tâches de calcul lourdes de réduction de carte, sans trop se soucier des performances des applications.

asmitB
la source
1

Les trois interfaces servent à transférer les valeurs du producteur au consommateur. Les consommateurs peuvent être de 2 types:

  • synchrone: le consommateur effectue un appel de blocage qui retourne lorsque la valeur est prête
  • asynchrone: lorsque la valeur est prête, une méthode de rappel du consommateur est appelée

En outre, les interfaces de communication diffèrent d'autres manières:

  • capable de transférer une seule valeur de plusieurs valeurs
  • si plusieurs valeurs, la contre-pression peut être prise en charge ou non

Par conséquent:

  • Future transfère une valeur unique à l'aide d'une interface synchrone

  • CompletableFuture transfère une valeur unique à l'aide d'interfaces synchrones et asynchrones

  • Rx transfère plusieurs valeurs à l'aide d'une interface asynchrone avec contre-pression

De plus, toutes ces fonctionnalités de communication prennent en charge le transfert des exceptions. Ce n'est pas toujours le cas. Par exemple, BlockingQueuenon.

Alexei Kaigorodov
la source