Kafka là gì
Kafka là gì? – Đó là hệ thống message pub/sub phân tán (distributed messaging system). Bên pulbic dữ liệu được gọi là producer, bên subscribe nhận dữ liệu theo topic được gọi là consumer. Kafka có khả năng truyền một lượng lớn message theo thời gian thực, trong trường hợp bên nhận chưa nhận message vẫn được lưu trữ sao lưu trên một hàng đợi và cả trên ổ đĩa bảo đảm an toàn. Đồng thời nó cũng được replicate trong cluster giúp phòng tránh mất dữ liệu.
Ở bài viết này mình sẽ trình bày cách sử dụng Apache Kafka trong Quarkus
Yêu cầu
- JDK 1.8+ installed with JAVA_HOME configured appropriately
- Apache Maven 3.6.2+
- Docker Compose to start a development cluster
Docker
Đây là image mà mình sử dụng để start kafka cho bài viết này.
version: '3'
services: zookeeper: image: confluentinc/cp-zookeeper:6.0.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 networks: - kafka kafka: image: confluentinc/cp-enterprise-kafka:6.0.0 hostname: kafka restart: "always" container_name: kafka depends_on: - zookeeper ports: - "29092:29092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: | PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: | PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 networks: - kafka kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui restart: "no" ports: - "9080:8080" environment: KAFKA_CLUSTERS_0_NAME: "kafka" KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka:9092" networks: - kafka networks: kafka: external: name: kafka
Tạo Project
Tạo project ở trang chủ của Quarkus tại đây, nhập group id và artifact id.
Bạn tìm và thêm 2 extension là:
- SmallRye Reactive Messaging - Kafka Connector
- RESTEasy Jackson
Click Generate your application
để tải xuống , sau đó unzip và mở bằng IDE yêu thích của bạn.
Tại file pom.xml
bạn sẽ thấy 2 extension quarkus-smallrye-reactive-messaging-kafka
và quarkus-resteasy-jackson
đã được thêm vào.
Hmm ngoài lề một chút, dạo này giá xăng hiện tại tăng chóng mặt nên mình sẽ viết ứng dụng gửi message kèm nâng giá xăng lên nhé
Tạo Class Gas pojo như sau:
package practice.kafka; public class Gas { public String name; public double price; public Gas() { } public Gas(String name, double price) { this.name = name; this.price = price; }
}
Tạo thêm một Class GasProcessor để nâng giá xăng như sau:
package practice.kafka; import javax.enterprise.context.ApplicationScoped; @ApplicationScoped
public class GasProcessor { private static final double CONVERSION_RATE = 1.5; public Gas process(Gas gas) { gas.price = gas.price * CONVERSION_RATE; return gas; }
}
Chúng ta nhận vào giá gốc sau đó thay đổi giá cả và gửi chúng lại cho Kafka.
Để làm được chúng ta sẽ cần cài đặt JSON serialization.
Với extension quarkus-resteasy-jackson
cung cấp cho chúng ta ObjectMapperSerializer
sử dụng cho việc serialize tất cả các pojo thông qua Jackson,
nhưng đối với deserializer
thì Generic, nên cần phải phân lớp.
Để deserializer cần tạo Class với tên là GasDeserializer kế thừa từ ObjectMapperDeserializer.
package practice.kafka; import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer; public class GasDeserializer extends ObjectMapperDeserializer<Gas> { public GasDeserializer(){ super(Gas.class); }
}
Cấu hình để sử dụng Jackson serializer, deserializer và kết nối máy chủ Kafka trong file application.properties
kafka.bootstrap.servers=http://localhost:29092 # Configure the Kafka source (we read from it)
mp.messaging.incoming.gas-in.connector=smallrye-kafka
mp.messaging.incoming.gas-in.topic=gas
mp.messaging.incoming.gas-in.value.deserializer=practice.kafka.GasDeserializer # Configure the Kafka sink (we write to it)
mp.messaging.outgoing.gas-out.connector=smallrye-kafka
mp.messaging.outgoing.gas-out.topic=gas
mp.messaging.outgoing.gas-out.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
Mình sẽ tạo thêm một Class API ví như Producer để tạo ra messages gửi lên cho Kafka.
package practice.kafka; import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter; import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.concurrent.CompletionStage; @Path("gas")
@ApplicationScoped
public class GasResource { @Inject GasProcessor gasProcessor; @Inject @Channel("gas-out") Emitter<Gas> gasEmitter; @POST @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) public Response increasePrice(Gas gas){ gas = gasProcessor.process(gas); CompletionStage<Void> ack = gasEmitter.send(gas); return Response.ok().entity(ack).build(); }
}
Ở đây mình sẽ sử dụng annotation @Channel
để khai báo đầu ra của message.
Sử dụng Emitter
để gửi message lên máy chủ Kafka.
Sau khi đã có producer gửi message, thì tiếp theo mình cũng sẽ tạo thêm một Class Consume để nhận message từ Kafka.
package practice.kafka; import org.eclipse.microprofile.reactive.messaging.Incoming; public class GasConsume { @Incoming("gas-in") public void outputGas(Gas gas){ System.out.printf("Gas price has been increased to %s",gas.price); }
}
Với annotation @Incoming
khai báo đầu nhận message từ Kafka.
À để sử dụng API dễ dàng hơn thì ta sử dụng swagger, bạn copy bỏ vào file pom.xml
nhé:
<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
Chạy câu lệnh để cài đặt extension trong file pom.xml
mvn clean install
Để khởi động ứng dụng
mvn quarkus:dev
Gọi thử API gas
curl -X 'POST' \ 'http://localhost:8080/gas' \ -H 'accept: */*' \ -H 'Content-Type: application/json' \ -d '{ "name": "Xăng 95", "price": 18000
}'
Và giá xăng đã tăng gấp rưỡi từ 18000 thành 27000
Kết Luận
Vậy với bài tóm tắt trên, các bạn có thể sử dụng Apache Kafka với Quarkus một cách cơ bản và nhanh chóng.
Nếu có thể bài sau mình sẽ trình bày cách sử dụng Avro Schema Registry với Kafka và Quarkus.
Cảm ơn mọi người, chúc mọi người nhiều sức khoẻ