Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Java by (3.4k points)

Is it possible to specify a custom thread pool for Java 8 parallel stream? I can not find it anywhere.

Imagine that I have a server application and I would like to use parallel streams. But the application is large and multi-threaded so I want to compartmentalize it. I do not want a slow running task in one module of the applicationblock tasks from another module.

If I can not use different thread pools for different modules, it means I can not safely use parallel streams in most of the real world situations.

Try the following example. There are some CPU intensive tasks executed in separate threads. The tasks leverage parallel streams. The first task is broken, so each step takes 1 second (simulated by thread sleep). The issue is that other threads get stuck and wait for the broken task to finish. This is contrived example, but imagine a servlet app and someone submitting a long running task to the shared fork join pool.

 

public class ParallelTest {

    public static void main(String[] args) throws InterruptedException {

        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task

        es.execute(() -> runTask(0));

        es.execute(() -> runTask(0));

        es.execute(() -> runTask(0));

        es.execute(() -> runTask(0));

        es.execute(() -> runTask(0));

        es.shutdown();

        es.awaitTermination(60, TimeUnit.SECONDS);

    }

    private static void runTask(int delay) {

        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()

                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));

    }

    public static boolean isPrime(long n) {

        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);

    }

}

1 Answer

0 votes
by (46k points)

There is a trick on how to perform a parallel process in a particular fork-join pool. If you perform it as a job in a fork-join supply, it waits there and does not use the common one.

ForkJoinPool forkJoinPool = new ForkJoinPool(2);

forkJoinPool.submit(() ->

    //parallel task here, for example

    IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())

).get();

The method is based on ForkJoinTask.fork which defines: "Designs to asynchronously perform this job in the pool the current duty is working in, if applicable, or practicing the ForkJoinPool.commonPool() if not inForkJoinPool()"

Related questions

0 votes
1 answer
0 votes
1 answer
0 votes
1 answer
asked Oct 9, 2019 in Java by Anvi (10.2k points)

Browse Categories

...