Project Loom and Project Reactor - Better Together

Project Loom is now available as a preview feature as part of Java 19. It introduces virtual threads to Java platform which is a lightweight implementation of threads that is provided by the JDK rather than the OS. About one year ago I came across this YouTube video where Brian Goetz mentions “I think Project Loom is going to kill Reactive Programming” - how ever if we broadly look at purpose of project loom (virtual threads) and reactive programming they appear to solve different set of problems. Project loom makes it possible to spawn millions of virtual threads performing I/O bound tasks within same JVM, but virtual threads are still threads and when it comes to developing complex asynchronous backpressure ready data pipeline then developers still have to code it by hand. Where reactive programming is more concerned with asynchronous data streams and the propagation of change. There are various libraries available such as Project Reactor, RxJava, Vert.x etc which support reactive programming by providing rich set of operators to define complex asynchronous data processing pipeline

In this post we will see why Project Loom (virtual threads) alone might not be sufficient for building efficient data processing pipelines and how project reactor can take advantage of virtual threads

Demo Scenario
For this post, I have prepared PostgreSQL database table persons with 10,000 dummy records. I have considered typical application flow where we want to query records from persons table, for each retrieved record enrich data based on external service and then print enriched person detail to the terminal. To solve this problem, I have created set of examples with different concurrency approaches to see how they could impact overall application throughput. There are few other things which I have considered across these examples in this post -

  • Example code must not retrieve all records in memory at once. It should process all records by incrementally retrieving it. This is to ensure that our example code should still work even if table has millions of records and JVM shouldn’t run out of memory
  • To keep code examples simple and focused around particular concurrency approach - instead of actual external service I am using mock service object which returns dummy enriched data
  • For making visible impact of different concurrency approaches, I am considering amplified latencies for both database related network traffic and for mock external service which enriches person data
  • All traffic to database is delayed by 250 millis. To make this possible on local machine - I have developed Netty application delay-simulator which acts as proxy between database and example application code
  • Mock service returns result with predictable delay of 100 - 200 millis based on person id
  • There must not be more than 50 concurrent requests for enriching person data. This is to ensure that target service doesn’t get overloaded by lots of concurrent requests
  • I am considering standard PostgreSQL JDBC driver in all code examples so that database communication related code paths use consistent mechanism for retrieval of records. This helps to relatively compare code complexities involved when using virtual threads or when using abstractions defined by project reactor in respective code examples. In real project, it might be better to use R2DBC or Reactive Hibernate if they are suitable for the project requirement

All example approaches referred in this post and delay-simulator source code is available in loom-and-reactor. I have also prepared set of gists for a quick reference while reading through the post. Click on a link provided in approach title to open respective gist which illustrates code snippet for a particular approach (Note - I haven’t shown project specific classes as part of import statements in these gists)

Approach 1 - Single threaded processing - code link

For this approach we are doing all processing in a single thread (main thread in our case). This is only example code in this post where we are not using Java virtual threads. In this imperative style of example we retrieve records from database table in batch of 100 records at a time and process them one after another by iterating over JDBC result set. For each record we enrich person data by calling PersonDataService which takes about 100–200 millis based on person id. Once 100 records are iterated there will be network call to database (internally by result set) to fetch next batch of 100 records. Due to amplified latencies as mentioned earlier, This sequential style of processing takes a lot of time and overall it takes about 25.5 minutes (1533.44 seconds) to process all records in the table

Approach 2 - Basic multi-threaded processing - code link

For this approach we retrieve records from database table in batch of 100 records at a time and process them concurrently by submitting individual time-consuming enrichment task to virtual thread per task executor. This approach will create 100 virtual threads for processing each submitted task but since we want no more than 50 concurrent requests to enrichment service - we limit overall concurrency by using semaphore which only permits maximum of 50 simultaneous tasks at any point in time. Once all 100 tasks are finished we retrieve next batch of 100 records from database table and repeat same process for concurrent enrichment. This small enhancement in approach makes processing dramatically faster and overall it takes about 64.9 seconds to process all records in the table

Approach 3 - Prefetch records together with multi-threaded processing - code link

This approach adds small enhancement on top of previous approach by fetching next batch of records while records in current batch are being processed concurrently as part of their dedicated virtual threads. Once all tasks are submitted to virtual thread per task executor, we immediately attempt to retrieve next batch of records. Once next batch of records are available we keep waiting till current set of concurrent tasks are complete and once they are complete next iteration of loop immediately submits 100 concurrent enrichment tasks to virtual thread per task executor. So we spend comparatively less or no time in fetching next batch of records (depending on how fast submitted concurrent tasks are complete) and overall this approach takes about 38.3 seconds to process all records in the table

Approach 4 - Poll next record from processing threads and keep next 100 records ready for processing - code link

Prefetching next batch of records while current batch of records are being processed better utilizes resources as we have seen in previous approach. There is still scope of further enhancement in that approach. Remember we are creating 100 virtual threads for each enrichment task but at any given point in time we want to limit maximum of 50 concurrent request to enrichment service. We use semaphore with permit=50 to enforce this limits. So out of 100 tasks which are submitted together - only 50 tasks will make progress and others will wait for permit from semaphore. As soon as 1 task is complete it will release 1 semaphore permit and one of the waiting task can make progress. We are not submitting any new tasks until currently submitted 100 tasks are complete. After 51 of the submitted tasks are complete - there will be less number of concurrent enrichment requests than what is permitted and we will have to wait till longest last task is complete before we start submitting next batch of records

Since enrichment task is time consuming and number of allowed concurrent requests are limited - it would be better if we can continuously maintain 50 concurrent requests to enrichment service and try to utilize available capacity as maximum as possible. In practice there are too many ways to implement this kind of flow but I have prepared one basic implementation where there are two types of tasks - one is responsible for fetching records from database tables and other keeps processing records as soon as they are available from the first task. Here is a high level implementation notes between PersonDataRetriever task and ProcessPersonData task -

  1. There is one virtual thread running PersonDataRetriever task and it is responsible to prefetch and maintain list of 100 records in memory
  2. There are 50 virtual threads running ProcessPersonData task. These tasks poll next available record from PersonDataRetriever task. Once record is available - it sends request for enrichment service and finally prints output to the terminal
  3. When ProcessPersonData task polls next record and if next record is immediately available as part of prefetched list of 100 records then next record will be immediately shared by PersonDataRetriever task. However if prefetched list of records is empty (this can happen if data processor is faster than data retriever), current polling request will be added to list of waiting tasks and it will be signaled with a record as soon as next record is available

In simple terminology - there are 2 tasks where one task is responsible for retrieving records from database table and other task is responsible for processing records. There is a formal contract between them that if record is immediately available then it will be shared to processing task otherwise processing task will be signaled later whenever a record is available. This approach takes about 29.9 seconds to process all records in the table

As I mentioned earlier - this is not only way to implement continuous prefetching and processing but just a basic implementation. We can further add exception handling mechanisms, add generics to the tasks, improve efficiency in contract/implementation etc to make this approach more reusable, maintainable and efficient - but for the purpose of this post I have kept implementation simple

Approach 5 - Use reactor to define data processing pipeline and publish records based on demand - code link

One might think there is a lot of code to solve a simple problem which we have - keep prefetching records and continuously process them concurrently with 50 virtual threads. But Java platform threads, virtual threads or built-in concurrency framework does not have abstractions that help composing such asynchronous data streams. Developers are in control on how to define asynchronous data processing pipeline as we saw in Approach 4 previously. So we may look up to some readily available library which helps composing such asynchronous data pipeline. This is where project reactor becomes helpful. In this approach I have prepared data processing pipeline which is somewhat similar to previous approach. We want to prefetch records from database tables and feed it to data processing pipeline developed using operators defined by project reactor and by considering demand from downstream pipeline operators

In this example - PersonDataRetrieverForReactor is responsible to fetch database table records and feed them to data processing pipeline considering demand from the downstream pipeline operators. Here as we can see data retriever task’s code is a lot more simpler than what we defined in Approach 4. Considering current demand we just emit 1 record at a time. Keeping track of prefetched list of records is now responsibility of reactive pipeline operators which we can declaratively define when we compose a pipeline. This approach takes about 30.3 seconds to process all records in the table which is slightly more than Approach 4 but code example seems a lot more simpler, maintainable and due to reduced complexities there are less chances of bugs

If we use reactive database client such as R2DBC, then we won’t even have to write class like PersonDataRetrieverForReactor which retrieves data considering demand. Since this database client itself is reactive, it will internally consider demand from downstream operators and will keep fetching more data as and when required. Here is a link for code example using such reactive database client - code link. That code example takes about 30.2 seconds to process all records in the table

Some times we may need to execute blocking operation from within reactive pipeline. Project reactor provides a specialized thread pool for executing such blocking operations called bounded elastic thread pool. By default this bounded elastic thread pool is backed by 1:1 platform threads and number of threads in thread pool are limited. Now that project loom is available which can easily handle millions of virtual threads executing blocking I/O tasks - project reactor can take advantage of this by publishing all such blocking operations on a scheduler backed by virtual thread pool like -

Summary:
Developing complex and efficient data pipelines for processing stream of data is very much feasible but quite cumbersome to write when we only rely on virtual threads. In this post I mainly focused on a basic example with streaming source of data but if pipeline is long and having bunch of complex intermediate asynchronous tasks then we may need to rely on libraries which provides reactive programming facilities such as project reactor - until Java itself has built-in APIs with such capability. On the other side - project reactor now can take advantage of virtual threads by publishing all blocking operations from data processing pipeline to a scheduler backed by virtual thread pool

Reference : Project Loom — A Friend or Foe of Reactive? — Oleh Dokuka, VMware & Andrii Rodionov

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Parth Mistry

Enterprise Application and BigData engineering enthusiast. Interested in highly efficient low-cost app design and development with Java/Scala and Rust 🦀