java.util.concurrent Package đơn giản và dễ hiểu (Phần 1)

0 0 0

Người đăng: Bách Nguyễn Ngọc

Theo Viblo Asia

Trong thế giới lập trình Java, việc xử lý đồng thời (concurrency) và song song (parallelism) là những khía cạnh quan trọng để tối ưu hóa hiệu suất và tăng cường khả năng xử lý của ứng dụng. Để hỗ trợ các bạn giải quyết các thách thức liên quan đến việc quản lý nhiều luồng (threads) và tài nguyên một cách hiệu quả, Java cung cấp một gói tiện ích mạnh mẽ mang tên java.util.concurrent. Gói này bao gồm nhiều công cụ và cấu trúc dữ liệu giúp đơn giản hóa việc triển khai các tác vụ đồng thời, từ việc quản lý các tác vụ đơn giản đến các cơ chế đồng bộ phức tạp. Trong bài viết này, chúng ta sẽ khám phá chi tiết java.util.concurrent package, tìm hiểu cách nó hoạt động và cách áp dụng nó vào các tình huống thực tế một cách dễ hiểu và hiệu quả.

Gói java.util.concurrent trong Java là một phần của Thư viện Lớp chuẩn Java (Java Standard Library), được giới thiệu trong Java 5 để hỗ trợ lập trình đồng thời (concurrent programming) một cách hiệu quả và an toàn hơn. Gói này cung cấp một loạt các class và interface để quản lý các tác vụ đồng thời, điều phối luồng (thread) và thực hiện các thao tác đồng bộ hoá. Trước khi pakage này ra đời, người ta cần tự tạo các lớp tiện ích theo nhu cầu của riêng mình.

Bài viết này nhằm mục đích tổng hợp các kiến thức liên quan đến việc xử lý đồng thời và song song trong Java, cũng như các khái niệm liên quan đến gói java.util.concurrent. Các bạn không nhất thiết phải cần phải ngay lập tức nắm vững được hết các khái niệm của nó (tất nhiên là được thì càng tốt) mà mình muốn các bạn hãy coi bài viết như một cẩm nang để có thể research lại một cách nhanh chóng ngay khi gặp một tình huống thực tế cần phải xử lý. Ở đây, mình sẽ cố gắng đưa ra các khái niệm ngắn gọn, đi kèm với các ví dụ đơn giản, để chúng mình có thể dễ dàng tiếp cận và nhanh chóng hiểu về nó. Nào giờ hãy cùng mình đi vào nội dung bài viết nhé. Chúc các bạn học tập tốt...

Các Thành Phần Chính của java.util.concurrent

  • Executor: Giao diện đơn giản để thực thi các tác vụ không đồng bộ.
  • ExecutorService: Mở rộng Executor với các phương pháp để quản lý vòng đời của một executor và để trả về giá trị từ các tác vụ không đồng bộ.
  • Callable: Tương tự như Runnable, nhưng có thể trả về kết quả và ném ra ngoại lệ.
  • Future: Đại diện cho kết quả của một tính toán không đồng bộ.
  • ScheduledExecutorService: Mở rộng ExecutorService để hỗ trợ lập lịch thực thi các tác vụ.
  • ThreadPoolExecutor: Triển khai một pool của các luồng có thể tái sử dụng để thực thi các tác vụ.
  • ScheduledThreadPoolExecutor: Triển khai một pool của các luồng có thể lập lịch để thực thi các tác vụ sau một khoảng thời gian trì hoãn hoặc định kỳ.
  • CountDownLatch: Cho phép một hoặc nhiều luồng chờ đến khi một tập hợp các thao tác trong các luồng khác hoàn thành.
  • CyclicBarrier: Cho phép một nhóm các luồng tất cả cùng chờ nhau đến một điểm chung.
  • Semaphore: Kiểm soát số lượng luồng truy cập đồng thời vào một tài nguyên.
  • Lock: Cung cấp các khóa có thể được sử dụng để kiểm soát truy cập đồng bộ đến một tài nguyên chung.
  • ReentrantLock: Một triển khai của Lock cho phép tái nhập vào cùng một luồng.
  • ConcurrentHashMap: Một phiên bản đồng bộ hóa của HashMap giúp tăng hiệu suất trong các môi trường đa luồng.
  • CopyOnWriteArrayList: Một phiên bản của ArrayList giúp an toàn trong môi trường đa luồng bằng cách sao chép toàn bộ mảng bất cứ khi nào có thay đổi.
  • Exchanger: Cho phép hai luồng trao đổi các đối tượng của mình tại một điểm gặp gỡ đồng bộ hóa.
  • Phaser: Một dạng mở rộng của CyclicBarrier và CountDownLatch hỗ trợ nhiều pha (phases).

Được rồi, bạn đã có thể nắm sơ lược về các thành phần của pakage này, giờ mình sẽ cùng các bạn đi qua từng thành phần đó để xem chúng thực sử là gì và được sử dụng như thế nào nhé.

1. Executor

  • Executor là một giao diện trong gói java.util.concurrent của Java, đại diện cho một cơ chế để thực thi các tác vụ (tasks) không đồng bộ. Thay vì tạo và quản lý các luồng (threads) trực tiếp, Executor cung cấp một cách trừu tượng để gửi các tác vụ và để hệ thống xử lý việc quản lý luồng và thực thi chúng.
  • Là một interface chỉ chứa mỗi phương thức execute(Runnable). Chính ThreadPoolExecutor phải implement interface này và hiện thực phương thức này, giúp bạn đưa một Runnable vào Thread Pool một cách dễ dàng.

Ví dụ về Executor:

import java.util.concurrent.Executor;
import java.util.concurrent.Executors; public class ExecutorExample { public static void main(String[] args) { // Tạo một Executor sử dụng một Thread Pool với 2 luồng. Executor executor = Executors.newFixedThreadPool(2); // Tạo các tác vụ (task) đơn giản. Runnable task1 = () -> { System.out.println("Task 1 đang chạy trên thread: " + Thread.currentThread().getName()); }; Runnable task2 = () -> { System.out.println("Task 2 đang chạy trên thread: " + Thread.currentThread().getName()); }; Runnable task3 = () -> { System.out.println("Task 3 đang chạy trên thread: " + Thread.currentThread().getName()); }; // Gửi các tác vụ đến Executor để thực hiện. executor.execute(task1); executor.execute(task2); executor.execute(task3); }
}

Giải thích ví dụ:

1. Tạo Executor

Executor executor = Executors.newFixedThreadPool(2);

Ở đây, chúng ta tạo một Executor sử dụng một nhóm luồng cố định (ThreadPool) với 2 luồng.

2. Tạo các tác vụ:

Runnable task1 = () -> { System.out.println("Task 1 đang chạy trên thread: " + Thread.currentThread().getName());
}; Runnable task2 = () -> { System.out.println("Task 2 đang chạy trên thread: " + Thread.currentThread().getName());
}; Runnable task3 = () -> { System.out.println("Task 3 đang chạy trên thread: " + Thread.currentThread().getName());
};

Ba tác vụ đơn giản được tạo ra, mỗi tác vụ chỉ đơn giản in ra một đoạn message cộng với tên của luồng đang thực hiện nó.

3. Gửi các tác vụ đến Executor

executor.execute(task1);
executor.execute(task2);
executor.execute(task3);

Các tác vụ được gửi đến Executor để thực hiện. Executor sẽ quản lý và phân phối các tác vụ này cho các luồng trong nhóm luồng của nó.

4. Kết quả

Khi chạy chương trình, bạn sẽ thấy rằng các tác vụ được thực hiện bởi các luồng khác nhau trong thread pool. Vì thread pool có kích thước cố định là 2, nên hai tác vụ đầu tiên sẽ được thực hiện đồng thời, và tác vụ thứ ba sẽ chờ cho đến khi một trong hai luồng hoàn thành công việc của nó.

Ví dụ này minh họa cách Executor có thể được sử dụng để đơn giản hóa việc quản lý luồng và thực hiện các tác vụ đồng thời trong Java.

2. ExecutorService

ExecutorService là một giao diện mở rộng của Executor trong gói java.util.concurrent của Java, cung cấp các phương thức bổ sung để quản lý chu kỳ sống của các executor và hỗ trợ các tác vụ không đồng bộ. Nó không chỉ cho phép gửi các tác vụ mà còn cung cấp các cơ chế để kiểm soát và theo dõi tiến trình thực hiện các tác vụ này.

Các đặc điểm chính của ExecutorService:

  • Quản lý chu kỳ sống: ExecutorService cho phép khởi động, dừng và quản lý việc tắt các executor.
  • Thực thi không đồng bộ: Ngoài việc gửi các tác vụ Runnable, ExecutorService còn hỗ trợ gửi các tác vụ Callable và trả về các đối tượng Future để theo dõi tiến trình và kết quả của chúng.
  • Hỗ trợ nhóm luồng (thread pool): ExecutorService thường được triển khai bằng cách sử dụng nhóm luồng, giúp quản lý và tái sử dụng luồng một cách hiệu quả.

Các phương thức chính của ExecutorService:

  • submit(Runnable task): Gửi một tác vụ Runnable để thực thi và trả về một đối tượng Future đại diện cho tác vụ đó.
  • submit(Callable<T> task): Gửi một tác vụ Callable để thực thi và trả về một đối tượng Future đại diện cho kết quả của tác vụ đó.
  • invokeAll(Collection<? extends Callable<T>> tasks): Thực thi một tập hợp các tác vụ Callable và chờ tất cả hoàn thành.
  • invokeAny(Collection<? extends Callable<T>> tasks): Thực thi một tập hợp các tác vụ Callable và trả về kết quả của tác vụ đầu tiên hoàn thành.
  • shutdown(): Khởi động quá trình tắt ExecutorService, ngừng nhận các tác vụ mới nhưng tiếp tục thực hiện các tác vụ đã gửi.
  • shutdownNow(): Cố gắng ngừng tất cả các tác vụ đang thực hiện và ngừng nhận các tác vụ mới.
  • isShutdown(): Kiểm tra xem ExecutorService đã được tắt hay chưa.
  • isTerminated(): Kiểm tra xem tất cả các tác vụ đã hoàn thành sau khi tắt ExecutorService hay chưa.

Ví dụ về ExecutorService:

Dưới đây là một ví dụ minh họa cách sử dụng ExecutorService để gửi và quản lý các tác vụ trong Java.

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; public class ExecutorServiceExample { public static void main(String[] args) { // Tạo một ExecutorService sử dụng một thread pool cố định gồm 3 luồng. ExecutorService executorService = Executors.newFixedThreadPool(3); // Gửi các tác vụ Runnable đến ExecutorService để thực thi. for (int i = 1; i <= 3; i++) { executorService.submit(new RunnableTask("Task " + i)); } // Gửi một tác vụ Callable đến ExecutorService để thực thi. Future<Integer> futureResult = executorService.submit(new CallableTask(10)); try { // Lấy kết quả của tác vụ Callable. Integer result = futureResult.get(); System.out.println("Kết quả của CallableTask: " + result); } catch (Exception e) { e.printStackTrace(); } // Đóng ExecutorService. executorService.shutdown(); try { // Đợi cho đến khi tất cả các tác vụ hoàn thành hoặc hết thời gian chờ. if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { executorService.shutdownNow(); // Buộc ngừng nếu không hoàn thành trong thời gian chờ. } } catch (InterruptedException e) { executorService.shutdownNow(); } }
} class RunnableTask implements Runnable { private String name; public RunnableTask(String name) { this.name = name; } @Override public void run() { System.out.println(name + " đang thực hiện công việc."); try { Thread.sleep(2000); // Giả lập thời gian thực hiện công việc } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " đã hoàn thành công việc."); }
} class CallableTask implements Callable<Integer> { private int number; public CallableTask(int number) { this.number = number; } @Override public Integer call() { System.out.println("CallableTask đang thực hiện công việc."); try { Thread.sleep(2000); // Giả lập thời gian thực hiện công việc } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("CallableTask đã hoàn thành công việc."); return number * 2; // Trả về kết quả của tác vụ }
}

Giải thích ví dụ:

1. Tạo ExecutorService

ExecutorService executorService = Executors.newFixedThreadPool(3);

Ở đây, chúng ta tạo một ExecutorService sử dụng một thread pool cố định gồm 3 luồng

2. Gửi các tác vụ Runnable đến ExecutorService để thực thi

for (int i = 1; i <= 3; i++) { executorService.submit(new RunnableTask("Task " + i));
}

Chúng ta gửi ba tác vụ RunnableTask đến ExecutorService để thực thi. Các tác vụ này sẽ được thực hiện đồng thời trong thread pool

3. Gửi một tác vụ Callable đến ExecutorService để thực thi

Future<Integer> futureResult = executorService.submit(new CallableTask(10));

Chúng ta gửi một tác vụ CallableTask đến ExecutorService để thực thi và lấy đối tượng Future đại diện cho kết quả của tác vụ đó.

4. Lấy kết quả của tác vụ Callable:

Integer result = futureResult.get();
System.out.println("Kết quả của CallableTask: " + result);

Chúng ta sử dụng phương thức get() của Future để lấy kết quả của tác vụ Callable. Nếu tác vụ chưa hoàn thành, phương thức này sẽ chờ cho đến khi tác vụ hoàn thành và trả về kết quả.

5. Đóng ExecutorService

executorService.shutdown();
try { if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { executorService.shutdownNow(); }
} catch (InterruptedException e) { executorService.shutdownNow();
}

Chúng ta gọi shutdown() để ngừng nhận các tác vụ mới và chờ cho đến khi tất cả các tác vụ hoàn thành. Nếu các tác vụ không hoàn thành trong thời gian chờ (5 giây), chúng ta gọi shutdownNow() để buộc ngừng.

6. Kết quả

Khi chạy chương trình, bạn sẽ thấy rằng các tác vụ Runnable và Callable được thực thi đồng thời bởi ExecutorService. Kết quả của tác vụ Callable sẽ được in ra màn hình sau khi nó hoàn thành. ExecutorService sau đó sẽ được đóng và chương trình kết thúc. Ví dụ này minh họa cách ExecutorService có thể được sử dụng để quản lý và thực thi các tác vụ không đồng bộ trong Java, cung cấp một công cụ mạnh mẽ để làm việc với các ứng dụng đa luồng.

3. Callable và Future

Callable

Callable là một giao diện trong gói java.util.concurrent của Java, tương tự như Runnable nhưng có thể trả về kết quả và ném ngoại lệ có kiểm tra. Callable thường được sử dụng để thực hiện các tác vụ tính toán và trả về kết quả sau khi hoàn thành.

Đặc điểm của Callable:

  • Trả về kết quả: Callable trả về một kết quả của loại cụ thể khi tác vụ hoàn thành.
  • Ném ngoại lệ có kiểm tra: Callable có thể ném các ngoại lệ có kiểm tra, cho phép xử lý các điều kiện lỗi phức tạp hơn.

Cú pháp của Callable

public interface Callable<V> { V call() throws Exception;
}

Future

Future là một giao diện trong gói java.util.concurrent của Java, đại diện cho kết quả của một tính toán không đồng bộ. Nó cung cấp các phương thức để kiểm tra xem tác vụ đã hoàn thành chưa, đợi đến khi nó hoàn thành, và lấy kết quả của tác vụ.

Đặc điểm của Future

  • Kiểm tra trạng thái: Future cho phép kiểm tra xem tác vụ đã hoàn thành chưa thông qua phương thức isDone().
  • Chờ kết quả: Future cung cấp phương thức get() để chờ và lấy kết quả của tác vụ.
  • Hủy tác vụ: Future cho phép hủy tác vụ thông qua phương thức cancel().

Cú pháp của Future

public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Ví dụ về Callable và Future:

<< Ví dụ trong phần 2. ExecutorService>>

3. ScheduledExecutorService

ScheduledExecutorService là một giao diện trong gói java.util.concurrent của Java, được thiết kế để lập lịch các tác vụ (tasks) để chạy sau một khoảng thời gian trì hoãn hoặc theo chu kỳ lặp lại. Nó là một phần của các nhóm luồng (thread pools) và cung cấp một cách hiệu quả để quản lý việc thực hiện các tác vụ định kỳ hoặc bị trì hoãn trong các ứng dụng đa luồng.

Các phương thức chính của ScheduledExecutorService:

ScheduledExecutorService mở rộng ExecutorService và bổ sung các phương thức lập lịch, bao gồm:

  • schedule(Runnable command, long delay, TimeUnit unit): Lập lịch thực hiện một tác vụ Runnable sau một khoảng thời gian trì hoãn.
  • schedule(Callable<V> callable, long delay, TimeUnit unit): Lập lịch thực hiện một tác vụ Callable sau một khoảng thời gian trì hoãn.
  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit): Lập lịch thực hiện một tác vụ định kỳ với khoảng thời gian cố định giữa các lần bắt đầu.
  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit): Lập lịch thực hiện một tác vụ định kỳ với khoảng thời gian cố định giữa các lần kết thúc và bắt đầu.

Ví dụ về ScheduledExecutorService

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; public class ScheduledExecutorServiceExample { public static void main(String[] args) { // Tạo một ScheduledExecutorService với một nhóm luồng gồm 1 luồng. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); // Lập lịch thực hiện một tác vụ Runnable sau 5 giây. scheduledExecutorService.schedule(new RunnableTask(), 5, TimeUnit.SECONDS); // Lập lịch thực hiện một tác vụ Runnable định kỳ mỗi 3 giây. scheduledExecutorService.scheduleAtFixedRate(new RunnableTask(), 1, 3, TimeUnit.SECONDS); // Lập lịch thực hiện một tác vụ Runnable định kỳ với khoảng cách cố định 3 giây giữa các lần kết thúc và bắt đầu. scheduledExecutorService.scheduleWithFixedDelay(new RunnableTask(), 1, 3, TimeUnit.SECONDS); // Đóng ScheduledExecutorService sau 20 giây để kết thúc chương trình. scheduledExecutorService.schedule(new Runnable() { @Override public void run() { scheduledExecutorService.shutdown(); System.out.println("ScheduledExecutorService đã đóng."); } }, 20, TimeUnit.SECONDS); }
} class RunnableTask implements Runnable { @Override public void run() { System.out.println("Thực hiện công việc tại: " + System.currentTimeMillis() / 1000); }
}

Giải thích ví dụ

1. Tạo ScheduledExecutorService:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

Ở đây, chúng ta tạo một ScheduledExecutorService với một thread pool gồm 1 luồng.

2. Lập lịch thực hiện một tác vụ sau 5 giây:

scheduledExecutorService.schedule(new RunnableTask(), 5, TimeUnit.SECONDS);

3. Lập lịch thực hiện một tác vụ định kỳ mỗi 3 giây:

scheduledExecutorService.scheduleAtFixedRate(new RunnableTask(), 1, 3, TimeUnit.SECONDS);

4. Lập lịch thực hiện một tác vụ định kỳ với khoảng cách cố định 3 giây giữa các lần kết thúc và bắt đầu:

scheduledExecutorService.scheduleWithFixedDelay(new RunnableTask(), 1, 3, TimeUnit.SECONDS);

Chúng ta lập lịch thực hiện một tác vụ RunnableTask định kỳ với khoảng cách cố định 3 giây giữa các lần kết thúc và bắt đầu, bắt đầu sau 1 giây.

  1. Đóng ScheduledExecutorService sau 20 giây:
scheduledExecutorService.schedule(new Runnable() { @Override public void run() { scheduledExecutorService.shutdown(); System.out.println("ScheduledExecutorService đã đóng."); }
}, 20, TimeUnit.SECONDS);

Chúng ta lập lịch đóng ScheduledExecutorService sau 20 giây để kết thúc chương trình.

6. Kết quả

Khi chạy chương trình, bạn sẽ thấy rằng các tác vụ được thực hiện theo lịch đã định, với các thông báo in ra màn hình cho biết thời gian thực hiện của mỗi tác vụ. Sau 20 giây, ScheduledExecutorService sẽ đóng và chương trình sẽ kết thúc.

Ví dụ này minh họa cách ScheduledExecutorService có thể được sử dụng để lập lịch thực hiện các tác vụ trong Java, cung cấp một cách linh hoạt và hiệu quả để quản lý các tác vụ định kỳ hoặc bị trì hoãn trong các ứng dụng đa luồng.

4. ThreadPoolExecutor

ThreadPoolExecutor là một lớp trong gói java.util.concurrent của Java, cung cấp một cách triển khai mạnh mẽ và linh hoạt cho ExecutorService bằng cách sử dụng một nhóm các luồng (thread pool) để quản lý và thực thi các tác vụ. ThreadPoolExecutor giúp quản lý việc tạo, hủy, và tái sử dụng các luồng một cách hiệu quả, giảm chi phí và tối ưu hóa hiệu suất trong các ứng dụng đa luồng.

ThreadPoolExecutor chính là đại diện của Thread Pool. Như trong các ví dụ ở trên, mình sử dụng Executors gọi ra các phương thức tiện ích, ví dụ như Executors.newFixedThreadPool(3), bản chất của việc này chính là giúp chúng ta đơn giản hóa trong việc khai báo một ThreadPoolExecutor.

Bạn có thể chỉ cần biết đến Executors cũng đã đủ để có thể xây dựng các Thread Pool tiện dụng được sử dụng ở hầu hết các nhu cầu của dự án. Tuy nhiên việc hiểu và nắm được ThreadPoolExecutor sẽ giúp các bạn hiểu được bản chất của ThreadPool cũng như có thể tùy biến một cách linh hoạt cụ thể hơn khi khai báo ThreadPool.

Đặc điểm của ThreadPoolExecutor

  • Quản lý luồng: ThreadPoolExecutor quản lý việc tạo và tái sử dụng các luồng, giúp giảm chi phí khởi tạo luồng mới.

  • Linh hoạt: ThreadPoolExecutor cho phép cấu hình số lượng luồng tối thiểu và tối đa, cũng như các chính sách xử lý khi có quá nhiều tác vụ.

  • Thực thi không đồng bộ: ThreadPoolExecutor hỗ trợ thực thi các tác vụ không đồng bộ và cung cấp các cơ chế để quản lý và theo dõi trạng thái của các tác vụ này.

Cú pháp:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);

Các tham số của ThreadPoolExecutor Khi tạo một đối tượng ThreadPoolExecutor, bạn có thể cấu hình các tham số sau:

  • corePoolSize: Số lượng luồng cơ bản trong nhóm luồng.
  • maximumPoolSize: Số lượng luồng tối đa trong nhóm luồng.
  • keepAliveTime: Thời gian mà các luồng vượt quá số lượng cơ bản sẽ chờ để được tái sử dụng trước khi bị hủy.
  • unit: Đơn vị thời gian cho keepAliveTime.
  • workQueue: Hàng đợi các tác vụ đang chờ được thực thi.
  • threadFactory: Cách tạo các luồng mới.
  • handler: Chính sách xử lý khi có quá nhiều tác vụ để thực thi.

Ví dụ về ThreadPoolExecutor

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorExample { public static void main(String[] args) { // Tạo một ThreadPoolExecutor với các tham số cấu hình. ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, // corePoolSize 4, // maximumPoolSize 60, // keepAliveTime TimeUnit.SECONDS, // unit new LinkedBlockingQueue<Runnable>() // workQueue ); // Gửi các tác vụ đến ThreadPoolExecutor để thực thi. for (int i = 1; i <= 5; i++) { executor.submit(new Task("Task " + i)); } // Đóng ThreadPoolExecutor. executor.shutdown(); try { // Đợi cho đến khi tất cả các tác vụ hoàn thành hoặc hết thời gian chờ. if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { executor.shutdownNow(); // Buộc ngừng nếu không hoàn thành trong thời gian chờ. } } catch (InterruptedException e) { executor.shutdownNow(); } }
} class Task implements Runnable { private String name; public Task(String name) { this.name = name; } @Override public void run() { System.out.println(name + " đang thực hiện công việc."); try { Thread.sleep(2000); // Giả lập thời gian thực hiện công việc } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " đã hoàn thành công việc."); }
}

Giải thích ví dụ:

1. Tạo ThreadPoolExecutor:

ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, // corePoolSize 4, // maximumPoolSize 60, // keepAliveTime TimeUnit.SECONDS, // unit new LinkedBlockingQueue<Runnable>() // workQueue
);

Ở đây, chúng ta tạo một ThreadPoolExecutor với 2 luồng cơ bản (corePoolSize), 4 luồng tối đa (maximumPoolSize), thời gian giữ sống cho các luồng dư thừa là 60 giây (keepAliveTime), và một hàng đợi LinkedBlockingQueue để chứa các tác vụ đang chờ.

2. Gửi các tác vụ đến ThreadPoolExecutor để thực thi:

for (int i = 1; i <= 5; i++) { executor.submit(new Task("Task " + i));
}

Chúng ta gửi 5 tác vụ Task đến ThreadPoolExecutor để thực thi. Các tác vụ này sẽ được thực thi đồng thời bởi các luồng trong nhóm luồng.

3. Đóng ThreadPoolExecutor:

executor.shutdown();
try { if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { executor.shutdownNow(); }
} catch (InterruptedException e) { executor.shutdownNow();
}

Chúng ta gọi shutdown() để ngừng nhận các tác vụ mới và chờ cho đến khi tất cả các tác vụ hoàn thành. Nếu các tác vụ không hoàn thành trong thời gian chờ (5 giây), chúng ta gọi shutdownNow() để buộc ngừng.

4. Kết quả

Khi chạy chương trình, bạn sẽ thấy rằng các tác vụ Task được thực thi đồng thời bởi các luồng trong ThreadPoolExecutor. ThreadPoolExecutor quản lý việc tạo và tái sử dụng các luồng một cách hiệu quả, đảm bảo rằng các tác vụ được thực hiện nhanh chóng và hiệu quả.

Lợi ích của việc sử dụng ThreadPoolExecutor

  • Quản lý luồng hiệu quả: ThreadPoolExecutor giúp quản lý việc tạo và tái sử dụng các luồng, giảm chi phí khởi tạo luồng mới và tối ưu hóa hiệu suất của ứng dụng.
  • Linh hoạt: ThreadPoolExecutor cho phép cấu hình số lượng luồng, hàng đợi và các chính sách xử lý, cung cấp một công cụ mạnh mẽ và linh hoạt để quản lý các tác vụ không đồng bộ.
  • Thực thi không đồng bộ: ThreadPoolExecutor hỗ trợ thực thi các tác vụ không đồng bộ và cung cấp các cơ chế để quản lý và theo dõi trạng thái của các tác vụ này, giúp đơn giản hóa việc quản lý các tác vụ đồng thời trong các ứng dụng đa luồng.

5. ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor là một lớp trong gói java.util.concurrent của Java, mở rộng từ ThreadPoolExecutor và bổ sung khả năng lập lịch thực thi các tác vụ. Nó cho phép bạn lập lịch thực thi các tác vụ sau một khoảng thời gian trì hoãn hoặc lặp lại định kỳ.

Đặc điểm của ScheduledThreadPoolExecutor

  • Lập lịch thực thi: ScheduledThreadPoolExecutor cho phép lập lịch thực thi các tác vụ với một khoảng thời gian trì hoãn cụ thể hoặc lặp lại định kỳ.
  • Quản lý luồng hiệu quả: Tận dụng lợi ích của ThreadPoolExecutor, giúp quản lý việc tạo và tái sử dụng các luồng, giảm chi phí khởi tạo luồng mới và tối ưu hóa hiệu suất của ứng dụng.
  • Linh hoạt và mạnh mẽ: Cung cấp các phương thức linh hoạt để lập lịch và quản lý các tác vụ.

Các phương thức chính của ScheduledThreadPoolExecutor

  • schedule(Runnable command, long delay, TimeUnit unit): Lập lịch thực thi một tác vụ Runnable sau một khoảng thời gian trì hoãn.

  • schedule(Callable<V> callable, long delay, TimeUnit unit): Lập lịch thực thi một tác vụ Callable sau một khoảng thời gian trì hoãn.

  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit): Lập lịch thực thi định kỳ một tác vụ Runnable với một khoảng thời gian ban đầu trì hoãn và một chu kỳ cố định.

  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit): Lập lịch thực thi định kỳ một tác vụ Runnable với một khoảng thời gian ban đầu trì hoãn và một khoảng thời gian trì hoãn giữa các lần thực thi liên tiếp.

Ví dụ về ScheduledThreadPoolExecutor

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class ScheduledThreadPoolExecutorExample { public static void main(String[] args) { // Tạo một ScheduledThreadPoolExecutor với một nhóm luồng cố định gồm 2 luồng. ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2); // Lập lịch thực thi một tác vụ Runnable sau 3 giây. executor.schedule(new Task("One-time Task"), 3, TimeUnit.SECONDS); // Lập lịch thực thi một tác vụ Runnable định kỳ mỗi 2 giây sau 5 giây trì hoãn ban đầu. executor.scheduleAtFixedRate(new Task("Fixed-rate Task"), 5, 2, TimeUnit.SECONDS); // Lập lịch thực thi một tác vụ Runnable định kỳ với trì hoãn 4 giây giữa các lần thực thi, sau 5 giây trì hoãn ban đầu. executor.scheduleWithFixedDelay(new Task("Fixed-delay Task"), 5, 4, TimeUnit.SECONDS); // Đóng ScheduledThreadPoolExecutor sau 20 giây để kết thúc chương trình. executor.schedule(() -> { executor.shutdown(); try { if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); } }, 20, TimeUnit.SECONDS); }
} class Task implements Runnable { private String name; public Task(String name) { this.name = name; } @Override public void run() { System.out.println(name + " đang thực hiện công việc tại " + System.currentTimeMillis()); try { Thread.sleep(1000); // Giả lập thời gian thực hiện công việc } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " đã hoàn thành công việc tại " + System.currentTimeMillis()); }
}

Giải thích ví dụ

1. Tạo ScheduledThreadPoolExecutor:

ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2);

Chúng ta tạo một ScheduledThreadPoolExecutor với một nhóm luồng cố định gồm 2 luồng.

2. Lập lịch thực thi một tác vụ Runnable sau 3 giây:

executor.schedule(new Task("One-time Task"), 3, TimeUnit.SECONDS);

Chúng ta lập lịch thực thi một tác vụ Runnable sau 3 giây.

3. Lập lịch thực thi định kỳ một tác vụ Runnable mỗi 2 giây sau 5 giây trì hoãn ban đầu:

executor.scheduleAtFixedRate(new Task("Fixed-rate Task"), 5, 2, TimeUnit.SECONDS);

Chúng ta lập lịch thực thi định kỳ một tác vụ Runnable mỗi 2 giây sau 5 giây trì hoãn ban đầu.

4. Lập lịch thực thi định kỳ với trì hoãn 4 giây giữa các lần thực thi, sau 5 giây trì hoãn ban đầu:

executor.scheduleWithFixedDelay(new Task("Fixed-delay Task"), 5, 4, TimeUnit.SECONDS);

Chúng ta lập lịch thực thi định kỳ với trì hoãn 4 giây giữa các lần thực thi, sau 5 giây trì hoãn ban đầu.

5. Đóng ScheduledThreadPoolExecutor sau 20 giây để kết thúc chương trình:

executor.schedule(() -> { executor.shutdown(); try { if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); }
}, 20, TimeUnit.SECONDS);

Chúng ta lập lịch đóng ScheduledThreadPoolExecutor sau 20 giây để kết thúc chương trình, đảm bảo rằng tất cả các tác vụ đã hoàn thành trước khi đóng.

6. Kết quả

Khi chạy chương trình, bạn sẽ thấy rằng các tác vụ Task được lập lịch và thực thi theo các cấu hình đã định. Các tác vụ này sẽ được thực hiện sau khoảng thời gian trì hoãn ban đầu và lặp lại theo chu kỳ cố định hoặc với khoảng thời gian trì hoãn giữa các lần thực thi.

6. CountDownLatch

CountDownLatch là một lớp trong gói java.util.concurrent của Java, được sử dụng để đồng bộ hóa một hoặc nhiều luồng bằng cách cho phép một hoặc nhiều luồng chờ cho đến khi một bộ đếm đếm ngược về 0. Đây là một công cụ đơn giản nhưng mạnh mẽ để điều phối các hoạt động đồng bộ trong môi trường đa luồng.

Cách hoạt động của CountDownLatch

CountDownLatch được khởi tạo với một số lượng đếm ngược ban đầu. Các luồng có thể gọi phương thức await() để chờ cho đến khi bộ đếm đếm ngược về 0. Các luồng khác có thể gọi phương thức countDown() để giảm bộ đếm. Khi bộ đếm đếm ngược về 0, tất cả các luồng đang chờ sẽ được tiếp tục thực thi.

Nó thường được sử dụng trong các tình huống như:

  • Chờ một nhóm tác vụ hoàn thành trước khi thực hiện tác vụ tiếp theo.
  • Đồng bộ hóa một số luồng để chúng bắt đầu cùng một thời điểm.

Đặc điểm của CountDownLatch

  • Cơ chế chờ với bộ đếm: Một hoặc nhiều luồng chờ bộ đếm của CountDownLatch giảm xuống 0 trước khi tiếp tục.

  • Không tái sử dụng được: Sau khi bộ đếm giảm về 0, CountDownLatch không thể được đặt lại.

  • Dễ dàng sử dụng trong các bài toán đồng bộ: Hữu ích trong các trường hợp như chờ nhiều luồng hoàn thành hoặc đồng bộ hóa một nhóm luồng.

Các phương thức chính của CountDownLatch

  • CountDownLatch(int count): Khởi tạo với số lượng đếm ban đầu.
  • void countDown(): Giảm giá trị của bộ đếm đi 1.
  • void await(): Chờ đến khi bộ đếm giảm về 0.
  • boolean await(long timeout, TimeUnit unit): Chờ đến khi bộ đếm giảm về 0 hoặc hết thời gian chờ.
  • long getCount(): Trả về giá trị hiện tại của bộ đếm.

Ví dụ về CountDownLatch

import java.util.concurrent.CountDownLatch; public class CountDownLatchExample { public static void main(String[] args) { // Tạo một CountDownLatch với giá trị đếm ngược ban đầu là 3. CountDownLatch latch = new CountDownLatch(3); // Tạo và khởi động 3 luồng công việc. for (int i = 0; i < 3; i++) { new Thread(new Worker(latch)).start(); } try { // Chờ cho đến khi bộ đếm của CountDownLatch đếm ngược về 0. latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Tất cả các công việc đã hoàn thành. Tiếp tục thực hiện các tác vụ tiếp theo."); }
} class Worker implements Runnable { private CountDownLatch latch; public Worker(CountDownLatch latch) { this.latch = latch; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " đang thực hiện công việc."); try { // Giả lập thời gian thực hiện công việc bằng cách ngủ trong 1 giây. Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " đã hoàn thành công việc."); latch.countDown(); // Giảm bộ đếm của CountDownLatch. }
}

Giải thích ví dụ

1. Tạo CountDownLatch:

CountDownLatch latch = new CountDownLatch(3);

Ở đây, chúng ta tạo một CountDownLatch với giá trị đếm ngược ban đầu là 3. Điều này có nghĩa là bộ đếm sẽ bắt đầu từ 3 và cần được giảm về 0 để các luồng chờ có thể tiếp tục.

2. Tạo và khởi động các luồng công việc:

for (int i = 0; i < 3; i++) { new Thread(new Worker(latch)).start();
}

Chúng ta tạo và khởi động 3 luồng công việc. Mỗi luồng sẽ thực hiện công việc trong lớp Worker.

3. Lớp Worker:

class Worker implements Runnable { private CountDownLatch latch; public Worker(CountDownLatch latch) { this.latch = latch; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " đang thực hiện công việc."); try { // Giả lập thời gian thực hiện công việc bằng cách ngủ trong 1 giây. Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " đã hoàn thành công việc."); latch.countDown(); // Giảm bộ đếm của CountDownLatch. }
}

Trong lớp Worker, mỗi luồng sẽ thực hiện công việc và sau đó gọi latch.countDown() để giảm bộ đếm của CountDownLatch. Mỗi lần countDown() được gọi, bộ đếm sẽ giảm 1.

4. Chờ các luồng hoàn thành công việc trong luồng chính:

try { latch.await();
} catch (InterruptedException e) { e.printStackTrace();
}
System.out.println("Tất cả các công việc đã hoàn thành. Tiếp tục thực hiện các tác vụ tiếp theo.");

Luồng chính sẽ gọi latch.await() để chờ cho đến khi bộ đếm của CountDownLatch đếm ngược về 0. Khi tất cả các luồng công việc hoàn thành và gọi countDown(), bộ đếm sẽ về 0 và luồng chính sẽ tiếp tục thực thi.

5. Kết quả

Khi chạy chương trình, bạn sẽ thấy rằng các luồng công việc sẽ thực hiện công việc của mình và sau đó giảm bộ đếm của CountDownLatch. Luồng chính sẽ chờ cho đến khi tất cả các công việc hoàn thành trước khi tiếp tục thực hiện các tác vụ tiếp theo.

Ví dụ này minh họa cách CountDownLatch có thể được sử dụng để đồng bộ hóa các luồng trong Java, đảm bảo rằng các tác vụ không tiếp tục cho đến khi tất cả các luồng cần thiết đã hoàn thành công việc của mình.

Còn tiếp ...

Bình luận

Bài viết tương tự

- vừa được xem lúc

Tìm hiểu về Thread Pool trong java

1. Mở đầu.

0 0 49

java.util.concurrent Package đơn giản và dễ hiểu (Phần 2)

Tiếp nối phần 1 chúng ta hãy cùng đến với những lớp còn lại của gói java.util.concurrent. 6.

0 0 0

- vừa được xem lúc

Tìm hiểu về Thread Pool trong java

1. Mở đầu.

0 0 49

java.util.concurrent Package đơn giản và dễ hiểu (Phần 2)

Tiếp nối phần 1 chúng ta hãy cùng đến với những lớp còn lại của gói java.util.concurrent. 6.

0 0 0

- vừa được xem lúc

Tìm hiểu IDE dành cho Java lập trình viên nên dùng

IDE (Integrated Development Environment) là một phần mềm giúp cho việc phát triển phần mềm trở nên dễ dàng hơn. Với IDE, bạn có thể viết mã nguồn, sửa đổi mã nguồn, xây dựng, thử nghiệm và gỡ lỗi chươ

0 0 28

- vừa được xem lúc

Hướng dẫn sử dụng NetBeans trong lập trình Java

Chào mừng bạn đến với bài viết hướng dẫn sử dụng NetBeans trong lập trình Java! Có nhiều hướng dẫn sử dụng NetBeans trong lập trình Java. Bạn có thể cài đặt NetBeans theo 2 cách: download JDK tích hợp

0 0 29