- vừa được xem lúc

Temporal workflow trong Quarkus

0 0 36

Người đăng: Chu Văn Hạnh

Theo Viblo Asia

Giới Thiệu

Tiếp tục với series Java Practice, hôm nay chúng ta tiếp tục tìm hiểu về Temporal một thư viện workflow còn khá mới mẻ và triển khai nó trên Quarkus.
Bắt đầu thôi.

Temporal là gì?

Đầu tiên chắc chắn bạn đã hiểu về workflow!

Workflow (luồng công việc) là một sơ đồ miêu tả thứ tự thực hiện từng công việc, từng sự kiện. Sơ đồ này giúp cho nhà quản trị thấy được chính xác công việc được thực hiện như thế nào hay có thể dùng nó để thiết kế một trình tự công việc khoa học và mang lại hiệu quả cao.


Theo Temporal.io đề cập định nghĩa như sau:

A Temporal Application is a set of Temporal Workflow Executions. Each Temporal Workflow Execution has exclusive access to its local state, executes concurrently to all other Workflow Executions, and communicates with other Workflow Executions and the environment via message passing

Một cách dễ hiểu thì Temporal là một luồng chứa các công việc cần thực hiện và yêu cầu bạn cần thực hiện tuần tự công việc của mình, Trong Temporal một luồng công việc có thể kéo dài nhiều ngày để xử lý và các khoảng GAP Time giữa các công việc trong luồng đôi khi kéo dài hàng giờ đồng hồ nhưng không tiêu tốn tài nguyên máy chủ.

image.png

Ví dụ: Một luồng đơn hàng khi khách hàng đặt có thể kéo dài tới 10 ngày và trải qua nhiều công đoạn công việc khác nhau từ xác nhận đơn hàng -> đóng gói -> bàn giao vận chuyển -> giao hàng thành công.
Đây là một quy trình sẽ lập đi lập lại và nó yêu cầu thực hiện tuần tự từng bước (bỏ qua các ngoại lệ về tồn kho và giao hàng thất bại).
Trên thực tế một đơn hàng có rất nhiều trạng thái và trường hợp có thể xảy ra, và điều này là không hề dễ dàng để quản lý thì Temporal ra đời như một vị cứu tinh, giúp giải quyết các vấn đề về luồng công việc đơn giản hơn.
Temporal đã được đưa vô Hoàng Phúc giúp việc xử lý luồng đơn hàng bớt khó khăn hơn rất nhiều.

image.png

Temporal có 4 thành phần quan trọng để giúp nó vận hành

  1. Macthing Service: Service này dùng để lưu trữ các Task Queues
  2. Worker Service: Service này sẽ triển khai các quy trình làm việc(workflow)
  3. History Service: Nó ghi nhớ lại quá trình làm việc, lưu trữ các mutable state, queues
  4. Frontend Service: hỗ trợ về rate limiting, routing và authorizing

Trong bài viết này chúng ta sẽ không đi sâu về Temporal Cluster mà chỉ tập trung vào phần Worker Service thôi nhé 😄

Worker Service Worker Service sẽ có 2 thành phần chính là workflowactivity

  • workflow sẽ là các bước tiến trình của công việc, trong ví dụ trên thì mỗi trạng thái là một truy trình công việc.
  • activity là các quy trình con của workflow.

Cài đặt Temporal

Trong bài viết này mình sẽ triển khai Temporal trên Docker.
Các bạn tạo file docker-compose.yml với nội dung bên dưới

version: "3.5"
services: elasticsearch: container_name: temporal-elasticsearch environment: - cluster.routing.allocation.disk.threshold_enabled=true - cluster.routing.allocation.disk.watermark.low=512mb - cluster.routing.allocation.disk.watermark.high=256mb - cluster.routing.allocation.disk.watermark.flood_stage=128mb - discovery.type=single-node - ES_JAVA_OPTS=-Xms100m -Xmx100m image: elasticsearch:7.16.2 networks: - temporal-network ports: - 9200:9200 postgresql: container_name: temporal-postgresql environment: POSTGRES_PASSWORD: temporal POSTGRES_USER: temporal image: postgres:13 networks: - temporal-network ports: - 5432:5432 temporal: container_name: temporal depends_on: - postgresql - elasticsearch environment: - DB=postgresql - DB_PORT=5432 - POSTGRES_USER=temporal - POSTGRES_PWD=temporal - POSTGRES_SEEDS=postgresql - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development_es.yaml - ENABLE_ES=true - ES_SEEDS=elasticsearch - ES_VERSION=v7 image: temporalio/auto-setup:1.15.0 networks: - temporal-network ports: - 7233:7233 volumes: - ./dynamicconfig:/etc/temporal/config/dynamicconfig temporal-admin-tools: container_name: temporal-admin-tools depends_on: - temporal environment: - TEMPORAL_CLI_ADDRESS=temporal:7233 image: temporalio/admin-tools:1.15.0 networks: - temporal-network stdin_open: true tty: true temporal-web: container_name: temporal-web depends_on: - temporal environment: - TEMPORAL_GRPC_ENDPOINT=temporal:7233 - TEMPORAL_PERMIT_WRITE_API=true image: temporalio/web:1.13.0 networks: - temporal-network ports: - 8088:8088
networks: temporal-network: driver: bridge name: temporal-network

Đợi docker pull image về và setup xong, các bạn truy cập trình duyệt http://localhost:8088 và thấy giao diện như hình là ok rồi nhé.

Screen Shot 2022-03-22 at 23.52.51.png

Temporal với Quarkus

Trong bài viết này mình sẽ triển khai ví dụ trên Quarkus nhé, đối với các bạn sử dụng Spring Boot cũng sẽ triển khai tương tự như cấu trúc này.
Bạn có thể xem qua code mình đã dùng trong bài viết này tại: https://github.com/sackaboy/temporal-order-example

Thêm Dependency vô file POM:

<dependency> <groupId>io.temporal</groupId> <artifactId>temporal-sdk</artifactId> <version>1.5.0</version>
</dependency>

Tạo file WorkflowObservable.java để khởi tạo kết nối tới Temporal

@ApplicationScoped
public class WorkflowObservable { private static final Logger LOG = Logger.getLogger(WorkflowObservable.class); private WorkflowClient client; private WorkerFactory factory; void onStart(@Observes StartupEvent event){ LOG.info("****************** On start: Run workflow ****************"); WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(); client = WorkflowClient.newInstance(service); factory = WorkerFactory.newInstance(client); Worker worker = factory.newWorker(OrderProduct.QUEUE_NAME); worker.registerWorkflowImplementationTypes(OrderProductImpl.class); worker.registerActivitiesImplementations(new ActivityOrderNewProductImpl()); LOG.info("Start WorkflowService "); factory.start(); } void onStop(@Observes ShutdownEvent event){ factory.shutdown(); LOG.info("****************** On stop: stop workflow ****************"); } public WorkflowClient getClient(){ return client; }
}

Đoạn code trên minh đăng ký với Temporal Workflow là class OrderProduct, Workflow Implement là OrderProductImpl và Activity Implement l class ActivityOrderNewProductImpl, Trong Workflow nó sẽ tiến hành tìm WorkflowMethod và các QueryMethod, SignalMethod nếu có.
Activity được coi là các quy trình con của Workflow nên Temporal sẽ import tất cả phương thức được khai báo trong Activity.

Trong file OrderProduct.java

@WorkflowInterface
public interface OrderProduct { public static final String QUEUE_NAME = "Customer_Order"; @WorkflowMethod Invoice orderNewProduct(Invoice invoice); @QueryMethod String getStatus(); @SignalMethod void signalOrderAccepted(); @SignalMethod void signalOrderPickedUp(); @SignalMethod void signalDelivered();
}

Tiếp tới file ActivityOrderNewProductImpl sẽ implements class OrderProduct để triển khai các quy trình con(Activity).

public class OrderProductImpl implements OrderProduct { private final RetryOptions retryoptions = RetryOptions.newBuilder() .setInitialInterval(Duration.ofSeconds(1)) .setMaximumInterval(Duration.ofSeconds(5)) .setBackoffCoefficient(2) .setMaximumAttempts(10) .build(); private final ActivityOptions defaultActivityOptions = ActivityOptions.newBuilder() // Timeout options specify when to automatically timeout Activities if the process is taking too long. .setStartToCloseTimeout(Duration.ofSeconds(60)) // Optionally provide customized RetryOptions. // Temporal retries failures by default, this is simply an example. .setRetryOptions(retryoptions) .build(); private final ActivityOrderNewProduct activityOrderNewProduct = Workflow.newActivityStub(ActivityOrderNewProduct.class,defaultActivityOptions); public boolean isOrderConfirmed = false; public boolean isOrderPickedUp = false; public boolean isOrderDelivered = false; private String status = ""; private static final Logger LOG = Logger.getLogger(OrderProductImpl.class); @Override public Invoice orderNewProduct(Invoice invoice) { try{ activityOrderNewProduct.placeOrder(invoice.getCustomer()); status = "waiting confirm"; if(activityOrderNewProduct.checkInventoryQuantity( invoice.getInvoiceDetailList().get(0).getProduct(), invoice.getInvoiceDetailList().get(0).getQuantity()) ){ LOG.info("***** Waiting for Hoang Phuc International to confirm your order"); activityOrderNewProduct.assignEmpConfirm(); Workflow.await(() -> isOrderConfirmed); LOG.info("isOrderConfirmed: "+isOrderConfirmed); invoice.setStatus("confirmed"); status = invoice.getStatus(); LOG.info("***** Please wait till we assign a delivery executive"); Workflow.await(() -> isOrderPickedUp); invoice.setStatus( "picked up"); status = invoice.getStatus(); LOG.info("isOrderPickedUp: "+isOrderPickedUp); Workflow.await(() -> isOrderDelivered); invoice.setStatus( "delivered"); status = invoice.getStatus(); }else{ LOG.info("***** Placed Order Not Successfully !"); invoice.setStatus( "cancelled"); status = invoice.getStatus(); activityOrderNewProduct.notifyCustomer(invoice.getCustomer()); } return invoice; }catch (Exception e){ invoice.setStatus( "cancelled cause error"); status = invoice.getStatus(); } return invoice; } @Override public String getStatus() { return status; } @Override public void signalOrderAccepted() { activityOrderNewProduct.setOrderAccepted(); this.isOrderConfirmed = false; } @Override public void signalOrderPickedUp() { activityOrderNewProduct.setOrderPickedUp(); this.isOrderPickedUp = true; } @Override public void signalDelivered() { activityOrderNewProduct.setOrderDelivered(); this.isOrderDelivered = true; }
}

Tiếp theo chúng ta sẽ khai báo với Temporal các Activity, Trước tiên mình sẽ khai báo 1 lớp Interface để khai báo Activity

@ActivityInterface
public interface ActivityOrderNewProduct { void placeOrder(Customer customer); Boolean checkInventoryQuantity(Product product, int quantity); void assignEmpConfirm() throws IOException; void notifyEmployee() throws IOException; void notifyCustomer(Customer customer); void notifyDeliver(Employee employee); void setOrderAccepted(); void setOrderPickedUp(); void setOrderDelivered();
}

và tiếp tục là class ActivityOrderNewProductImpl để implements lại class ActivityOrderNewProduct.

public class ActivityOrderNewProductImpl implements ActivityOrderNewProduct { public ActivityOrderNewProductImpl(){ } private static final Logger LOG = Logger.getLogger(ActivityOrderNewProductImpl.class); @Override public void placeOrder(Customer customer) { LOG.info("***** Customer order new"); } @Override public Boolean checkInventoryQuantity(Product product, int quantity) { if(product.getQuantity() >= quantity){ LOG.info("***** Inventory enough to provide"); return true; } LOG.info("***** Inventory not enough to provide"); return false; } @Override public void assignEmpConfirm() throws IOException { //Get Employee LOG.info("***** Order just assigned to: "); this.notifyEmployee(); } @Override public void notifyEmployee() throws IOException { LOG.info("***** Notify ZNS to employee: " ); } @Override public void notifyCustomer(Customer customer) { LOG.info("***** Notify to customer : " + customer.getName()); } @Override public void notifyDeliver(Employee employee) { LOG.info("***** Notify to deliver : "+employee.getName()); } @Override public void setOrderAccepted() { LOG.info("***** Employee has confirmed your order"); } @Override public void setOrderPickedUp() { LOG.info("***** Order has been picked up"); } @Override public void setOrderDelivered() { LOG.info("***** Order Delivered"); }
}

Trong file WorkflowResource.java bạn mình tạo ra 1 hàm để thực thi đẩy 1 workflow lên Temporal.

@POST @Path("/placeOrder") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) public Invoice placeNewOrder(PageLoadInvoice pageLoadInvoice) { int productOrderQuantity = 2; Invoice invoice = invoiceBusiness.createNew(pageLoadInvoice.cusID); InvoiceDetail invoiceDetail = invoiceDetailBusiness.createNew(invoice.getId(),pageLoadInvoice.prodID, productOrderQuantity); invoice.addInvoiceDetail(invoiceDetail); OrderProduct workflow = workflowObservable.getClient().newWorkflowStub( OrderProduct.class, WorkflowOptions.newBuilder() .setWorkflowId("Customer_Order_" + Math.abs(new Random().nextInt())) .setTaskQueue(OrderProduct.QUEUE_NAME).build()); WorkflowClient.start(workflow::orderNewProduct,invoice); return invoice; }

Trong hàm này các bạn có thể thấy Workflow được chỉ định sẽ start workflow này bằng hàm oderNewProduct nằm trong interface OrderProduct. Lúc này oderProductImpl implements oderNewProduct sẽ được gọi ra và xử lý.
Bạn có thể thấy hàm orderNewProduct trong oderProductImpl có nhiệm vụ gọi đến các Activity và khi một Activity được thực hiện, Temporal sẽ ghi nhớ quá trình này là tiếp tục thực hiện các Activity khác sau 1 thời gian nghỉ.

Trong oderProductImpl bạn sẽ thấy có rất nhiều đoạn Workflow.await(Supplier<Boolean> unblockCondition), khi Temporal chạy đến hàm này nó sẽ đợi cho tới khi Condition = true. Trong Temporal cơ chế nghỉ này sẽ không tiêu tốn tài nguyên máy chủ.

Trong ví dụ trên của mình Workflow.await sẽ cần phải nhận được các SignalMethod từ bên ngoài gọi vô để tiếp tục thực hiện.

Sau đó các bạn thực hiện gọi hàm này

curl -X 'POST' \ 'http://localhost:8080/workflow/placeOrder' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "prodID": 1, "cusID": 1
}'

Lúc này 1 Workflow mới đã được khởi tạo với trạng thái Running.

Screen Shot 2022-03-23 at 19.12.48.png

Khi một đơn hàng mới tạo ra mình sẽ gắn cho nó trạng thái bắt đầu là waiting confirm

Screen Shot 2022-03-23 at 19.23.38.png

Tiếp đó mình sẽ thực hiện gọi tiếp hàm sau:

curl -X 'GET' \ 'http://localhost:8080/workflow/confirmed/271924149' \ -H 'accept: application/json'

Hàm này sẽ thực hiện các nhiệm vụ của nó là đưa trạng thái đơn hàng thành confirmed.
Tương tự với hàm pickedup, delivered cũng sẽ đưa trạng thái đơn hàng về đúng nhu cầu và kết thúc 1 workflow khi nhận được trạng thái delivered.

Screen Shot 2022-03-23 at 19.36.23.png

Ngoài việc triển khai các Workflow bằng code, bạn hoàn toàn có thể khai báo các workflow của mình trong file Json. Điều này là vô cùng tiện lợi khi nghiệm của của các ứng dụng phức tạp và có sự thay đổi nhiều. Nếu nhận được sự ủng hộ của mọi người về Temporal thì mình sẽ viết 1 bài khác hướng dẫn sau 😄

Kết thúc

Temporal là một kiến trúc rất hay, tuy nhiên trong 1 bài viết ngắn ngủi cũng tương đối khó để mình trình bài hết về Temporal cũng như giúp bạn hiểu rõ được nó.
Hẹn các bạn trong bài viết khác trong series Java practice.
Bạn có thể xem qua code mình đã dùng trong bài viết này tại: https://github.com/sackaboy/temporal-order-example

Bình luận

Bài viết tương tự

- vừa được xem lúc

Tổng hợp các bài hướng dẫn về Design Pattern - 23 mẫu cơ bản của GoF

Link bài viết gốc: https://gpcoder.com/4164-gioi-thieu-design-patterns/. Design Patterns là gì. Design Patterns không phải là ngôn ngữ cụ thể nào cả.

0 0 302

- vừa được xem lúc

Học Spring Boot bắt đầu từ đâu?

1. Giới thiệu Spring Boot. 1.1.

0 0 277

- vừa được xem lúc

Cần chuẩn bị gì để bắt đầu học Java

Cần chuẩn bị những gì để bắt đầu lập trình Java. 1.1. Cài JDK hay JRE.

0 0 50

- vừa được xem lúc

Sử dụng ModelMapper trong Spring Boot

Bài hôm nay sẽ là cách sử dụng thư viện ModelMapper để mapping qua lại giữa các object trong Spring nhé. Trang chủ của ModelMapper đây http://modelmapper.org/, đọc rất dễ hiểu dành cho các bạn muốn tìm hiểu sâu hơn. 1.

0 0 194

- vừa được xem lúc

[Java] 1 vài tip nhỏ khi sử dụng String hoặc Collection part 1

. Hello các bạn, hôm nay mình sẽ chia sẻ về mẹo check String null hay full space một cách tiện lợi. Mình sẽ sử dụng thư viện Lớp StringUtils download file jar để import vào thư viện tại (link).

0 0 71

- vừa được xem lúc

Deep Learning với Java - Tại sao không?

Muốn tìm hiểu về Machine Learning / Deep Learning nhưng với background là Java thì sẽ như thế nào và bắt đầu từ đâu? Để tìm được câu trả lời, hãy đọc bài viết này - có thể kỹ năng Java vốn có sẽ giúp bạn có những chuyến phiêu lưu thú vị. DJL là tên viết tắt của Deep Java Library - một thư viện mã ng

0 0 139