1. Introduction

In the landscape of software development, efficiently processing large datasets has become paramount, especially with the advent of multicore processors. The Java Stream interface provided a leap forward by enabling sequential and parallel operations on collections. However, fully exploiting modern processors' capabilities while retaining the Stream API’s simplicity posed a challenge.

Responding to this, I created an open-source library aimed at experimenting with a new method of parallelizing stream operations. This library diverges from traditional batching methods by processing each stream element in its own virtual thread, offering a more refined level of parallelism.

In this article, I will talk about the library, and its design. It is more detail than you need simply to use the library.

The library is available on GitHub at https://github.com/verhas/vtstream and also as a dependency in Maven Central.

<dependency>
    <groupId>com.github.verhas</groupId>
    <artifactId>vtstream</artifactId>
    <version>1.0.1</version>
</dependency>

Check out the actual version number on the Maven Central site or on GitHub. This article is based on the version 1.0.1 of the library.

2. Parallel Computing

Parallel computing is not a new thing. It has been around for decades. The first computers were executing tasks in batches, hence in a serial way, but soon the idea of time-sharing came into picture.

The first time-sharing computer system was installed in 1961 at the Massachusetts Institute of Technology (MIT). This system, known as the Compatible Time-Sharing System (CTSS), allowed multiple users to log into a mainframe computer simultaneously, working in what appeared to be a private session. CTSS was a groundbreaking development in computer science, laying the foundation for modern operating systems and computing environments that support multitasking and multi-user operations.

This was not a parallel computing system, per se. CTSS was designed to run on a single mainframe computer, the IBM 7094, at MIT. It has one CPU, thus the code was executed in a serial way.

Today we have multicore processors and multiple processors in a single computer. I edit this article on a computer that has 10 processor cores.

To execute tasks concurrently, there are two plus one approaches:

  • define the algorithm in a concurrent way, for example, reactive programming, or

  • define the algorithm the good old sequential way and let some program decide on the concurrency, or

  • mix the two.

When we’re programming some reactive algorithm, or defined streams as in Java 8 stream, we help the application to execute the tasks concurrently. We define small parts and their interdependence so that the environment can decide which parts can be executed concurrently. The actual execution is done by the framework and when we are using

  • virtual threads, or

  • threads (perhaps processes)

the difference is in the scheduler. Who makes the decision which processor should execute which task the next moment. In the case of threads or processes, the executor is the operating system. The difference between the thread and process execution is that threads belonging to the same process share the same memory space. Processes have their own memory space. Similarly, virtual threads belonging to the same operating system thread share the same stack.

Transitioning from processes to virtual threads, we encounter a reduction in shared resources and, consequently, overhead. This makes virtual threads significantly less costly compared to traditional threads. While a machine might support thousands of threads and processes, it can accommodate millions of virtual threads.

In defining a task with streams, you are essentially outlining a series of operations to be performed on multiple elements. The decision to execute these operations concurrently rests with the framework, which may or may not choose to do so. However, Stream in Java serves as a high-level interface, offering us the flexibility to implement a version that facilitates concurrent execution of tasks.

3. Implementing Streams in Threads

The library contains two primary classes located in the main directory, namely:

  • ThreadedStream

  • Command

ThreadedStream is the class responsible for implementing the Stream interface.

public class ThreadedStream<T> implements Stream<T> {

The Command class encompasses nested classes that implement functionality for stream operations.

    public static class Filter<T> extends Command<T, T> {
    public static class AnyMatch<T> extends Command<T, T> {
    public static class FindFirst<T> extends Command<T, T> {
    public static class FindAny<T> extends Command<T, T> {
    public static class NoOp<T> extends Command<T, T> {
    public static class Distinct<T> extends Command<T, T> {
    public static class Skip<T> extends Command<T, T> {
    public static class Peek<T> extends Command<T, T> {
    public static class Map<T, R> extends Command<T, R> {

All the mentioned operators are intermediary. The terminal operators are implemented within the ThreadedStream class, which converts the threaded stream into a regular stream before invoking the terminal operator on this stream. An example of this approach is the implementation of the collect method.

    @Override
    public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
        return toStream().collect(supplier, accumulator, combiner);
    }

The source of the elements is also a stream, which means that the threading functionality is layered atop the existing stream implementation. This setup allows for the utilization of streams both as data sources and as destinations for processed data. Threading occurs in the interim, facilitating the parallel execution of intermediary commands.

Therefore, the core of the implementation—and its most intriguing aspect—lies in the construction of the structure and its subsequent execution.

We will first examine the structure of the stream data and then explore how the class executes operations utilizing virtual threads.

3.1. Stream Data Structure

The ThreadedStream class maintains its data through the following member variables:

    private final Command<Object, T> command;
    private final ThreadedStream<?> downstream;
    private final Stream<?> source;
    private long limit = -1;
    private boolean chained = false;
  • command represents the Command object to be executed on the data. It might be a no-operation (NoOp) command or null if there is no specific command to execute.

  • downstream variable points to the preceding ThreadedStream in the processing chain. A ThreadedStream retrieves data either from the immediate downstream stream, if available, or directly from the source if it’s the initial in the chain.

  • source is the initial data stream. It remains defined even when a downstream is specified, in which scenario the source for both streams remains identical.

  • limit specifies the maximum number of elements this stream is configured to process. Implementing a limit requires a workaround, as stream element processing starts immediately rather than being "pulled" by the terminal operation. Consequently, infinite streams cannot feed into a ThreadedStream.

  • chained is a boolean flag indicating whether the stream is part of a processing chain. When true, it signifies that there is a subsequent stream dependent on this one’s output, preventing execution in cases of processing forks. This mechanism mirrors the approach found in JVM’s standard stream implementations.

3.2. Stream Build

The stream data structure is constructed dynamically as intermediary operations are chained together. The initiation of this process begins with the creation of a starting element, achieved by invoking the static method threaded on the ThreadedStream class. An exemplary line from the unit tests illustrates this initiation:

        final var k = ThreadedStream.threaded(Stream.of(1, 2, 3));

This line demonstrates the creation of a ThreadedStream instance named k, initialized with a source stream consisting of the elements 1, 2, and 3. The threaded method serves as the entry point for transforming a regular stream into a ThreadedStream, setting the stage for further operations that can leverage virtual threads for concurrent execution.

When an intermediary operation is appended, it results in the creation of a new ThreadedStream instance. This new instance designates the preceding ThreadedStream as its downstream. Moreover, the source stream for this newly formed ThreadedStream remains identical to the source stream of its predecessor. This design ensures a seamless flow of data through the chain of operations, facilitating efficient processing in a concurrent environment.

For example, when we call

        final var t = k.map(x -> x * 2);

the map method is called, which is

    public <R> ThreadedStream<R> map(Function<? super T, ? extends R> mapper) {
        return new ThreadedStream<>(new Command.Map<>(mapper), this);
    }

It generates a new ThreadedStream object wherein the preceding ThreadedStream acts as the downstream. Additionally, the command field is populated with a new instance of the Command class, configured with the specified mapper function.

This process effectively constructs a linked list composed of ThreadedStream objects. This linked structure comes into play during the execution phase, triggered by invoking one of the terminal operations on the stream. This method ensures that each ThreadedStream in the sequence can process data in a manner that supports concurrent execution, leveraging the capabilities of virtual threads for efficient data processing.

It’s crucial to understand that the ThreadedStream class refrains from performing any operations on the data until a terminal operation is called. Once execution commences, it proceeds concurrently. To facilitate independent execution of these operations, ThreadedStream instances are designed to be immutable. They are instantiated during the setup phase and undergo a single mutation when they are linked together. During execution, these instances serve as a read-only data structure, guiding the flow of operation execution. This immutability ensures thread safety and consistency throughout concurrent processing, allowing for efficient and reliable stream handling.

3.3. Stream Execution

The commencement of stream execution is triggered by invoking a terminal operation. These terminal operations are executed by first transforming the threaded stream back into a conventional stream, upon which the terminal operation is then performed.

The collect method serves as a prime example of this process, as previously mentioned. This method is emblematic of how terminal operations are seamlessly integrated within the ThreadedStream framework, bridging the gap between concurrent execution facilitated by virtual threads and the conventional stream processing model of Java. By converting the ThreadedStream into a standard Stream, it leverages the rich ecosystem of terminal operations already available in Java, ensuring compatibility and extending functionality with minimal overhead.

    @Override
    public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
        return toStream().collect(supplier, accumulator, combiner);
    }

The toStream() method represents the core functionality of the library, marking the commencement of stream execution by initiating a new virtual thread for each element in the source stream. This method differentiates between ordered and unordered execution through two distinct implementations:

  • toUnorderedStream()

  • toOrderedStream()

The choice between these methods is determined by the isParallel() status of the source stream. It’s worth noting that executing an ordered stream in parallel can be advantageous. Although the results may be produced out of order, parallel processing accelerates the operation. Ultimately, care must be taken to collect the results in a sequential manner, despite the unordered processing potentially yielding higher efficiency by allowing elements to be passed to the resulting stream as soon as they become available, eliminating the need to wait for the preceding elements.

The implementation of toStream() is designed to minimize an unnecessary collection of elements. Elements are forwarded to the resulting stream immediately upon readiness in the case of unordered streams, and in sequence upon the readiness and previous element’s forwarding in ordered streams.

In subsequent sections, we delve into the specifics of these two execution methodologies.

3.4. Unordered Stream Execution

Unordered execution promptly forwards results as they become prepared. This approach employs a concurrent list for result storage, facilitating simultaneous result deposition by threads and retrieval by the target stream, preventing excessive list growth.

The iteration over the source stream initiates the creation of a new virtual thread for each element. When a limit is imposed, it’s applied directly on the source stream, diverging from traditional stream implementations where limit acts as a genuine intermediary operation.

The implementation of the unordered stream execution is as follows:

    private Stream<T> toUnorderedStream() {
        final var result = Collections.synchronizedList(new LinkedList<Command.Result<T>>());
        final AtomicInteger n = new AtomicInteger(0);
        final Stream<?> limitedSource = limit >= 0 ? source.limit(limit) : source;
        limitedSource.forEach(
                t -> {
                    Thread.startVirtualThread(() -> result.add(calculate(t)));
                    n.incrementAndGet();
                });
        return IntStream.range(0, n.get())
                .mapToObj(i -> {
                    while (result.isEmpty()) {
                        Thread.yield();
                    }
                    return result.removeFirst();
                })
                .filter(f -> !f.isDeleted())
                .peek(r -> {
                    if (r.exception() != null) {
                        throw new ThreadExecutionException(r.exception());
                    }
                })
                .map(Command.Result::result);
    }

The counter n is utilized to tally the number of threads initiated. The resulting stream is constructed using this counter by mapping the numbers 0 to n-1 to the elements of the concurrent list as they become ready. If the list lacks elements at any point, the process pauses, awaiting the availability of the next element. This waiting mechanism is implemented within a loop that incorporates a yield call to prevent unnecessary CPU consumption by halting the loop’s execution until it’s necessary to proceed. This efficient use of resources ensures that the system remains responsive and minimizes the potential for performance degradation during the execution of parallel tasks.

3.5. Ordered Stream Execution

Ordered stream execution introduces a more nuanced approach compared to its unordered counterpart. It incorporates a local class named Task, designed specifically to await the readiness of a particular thread. Similar to the unordered execution, a concurrent list is utilized, but with a key distinction: the elements of this list are the tasks themselves, rather than the results.

This list is populated by the code responsible for thread creation, rather than by the threads themselves. The presence of a fully populated list eliminates the need for a separate counter to track thread initiation. Consequently, the process transitions to sequentially waiting on each thread as dictated by their order in the list, thereby ensuring that each thread’s output is relayed to the target stream in a sequential manner. This method meticulously maintains the ordered integrity of the stream’s elements, despite the concurrent nature of their processing, by aligning the execution flow with the sequence of the original stream.

    private Stream<T> toOrderedStream() {
        class Task {
            Thread workerThread;
            volatile Command.Result<T> result;

            /**
             * Wait for the thread calculating the result of the task to be finished. This method is blocking.
             * @param task the task to wait for
             */
            static void waitForResult(Task task) {
                try {
                    task.workerThread.join();
                } catch (InterruptedException e) {
                    task.result = deleted();
                }
            }
        }
        final var tasks = Collections.synchronizedList(new LinkedList<Task>());

        final Stream<?> limitedSource = limit >= 0 ? source.limit(limit) : source;
        limitedSource.forEach(
                sourceItem -> {
                    Task task = new Task();
                    tasks.add(task);
                    task.workerThread = Thread.startVirtualThread(() -> task.result = calculate(sourceItem));
                }
        );

        return tasks.stream()
                .peek(Task::waitForResult)
                .map(f -> f.result)
                .peek(r -> {
                            if (r.exception() != null) {
                                throw new ThreadExecutionException(r.exception());
                            }
                        }
                )
                .filter(r -> !r.isDeleted()).map(Command.Result::result);
    }

4. Summary and Takeaway

Having explored an implementation that facilitates the parallel execution of stream operations, it’s noteworthy that this library is open source, offering you the flexibility to either utilize it as is or reference its design and implementation to craft your own version. The detailed exposition provided here aims to shed light on both the conceptual underpinnings and practical aspects of the library’s construction.

However, it’s important to acknowledge that the library has not undergone extensive testing. It received a review from Istvan Kovacs, a figure with considerable expertise in concurrent programming. Despite this, his review does not serve as an absolute assurance of the library’s reliability and absence of bugs. Consequently, should you decide to integrate this library into your projects, it’s advised to proceed with caution and conduct thorough testing to ensure it meets your requirements and standards. The library is provided "as is," with the understanding that users adopt it at their own risk, underpinning the importance of due diligence in its deployment.


Comments

Please leave your comments using Disqus, or just press one of the happy faces. If for any reason you do not want to leave a comment here, you can still create a Github ticket.