Lập trình đa luồng với Callable và Future trong Java
Trong bài viết Lập trình đa luồng trong Java các bạn đã biết được 2 cách để tạo một Thread trong Java: tạo 1 đối tượng của lớp được extend từ class Thread hoặc implements từ interface Runnable. Trong bài viết này tôi giới thiệu với các bạn một cách khác để tạo Thread, đó là Callable trong Java với khả năng trả về kết quả Future<T> sau khi xử lý và có thể throw Exception nếu trong quá trình xử lý tác vụ có lỗi xảy ra.
Nội dung [Ẩn]
- 1 Callable
- 2 Future
- 3 Ví dụ sử dụng Callable và Future
- 3.1 Sử dụng phương thức submit(Callable) của ExecutorService với kiểu trả về là 1 Future<T>
- 3.2 Sử dụng phương thức get() của Future<T> với Timeout
- 3.3 Sử dụng phương thức invokeAny(Collection<?> extends Callable<T> tasks)
- 3.4 Sử dụng phương thức invokeAll(Collection<?> extends Callable<T> tasks)
- 3.5 Hủy bỏ một Future
Callable
Callable là một interface sử dụng Java Generic để định nghĩa đối tượng sẽ trả về sau khi xử lý xong tác vụ.
Callable interface cũng giống như Runnable, ngoại trừ khác ở chỗ thay vì trả về void từ run() method của Runnable thì call() method của Callable trả về đối tượng kiểu Future<T> (bất kỳ) hoặc throw Exception.
Runnable:
public abstract void run() => kiểu trả về là void
Callable:
<T> Future<T> submit(Callable<T> task) => kiểu trả về là Future<T>
<T> Future<T> submit(Runnable<T> task) => kiểu trả về là Future<T>
Future
Dùng để lấy kết quả khi thực thi một Thread, có thể lấy Future khi submit một task vào ThreadPool. Task ở đây là một object implement Runnable hay Callable.
Một số method của Future:
- isDone() : Kiểm tra task hoàn thành hay không?
- cancel() : Hủy task
- isCancelled(): Kiểm tra task bị hủy trước khi hoàn thành hay không?
- get() : Lấy kết quả trả về của task.
Ví dụ sử dụng Callable và Future
Sử dụng phương thức submit(Callable) của ExecutorService với kiểu trả về là 1 Future<T>
Ví dụ về tính tổng bình phương của 10 số, thay vì xử lý việc tính tổng này trong Thread chính của chương trình, tôi sẽ tạo mới Thread sử dụng Callable để xử lý và nhận kết quả về thông qua Future.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | package com.gpcoder.threadpool.callable; import java.util.Random; import java.util.concurrent.Callable; public class CallableWorker implements Callable<Integer> { private int num; private Random ran; public CallableWorker( int num) { this .num = num; this .ran = new Random(); } public Integer call() throws Exception { Thread.sleep(ran.nextInt( 10 ) * 1000 ); int result = num * num; System.out.println( "Complete " + num); return result; } } |
Để thực thi tác vụ của Callable, chúng ta phải submit nó vào một ThreadPool sử dụng phương thức submit() của Executor Framework. Để nhận kết quả trả về chúng ta sử dụng phương thức get() của lớp Future. Ta có chương trình như bên dưới:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | package com.gpcoder.threadpool.callable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CallableExample { public static void main(String[] args) { // create a list to hold the Future object associated with Callable List<Future<Integer>> list = new ArrayList<>(); // Get ExecutorService from Executors utility class, thread pool size is 5 ExecutorService executor = Executors.newFixedThreadPool( 5 ); Callable<Integer> callable; Future<Integer> future; for ( int i = 1 ; i <= 10 ; i++) { callable = new CallableWorker(i); // submit Callable tasks to be executed by thread pool future = executor.submit(callable); // add Future to the list, we can get return value using Future list.add(future); } // shut down the executor service now executor.shutdown(); // Wait until all threads are finish while (!executor.isTerminated()) { // Running ... } int sum = 0 ; for (Future<Integer> f : list) { try { sum += f.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } System.out.println( "Finished all threads: " ); System.out.println( "Sum all = " + sum); } } |
Thực thi chương trình trên, ta có kết quả như sau:
1 2 3 4 5 6 7 8 9 10 11 12 | Complete 5 Complete 4 Complete 7 Complete 6 Complete 3 Complete 2 Complete 9 Complete 1 Complete 8 Complete 10 Finished all threads: Sum all = 385 |
Lưu ý: Tương tự như submit(Callable), phương thức submit(Runnable) cũng đưa vào 1 Runnable và nó trả về một đối tượng Future. Đối tượng Future có thể được sử dụng để kiểm tra nếu Runnable đã hoàn tất việc thực thi.
Sử dụng phương thức get() của Future<T> với Timeout
Phương thức get() là synchronous, do đó nó sẽ blocking chương trình của chúng ta mỗi khi đợi kết quả của Callable. Để hạn chế blocking chương trình quá lâu, chúng ta có thể sử dụng phương thức get() này với một thời gian Timeout như sau:
1 | future.get( 7 , TimeUnit.SECONDS); |
Lưu ý: khi sử dụng phương thức get() với Timeout có thể nhận một TimeoutException nếu thời gian thực thi của task vượt quá khoảng thời gian Timeout.
Ví dụ:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | package com.gpcoder.threadpool.callable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class CallableExample2 { public static final int GET_TIME_OUT = 5 ; public static final int NUM_OF_TASK = 10 ; public static void main(String[] args) throws TimeoutException, InterruptedException { // create a list to hold the Future object associated with Callable List<Future<Integer>> list = new ArrayList<>(); // Get ExecutorService from Executors utility class, thread pool size is 5 ExecutorService executor = Executors.newFixedThreadPool( 5 ); Callable<Integer> callable; Future<Integer> future; for ( int i = 1 ; i <= NUM_OF_TASK; i++) { callable = new CallableWorker(i); // submit Callable tasks to be executed by thread pool future = executor.submit(callable); // add Future to the list, we can get return value using Future list.add(future); } int sum = 0 ; for (Future<Integer> f : list) { try { // print the return value of Future, notice the output delay in console // because Future.get() waits for task to get completed int result = f.get(GET_TIME_OUT, TimeUnit.SECONDS); // Throw TimeoutException if the task execute over 7s sum += result; System.out.println( "Result: " + result); System.out.println( "Is completed? : " + f.isDone()); System.out.println( "Is canceled? : " + f.isCancelled()); } catch (TimeoutException | CancellationException | InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.println( "---" ); } // shut down the executor service now executor.shutdownNow(); // Blocks until all tasks have completed execution after a shutdown request, or // the timeout occurs, or the current thread is interrupted, whichever happens // first. while (!executor.awaitTermination(GET_TIME_OUT * NUM_OF_TASK * 1000 , TimeUnit.SECONDS)) { // Running ... } System.out.println( "Finished all threads: " ); System.out.println( "Sum all = " + sum); } } |
Thực thi chương trình trên vài lần bạn sẽ nhận được exception và kết quả như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | Complete 1 Result: 1 Is completed? : true Is canceled? : false --- Complete 6 java.util.concurrent.TimeoutException --- at java.util.concurrent.FutureTask.get(FutureTask.java: 205 ) at com.gpcoder.threadpool.callable.CallableExample2.main(CallableExample2.java: 42 ) Complete 2 Complete 3 Result: 9 Is completed? : true Is canceled? : false --- Complete 5 Complete 10 Complete 8 Complete 4 Result: 16 Is completed? : true Is canceled? : false --- Result: 25 Is completed? : true Is canceled? : false --- Result: 36 Is completed? : true Is canceled? : false --- Complete 7 Result: 49 Is completed? : true Is canceled? : false --- Result: 64 Is completed? : true Is canceled? : false --- java.util.concurrent.TimeoutException --- Result: 100 Is completed? : true Is canceled? : false --- at java.util.concurrent.FutureTask.get(FutureTask.java: 205 ) at com.gpcoder.threadpool.callable.CallableExample2.main(CallableExample2.java: 42 ) Finished all threads: Sum all = 300 |
Sử dụng phương thức invokeAny(Collection<?> extends Callable<T> tasks)
Phương thức invokeAny() trả về một Future, nhưng trả về kết quả của một trong những đối tượng Callable. Nó không đảm bảo về kết quả bạn sẽ nhận được từ callable nào, chỉ cần một trong số chúng hoàn thành. Tức là ko cần tất cả các Thread hòan thành, chỉ cần 1 task hoàn thành phương thức get() sẽ nhận được kết quả.
Nếu 1 trong số task hoàn thành hoặc ném ra 1 ngoại lệ, phần còn lại của Callable sẽ được hủy bỏ (cancelled).
Ví dụ:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | package com.gpcoder.threadpool.callable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class InvokeAnyExample { public static void main(String[] args) throws InterruptedException, ExecutionException { // Get ExecutorService from Executors utility class, thread pool size is 5 ExecutorService executor = Executors.newFixedThreadPool( 5 ); List<Callable<Integer>> callables = new ArrayList<>(); for ( int i = 1 ; i <= 10 ; i++) { callables.add( new CallableWorker(i)); } Integer result = executor.invokeAny(callables); System.out.println( "Result = " + result); executor.shutdown(); System.out.println( "Finished all threads " ); } } |
Thực thi chương trình trên sẽ in ra các kết quả được trả về từ 1 trong 10 Callable từ danh sách callables. Mỗi lần chạy sẽ nhận được một kết quả khác nhau.
Kết quả lần 1:
1 2 3 | Complete 4 Result = 16 Finished all threads |
Kết quả lần 2:
1 2 3 4 | Complete 3 Complete 6 Result = 9 Finished all threads |
Sử dụng phương thức invokeAll(Collection<?> extends Callable<T> tasks)
Phương thức invokeAll() gọi tất cả đối tượng Callable trong tập hợp. Phương thức này trả về 1 danh sách các đối tượng Future<T> mà được trả về từ việc thực thi các Callables.
Ví dụ:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | package com.gpcoder.threadpool.callable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class InvokeAllExample { public static void main(String[] args) throws InterruptedException, ExecutionException { // Get ExecutorService from Executors utility class, thread pool size is 5 ExecutorService executor = Executors.newFixedThreadPool( 5 ); List<Callable<Integer>> callables = new ArrayList<>(); for ( int i = 1 ; i <= 10 ; i++) { callables.add( new CallableWorker(i)); } List<Future<Integer>> futures = executor.invokeAll(callables); int sum = 0 ; for (Future<Integer> future : futures) { sum += future.get(); } System.out.println( "Sum all = " + sum); executor.shutdown(); System.out.println( "Finished all threads " ); } } |
Thực thi chương trình trên, bạn sẽ thấy tất cả các Callable đều được thực thi và kết quả được lưu vào List<Future>. Ta có kết quả như sau:
1 2 3 4 5 6 7 8 9 10 11 12 | Complete 1 Complete 5 Complete 7 Complete 4 Complete 8 Complete 2 Complete 3 Complete 9 Complete 6 Complete 10 Sum all = 385 Finished all threads |
Hủy bỏ một Future
Bạn có thể hủy một Future bằng cách sử dụng phương thức Future.cancel(). Khi phương thức này được gọi, nó cố gắng để hủy bỏ việc thực hiện các task và trả về true nếu nó bị hủy bỏ thành công, nếu không, nó trả về false.
Phương thức cancel() chấp nhận đối số boolean – mayInterruptIfRunning. Nếu gán giá trị true cho đối số này, Thread hiện đang thi hành task sẽ bị gián đoạn (interrupted), nếu không các task đang xử lý sẽ được phép hoàn thành.
Bạn có thể sử dụng phương thức isCancelled() để kiểm tra xem một task có bị hủy hay không. Ngoài ra, sau khi huỷ bỏ task thì phương thức isDone() sẽ luôn luôn có kết quả là true.
Ví dụ:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | package com.gpcoder.threadpool.callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class FutureCancelExample { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newSingleThreadExecutor(); long startTime = System.currentTimeMillis(); Future<Integer> future = executorService.submit( new CallableWorker( 1 )); while (!future.isDone()) { System.out.println( "Task is still working ..." ); Thread.sleep( 200 ); long workingTime = (System.currentTimeMillis() - startTime); if (TimeUnit.SECONDS.toSeconds(workingTime) > 1000 ) { future.cancel( true ); } } executorService.shutdown(); if (!future.isCancelled()) { System.out.println( "Task completed! Retrieving the result" ); System.out.println( "Result = " + future.get()); } else { System.out.println( "Task was cancelled" ); } System.out.println( "It will throw exception form here" ); System.out.println( "Result = " + future.get()); } } |
Thực thi chương trình trên, ta có kết quả như sau:
1 2 3 4 5 6 7 8 9 10 11 | Task is still working ... Task is still working ... Task is still working ... Task is still working ... Task is still working ... Task was cancelled It will throw exception form here Exception in thread "main" java.util.concurrent.CancellationException at java.util.concurrent.FutureTask.report(FutureTask.java: 121 ) at java.util.concurrent.FutureTask.get(FutureTask.java: 192 ) at com.gpcoder.threadpool.callable.FutureCancelExample.main(FutureCancelExample.java: 37 ) |
Như bạn thấy chương trình trên nó sẽ ném một ngoại lệ (exception), bởi vì phương thức future.get() sẽ ném CancellationException nếu task được hủy bỏ. Chúng ta có thể xử lý sự kiện này bằng cách kiểm tra Future có bị hủy bỏ hay không trước khi lấy kết quả thông qua phương thức future.isCancelled().
Trên đây là toàn bộ những kiến thức cơ bản về việc sử dụng Callable và Future với ExecutorService Framework. Hy vọng bài viết này giúp ích cho các bạn, hẹn gặp lại ở các bài viết tiếp theo.
No comments:
Post a Comment