Can You Split A Stream Into Two Streams?


Answer :

A collector can be used for this.

  • For two categories, use Collectors.partitioningBy() factory.

This will create a Map from Boolean to List, and put items in one or the other list based on a Predicate.

Note: Since the stream needs to be consumed whole, this can't work on infinite streams. And because the stream is consumed anyway, this method simply puts them in Lists instead of making a new stream-with-memory. You can always stream those lists if you require streams as output.

Also, no need for the iterator, not even in the heads-only example you provided.

  • Binary splitting looks like this:
Random r = new Random();  Map<Boolean, List<String>> groups = stream     .collect(Collectors.partitioningBy(x -> r.nextBoolean()));  System.out.println(groups.get(false).size()); System.out.println(groups.get(true).size()); 
  • For more categories, use a Collectors.groupingBy() factory.
Map<Object, List<String>> groups = stream     .collect(Collectors.groupingBy(x -> r.nextInt(3))); System.out.println(groups.get(0).size()); System.out.println(groups.get(1).size()); System.out.println(groups.get(2).size()); 

In case the streams are not Stream, but one of the primitive streams like IntStream, then this .collect(Collectors) method is not available. You'll have to do it the manual way without a collector factory. It's implementation looks like this:

[Example 2.0 since 2020-04-16]

    IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();     IntPredicate predicate = ignored -> r.nextBoolean();      Map<Boolean, List<Integer>> groups = intStream.collect(             () -> Map.of(false, new ArrayList<>(100000),                          true , new ArrayList<>(100000)),             (map, value) -> map.get(predicate.test(value)).add(value),             (map1, map2) -> {                 map1.get(false).addAll(map2.get(false));                 map1.get(true ).addAll(map2.get(true ));             }); 

In this example I initialize the ArrayLists with the full size of the initial collection (if this is known at all). This prevents resize events even in the worst-case scenario, but can potentially gobble up 2*N*T space (N = initial number of elements, T = number of threads). To trade-off space for speed, you can leave it out or use your best educated guess, like the expected highest number of elements in one partition (typically just over N/2 for a balanced split).

I hope I don't offend anyone by using a Java 9 method. For the Java 8 version, look at the edit history.


I stumbled across this question to my self and I feel that a forked stream has some use cases that could prove valid. I wrote the code below as a consumer so that it does not do anything but you could apply it to functions and anything else you might come across.

class PredicateSplitterConsumer<T> implements Consumer<T> {   private Predicate<T> predicate;   private Consumer<T>  positiveConsumer;   private Consumer<T>  negativeConsumer;    public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)   {     this.predicate = predicate;     this.positiveConsumer = positive;     this.negativeConsumer = negative;   }    @Override   public void accept(T t)   {     if (predicate.test(t))     {       positiveConsumer.accept(t);     }     else     {       negativeConsumer.accept(t);     }   } } 

Now your code implementation could be something like this:

personsArray.forEach(         new PredicateSplitterConsumer<>(             person -> person.getDateOfBirth().isPresent(),             person -> System.out.println(person.getName()),             person -> System.out.println(person.getName() + " does not have Date of birth"))); 

Unfortunately, what you ask for is directly frowned upon in the JavaDoc of Stream:

A stream should be operated on (invoking an intermediate or terminal stream operation) only once. This rules out, for example, "forked" streams, where the same source feeds two or more pipelines, or multiple traversals of the same stream.

You can work around this using peek or other methods should you truly desire that type of behaviour. In this case, what you should do is instead of trying to back two streams from the same original Stream source with a forking filter, you would duplicate your stream and filter each of the duplicates appropriately.

However, you may wish to reconsider if a Stream is the appropriate structure for your use case.


Comments

Popular posts from this blog

Converting A String To Int In Groovy

"Cannot Create Cache Directory /home//.composer/cache/repo/https---packagist.org/, Or Directory Is Not Writable. Proceeding Without Cache"

Android SDK Location Should Not Contain Whitespace, As This Cause Problems With NDK Tools