Pouvez-vous diviser un flux en deux flux?

146

J'ai un ensemble de données représenté par un flux Java 8:

Stream<T> stream = ...;

Je peux voir comment le filtrer pour obtenir un sous-ensemble aléatoire - par exemple

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();   
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

Je peux également voir comment je pourrais réduire ce flux pour obtenir, par exemple, deux listes représentant deux moitiés aléatoires de l'ensemble de données, puis les transformer en flux. Mais existe-t-il un moyen direct de générer deux flux à partir du flux initial? Quelque chose comme

(heads, tails) = stream.[some kind of split based on filter]

Merci pour tout aperçu.

user1148758
la source
La réponse de Mark est beaucoup plus utile que la réponse de Louis, mais je dois dire que celle de Louis est plus liée à la question initiale. La question est plutôt centrée sur la possibilité de convertir Streamen plusieurs Streams sans conversion intermédiaire , même si je pense que les personnes qui ont atteint cette question cherchent en fait le moyen d'y parvenir indépendamment de cette contrainte, qui est la réponse de Mark. Cela peut être dû au fait que la question du titre n'est pas la même que celle de la description .
devildelta le

Réponses:

9

Pas exactement. Tu ne peux pas en avoir deuxStream sur un; cela n'a pas de sens - comment feriez-vous une itération sur l'un sans avoir besoin de générer l'autre en même temps? Un flux ne peut être exploité qu'une seule fois.

Cependant, si vous voulez les vider dans une liste ou quelque chose, vous pouvez faire

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));
Louis Wasserman
la source
65
Pourquoi cela n'a-t-il pas de sens? Puisqu'un flux est un pipeline, il n'y a aucune raison pour qu'il ne puisse pas créer deux producteurs du flux d'origine, je pouvais voir que cela était géré par un collecteur qui fournit deux flux.
Brett Ryan
36
Pas de thread safe. Mauvais conseil d'essayer d'ajouter directement à une collection, c'est pourquoi nous avons le stream.collect(...)for avec thread-safe prédéfini Collectors, qui fonctionne bien même sur des collections non thread-safe (sans conflit de verrouillage synchronisé). Meilleure réponse par @MarkJeronimus.
YoYo
1
@JoD Il est thread-safe si les têtes et les queues sont thread-safe. De plus, en supposant l'utilisation de flux non parallèles, seul l'ordre n'est pas garanti, ils sont donc thread-safe. C'est au programmeur de résoudre les problèmes de concurrence, donc cette réponse est parfaitement adaptée si les collections sont thread-safe.
Nicolas
1
@Nixon cela ne convient pas en présence d'une meilleure solution, que nous avons ici. Avoir un tel code peut conduire à de mauvais précédents, poussant les autres à l'utiliser d'une mauvaise manière. Même si aucun flux parallèle n'est utilisé, ce n'est qu'à un pas. Les bonnes pratiques de codage nous obligent à ne pas maintenir l'état pendant les opérations de flux. La prochaine chose que nous faisons est de coder dans un cadre comme Apache Spark, et les mêmes pratiques conduiraient vraiment à des résultats inattendus. C'était une solution créative, je donne ça, une solution que j'aurais peut-être écrite moi-même il n'y a pas si longtemps.
YoYo
1
@JoD Ce n'est pas une meilleure solution, c'est en fait plus inefficace. Cette ligne de pensée aboutit finalement à la conclusion que toutes les collections devraient être thread-safe par défaut pour éviter les conséquences involontaires, ce qui est tout simplement faux.
Nicolas
301

Un collecteur peut être utilisé pour cela.

  • Pour deux catégories, utilisez l' Collectors.partitioningBy()usine.

Cela créera un Mapde Booleanà Listet placera les éléments dans l'une ou l'autre liste basée sur un Predicate.

Remarque: puisque le flux doit être consommé dans son intégralité, cela ne peut pas fonctionner sur des flux infinis. Et comme le flux est de toute façon consommé, cette méthode les place simplement dans des listes au lieu de créer un nouveau flux avec mémoire. Vous pouvez toujours diffuser ces listes si vous avez besoin de flux en sortie.

De plus, pas besoin de l'itérateur, même pas dans l'exemple de tête uniquement que vous avez fourni.

  • Le fractionnement binaire ressemble à ceci:
Random r = new Random();

Map<Boolean, List<String>> groups = stream
    .collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • Pour plus de catégories, utilisez une Collectors.groupingBy()usine.
Map<Object, List<String>> groups = stream
    .collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

Dans le cas où les flux ne le sont pas Stream, mais l'un des flux primitifs comme IntStream, alors cette .collect(Collectors)méthode n'est pas disponible. Vous devrez le faire de manière manuelle sans usine de collecte. Sa mise en œuvre ressemble à ceci:

[Exemple 2.0 depuis le 16/04/2020]

    IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();
    IntPredicate predicate = ignored -> r.nextBoolean();

    Map<Boolean, List<Integer>> groups = intStream.collect(
            () -> Map.of(false, new ArrayList<>(100000),
                         true , new ArrayList<>(100000)),
            (map, value) -> map.get(predicate.test(value)).add(value),
            (map1, map2) -> {
                map1.get(false).addAll(map2.get(false));
                map1.get(true ).addAll(map2.get(true ));
            });

Dans cet exemple, j'initialise les ArrayLists avec la taille totale de la collection initiale (si cela est connu du tout). Cela empêche les événements de redimensionnement même dans le pire des cas, mais peut potentiellement engloutir 2 * N * T d'espace (N = nombre initial d'éléments, T = nombre de threads). Pour faire un compromis entre l'espace et la vitesse, vous pouvez le laisser de côté ou utiliser votre meilleure estimation éclairée, comme le plus grand nombre d'éléments attendus dans une partition (généralement un peu plus de N / 2 pour une répartition équilibrée).

J'espère que je n'offense personne en utilisant une méthode Java 9. Pour la version Java 8, consultez l'historique des modifications.

Mark Jeronimus
la source
2
Belle. Cependant, la dernière solution pour IntStream ne sera pas thread-safe en cas de flux parallélisé. La solution est bien plus simple que vous ne le pensez ... stream.boxed().collect(...);! Il fera comme annoncé: convertir la primitive IntStreamen Stream<Integer>version boîte .
YoYo
32
Cela devrait être la réponse acceptée car elle résout directement la question du PO.
ejel
27
Je souhaite que Stack Overflow permette à la communauté de remplacer la réponse sélectionnée si une meilleure est trouvée.
GuiSim
Je ne suis pas sûr que cela réponde à la question. La question demande de diviser un flux en flux et non en listes.
AlikElzin-kilaka
1
La fonction d'accumulateur est inutilement verbeuse. Au lieu de (map, x) -> { boolean partition = p.test(x); List<Integer> list = map.get(partition); list.add(x); }vous pouvez simplement utiliser (map, x) -> map.get(p.test(x)).add(x). De plus, je ne vois aucune raison pour laquelle l' collectopération ne devrait pas être thread-safe. Cela fonctionne exactement comme il est censé fonctionner et très étroitement à la façon dont Collectors.partitioningBy(p)cela fonctionnerait. Mais j'utiliserais un IntPredicateau lieu de Predicate<Integer>lorsque je ne l'utilise pas boxed(), pour éviter de boxer deux fois.
Holger le
21

Je suis tombé sur cette question à moi-même et je pense qu'un flux fourchu a des cas d'utilisation qui pourraient s'avérer valides. J'ai écrit le code ci-dessous en tant que consommateur afin qu'il ne fasse rien mais que vous puissiez l'appliquer à des fonctions et à tout ce que vous pourriez rencontrer.

class PredicateSplitterConsumer<T> implements Consumer<T>
{
  private Predicate<T> predicate;
  private Consumer<T>  positiveConsumer;
  private Consumer<T>  negativeConsumer;

  public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
  {
    this.predicate = predicate;
    this.positiveConsumer = positive;
    this.negativeConsumer = negative;
  }

  @Override
  public void accept(T t)
  {
    if (predicate.test(t))
    {
      positiveConsumer.accept(t);
    }
    else
    {
      negativeConsumer.accept(t);
    }
  }
}

Maintenant, votre implémentation de code pourrait être quelque chose comme ceci:

personsArray.forEach(
        new PredicateSplitterConsumer<>(
            person -> person.getDateOfBirth().isPresent(),
            person -> System.out.println(person.getName()),
            person -> System.out.println(person.getName() + " does not have Date of birth")));
Ludger
la source
20

Malheureusement, ce que vous demandez est directement mal vu dans le JavaDoc de Stream :

Un flux ne doit être exploité (en invoquant une opération de flux intermédiaire ou terminal) qu'une seule fois. Cela exclut, par exemple, les flux "fourchus", où la même source alimente deux ou plusieurs pipelines, ou plusieurs traversées du même flux.

Vous pouvez contourner ce peekproblème en utilisant ou d'autres méthodes si vous désirez vraiment ce type de comportement. Dans ce cas, ce que vous devriez faire est au lieu d'essayer de sauvegarder deux flux de la même source de flux d'origine avec un filtre de forking, vous dupliqueriez votre flux et filtreriez chacun des doublons de manière appropriée.

Cependant, vous souhaiterez peut-être reconsidérer si a Streamest la structure appropriée pour votre cas d'utilisation.

Trevor Freeman
la source
6
Le libellé javadoc n'exclut pas le partitionnement en plusieurs flux tant qu'un seul élément de flux ne va que dans l' un d'entre eux
Thorbjørn Ravn Andersen
2
@ ThorbjørnRavnAndersen Je ne suis pas sûr que la duplication d'un élément de flux soit le principal obstacle à un flux fourchu. Le principal problème est que l'opération de fork est essentiellement une opération de terminal, donc lorsque vous décidez de fork, vous créez essentiellement une collection d'une sorte. Par exemple, je peux écrire une méthode List<Stream> forkStream(Stream s)mais mes flux résultants seront au moins partiellement sauvegardés par des collections et non directement par le flux sous-jacent, au lieu de dire filterqui n'est pas une opération de flux terminal.
Trevor Freeman
7
C'est l'une des raisons pour lesquelles je pense que les flux Java sont un peu à moitié comparés à github.com/ReactiveX/RxJava/wiki parce que le point de flux est d'appliquer des opérations sur un ensemble potentiellement infini d'éléments et les opérations du monde réel nécessitent souvent une division , duplication et fusion de flux.
Usman Ismail
8

Cela va à l'encontre du mécanisme général de Stream. Supposons que vous puissiez diviser le Stream S0 en Sa et Sb comme vous le souhaitez. Toute opération de terminal, par exemple count(), sur Sa "consommera" nécessairement tous les éléments de S0. Par conséquent, Sb a perdu sa source de données.

Auparavant, Stream avait une tee()méthode, je pense, qui dupliquait un flux à deux. Il est maintenant supprimé.

Stream a cependant une méthode peek (), vous pourrez peut-être l'utiliser pour répondre à vos besoins.

ZhongYu
la source
1
peekest exactement ce qui était tee.
Louis Wasserman
5

pas exactement, mais vous pourrez peut-être accomplir ce dont vous avez besoin en invoquant Collectors.groupingBy(). vous créez une nouvelle collection et pouvez ensuite instancier des flux sur cette nouvelle collection.

aepurniet
la source
2

C'était la moins mauvaise réponse que je puisse trouver.

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

public class Test {

    public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
            Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) {

        Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
        L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
        R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());

        return new ImmutablePair<L, R>(trueResult, falseResult);
    }

    public static void main(String[] args) {

        Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);

        Pair<List<Integer>, String> results = splitStream(stream,
                n -> n > 5,
                s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
                s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));

        System.out.println(results);
    }

}

Cela prend un flux d'entiers et les divise en 5. Pour ceux supérieurs à 5, il ne filtre que les nombres pairs et les met dans une liste. Pour le reste, il les rejoint avec |.

les sorties:

 ([6, 8],0|1|2|3|4|5)

Ce n'est pas idéal car il rassemble tout dans des collections intermédiaires brisant le flux (et a trop d'arguments!)

Ian Jones
la source
1

Je suis tombé sur cette question en cherchant un moyen de filtrer certains éléments d'un flux et de les enregistrer comme des erreurs. Je n'avais donc pas vraiment besoin de diviser le flux autant que d'attacher une action de fin prématurée à un prédicat avec une syntaxe discrète. Voici ce que j'ai proposé:

public class MyProcess {
    /* Return a Predicate that performs a bail-out action on non-matching items. */
    private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
    return x -> {
        if (pred.test(x)) {
            return true;
        }
        altAction.accept(x);
        return false;
    };

    /* Example usage in non-trivial pipeline */
    public void processItems(Stream<Item> stream) {
        stream.filter(Objects::nonNull)
              .peek(this::logItem)
              .map(Item::getSubItems)
              .filter(withAltAction(SubItem::isValid,
                                    i -> logError(i, "Invalid")))
              .peek(this::logSubItem)
              .filter(withAltAction(i -> i.size() > 10,
                                    i -> logError(i, "Too large")))
              .map(SubItem::toDisplayItem)
              .forEach(this::display);
    }
}
Sebastian Hans
la source
0

Version plus courte qui utilise Lombok

import java.util.function.Consumer;
import java.util.function.Predicate;

import lombok.RequiredArgsConstructor;

/**
 * Forks a Stream using a Predicate into postive and negative outcomes.
 */
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PROTECTED)
public class StreamForkerUtil<T> implements Consumer<T> {
    Predicate<T> predicate;
    Consumer<T> positiveConsumer;
    Consumer<T> negativeConsumer;

    @Override
    public void accept(T t) {
        (predicate.test(t) ? positiveConsumer : negativeConsumer).accept(t);
    }
}
OneCricketeer
la source
-3

Que diriez-vous:

Supplier<Stream<Integer>> randomIntsStreamSupplier =
    () -> (new Random()).ints(0, 2).boxed();

Stream<Integer> tails =
    randomIntsStreamSupplier.get().filter(x->x.equals(0));
Stream<Integer> heads =
    randomIntsStreamSupplier.get().filter(x->x.equals(1));
Matthieu
la source
1
Puisque le fournisseur est appelé deux fois, vous obtiendrez deux collectes aléatoires différentes. Je pense que c'est l'esprit du PO de diviser les cotes des évens dans la même séquence générée
usr-local-ΕΨΗΕΛΩΝ