Combinez une liste d'observables et attendez que tout soit terminé

91

TL; DR Comment convertir Task.whenAll(List<Task>)en RxJava?

Mon code existant utilise Bolts pour construire une liste de tâches asynchrones et attend que toutes ces tâches se terminent avant d'effectuer d'autres étapes. Essentiellement, il crée un List<Task>et retourne un seul Taskqui est marqué comme terminé lorsque toutes les tâches de la liste sont terminées, comme dans l' exemple sur le site Bolts .

Je cherche à remplacer Boltspar RxJavaet je suppose que cette méthode consistant à créer une liste de tâches asynchrones (taille non connue à l'avance) et à les regrouper toutes en une seule Observableest possible, mais je ne sais pas comment.

Je l' ai essayé de regarder merge, zip, concatetc ... mais ne peut pas se rendre au travail sur la List<Observable>que je construirai comme tous semblent avoir pour objet de travailler sur seulement deux Observablesà la fois si je comprends bien les documents correctement.

J'essaie d'apprendre RxJavaet je suis encore très nouveau dans ce domaine, alors pardonnez-moi si c'est une question évidente ou expliquée quelque part dans la documentation; J'ai essayé de chercher. Toute aide serait très appréciée.

Craig Russell
la source

Réponses:

73

On dirait que vous recherchez l' opérateur Zip .

Il existe plusieurs façons de l'utiliser, alors regardons un exemple. Disons que nous avons quelques observables simples de différents types:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

La façon la plus simple de tous les attendre est quelque chose comme ceci:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

Notez que dans la fonction zip, les paramètres ont des types concrets qui correspondent aux types des observables compressés.

Zipper une liste d'observables est également possible, soit directement:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

... ou en enveloppant la liste dans un Observable<Observable<?>>:

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

Cependant, dans ces deux cas, la fonction zip ne peut accepter qu'un seul Object[] paramètre car les types des observables dans la liste ne sont pas connus à l'avance ainsi que leur nombre. Cela signifie que la fonction zip devrait vérifier le nombre de paramètres et les convertir en conséquence.

Quoi qu'il en soit, tous les exemples ci-dessus finiront par s'imprimer 1 Blah true

MODIFIER: lorsque vous utilisez Zip, assurez-vous que les éléments Observableszippés émettent le même nombre d'éléments. Dans les exemples ci-dessus, les trois observables ont émis un seul élément. Si nous devions les changer en quelque chose comme ceci:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

Ensuite, 1, Blah, Trueet 2, Hello, Trueseraient les seuls éléments passés dans la (les) fonction (s) zip. L'élément 3ne serait jamais compressé puisque les autres observables sont terminés.

Malt
la source
9
Cela ne fonctionnera pas si l'un des appels échoue. Dans ce cas, tous les appels seront perdus.
StarWind0
1
@ StarWind0, vous pouvez ignorer l'erreur en utilisant onErrorResumeNext, exemple:Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty())
vuhung3990
Et si j'ai 100 observables?
Krzysztof Kubicki
80

Vous pouvez utiliser flatMapsi vous avez une composition de tâches dynamique. Quelque chose comme ça:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplemete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}

Un autre bon exemple d'exécution parallèle

Remarque: je ne connais pas vraiment vos besoins en matière de gestion des erreurs. Par exemple, que faire si une seule tâche échoue. Je pense que vous devriez vérifier ce scénario.

MyDogTom
la source
17
Cela devrait être la réponse acceptée étant donné que la question indique "lorsque toutes les tâches de la liste sont terminées". zipinforme de l'achèvement dès que l'une des tâches est terminée et n'est donc pas applicable.
user3707125
1
@MyDogTom: Pouvez-vous mettre à jour la réponse avec la version de la syntaxe Java7 (pas la version lambda)?
sanedroid
3
@PoojaGaikwad Avec lambda, c'est plus lisible. Remplacez simplement le premier lambda par new Func1<Observable<Boolean>, Observable<Boolean>>()...et le deuxième parnew Func1<List<Boolean>, Boolean>()
MyDogTom
@soshial RxJava 2 est la pire chose qui soit jamais arrivée avec RxJava, ouais
egorikem
15

Parmi les suggestions proposées, zip () combine en fait des résultats observables les uns avec les autres, ce qui peut être ou non ce que l'on souhaite, mais qui n'a pas été posé dans la question. Dans la question, tout ce qui était recherché était l'exécution de chacune des opérations, une par une ou en parallèle (ce qui n'était pas spécifié, mais l'exemple de Bolts lié concernait une exécution parallèle). De plus, zip () se terminera immédiatement lorsque l'une des observables sera terminée, donc c'est en violation des exigences.

Pour l'exécution parallèle d'Observables, flatMap () présenté dans l'autre réponse convient, mais merge () serait plus simple. Notez que la fusion se terminera en cas d'erreur de l'un des observables, si vous reportez plutôt la sortie jusqu'à ce que toutes les observables soient terminées, vous devriez regarder mergeDelayError () .

Pour un par un, je pense que la méthode statique Observable.concat () devrait être utilisée. Son javadoc déclare comme ceci:

concat (java.lang.Iterable> séquences) Aplatit un Iterable d'Observables en un Observable, l'un après l'autre, sans les entrelacer

ce qui ressemble à ce que vous recherchez si vous ne voulez pas d'exécution parallèle.

De plus, si vous n'êtes intéressé que par l'achèvement de votre tâche, et non par les valeurs renvoyées, vous devriez probablement regarder Completable au lieu d' Observable .

TLDR: pour une exécution une par une des tâches et un événement à la fin quand ils sont terminés, je pense que Completable.concat () est le mieux adapté. Pour une exécution parallèle, Completable.merge () ou Completable.mergeDelayError () ressemble à la solution. Le premier s'arrêtera immédiatement sur toute erreur sur n'importe quel complétable, le second les exécutera tous même si l'un d'eux a une erreur, et seulement alors signale l'erreur.

eis
la source
2

Vous avez probablement regardé l' zipopérateur qui fonctionne avec 2 Observables.

Il existe également la méthode statique Observable.zip. Il a un formulaire qui devrait vous être utile:

zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

Vous pouvez consulter le javadoc pour en savoir plus.

LordRaydenMK
la source
2

Avec Kotlin

Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean ->

})

Il est important de définir le type des arguments de la fonction ou vous aurez des erreurs de compilation

Le dernier type d'argument change avec le nombre d'argument: BiFunction pour 2 Function3 pour 3 Function4 pour 4 ...

Kevin ABRIOUX
la source
1

J'écris du code de calcul dans Kotlin avec JavaRx Observables et RxKotlin. Je veux observer une liste d'observables à compléter et en attendant me donner une mise à jour avec les progrès et les derniers résultats. À la fin, il renvoie le meilleur résultat de calcul. Une exigence supplémentaire était d'exécuter Observables en parallèle pour utiliser tous mes cœurs de processeur. J'ai fini avec cette solution:

@Volatile var results: MutableList<CalculationResult> = mutableListOf()

fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {

    return Observable.create { subscriber ->
        Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
            doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
        }).subscribeBy(
            onNext = {
                results.add(it)
                subscriber.onNext(Pair("A calculation is ready", it))

            },
            onComplete = {
                subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) 
                subscriber.onComplete()
            },
            onError = {
                subscriber.onError(it)
            }
        )
    }
}
Sjors
la source
pas familier avec RxKotlin ou @Volatile, mais comment cela fonctionnerait-il si cela est appelé par plusieurs threads en même temps? Qu'arriverait-il aux résultats?
eis
0

J'ai eu un problème similaire, j'avais besoin de récupérer les éléments de recherche de l'appel de repos tout en intégrant également les suggestions enregistrées à partir d'un RecentSearchProvider.AUTHORITY et de les combiner en une liste unifiée. J'essayais d'utiliser la solution @MyDogTom, malheureusement il n'y a pas d'Observable.from dans RxJava. Après quelques recherches, j'ai trouvé une solution qui a fonctionné pour moi.

 fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>>
{
    val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0)
    fetchedItems.add(fetchSearchSuggestions(context,query).toObservable())
    fetchedItems.add(getSearchResults(query).toObservable())

    return Observable.fromArray(fetchedItems)
        .flatMapIterable { data->data }
        .flatMap {task -> task.observeOn(Schedulers.io())}
        .toList()
        .map { ArrayList(it) }
}

J'ai créé un observable à partir du tableau d'observables qui contient des listes de suggestions et de résultats d'Internet en fonction de la requête. Après cela, vous passez simplement en revue ces tâches avec flatMapIterable et les exécutez à l'aide de flatmap, placez les résultats dans un tableau, qui peut être récupéré plus tard dans une vue de recyclage.

Anton Makov
la source
0

Si vous utilisez Project Reactor, vous pouvez utiliser Mono.when.

Mono.when(publisher1, publisher2)
.map(i-> {
    System.out.println("everything is done!");
    return i;
}).block()
Chevreuil
la source