- vừa được xem lúc

Sử dụng Apache Kafka với Quarkus

0 0 41

Người đăng: Duy Nguyễn

Theo Viblo Asia

Kafka là gì

Kafka là gì? – Đó là hệ thống message pub/sub phân tán (distributed messaging system). Bên pulbic dữ liệu được gọi là producer, bên subscribe nhận dữ liệu theo topic được gọi là consumer. Kafka có khả năng truyền một lượng lớn message theo thời gian thực, trong trường hợp bên nhận chưa nhận message vẫn được lưu trữ sao lưu trên một hàng đợi và cả trên ổ đĩa bảo đảm an toàn. Đồng thời nó cũng được replicate trong cluster giúp phòng tránh mất dữ liệu.
Screen Shot 2022-03-21 at 23.05.07.png

Ở bài viết này mình sẽ trình bày cách sử dụng Apache Kafka trong Quarkus

Yêu cầu

  • JDK 1.8+ installed with JAVA_HOME configured appropriately
  • Apache Maven 3.6.2+
  • Docker Compose to start a development cluster

Docker

Đây là image mà mình sử dụng để start kafka cho bài viết này.

version: '3'
services: zookeeper: image: confluentinc/cp-zookeeper:6.0.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 networks: - kafka kafka: image: confluentinc/cp-enterprise-kafka:6.0.0 hostname: kafka restart: "always" container_name: kafka depends_on: - zookeeper ports: - "29092:29092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: | PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: | PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 networks: - kafka kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui restart: "no" ports: - "9080:8080" environment: KAFKA_CLUSTERS_0_NAME: "kafka" KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka:9092" networks: - kafka networks: kafka: external: name: kafka 

Tạo Project

Tạo project ở trang chủ của Quarkus tại đây, nhập group id và artifact id.
Bạn tìm và thêm 2 extension là:

  • SmallRye Reactive Messaging - Kafka Connector
  • RESTEasy Jackson

Screen Shot 2022-03-21 at 23.16.04.png

Click Generate your application để tải xuống , sau đó unzip và mở bằng IDE yêu thích của bạn.

Tại file pom.xml bạn sẽ thấy 2 extension quarkus-smallrye-reactive-messaging-kafkaquarkus-resteasy-jackson đã được thêm vào.

Hmm ngoài lề một chút, dạo này giá xăng hiện tại tăng chóng mặt nên mình sẽ viết ứng dụng gửi message kèm nâng giá xăng lên nhé 😄

Tạo Class Gas pojo như sau:

package practice.kafka; public class Gas { public String name; public double price; public Gas() { } public Gas(String name, double price) { this.name = name; this.price = price; }
}

Tạo thêm một Class GasProcessor để nâng giá xăng như sau:

package practice.kafka; import javax.enterprise.context.ApplicationScoped; @ApplicationScoped
public class GasProcessor { private static final double CONVERSION_RATE = 1.5; public Gas process(Gas gas) { gas.price = gas.price * CONVERSION_RATE; return gas; }
}

Chúng ta nhận vào giá gốc sau đó thay đổi giá cả và gửi chúng lại cho Kafka.
Để làm được chúng ta sẽ cần cài đặt JSON serialization.

Với extension quarkus-resteasy-jackson cung cấp cho chúng ta ObjectMapperSerializer sử dụng cho việc serialize tất cả các pojo thông qua Jackson, nhưng đối với deserializer thì Generic, nên cần phải phân lớp.

Để deserializer cần tạo Class với tên là GasDeserializer kế thừa từ ObjectMapperDeserializer.

package practice.kafka; import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer; public class GasDeserializer extends ObjectMapperDeserializer<Gas> { public GasDeserializer(){ super(Gas.class); }
} 

Cấu hình để sử dụng Jackson serializer, deserializer và kết nối máy chủ Kafka trong file application.properties

kafka.bootstrap.servers=http://localhost:29092 # Configure the Kafka source (we read from it)
mp.messaging.incoming.gas-in.connector=smallrye-kafka
mp.messaging.incoming.gas-in.topic=gas
mp.messaging.incoming.gas-in.value.deserializer=practice.kafka.GasDeserializer # Configure the Kafka sink (we write to it)
mp.messaging.outgoing.gas-out.connector=smallrye-kafka
mp.messaging.outgoing.gas-out.topic=gas
mp.messaging.outgoing.gas-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Mình sẽ tạo thêm một Class API ví như Producer để tạo ra messages gửi lên cho Kafka.

package practice.kafka; import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter; import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.concurrent.CompletionStage; @Path("gas")
@ApplicationScoped
public class GasResource { @Inject GasProcessor gasProcessor; @Inject @Channel("gas-out") Emitter<Gas> gasEmitter; @POST @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) public Response increasePrice(Gas gas){ gas = gasProcessor.process(gas); CompletionStage<Void> ack = gasEmitter.send(gas); return Response.ok().entity(ack).build(); }
}

Ở đây mình sẽ sử dụng annotation @Channel để khai báo đầu ra của message.
Sử dụng Emitter để gửi message lên máy chủ Kafka.

Sau khi đã có producer gửi message, thì tiếp theo mình cũng sẽ tạo thêm một Class Consume để nhận message từ Kafka.

package practice.kafka; import org.eclipse.microprofile.reactive.messaging.Incoming; public class GasConsume { @Incoming("gas-in") public void outputGas(Gas gas){ System.out.printf("Gas price has been increased to %s",gas.price); }
}

Với annotation @Incoming khai báo đầu nhận message từ Kafka.

À để sử dụng API dễ dàng hơn thì ta sử dụng swagger, bạn copy bỏ vào file pom.xml nhé:

<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>

Chạy câu lệnh để cài đặt extension trong file pom.xml

mvn clean install

Để khởi động ứng dụng

mvn quarkus:dev

Gọi thử API gas

curl -X 'POST' \ 'http://localhost:8080/gas' \ -H 'accept: */*' \ -H 'Content-Type: application/json' \ -d '{ "name": "Xăng 95", "price": 18000
}'

Và giá xăng đã tăng gấp rưỡi từ 18000 thành 27000 😦

Screen Shot 2022-03-22 at 01.10.25.png

Kết Luận

Vậy với bài tóm tắt trên, các bạn có thể sử dụng Apache Kafka với Quarkus một cách cơ bản và nhanh chóng.

Nếu có thể bài sau mình sẽ trình bày cách sử dụng Avro Schema Registry với Kafka và Quarkus.

Cảm ơn mọi người, chúc mọi người nhiều sức khoẻ ❤️

References

https://quarkus.io/guides/kafka

Bình luận

Bài viết tương tự

- vừa được xem lúc

RESTful API Design: Best Practices

Hey hey hey hey, cuối năm cũng khá bận bịu công việc này kia nên cũng không có nhiều thời gian viết bài phục vụ anh em được. Nay mình xin chia sẻ một vài những tiêu chí mà mình hay sử dụng khi viết REST API.

0 0 39

- vừa được xem lúc

18. Responsive là gì?

Truy cập http://fullstack.edu.

0 0 44

- vừa được xem lúc

19. Media queries?

Truy cập http://fullstack.edu.

0 0 44

- vừa được xem lúc

20. Tablet responsive

Truy cập http://fullstack.edu.

0 0 34

- vừa được xem lúc

21. Mobile menu responsive

Truy cập http://fullstack.edu.

0 0 33

- vừa được xem lúc

22. Mobile menu fix bug

Truy cập http://fullstack.edu.

0 0 26