TL; DR
Comment convertir Task.whenAll(List<Task>)
en RxJava
?
Mon code existant utilise Bolts pour construire une liste de tâches asynchrones et attend que toutes ces tâches se terminent avant d'effectuer d'autres étapes. Essentiellement, il crée un List<Task>
et retourne un seul Task
qui est marqué comme terminé lorsque toutes les tâches de la liste sont terminées, comme dans l' exemple sur le site Bolts .
Je cherche à remplacer Bolts
par RxJava
et je suppose que cette méthode consistant à créer une liste de tâches asynchrones (taille non connue à l'avance) et à les regrouper toutes en une seule Observable
est possible, mais je ne sais pas comment.
Je l' ai essayé de regarder merge
, zip
, concat
etc ... mais ne peut pas se rendre au travail sur la List<Observable>
que je construirai comme tous semblent avoir pour objet de travailler sur seulement deux Observables
à la fois si je comprends bien les documents correctement.
J'essaie d'apprendre RxJava
et je suis encore très nouveau dans ce domaine, alors pardonnez-moi si c'est une question évidente ou expliquée quelque part dans la documentation; J'ai essayé de chercher. Toute aide serait très appréciée.
la source
onErrorResumeNext
, exemple:Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty())
Vous pouvez utiliser
flatMap
si vous avez une composition de tâches dynamique. Quelque chose comme ça:public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) { return Observable.from(tasks) //execute in parallel .flatMap(task -> task.observeOn(Schedulers.computation())) //wait, until all task are executed //be aware, all your observable should emit onComplemete event //otherwise you will wait forever .toList() //could implement more intelligent logic. eg. check that everything is successful .map(results -> true); }
Un autre bon exemple d'exécution parallèle
Remarque: je ne connais pas vraiment vos besoins en matière de gestion des erreurs. Par exemple, que faire si une seule tâche échoue. Je pense que vous devriez vérifier ce scénario.
la source
zip
informe de l'achèvement dès que l'une des tâches est terminée et n'est donc pas applicable.new Func1<Observable<Boolean>, Observable<Boolean>>()...
et le deuxième parnew Func1<List<Boolean>, Boolean>()
Parmi les suggestions proposées, zip () combine en fait des résultats observables les uns avec les autres, ce qui peut être ou non ce que l'on souhaite, mais qui n'a pas été posé dans la question. Dans la question, tout ce qui était recherché était l'exécution de chacune des opérations, une par une ou en parallèle (ce qui n'était pas spécifié, mais l'exemple de Bolts lié concernait une exécution parallèle). De plus, zip () se terminera immédiatement lorsque l'une des observables sera terminée, donc c'est en violation des exigences.
Pour l'exécution parallèle d'Observables, flatMap () présenté dans l'autre réponse convient, mais merge () serait plus simple. Notez que la fusion se terminera en cas d'erreur de l'un des observables, si vous reportez plutôt la sortie jusqu'à ce que toutes les observables soient terminées, vous devriez regarder mergeDelayError () .
Pour un par un, je pense que la méthode statique Observable.concat () devrait être utilisée. Son javadoc déclare comme ceci:
ce qui ressemble à ce que vous recherchez si vous ne voulez pas d'exécution parallèle.
De plus, si vous n'êtes intéressé que par l'achèvement de votre tâche, et non par les valeurs renvoyées, vous devriez probablement regarder Completable au lieu d' Observable .
TLDR: pour une exécution une par une des tâches et un événement à la fin quand ils sont terminés, je pense que Completable.concat () est le mieux adapté. Pour une exécution parallèle, Completable.merge () ou Completable.mergeDelayError () ressemble à la solution. Le premier s'arrêtera immédiatement sur toute erreur sur n'importe quel complétable, le second les exécutera tous même si l'un d'eux a une erreur, et seulement alors signale l'erreur.
la source
Vous avez probablement regardé l'
zip
opérateur qui fonctionne avec 2 Observables.Il existe également la méthode statique
Observable.zip
. Il a un formulaire qui devrait vous être utile:Vous pouvez consulter le javadoc pour en savoir plus.
la source
Avec Kotlin
Il est important de définir le type des arguments de la fonction ou vous aurez des erreurs de compilation
Le dernier type d'argument change avec le nombre d'argument: BiFunction pour 2 Function3 pour 3 Function4 pour 4 ...
la source
J'écris du code de calcul dans Kotlin avec JavaRx Observables et RxKotlin. Je veux observer une liste d'observables à compléter et en attendant me donner une mise à jour avec les progrès et les derniers résultats. À la fin, il renvoie le meilleur résultat de calcul. Une exigence supplémentaire était d'exécuter Observables en parallèle pour utiliser tous mes cœurs de processeur. J'ai fini avec cette solution:
@Volatile var results: MutableList<CalculationResult> = mutableListOf() fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> { return Observable.create { subscriber -> Observable.concatEager(listOfCalculations.map { calculation: Calculation -> doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result }).subscribeBy( onNext = { results.add(it) subscriber.onNext(Pair("A calculation is ready", it)) }, onComplete = { subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) subscriber.onComplete() }, onError = { subscriber.onError(it) } ) } }
la source
@Volatile
, mais comment cela fonctionnerait-il si cela est appelé par plusieurs threads en même temps? Qu'arriverait-il aux résultats?J'ai eu un problème similaire, j'avais besoin de récupérer les éléments de recherche de l'appel de repos tout en intégrant également les suggestions enregistrées à partir d'un RecentSearchProvider.AUTHORITY et de les combiner en une liste unifiée. J'essayais d'utiliser la solution @MyDogTom, malheureusement il n'y a pas d'Observable.from dans RxJava. Après quelques recherches, j'ai trouvé une solution qui a fonctionné pour moi.
fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>> { val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0) fetchedItems.add(fetchSearchSuggestions(context,query).toObservable()) fetchedItems.add(getSearchResults(query).toObservable()) return Observable.fromArray(fetchedItems) .flatMapIterable { data->data } .flatMap {task -> task.observeOn(Schedulers.io())} .toList() .map { ArrayList(it) } }
J'ai créé un observable à partir du tableau d'observables qui contient des listes de suggestions et de résultats d'Internet en fonction de la requête. Après cela, vous passez simplement en revue ces tâches avec flatMapIterable et les exécutez à l'aide de flatmap, placez les résultats dans un tableau, qui peut être récupéré plus tard dans une vue de recyclage.
la source
Si vous utilisez Project Reactor, vous pouvez utiliser
Mono.when
.Mono.when(publisher1, publisher2) .map(i-> { System.out.println("everything is done!"); return i; }).block()
la source