- vừa được xem lúc

[Redis] - Redis Stream With Spring Boot

0 0 117

Người đăng: TheLight

Theo Viblo Asia

Trong bài viết này, chúng ta sẽ cùng tìm hiểu về Redis Stream với Spring Boot để demo cách thực hiện xử lý luồng thời gian thực.

Redis ban đầu được biết đến như một Remote Dictionary Server được sử dụng để lưu thông tin vào bộ nhớ đệm (caching). Cùng với tính năng Master/ReadReplication & Pub/Sub, hiện tại Redis cũng đã hỗ trợ thêm tính năng Stream.

Một số bài viết cùng chủ đề mọi người có thể sẽ quan tâm:


Trước đây, chúng ta thường thu thập dữ liệu, lưu trữ trong cơ sở dữ liệu và xử lý dữ liệu hàng đêm. Nó được gọi là xử lý batch processing! Trong kỷ nguyên Microservices này, chúng ta nhận được luồng dữ liệu liên tục, đôi khi việc trì hoãn việc xử lý dữ liệu này có thể gây ảnh hưởng nghiêm trọng đến hoạt động kinh doanh của chúng ta. Ví dụ: Hãy xem xét một ứng dụng như Netflix / YouTube. Dựa trên bộ phim hoặc video chúng ta xem, các ứng dụng này hiển thị các đề xuất về những video hay bộ phim cùng chủ đề chúng ta đang xem ngay lập tức. Nó cung cấp trải nghiệm người dùng tốt hơn nhiều và giúp cho việc kinh doanh từ đó tốt hơn.

Stream processing là quá trình xử lý dữ liệu liên tục theo thời gian thực. Hãy xem cách chúng ta có thể xử lý luồng dữ liệu theo thời gian thực bằng cách sử dụng Redis Stream với Spring Boot.

Chúng ta cũng đã thảo luận về việc xử lý luồng dữ liệu thời gian thực bằng Apache Kafka. Apache Kafka đã hoạt động được 10 năm trong khi Redis còn khá mới trong lĩnh vực này. Một số tính năng của Redis Streams dường như được lấy cảm hứng từ Apache Kafka. Vấn đề với Kafka là nó rất khó cấu hình. Việc bảo trì cơ sở hạ tầng là rất khó khăn. Nhưng với Redis thì dễ dàng hơn.

Stream vs Pub/Sub

Sẽ có ai đó đặt câu hỏi rằng có cần hỗ trợ Stream nữa không khi chúng ta đã có tính năng Pub/Sub? Câu trả lời là Pub/Sub không giống như Stream, stream không phải là một giải pháp thay thế cho Pub/Sub.

Redis Stream với Spring Boot

  • Publisher sẽ đẩy một số sự kiện liên quan đến các giao dịch mua bán vào Redis. Hãy gọi chúng là purchase-events stream.
  • Một group-consumer quan tâm sẽ lắng nghe đến những sự kiện đó. Điều này có thể là để tính toán doanh thu hoặc xử lý thanh toán hoặc gửi email. Khi bạn cần thực hiện tất cả những điều này thì chúng ta cần một group-consumer riêng cho mỗi hành động.
  • Consumer sẽ sử dụng các sự kiện và họ có thể làm bất cứ điều gì với nó. Trong trường hợp của chúng ta, chúng ta chỉ tìm giá mà người dùng đã trả và tính toán doanh thu theo danh mục loại sản phẩm. Để mọi thứ đơn giản, chúng ta sẽ ghi thông tin này dưới dạng SortedSet trong Redis.

Sample Application

Product Category

public enum Category { APPLIANCES, BOOKS, COSMETICS, ELECTRONICS, OUTDOOR;
}

Product – DTO

@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Product { private String name; private double price; private Category category; }

Redis Stream – Producer

Producer apllication sẽ tiếp tục đẩy sự kiện PurchaseEvents được định cấu hình định kỳ thông qua publish.rate.

@Service
public class PurchaseEventProducer { private AtomicInteger atomicInteger = new AtomicInteger(0); @Value("${stream.key}") private String streamKey; @Autowired private ProductRepository repository; @Autowired private ReactiveRedisTemplate<String, String> redisTemplate; @Scheduled(fixedRateString= "${publish.rate}") public void publishEvent(){ Product product = this.repository.getRandomProduct(); ObjectRecord<String, Product> record = StreamRecords.newRecord() .ofObject(product) .withStreamKey(streamKey); this.redisTemplate .opsForStream() .add(record) .subscribe(System.out::println); atomicInteger.incrementAndGet(); } @Scheduled(fixedRate = 10000) public void showPublishedEventsSoFar(){ System.out.println( "Total Events :: " + atomicInteger.get() ); } }
  • Phương thức publishEvent() đẩy một số giao dịch mua sản phẩm ngẫu nhiên theo định kỳ.
  • Phương thức showPublishedEventsSoFar() chỉ đơn giản là hiển thị số lượng đơn đặt hàng được đặt từ trước đến nay, nhằm mục đích ghi nhật ký.

Để giữ mọi thứ đơn giản, chúng ta không sử dụng DB mà tạo ra một danh sách sản phẩm từ code.

@Repository
public class ProductRepository { private static final List<Product> PRODUCTS = List.of( // appliances new Product("oven", 500.00, Category.APPLIANCES), new Product("dishwasher", 125.00, Category.APPLIANCES), new Product("heater", 65.00, Category.APPLIANCES), new Product("vacuum cleaner", 48.00, Category.APPLIANCES), new Product("refrigerator", 1200.00, Category.APPLIANCES), // books new Product("how to win friends and influence", 13.00, Category.BOOKS), new Product("ds and algorithms", 70.00, Category.BOOKS), new Product("effective java", 41.00, Category.BOOKS), new Product("clean architecture", 32.00, Category.BOOKS), new Product("microservices", 16.00, Category.BOOKS), // cosmetics new Product("brush", 9.50, Category.COSMETICS), new Product("face wash", 13.00, Category.COSMETICS), new Product("makeup mirror", 17.50, Category.COSMETICS), // electronics new Product("sony 4k tv", 999.25, Category.ELECTRONICS), new Product("headphone", 133.25, Category.ELECTRONICS), new Product("macbook", 2517.25, Category.ELECTRONICS), new Product("speaker", 65.25, Category.ELECTRONICS), // outdoor new Product("plants", 9.75, Category.OUTDOOR), new Product("power tools", 73.50, Category.OUTDOOR), new Product("pools", 111.75, Category.OUTDOOR) ); public Product getRandomProduct(){ int random = ThreadLocalRandom.current().nextInt(0, 20); return PRODUCTS.get(random); } }

File application.peroperties

stream.key=purchase-events
publish.rate=1000

Redis Stream – Consumer

Có Producer rồi. Chúng ta hãy tạo một Consumer. Để sử dụng Redis Streams, chúng ta cần triển khai interface StreamListener.

@Service
public class PurchaseEventConsumer implements StreamListener<String, ObjectRecord<String, Product>> { private AtomicInteger atomicInteger = new AtomicInteger(0); @Autowired private ReactiveRedisTemplate<String, String> redisTemplate; @Override @SneakyThrows public void onMessage(ObjectRecord<String, Product> record) { System.out.println( InetAddress.getLocalHost().getHostName() + " - consumed :" + record.getValue() ); this.redisTemplate .opsForZSet() .incrementScore("revenue", record.getValue().getCategory().toString(), record.getValue().getPrice()) .subscribe(); atomicInteger.incrementAndGet(); } @Scheduled(fixedRate = 10000) public void showPublishedEventsSoFar(){ System.out.println( "Total Consumed :: " + atomicInteger.get() ); } }
  • Đầu tiên, chúng ta hiển thị dữ liệu nhận được từ Producer.
  • Sau đó, chúng ta lấy được giá phải trả cho sản phẩm và thêm nó vào doanh thu (revenue) của danh mục loại sản phẩm được sắp xếp.
  • Cuối cùng là hiển thị số lượng sự kiện nhận được từ Producer theo định kỳ.

Cấu hình Redis Stream

Sau khi Consumer được tạo, chúng ta cần đăng ký Consumer ở trên vào StreamMessageListenerContainer.

@Configuration
public class RedisStreamConfig { @Value("${stream.key:purchase-events}") private String streamKey; // Autowired PurchaseEventConsumer (Consumer bên trên) @Autowired private StreamListener<String, ObjectRecord<String, Product>> streamListener; @Bean public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .targetType(Product.class) .build(); var listenerContainer = StreamMessageListenerContainer .create(redisConnectionFactory, options); var subscription = listenerContainer.receiveAutoAck( Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()), StreamOffset.create(streamKey, ReadOffset.lastConsumed()), streamListener); listenerContainer.start(); return subscription; } }

Cơ sở hạ tầng hóa ứng dụng (Dockerizing Infrastructure)

Dockerfile
Chúng ta sẽ tạo nhiều Consumer để xử lý các sự kiện mua hàng. Vì vậy, chúng ta sẽ tạo ra dockerfile.

# Use JRE11 slim
FROM openjdk:11.0-jre-slim # Add the app jar
ADD target/*.jar redis-stream.jar ENTRYPOINT java -jar redis-stream.jar

docker-compose file

version: '3'
services: redis: image: redis ports: - 6379:6379 redis-commander: image: rediscommander/redis-commander:latest depends_on: - redis environment: - REDIS_HOSTS=redis:redis ports: - 8081:8081 producer: build: ./redis-stream-producer image: docker/redis-stream-producer depends_on: - redis environment: - SPRING_REDIS_HOST=redis - PUBLISH_RATE=1000 consumer: build: ./redis-stream-consumer image: docker/redis-stream-consumer depends_on: - redis environment: - SPRING_REDIS_HOST=redis

Redis Stream - Setup

Đầu tiên là chạy Redis và Redis-commander lên trước

docker-compose up redis redis-commander

Truy cập vào trình duyệt chúng ta sẽ thấy dịch vụ Redis đã được chạy

Chúng ta có thể tạo một stream như hình dưới đây. Đây là tất cả các lệnh redis liên quan đến stream: Redis Stream Command

XADD purchase-events * dummy-key dummy-value

Chúng ta tạo ra một consumer-group bằng lệnh sau:

XGROUP CREATE purchase-events purchase-events

Chạy Producer application

docker-compose up producer

Sau khi start producer nó sẽ bắt đầu gửi sự kiện định kỳ:

producer_1 | 1585682873612-0
producer_1 | 1585682873812-0
producer_1 | 1585682874013-0
producer_1 | 1585682874215-0
producer_1 | 1585682874413-0
producer_1 | 1585682874613-0
producer_1 | 1585682874812-0
producer_1 | 1585682875012-0
producer_1 | Total Events :: 51

Chạy Consumer application

docker-compose up --scale consumer=3

Chúng ta có thể thấy Consumer sử dụng tất cả các sự kiện mua hàng. Tải được phân phối cho tất cả Consumer trong consumger-group. Ở đây người consumer_2 cho thấy rằng nó xử lý nhiều sự kiện hơn vì nó bắt đầu trước những consumer khác.

producer_1 | 1585682887612-0
consumer_2 | 7b6c828647b0 - consumed :Product(name=how to win friends and influence, price=13.0, category=BOOKS)
producer_1 | 1585682887813-0
consumer_3 | 83699cab10bd - consumed :Product(name=ds and algorithms, price=70.0, category=BOOKS)
producer_1 | 1585682888012-0
consumer_1 | cdb3357593e6 - consumed :Product(name=headphone, price=133.25, category=ELECTRONICS)
producer_1 | 1585682888212-0
consumer_2 | 7b6c828647b0 - consumed :Product(name=oven, price=500.0, category=APPLIANCES)
consumer_1 | Total Consumed :: 18
consumer_2 | Total Consumed :: 84
producer_1 | 1585682888412-0
consumer_3 | 83699cab10bd - consumed :Product(name=makeup mirror, price=17.5, category=COSMETICS)
producer_1 | 1585682888612-0
consumer_1 | cdb3357593e6 - consumed :Product(name=ds and algorithms, price=70.0, category=BOOKS)
consumer_3 | Total Consumed :: 16

Truy cập redis-commander và tìm sorted set – revenue. Chúng tôi có thể thấy doanh thu các sản phẩm theo danh mục diễn ra trong thời gian thực:

Tổng kết

Vậy là chúng ta vừa triển khai thành công xử lý luồng dữ liệu theo thời gian thực bằng cách sử dụng Redis Stream với Spring Boot. Hi vọng bài viết hữu ích với mọi người.

Nguồn: https://thenewstack.wordpress.com/2021/11/25/redis-redis-stream-with-spring-boot/

Follow me: thenewstack.wordpress.com

Bình luận

Bài viết tương tự

- vừa được xem lúc

Caching đại pháp 2: Cache thế nào cho hợp lý?

Caching rất dễ. Mình không nói đùa đâu, caching rất là dễ. Ai cũng có thể làm được chỉ sau 10 phút đọc tutorial. Nó cũng giống như việc đứa trẻ lên 3 đã có thể cầm bút để vẽ vậy.

0 0 114

- vừa được xem lúc

Caching đại pháp 1: Nấc thang lên level của developer

Bí quyết thành công trong việc đáp ứng hệ thống triệu user của những công ty lớn (và cả công ty nhỏ). Tại sao caching lại là kỹ thuật tối quan trọng để phù phép ứng dụng rùa bò của chúng ta thành siêu phẩm vạn người mê.

0 0 67

- vừa được xem lúc

Cache dữ liệu Nodejs với Redis

Một tí gọi là lý thuyết để anh em tham khảo. Cache là gì. Lợi ích của việc cache data. .

0 0 96

- vừa được xem lúc

Nguyên tắc hoạt động của redis server

Sự ra đời của Redis. . Câu chuyện bắt đầu khi tác giả của Redis, Salvatore Sanfilippo. (nickname: antirez), cố gắng làm những công việc gần như là không.

0 0 75

- vừa được xem lúc

Viết ứng dụng chat realtime với Laravel, VueJS, Redis và Socket.IO, Laravel Echo

Xin chào tất cả các bạn, đây là một trong những bài post đầu tiên của mình. Sau bao năm toàn đi đọc các blog tích luỹ được chút kiến thức của các cao nhân trên mạng.

0 0 904

- vừa được xem lúc

Tìm hiểu tổng quan về Redis

1. Lời mở đầu.

0 0 351