- vừa được xem lúc

[Kafka] - Kafka Stream With Spring Boot

0 0 87

Người đăng: TheLight

Theo Viblo Asia

Hướng dẫn này là một ví dụ nhỏ hướng dẫn mọi người về cách xử lý dữ liệu thời gian thực bằng cách sử dụng Kafka Stream với Spring Boot.

Trước đây, chúng ta thường thu thập dữ liệu, lưu trữ trong cơ sở dữ liệu và xử lý dữ liệu hàng đêm. Nó được gọi là xử lý batch processing! Trong kỷ nguyên Microservices này, chúng ta nhận được luồng dữ liệu liên tục, đôi khi việc trì hoãn việc xử lý dữ liệu này có thể gây ảnh hưởng nghiêm trọng đến hoạt động kinh doanh của chúng ta. Ví dụ: Hãy xem xét một ứng dụng như Netflix / YouTube. Dựa trên bộ phim hoặc video chúng ta xem, các ứng dụng này hiển thị các đề xuất về những video hay bộ phim cùng chủ đề chúng ta đang xem ngay lập tức. Nó cung cấp trải nghiệm người dùng tốt hơn nhiều và giúp cho việc kinh doanh từ đó tốt hơn.

Stream processing là quá trình xử lý dữ liệu liên tục theo thời gian thực. Hãy xem cách chúng ta có thể xử lý luồng thời gian thực đơn giản bằng cách sử dụng Kafka Stream với Spring Boot.

Để demo xử lý luồng dữ liệu theo thời gian thực này, Chúng ta hãy xem xét một ứng dụng đơn giản có chứa 3 microservices.

  • Producer: Microservice này tạo ra một số dữ liệu. Trong thế giới thực, Producer có thể là trình duyệt / ứng dụng ngân hàng. Một số hành động của người dùng sẽ gửi lịch sử lướt phim / giao dịch thẻ tín dụng,... đến Consumer cần thu thập dữ liệu. Trong ví dụ này chúng ta sẽ tạo các số tuần tự từ 1 đến N trong mỗi giây và gửi dữ liệu đi.
  • Processor: Microservice này sử dụng dữ liệu, thực hiện một số xử lý trên dữ liệu và gửi tiếp dữ liệu đến một topic khác. Trong thế giới thực, đây có thể là dịch vụ đề xuất phim cho Netflix. Trong ví dụ này, chúng ta sẽ bỏ qua tất cả các số lẻ và tìm bình phương của các số chẵn.
  • Consumer: Microservice này sử dụng dữ liệu đã xử lý từ Processor. Trong thế giới thực, đây có thể là trình duyệt của chúng ta để nhận các đề xuất mới nhất dựa trên quá trình duyệt phim. Trong ví dụ này, chúng ta sẽ sử dụng dữ liệu và in nó trên console.

Producer, Processor và Consumer là 3 ứng dụng khác nhau được kết nối qua 2 chủ đề (topic) Kafka khác nhau như hình bên dưới:

Set up Kafka

Theo hướng dẫn ở bài viết Install Kafka cluter on local. Chúng ta tạo ra 2 topic:

  • numbers (Topic 1 / Source topic)
  • squaredNumbers (Topic 2 / Sink topic)

Java Functional Interface

Spring Cloud Functions đơn giản hóa việc phát triển ứng dụng này bằng cách sử dụng các Functional Interface bên dưới.

Application Type Java Functional Interface
Kafka Producer Supplier
Kafka Processor Function
Kafka Consumer Consumer

Kafka Stream Proceder

Việc sử dụng Kafka Stream với Spring Boot dễ dàng hơn việc manual cấu hình. Spring Boot thực hiện hầu hết tất cả các công việc cấu hình một cách tự động. Chúng ta tạo một Bean đơn giản để tạo ra một số tuần tự mỗi giây.

  • Nếu loại Bean này là Supplier, Spring Boot sẽ coi nó như một Producer.
  • Chúng ta sử dụng Flux vì nó sẽ là một luồng dữ liệu liên tục (data stream)
@Configuration
public class KafkaProducer { /* * produce a number from 1, every second * Supplier<T> makes this as kafka producer of T * */ @Bean public Supplier<Flux<Long>> numberProducer(){ return () -> Flux.range(1, 1000) .map(i -> (long) i) .delayElements(Duration.ofSeconds(1)); };
}

Bây giờ câu hỏi quan trọng là dữ liệu sẽ được ghi vào đâu? Ở phần đầu chúng ta đã tạo một topic cho điều này - numbers. Chúng ta cấu hình điều đó thông qua application.yaml như hình dưới đây.

  • spring.cloud.stream.functions.definition: nơi chúng ta cung cấp danh sách các tên Bean (được phân tách bởi dấy ?.
  • spring.cloud.stream.bindings.numberProductioner-out-0.destination: cấu hình nơi dữ liệu đến! out chỉ ra rằng nơi mà Spring Boot ghi dữ liệu vào chủ đề Kafka. Ngược lại, để đọc dữ liệu, chỉ cần sử dụng in.
  • spring.cloud.stream.bindings.numberProductioner-out-0.producer.use-native-encoding : Serialization/deserialization.
  • spring.cloud.stream.kafka.bindings.numberProductioner-out- 0.producer.configuration.value: Serialization/deserialization.
  • Sau đó, chúng ta cấu hình địa chỉ kafka brocker (hay kafka server).
spring.cloud.stream: function: definition: numberProducer bindings: numberProducer-out-0: destination: numbers producer: use-native-encoding: true kafka: bindings: numberProducer-out-0: producer: configuration: value: serializer: org.apache.kafka.common.serialization.LongSerializer binder: brokers: - localhost:9091 - localhost:9092

Kafka Stream Processor

Processor vừa là Producer vừa là Consumer. Nó sử dụng dữ liệu từ 1 topic và tạo ra dữ liệu mới và gửi cho topic khác. Trong trường hợp của chúng ta:

  • Chúng ta sẽ nhận dữ liệu từ topic numbers
  • Loại bỏ các số lẻ
  • Bình phương các số chẵn
  • Cuối cùng là gửi chúng đến một topic khác.

Chúng ta tạo Processor bằng cách sử dụng Functional Interface tương ứng trong Java là Function<T, R>.

  • Chúng ta sử dụng dữ liệu đầu vào là KStream<String, Long> .
  • Chúng ta thực hiện một số xử lý.
  • Sau đó, chúng ta trả về kiểu dữ liệu KStream<String, Long>.

Lưu ý rằng kiểu trả về có thể là bất cứ kiểu dữ liệu gì. Không nhất thiết phải giống với loại dữ liệu đầu vào.

@Configuration
public class KafkaProcessor { /* * process the numbers received via kafka topic * Function<T, R> makes this as kafka stream processor * T is input type * R is output type * * */ @Bean public Function<KStream<String, Long>, KStream<String, Long>> evenNumberSquareProcessor(){ return kStream -> kStream .filter((k, v) -> v % 2 == 0) .peek((k, v) -> System.out.println("Squaring Even : " + v)) .mapValues(v -> v * v); }; }

Trong file application.yaml chúng ta sử dụng từ khóa in để thu tập dữ liệu đầu vào, và sử dụng từ khóa out để ghi dữ liệu đầu ta.

spring.cloud.stream: function: definition: evenNumberSquareProcessor bindings: evenNumberSquareProcessor-in-0: destination: numbers evenNumberSquareProcessor-out-0: destination: squaredNumbers kafka: binder: brokers: - localhost:9091 - localhost:9092

Kafka Stream Consumer

Trong ví dụ này, những bước chúng ta cần làm để thu thập (consume) dữ liệu là:

  • Tạo một Bean loại Consumer để sử dụng dữ liệu từ một topic.
  • KStream<String, Long>: Key có kiểu dữ liệu String, và Value có kiểu giá trị là Long. Là kiểu dữ liệu được gửi đến topic (dữ liệu đầu vào).
@Configuration
public class KafkaConsumer { /* * consume the numbers received via kafka topic * Consumer<T> makes this as kafka consumer of T * */ @Bean public Consumer<KStream<String, Long>> squaredNumberConsumer(){ return stream -> stream.foreach((key, value) -> System.out.println("Square Number Consumed : " + value)); }; }

Trong file application.yaml.

  • Chúng ta cập nhật tên Bean của spring cloud function bean name
  • Chúng ta giả định rằng chủ đề squaredNumbers đã được tạo và chúng ta sử dụng dữ liệu từ topic.
  • Để sử dụng dữ liệu chúng ta sử dụng từ khóa in.
spring.cloud.stream: function: definition: squaredNumberConsumer bindings: squaredNumberConsumer-in-0: destination: squaredNumbers kafka: binder: brokers: - localhost:9091 - localhost:9092

Kafka Stream Processing

Kết quả:

Processer

Squaring Even : 2
Squaring Even : 4
Squaring Even : 6
Squaring Even : 8
Squaring Even : 10
Squaring Even : 12
Squaring Even : 14

Consumer

Square Number Consumed : 4
Square Number Consumed : 16
Square Number Consumed : 36
Square Number Consumed : 64
Square Number Consumed : 100
Square Number Consumed : 144
Square Number Consumed : 196

Tổng kết

Trên đây, chúng ta vừa khám phá ví dụ khả năng xử lý dữ liệu thời gian thực bằng cách sử dụng Kafka Stream với Spring Boot. Hy vọng mọi người sẽ hiểu được ý tưởng tổng thể của bài viết để có thể áp dụng nó vào dự án của mọi người một cách thoải mái nhất.

Nguồn: https://thenewstack.wordpress.com/2021/11/24/kafka-kafka-stream-with-spring-boot/

Follow me: thenewstack.wordpress.com

Bình luận

Bài viết tương tự

- vừa được xem lúc

Học Spring Boot bắt đầu từ đâu?

1. Giới thiệu Spring Boot. 1.1.

0 0 278

- vừa được xem lúc

Sử dụng ModelMapper trong Spring Boot

Bài hôm nay sẽ là cách sử dụng thư viện ModelMapper để mapping qua lại giữa các object trong Spring nhé. Trang chủ của ModelMapper đây http://modelmapper.org/, đọc rất dễ hiểu dành cho các bạn muốn tìm hiểu sâu hơn. 1.

0 0 194

- vừa được xem lúc

Spring Security Registration – Kích hoạt một tài khoản thông qua email

1. Tổng quan.

0 0 119

- vừa được xem lúc

Entity, domain model và DTO - sao nhiều quá vậy?

Bài viết hôm nay khá hay và cũng là chủ đề quan trọng trong Spring Boot. Cụ thể chúng ta cùng tìm hiểu xem data sẽ biến đổi như thế nào khi đi qua các layer khác nhau. Và những khái niệm Entity, Domain model và DTO là gì nhé. 1.

0 0 83

- vừa được xem lúc

Cấu trúc dự án Spring Boot thế nào cho chuẩn?

Hello mình đã trở lại với series Spring Boot cơ bản, và hiện tại mình đang nhận thêm một kèo khá ngon nên có thể sẽ ra mắt series mới về Java core . Tuy vậy, mình sẽ cố gắng giữ tiến độ 2 bài/tuần của series Spring Boot nhé.

1 0 193

- vừa được xem lúc

Vòng đời, các loại bean và cơ chế Component scan

1. Vòng đời của bean. 1.1.

0 0 117