Zipper des flux à l'aide de JDK8 avec lambda (java.util.stream.Streams.zip)

149

Dans JDK 8 avec lambda b93, il y avait une classe java.util.stream.Streams.zip dans b93 qui pouvait être utilisée pour compresser les flux (ceci est illustré dans le tutoriel Exploring Java8 Lambdas. Part 1 par Dhananjay Nene ). Cette fonction:

Crée un Stream combiné paresseux et séquentiel dont les éléments sont le résultat de la combinaison des éléments de deux flux.

Cependant, en b98, cela a disparu. En fait, la Streamsclasse n'est même pas accessible dans java.util.stream en b98 .

Cette fonctionnalité a-t-elle été déplacée, et si oui, comment puis-je compresser les flux de manière concise en utilisant b98?

L'application que j'ai en tête se trouve dans cette implémentation java de Shen , où j'ai remplacé la fonctionnalité zip dans le

  • static <T> boolean every(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
  • static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)

fonctions avec un code assez verbeux (qui n'utilise pas les fonctionnalités de b98).

Artella
la source
3
Ah viens de découvrir qu'il semble avoir été complètement supprimé: mail.openjdk.java.net/pipermail/lambda-libs-spec-observers
...
"Exploring Java8 Lambdas. Part 1" - nouveau lien pour cet article: blog.dhananjaynene.com/2013/02/exploring-java8-lambdas-part-1
Aleksei Egorov

Réponses:

77

J'en avais besoin aussi, alors j'ai juste pris le code source de b93 et ​​l'ai mis dans une classe "util". J'ai dû le modifier légèrement pour fonctionner avec l'API actuelle.

Pour référence, voici le code de travail (prenez-le à vos risques et périls ...):

public static<A, B, C> Stream<C> zip(Stream<? extends A> a,
                                     Stream<? extends B> b,
                                     BiFunction<? super A, ? super B, ? extends C> zipper) {
    Objects.requireNonNull(zipper);
    Spliterator<? extends A> aSpliterator = Objects.requireNonNull(a).spliterator();
    Spliterator<? extends B> bSpliterator = Objects.requireNonNull(b).spliterator();

    // Zipping looses DISTINCT and SORTED characteristics
    int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics() &
            ~(Spliterator.DISTINCT | Spliterator.SORTED);

    long zipSize = ((characteristics & Spliterator.SIZED) != 0)
            ? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown())
            : -1;

    Iterator<A> aIterator = Spliterators.iterator(aSpliterator);
    Iterator<B> bIterator = Spliterators.iterator(bSpliterator);
    Iterator<C> cIterator = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return aIterator.hasNext() && bIterator.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(aIterator.next(), bIterator.next());
        }
    };

    Spliterator<C> split = Spliterators.spliterator(cIterator, zipSize, characteristics);
    return (a.isParallel() || b.isParallel())
           ? StreamSupport.stream(split, true)
           : StreamSupport.stream(split, false);
}
siki
la source
1
Le flux résultant ne devrait-il pas être SIZEDsi l'un ou l'autre des flux est SIZED, pas les deux?
Didier L
5
Je ne pense pas. Les deux flux doivent être SIZEDpour que cette implémentation fonctionne. Cela dépend en fait de la façon dont vous définissez la compression. Devriez-vous être capable de compresser deux flux de taille différente, par exemple? À quoi ressemblerait alors le flux résultant? Je pense que c'est pourquoi cette fonction a été omise de l'API. Il existe de nombreuses façons de faire cela et c'est à l'utilisateur de décider quel comportement doit être le "correct". Souhaitez-vous supprimer les éléments du flux le plus long ou compléter la liste la plus courte? Si oui, avec quelle (s) valeur (s)?
siki
Sauf si je manque quelque chose, il n'y a pas besoin de casting (par exemple à Spliterator<A>).
jub0bs
Existe-t-il un site Web sur lequel le code source de Java 8 b93 est hébergé? J'ai du mal à le trouver.
Starwarswii
42

zip est l'une des fonctions fournies par la bibliothèque protonpack .

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");

List<String> zipped = StreamUtils.zip(streamA,
                                      streamB,
                                      (a, b) -> a + " is for " + b)
                                 .collect(Collectors.toList());

assertThat(zipped,
           contains("A is for Apple", "B is for Banana", "C is for Carrot"));
Dominic Fox
la source
1
également trouvé dans StreamEx: amaembo.github.io/streamex/javadoc/one/util/streamex/…
tokland
34

Si vous avez Guava dans votre projet, vous pouvez utiliser la méthode Streams.zip (a été ajoutée dans Guava 21):

Renvoie un flux dans lequel chaque élément est le résultat du passage de l'élément correspondant de chacun des streamA et streamB à function. Le flux résultant ne sera aussi long que le plus court des deux flux d'entrée; si un flux est plus long, ses éléments supplémentaires seront ignorés. Le flux résultant n'est pas divisible efficacement. Cela peut nuire aux performances parallèles.

 public class Streams {
     ...

     public static <A, B, R> Stream<R> zip(Stream<A> streamA,
             Stream<B> streamB, BiFunction<? super A, ? super B, R> function) {
         ...
     }
 }
ZhekaKozlov
la source
26

Zipper deux flux en utilisant JDK8 avec lambda ( gist ).

public static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper) {
    final Iterator<A> iteratorA = streamA.iterator();
    final Iterator<B> iteratorB = streamB.iterator();
    final Iterator<C> iteratorC = new Iterator<C>() {
        @Override
        public boolean hasNext() {
            return iteratorA.hasNext() && iteratorB.hasNext();
        }

        @Override
        public C next() {
            return zipper.apply(iteratorA.next(), iteratorB.next());
        }
    };
    final boolean parallel = streamA.isParallel() || streamB.isParallel();
    return iteratorToFiniteStream(iteratorC, parallel);
}

public static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel) {
    final Iterable<T> iterable = () -> iterator;
    return StreamSupport.stream(iterable.spliterator(), parallel);
}
Karol Król
la source
2
Belle solution et (relativement) compacte! Nécessite que vous mettiez import java.util.function.*;et import java.util.stream.*;en haut de votre fichier.
sffc
Notez qu'il s'agit d'une opération de terminal sur le flux. Cela signifie que pour les flux infinis, cette méthode tombe en panne
smac89
2
Tellement emballages inutiles: Ici () -> iteratoret là encore: iterable.spliterator(). Pourquoi ne pas implémenter directement un Spliteratorplutôt qu'un Iterator? Vérifiez @Doradus réponse stackoverflow.com/a/46230233/1140754
Miguel Gamboa
20

Étant donné que je ne peux concevoir aucune utilisation de la compression sur des collections autres que celles indexées (Lists) et que je suis un grand fan de simplicité, ce serait ma solution:

<A,B,C>  Stream<C> zipped(List<A> lista, List<B> listb, BiFunction<A,B,C> zipper){
     int shortestLength = Math.min(lista.size(),listb.size());
     return IntStream.range(0,shortestLength).mapToObj( i -> {
          return zipper.apply(lista.get(i), listb.get(i));
     });        
}
Rafael
la source
1
Je pense que ça mapToObjectdevrait être mapToObj.
seanf
si la liste ne l'est pas RandomAccess(par exemple sur les listes chaînées), ce sera très lent
avmohan le
Absolument. Mais la plupart des développeurs Java savent bien que LinkedList a des performances médiocres pour les opérations d'accès aux index.
Rafael le
11

Les méthodes de la classe que vous avez mentionnée ont été déplacées vers l' Streaminterface elle-même au profit des méthodes par défaut. Mais il semble que la zipméthode ait été supprimée. Peut-être parce que le comportement par défaut des flux de différentes tailles n'est pas clair. Mais la mise en œuvre du comportement souhaité est simple:

static <T> boolean every(
  Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().allMatch(x->!it.hasNext()||pred.test(x, it.next()));
}
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) {
    Iterator<T> it=c2.iterator();
    return c1.stream().filter(x->it.hasNext()&&pred.test(x, it.next()))
      .findFirst().orElse(null);
}
Holger
la source
N'est-ce pas que predicatevous avez passé au filtre avec état ? Cela viole le contrat de méthode et ne fonctionnera surtout pas lors du traitement du flux en parallèle.
Andreas
2
@Andreas: aucune solution ici ne prend en charge le traitement parallèle. Étant donné que mes méthodes ne renvoient pas de flux, elles s'assurent que les flux ne s'exécutent pas en parallèle. De même, le code de la réponse acceptée retourne un flux qui peut être transformé en parallèle mais qui ne fera rien en parallèle. Cela dit, les prédicats avec état sont déconseillés mais ne violent pas le contrat. Ils peuvent même être utilisés dans un contexte parallèle si vous vous assurez que la mise à jour de l'état est thread-safe. Dans certaines situations, ils sont inévitables, par exemple, transformer un flux en distinct est un prédicat complet en soi .
Holger
2
@Andreas: vous pouvez deviner pourquoi ces opérations ont été supprimées de l'API Java…
Holger
8

Je suggère humblement cette mise en œuvre. Le flux résultant est tronqué au plus court des deux flux d'entrée.

public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner) {
    Spliterator<L> lefts = leftStream.spliterator();
    Spliterator<R> rights = rightStream.spliterator();
    return StreamSupport.stream(new AbstractSpliterator<T>(Long.min(lefts.estimateSize(), rights.estimateSize()), lefts.characteristics() & rights.characteristics()) {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            return lefts.tryAdvance(left->rights.tryAdvance(right->action.accept(combiner.apply(left, right))));
        }
    }, leftStream.isParallel() || rightStream.isParallel());
}
Doradus
la source
J'aime ta proposition. Mais je ne suis pas totalement d'accord avec le dernier .., leftStream.isParallel() || rightStream.isParallel(). Je pense que cela n'a aucun effet car AbstractSpliteratoroffre un parallélisme limité par défaut. Je pense donc que le résultat final sera le même que celui de passer false.
Miguel Gamboa le
@MiguelGamboa - merci pour votre commentaire. Je ne sais pas ce que vous entendez par "parallélisme limité par défaut" - avez-vous un lien vers certains documents?
Doradus
6

La bibliothèque Lazy-Seq fournit la fonctionnalité zip.

https://github.com/nurkiewicz/LazySeq

Cette bibliothèque est fortement inspirée scala.collection.immutable.Streamet vise à fournir une implémentation de séquence paresseuse immuable, sûre pour les threads et facile à utiliser, éventuellement infinie.

Nick Siderakis
la source
5

En utilisant la dernière bibliothèque Guava (pour la Streamsclasse), vous devriez pouvoir faire

final Map<String, String> result = 
    Streams.zip(
        collection1.stream(), 
        collection2.stream(), 
        AbstractMap.SimpleEntry::new)
    .collect(Collectors.toMap(e -> e.getKey(), e  -> e.getValue()));
Dan Borza
la source
2

Cela fonctionnerait-il pour vous? C'est une fonction courte, qui évalue paresseusement les flux qu'elle zippe, vous pouvez donc lui fournir des flux infinis (elle n'a pas besoin de prendre la taille des flux compressés).

Si les flux sont finis, il s'arrête dès que l'un des flux manque d'éléments.

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Stream;

class StreamUtils {
    static <ARG1, ARG2, RESULT> Stream<RESULT> zip(
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner) {
        final var i2 = s2.iterator();
        return s1.map(x1 -> i2.hasNext() ? combiner.apply(x1, i2.next()) : null)
                .takeWhile(Objects::nonNull);
    }
}

Voici un code de test unitaire (beaucoup plus long que le code lui-même!)

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;

class StreamUtilsTest {
    @ParameterizedTest
    @MethodSource("shouldZipTestCases")
    <ARG1, ARG2, RESULT>
    void shouldZip(
            String testName,
            Stream<ARG1> s1,
            Stream<ARG2> s2,
            BiFunction<ARG1, ARG2, RESULT> combiner,
            Stream<RESULT> expected) {
        var actual = StreamUtils.zip(s1, s2, combiner);

        assertEquals(
                expected.collect(Collectors.toList()),
                actual.collect(Collectors.toList()),
                testName);
    }

    private static Stream<Arguments> shouldZipTestCases() {
        return Stream.of(
                Arguments.of(
                        "Two empty streams",
                        Stream.empty(),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One singleton and one empty stream",
                        Stream.of(1),
                        Stream.empty(),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "One empty and one singleton stream",
                        Stream.empty(),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.empty()),
                Arguments.of(
                        "Two singleton streams",
                        Stream.of("blah"),
                        Stream.of(1),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blah", 1))),
                Arguments.of(
                        "One singleton, one multiple stream",
                        Stream.of("blob"),
                        Stream.of(2, 3),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("blob", 2))),
                Arguments.of(
                        "One multiple, one singleton stream",
                        Stream.of("foo", "bar"),
                        Stream.of(4),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("foo", 4))),
                Arguments.of(
                        "Two multiple streams",
                        Stream.of("nine", "eleven"),
                        Stream.of(10, 12),
                        (BiFunction<Object, Object, Object>) StreamUtilsTest::combine,
                        Stream.of(pair("nine", 10), pair("eleven", 12)))
        );
    }

    private static List<Object> pair(Object o1, Object o2) {
        return List.of(o1, o2);
    }

    static private <T1, T2> List<Object> combine(T1 o1, T2 o2) {
        return List.of(o1, o2);
    }

    @Test
    void shouldLazilyEvaluateInZip() {
        final var a = new AtomicInteger();
        final var b = new AtomicInteger();
        final var zipped = StreamUtils.zip(
                Stream.generate(a::incrementAndGet),
                Stream.generate(b::decrementAndGet),
                (xa, xb) -> xb + 3 * xa);

        assertEquals(0, a.get(), "Should not have evaluated a at start");
        assertEquals(0, b.get(), "Should not have evaluated b at start");

        final var takeTwo = zipped.limit(2);

        assertEquals(0, a.get(), "Should not have evaluated a at take");
        assertEquals(0, b.get(), "Should not have evaluated b at take");

        final var list = takeTwo.collect(Collectors.toList());

        assertEquals(2, a.get(), "Should have evaluated a after collect");
        assertEquals(-2, b.get(), "Should have evaluated b after collect");
        assertEquals(List.of(2, 4), list);
    }
}
dominic
la source
J'ai dû abandonner le takeWhileà la fin, c'était que cela ne semble pas être dans java8 mais ce n'est pas un problème car l'appelé peut filtrer toutes les valeurs nulles qui se produisent lorsque les flux compressés ne sont pas de la même taille. Je pense que cette réponse devrait être la réponse numéro 1 car elle est cohérente et compréhensible. excellent travail merci encore.
simbo1905
1
public class Tuple<S,T> {
    private final S object1;
    private final T object2;

    public Tuple(S object1, T object2) {
        this.object1 = object1;
        this.object2 = object2;
    }

    public S getObject1() {
        return object1;
    }

    public T getObject2() {
        return object2;
    }
}


public class StreamUtils {

    private StreamUtils() {
    }

    public static <T> Stream<Tuple<Integer,T>> zipWithIndex(Stream<T> stream) {
        Stream<Integer> integerStream = IntStream.range(0, Integer.MAX_VALUE).boxed();
        Iterator<Integer> integerIterator = integerStream.iterator();
        return stream.map(x -> new Tuple<>(integerIterator.next(), x));
    }
}
robby_pelssers
la source
1

Le cyclope- react d' AOL , auquel je contribue, fournit également des fonctionnalités de compression, à la fois via une implémentation de Stream étendue , qui implémente également l'interface de flux réactifs ReactiveSeq, et via StreamUtils qui offre une grande partie des mêmes fonctionnalités via des méthodes statiques aux flux Java standard.

 List<Tuple2<Integer,Integer>> list =  ReactiveSeq.of(1,2,3,4,5,6)
                                                  .zip(Stream.of(100,200,300,400));


  List<Tuple2<Integer,Integer>> list = StreamUtils.zip(Stream.of(1,2,3,4,5,6),
                                                  Stream.of(100,200,300,400));

Il offre également une fermeture éclair plus généralisée basée sur l'application. Par exemple

   ReactiveSeq.of("a","b","c")
              .ap3(this::concat)
              .ap(of("1","2","3"))
              .ap(of(".","?","!"))
              .toList();

   //List("a1.","b2?","c3!");

   private String concat(String a, String b, String c){
    return a+b+c;
   }

Et même la possibilité d'associer chaque élément d'un flux avec chaque élément d'un autre

   ReactiveSeq.of("a","b","c")
              .forEach2(str->Stream.of(str+"!","2"), a->b->a+"_"+b);

   //ReactiveSeq("a_a!","a_2","b_b!","b_2","c_c!","c2")
John McClean
la source
0

Si quelqu'un en a encore besoin, il y a une StreamEx.zipWithfonction dans la bibliothèque streamex :

StreamEx<String> givenNames = StreamEx.of("Leo", "Fyodor")
StreamEx<String> familyNames = StreamEx.of("Tolstoy", "Dostoevsky")
StreamEx<String> fullNames = givenNames.zipWith(familyNames, (gn, fn) -> gn + " " + fn);

fullNames.forEach(System.out::println);  // prints: "Leo Tolstoy\nFyodor Dostoevsky\n"
const.grigoryev
la source
-1

C'est bien. J'ai dû compresser deux flux dans une carte avec un flux étant la clé et l'autre la valeur

Stream<String> streamA = Stream.of("A", "B", "C");
Stream<String> streamB  = Stream.of("Apple", "Banana", "Carrot", "Doughnut");    
final Stream<Map.Entry<String, String>> s = StreamUtils.zip(streamA,
                    streamB,
                    (a, b) -> {
                        final Map.Entry<String, String> entry = new AbstractMap.SimpleEntry<String, String>(a, b);
                        return entry;
                    });

System.out.println(s.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));

Sortie: {A = pomme, B = banane, C = carotte}

Gnana
la source