Tiếp tục chuỗi bài viết về Kafka. Bài viết này mình sẽ hướng dẫn cấu hình một ứng dụng Spring Boot config sử dụng với Kafka đơn giản Producer và Consumer. Bài viết sẽ hướng dẫn cách cấu hình gửi nhận tin nhắn dạng String object và Java object do chúng ta định nghĩa ra. Nội dung bài viết tập trung vào việc hướng dẫn gửi nhận các loại đối tượng khác nhau là chính, nên các phần cấu hình như đọc thông tin cấu hình từ file .yaml
, tạo các @Bean
phức tạp hơn mình sẽ loại bỏ.
Chúng ta có một common hay constant class như sau:
public class ApplicationConstant { public static final String KAFKA_LOCAL_SERVER_CONFIG = "localhost:9092"; public static final String GROUP_ID_STRING = "group-id-string"; public static final String GROUP_ID_JSON = "group-id-json"; public static final String TOPIC_NAME = "topic-test";
}
1. Gửi/nhận String message
Config : class cấu hình Consumer nhận tin nhắn (String message)
public class SpringKafkaConfig { @Bean public ConsumerFactory<String, String> consumingEventStringMessage() { Map<String, Object> configMap = new HashMap<>(); configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG); configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configMap.put(ConsumerConfig.GROUP_ID_CONFIG, ApplicationConstant.GROUP_ID_STRING); return new DefaultKafkaConsumerFactory<>(configMap); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> listenerEventSendStringMessage() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); factory.setConsumerFactory(consumingEventStringMessage()); return factory; }
}
KafkaProducer : class gửi tin nhắn
@RestController
@RequestMapping("/produce")
public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @GetMapping("/{message}") public String sendMessage(@PathVariable String message) { try { kafkaTemplate.send(ApplicationConstant.TOPIC_NAME, message); } catch (Exception e) { e.printStackTrace(); } return "Message sent succuessfully"; }
}
KafkaConsumer : class nhận tin nhắn
@Component
public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @KafkaListener(groupId = ApplicationConstant.GROUP_ID_STRING, topics = ApplicationConstant.TOPIC_NAME, containerFactory = "listenerEventSendStringMessage") public void receivedMessage(String message) { logger.info("Message Received using Kafka listener " + message); }
}
Gửi/nhận JSON message
Model
public class Student { private Long id; private String name; private String rollNumber; //getter & setter
}
Config : class cấu hình Consumer nhận tin nhắn (Student object message)
@Configuration
@EnableKafka
public class SpringKafkaConfig { @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> configMap = new HashMap<>(); configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG); configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<String, Object>(configMap); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ConsumerFactory<String, Student> consumingEventSendStudentMessage() { Map<String, Object> configMap = new HashMap<>(); configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ApplicationConstant.KAFKA_LOCAL_SERVER_CONFIG); configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); configMap.put(ConsumerConfig.GROUP_ID_CONFIG, ApplicationConstant.GROUP_ID_JSON); configMap.put(JsonDeserializer.TRUSTED_PACKAGES, "tiendv.example.model.dto");
//line-50 return new DefaultKafkaConsumerFactory<>(configMap); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Student> listenerEventSendStudentMessage() { ConcurrentKafkaListenerContainerFactory<String, Student> factory = new ConcurrentKafkaListenerContainerFactory<String, Student>(); factory.setConsumerFactory(consumingEventSendStudentMessage()); return factory; }
}
KafkaProducer : class gửi tin nhắn
@RestController
@RequestMapping("/produce")
public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @PostMapping("/message") public String sendMessage(@RequestBody Student message) { try { kafkaTemplate.send(ApplicationConstant.TOPIC_NAME, message); } catch (Exception e) { e.printStackTrace(); } return "json message sent succuessfully";
}
KafkaConsumer : class nhận tin nhắn
@Component
public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @KafkaListener(groupId = ApplicationConstant.GROUP_ID_JSON, topics = ApplicationConstant.TOPIC_NAME, containerFactory = "listenerEventSendStudentMessage") public void receivedMessage(Student message) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); String jsonString = mapper.writeValueAsString(message); logger.info("Json message received using Kafka listener " + jsonString); }
}
Định nghĩa một Rest end-point để nhận message từ Kafka topic
@RestController
@RequestMapping("/consume")
public class KafkaConsumer { @Autowired private ConcurrentKafkaListenerContainerFactory<String, Student> factory; @GetMapping("/message") public List<Student> receiveMessage() { List<Student> students = new ArrayList<>(); ConsumerFactory<String, Student> consumerFactory = factory.getConsumerFactory(); Consumer<String, Student> consumer = consumerFactory.createConsumer(); try { consumer.subscribe(Arrays.asList(ApplicationConstant.TOPIC_NAME)); ConsumerRecords<String, Student> consumerRecords = consumer.poll(10000); Iterable<ConsumerRecord<String, Student>> records = consumerRecords.records(ApplicationConstant.TOPIC_NAME); Iterator<ConsumerRecord<String, Student>> iterator = records.iterator(); while (iterator.hasNext()) { students.add(iterator.next().value()); } } catch (Exception e) { e.printStackTrace(); } return students; }
}
Tổng kết
Trên đây là hướng dẫn để mọi người biết thêm cách cấu hình gửi nhận tin nhắn với 2 loại đối tượng là String và Java object. Hy vọng mọi người sẽ hiểu được ý tưởng tổng thể để có thể áp dụng nó vào dự án của mọi người một cách thoải mái nhất.
Mọi người có thể tìm hiểu các bài viết liên quan tại đây:
Nguồn: https://thenewstack.wordpress.com/2021/11/24/kafka-spring-boot-kafka-producer-consumer-example/
Follow me: thenewstack.wordpress.com