For the following program I am trying to figure out why using 2 different streams parallelizes the task and using the same stream and calling join/get on the Completable future makes them take longer time equivalent to as if they were sequentially processed).
public class HelloConcurrency { private static Integer sleepTask(int number) { System.out.println(String.format("Task with sleep time %d", number)); try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); return -1; } return number; } public static void main(String[] args) { List<Integer> sleepTimes = Arrays.asList(1,2,3,4,5,6); System.out.println("WITH SEPARATE STREAMS FOR FUTURE AND JOIN"); ExecutorService executorService = Executors.newFixedThreadPool(6); long start = System.currentTimeMillis(); List<CompletableFuture<Integer>> futures = sleepTimes.stream() .map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService) .exceptionally(ex -> { ex.printStackTrace(); return -1; })) .collect(Collectors.toList()); executorService.shutdown(); List<Integer> result = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); long finish = System.currentTimeMillis(); long timeElapsed = (finish - start)/1000; System.out.println(String.format("done in %d seconds.", timeElapsed)); System.out.println(result); System.out.println("WITH SAME STREAM FOR FUTURE AND JOIN"); ExecutorService executorService2 = Executors.newFixedThreadPool(6); start = System.currentTimeMillis(); List<Integer> results = sleepTimes.stream() .map(sleepTime -> CompletableFuture.supplyAsync(() -> sleepTask(sleepTime), executorService2) .exceptionally(ex -> { ex.printStackTrace(); return -1; })) .map(CompletableFuture::join) .collect(Collectors.toList()); executorService2.shutdown(); finish = System.currentTimeMillis(); timeElapsed = (finish - start)/1000; System.out.println(String.format("done in %d seconds.", timeElapsed)); System.out.println(results); } }
Output
WITH SEPARATE STREAMS FOR FUTURE AND JOIN Task with sleep time 6 Task with sleep time 5 Task with sleep time 1 Task with sleep time 3 Task with sleep time 2 Task with sleep time 4 done in 6 seconds. [1, 2, 3, 4, 5, 6] WITH SAME STREAM FOR FUTURE AND JOIN Task with sleep time 1 Task with sleep time 2 Task with sleep time 3 Task with sleep time 4 Task with sleep time 5 Task with sleep time 6 done in 21 seconds. [1, 2, 3, 4, 5, 6]
Answer
The two approaches are quite different, let me try to explain it clearly
1st approach : In the first approach you are spinning up all Async
requests for all 6 tasks and then calling join
function on each one of them to get the result
2st approach : But in the second approach you are calling the join
immediately after spinning the Async
request for each task. For example after spinning Async
thread for task 1
calling join
, make sure that thread to complete task and then only spin up the second task with Async
thread
Note : Another side if you observe the output clearly, In the 1st approach output appears in random order since the all six tasks were executed asynchronously. But during second approach all tasks were executed sequentially one after the another.
I believe you have an idea how stream map
operation is performed, or you can get more information from here or here
To perform a computation, stream operations are composed into a stream pipeline. A stream pipeline consists of a source (which might be an array, a collection, a generator function, an I/O channel, etc), zero or more intermediate operations (which transform a stream into another stream, such as filter(Predicate)), and a terminal operation (which produces a result or side-effect, such as count() or forEach(Consumer)). Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.