Streams in Java 8

Last year, Oracle released Java 8, the most significant upgrade to the language since Java 5. One of the headline features in Java 8 is the introduction of the new Streams API.

This blog post is an attempt to wrap my head around this new feature by way of an example.

Consider the Even Fibonacci numbers problem from Project Euler:

Each new term in the Fibonacci sequence is generated by adding the previous two terms. By starting with 1 and 2, the first 10 terms will be:
1, 2, 3, 5, 8, 13, 21, 34, 55, 89, …
By considering the terms in the Fibonacci sequence whose values do not exceed four million, find the sum of the even-valued terms.

A traditional Java solution to this problem would be to create an iterator that generates a sequence of Fibonacci numbers and then iterate through them, adding up the even numbers till we hit an element exceeding four million. Here’s an implementation:

import java.util.Iterator;

public class FibonacciSequence implements Iterator<Integer> {

    private Integer previous;
    private Integer current;

    public FibonacciSequence() {
        this.previous = -1;
        this.current = 1;
    }

    @Override
    public Integer next() {
        Integer next = previous + current;
        previous = current;
        current = next;
        return next;
    }

    @Override
    public boolean hasNext() {
        return true;
    }

    public static void main(String[] args) {
        final int LIMIT = 4_000_000;
        final FibonacciSequence sequence = new FibonacciSequence();

        Integer sum = 0;

        while (sequence.hasNext()) {
            Integer value = sequence.next();
            if (value > LIMIT) {
                break;
            }
            if (value % 2 == 0) {
                sum += value;
            }
        }

        System.out.println("Sum using Iterator = " + sum);
    }
}

This can be made a bit nicer using the enhanced for-each loop syntax from Java 5, which requires that our FibonacciSequence class implement the Iterable interface:

public class FibonacciSequence implements Iterator<Integer>, Iterable<Integer>  {

    // Class implementation as in the previous code snippet

    @Override
    public Iterator<Integer> iterator() {
        return this;
    }

    public static void main(String[] args) {
        final int LIMIT = 4_000_000;
        final FibonacciSequence sequence = new FibonacciSequence();

        Integer sum = 0;

        for (Integer value : sequence) {
            if (value > LIMIT) {
                break;
            }
            if (value % 2 == 0) {
                sum += value;
            }
        }

        System.out.println("Sum using Iterable = " + sum);
    }
}

Which brings us to the Java 8 Streams API. Ideally, a stream style API should allow us to solve this problem using something like this:

int sum = infiniteFibonacciNumbersStream
        .takeWhile(i -> i <= 4_000_000)
        .filter(i -> i % 2 == 0)
        .sum();

We start with an infinite stream of fibonacci numbers, consume elements from it till the predicate (i <= 4_000_000) becomes false, filter the elements of the resulting finite stream for even numbers (i % 2 == 0) and finally, compute the sum of the filtered stream.

Unfortunately, the Java 8 stream API has no equivalent of takeWhile. As far as I can tell, the API offers no way of limiting our stream to a finite one. To work around this limitation, we will push the limit into the FibonacciSequence class, making it a finite sequence:

public class FibonacciSequence implements Iterator<Integer>, Iterable<Integer> {

    private final Integer max;
    private Integer previous;
    private Integer current;

    public FibonacciSequence(int max) {
        this.previous = -1;
        this.current = 1;
        this.max = max;
    }

    @Override
    public Integer next() {
        Integer next = previous + current;
        previous = current;
        current = next;
        return next;
    }

    @Override
    public boolean hasNext() {
        return current <= max;
    }

    @Override
    public Iterator<Integer> iterator() {
        return this;
    }

    public static void main(String[] args) {
        final int LIMIT = 4_000_000;
        final FibonacciSequence sequence = new FibonacciSequence(LIMIT);
        Integer sum = 0;

        for (Integer value : sequence) {
            if (value % 2 == 0) {
                sum += value;
            }
        }

        System.out.println("Sum using finite Iterable = " + sum);
    }    
}

Now we need a way to convert the FibonacciSequence into a stream. The default interface implementation (another Java 8 feature) of the new Collection.stream() method gives us a hint:

/**
* Returns a sequential {@code Stream} with this collection as its source.
*
* @since 1.8
*/
default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}

The spliterator is the stream equivalent of the iterator. It allows the iterator to be potentially split into multiple iterators so that it can be consumed in parallel via parallel streams. The second parameter, false, provided to StreamSupport.stream() indicates that this stream is not designed for parallel consumption and the returned stream is a sequential stream.

The Java 8 Iterable interface, which our FibonacciSequence implements, now provides a default implementation of spliterator():

/**
* Creates a {@link Spliterator} over the elements described by this
* {@code Iterable}.
*
* @since 1.8
*/
default Spliterator<T> spliterator() {
    return Spliterators.spliteratorUnknownSize(iterator(), 0);
}

So putting these pieces together, we can obtain a finite, sequential Stream of fibonacci numbers using:

FibonacciSequence fibonacciSequence = new FibonacciSequence(LIMIT);
Stream<Integer> fibStream = StreamSupport.stream(fibonacciSequence.spliterator(), false);

With the stream in hand, the only steps that remain are to filter the stream for even numbers and then compute the sum.

int sum = fibStream
        .filter(i -> i % 2 == 0)
        .mapToInt(i -> i)
        .sum();

The mapToInt() operation is needed because the stream API’s sum() operation can only work with IntStreams, a specialization of Stream for primitive integers. The mapToInt() operation converts the source stream into an IntStream using the supplied function.

Putting it all together:

final int LIMIT = 4_000_000;
FibonacciSequence fibonacciSequence = new FibonacciSequence(4_000_000);
Stream<Integer> fibStream = StreamSupport.stream(fibonacciSequence.spliterator(), false);
int sum = fibStream
        .filter(i -> i % 2 == 0)
        .mapToInt(i -> i)
        .sum();
System.out.println("Sum by stream computation is: " + sum);

So that’s the stream equivalent of our original iterative solution.

Now we also know that Fibonacci sequences can be generated using recursion. Can we solve the fibonacci problem using a recursive stream generation solution? Turns out, it can be done! Here’s how:

public Stream<Integer> recursiveFibStream(int a, int b, int max) {
    Integer current = a + b, prev = b;
    if (current <= max) {
        return Stream.concat(Stream.of(current), recursiveFibStream(prev, current, max));
    } else {
        return Stream.empty();
    }
}

public int usingRecursiveStream() {
    int sum = recursiveFibStream(0, 1, 4_000_000)
            .filter(i -> i % 2 == 0)
            .mapToInt(i -> i)
            .sum();

    return sum;
}

Nifty? Sure. But it runs like a snail!

I setup a JMH benchmark of these four different implementation strategies and here are the results:

Benchmark                      Mode      Cnt  Score   Error  Units
Runner.usingIterable         sample  2192211  0.393 ± 0.009  us/op
Runner.usingIterator         sample  2204423  0.383 ± 0.005  us/op
Runner.usingRecursiveStream  sample  3810503  6.579 ± 0.054  us/op
Runner.usingStream           sample  2530036  0.529 ± 0.020  us/op

The Stream API does have some overhead over the conventional iterator/iterable based implementations but the clear outlier is the recursive stream implementation which takes 6.6 microseconds to compute the sum which the others can compute in less than half a microsecond!

I’ve shared the benchmarking code on git: https://github.com/antrix/java8-streams-example. Do let me know if you find a mistake either in my implementation or the benchmark setup.