- vừa được xem lúc

[Kafka] - Spring Boot Kafka Producer - Consumer Example

0 0 47

Người đăng: TheLight

Theo Viblo Asia

Tiếp tục chuỗi bài viết về Kafka. Bài viết này mình sẽ hướng dẫn cấu hình một ứng dụng Spring Boot config sử dụng với Kafka đơn giản Producer và Consumer. Bài viết sẽ hướng dẫn cách cấu hình gửi nhận tin nhắn dạng String object và Java object do chúng ta định nghĩa ra. Nội dung bài viết tập trung vào việc hướng dẫn gửi nhận các loại đối tượng khác nhau là chính, nên các phần cấu hình như đọc thông tin cấu hình từ file .yaml, tạo các @Bean phức tạp hơn mình sẽ loại bỏ.

Chúng ta có một common hay constant class như sau:

public class ApplicationConstant { public static final String KAFKA_LOCAL_SERVER_CONFIG = "localhost:9092"; public static final String GROUP_ID_STRING = "group-id-string"; public static final String GROUP_ID_JSON = "group-id-json"; public static final String TOPIC_NAME = "topic-test";
}

1. Gửi/nhận String message

Config : class cấu hình Consumer nhận tin nhắn (String message)

public class SpringKafkaConfig { @Bean public ConsumerFactory<String, String> consumingEventStringMessage() { Map<String, Object> configMap = new HashMap<>(); configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG); configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configMap.put(ConsumerConfig.GROUP_ID_CONFIG, ApplicationConstant.GROUP_ID_STRING); return new DefaultKafkaConsumerFactory<>(configMap); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> listenerEventSendStringMessage() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); factory.setConsumerFactory(consumingEventStringMessage()); return factory; }
}

KafkaProducer : class gửi tin nhắn

@RestController
@RequestMapping("/produce")
public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @GetMapping("/{message}") public String sendMessage(@PathVariable String message) { try { kafkaTemplate.send(ApplicationConstant.TOPIC_NAME, message); } catch (Exception e) { e.printStackTrace(); } return "Message sent succuessfully"; }
}

KafkaConsumer : class nhận tin nhắn

@Component
public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @KafkaListener(groupId = ApplicationConstant.GROUP_ID_STRING, topics = ApplicationConstant.TOPIC_NAME, containerFactory = "listenerEventSendStringMessage") public void receivedMessage(String message) { logger.info("Message Received using Kafka listener " + message); }
}

Gửi/nhận JSON message

Model

public class Student { private Long id; private String name; private String rollNumber; //getter & setter
}

Config : class cấu hình Consumer nhận tin nhắn (Student object message)

@Configuration
@EnableKafka
public class SpringKafkaConfig { @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> configMap = new HashMap<>(); configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG); configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<String, Object>(configMap); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ConsumerFactory<String, Student> consumingEventSendStudentMessage() { Map<String, Object> configMap = new HashMap<>(); configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG); configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); configMap.put(ConsumerConfig.GROUP_ID_CONFIG, ApplicationConstant.GROUP_ID_JSON); configMap.put(JsonDeserializer.TRUSTED_PACKAGES, "tiendv.example.model.dto");
//line-50 return new DefaultKafkaConsumerFactory<>(configMap); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Student> listenerEventSendStudentMessage() { ConcurrentKafkaListenerContainerFactory<String, Student> factory = new ConcurrentKafkaListenerContainerFactory<String, Student>(); factory.setConsumerFactory(consumingEventSendStudentMessage()); return factory; }
}

KafkaProducer : class gửi tin nhắn

@RestController
@RequestMapping("/produce")
public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @PostMapping("/message") public String sendMessage(@RequestBody Student message) { try { kafkaTemplate.send(ApplicationConstant.TOPIC_NAME, message); } catch (Exception e) { e.printStackTrace(); } return "json message sent succuessfully";
}

KafkaConsumer : class nhận tin nhắn

@Component
public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @KafkaListener(groupId = ApplicationConstant.GROUP_ID_JSON, topics = ApplicationConstant.TOPIC_NAME, containerFactory = "listenerEventSendStudentMessage") public void receivedMessage(Student message) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); String jsonString = mapper.writeValueAsString(message); logger.info("Json message received using Kafka listener " + jsonString); }
}

Định nghĩa một Rest end-point để nhận message từ Kafka topic

@RestController
@RequestMapping("/consume")
public class KafkaConsumer { @Autowired private ConcurrentKafkaListenerContainerFactory<String, Student> factory; @GetMapping("/message") public List<Student> receiveMessage() { List<Student> students = new ArrayList<>(); ConsumerFactory<String, Student> consumerFactory = factory.getConsumerFactory(); Consumer<String, Student> consumer = consumerFactory.createConsumer(); try { consumer.subscribe(Arrays.asList(ApplicationConstant.TOPIC_NAME)); ConsumerRecords<String, Student> consumerRecords = consumer.poll(10000); Iterable<ConsumerRecord<String, Student>> records = consumerRecords.records(ApplicationConstant.TOPIC_NAME); Iterator<ConsumerRecord<String, Student>> iterator = records.iterator(); while (iterator.hasNext()) { students.add(iterator.next().value()); } } catch (Exception e) { e.printStackTrace(); } return students; }
}

Tổng kết

Trên đây là hướng dẫn để mọi người biết thêm cách cấu hình gửi nhận tin nhắn với 2 loại đối tượng là String và Java object. Hy vọng mọi người sẽ hiểu được ý tưởng tổng thể để có thể áp dụng nó vào dự án của mọi người một cách thoải mái nhất.

Mọi người có thể tìm hiểu các bài viết liên quan tại đây:

Nguồn: https://thenewstack.wordpress.com/2021/11/24/kafka-spring-boot-kafka-producer-consumer-example/

Follow me: thenewstack.wordpress.com

Bình luận

Bài viết tương tự

- vừa được xem lúc

Học Spring Boot bắt đầu từ đâu?

1. Giới thiệu Spring Boot. 1.1.

0 0 277

- vừa được xem lúc

Sử dụng ModelMapper trong Spring Boot

Bài hôm nay sẽ là cách sử dụng thư viện ModelMapper để mapping qua lại giữa các object trong Spring nhé. Trang chủ của ModelMapper đây http://modelmapper.org/, đọc rất dễ hiểu dành cho các bạn muốn tìm hiểu sâu hơn. 1.

0 0 194

- vừa được xem lúc

Spring Security Registration – Kích hoạt một tài khoản thông qua email

1. Tổng quan.

0 0 119

- vừa được xem lúc

Entity, domain model và DTO - sao nhiều quá vậy?

Bài viết hôm nay khá hay và cũng là chủ đề quan trọng trong Spring Boot. Cụ thể chúng ta cùng tìm hiểu xem data sẽ biến đổi như thế nào khi đi qua các layer khác nhau. Và những khái niệm Entity, Domain model và DTO là gì nhé. 1.

0 0 83

- vừa được xem lúc

Cấu trúc dự án Spring Boot thế nào cho chuẩn?

Hello mình đã trở lại với series Spring Boot cơ bản, và hiện tại mình đang nhận thêm một kèo khá ngon nên có thể sẽ ra mắt series mới về Java core . Tuy vậy, mình sẽ cố gắng giữ tiến độ 2 bài/tuần của series Spring Boot nhé.

1 0 193

- vừa được xem lúc

Vòng đời, các loại bean và cơ chế Component scan

1. Vòng đời của bean. 1.1.

0 0 117