Mở đầu
- Tiếp tục series, hôm nay là một buổi chia sẽ của tôi về cách implement lại pubsub pattern bằng golang channel. Let's go, guys!
Pubsub
-
Trước hết ta sẽ có một định nghĩa đầy đủ từ wiki :
In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any, there may be. -
Pubsub là một message pattern mà ở trong đó publisher chỉ việc gửi message và không cần quan tâm đến có subscriber nào nhận hay không và các message sẽ được phân loại và gủi đi mà không cần quan tâm xem có subscribers nào hay không. Publishers và subscribers không biết sự tồn tại của nhau. Ở một số hệ thông pubsub, sẽ có thêm 1 thành phần là broker, nó đảm nhiệm phân loại và gửi message.
-
Pubsub hay message queue nói chung được sử dụng khá phổ biến trong micro-service architectures. Nó cung cấp một phương thức giúp các service giao tiếp với nhau một cách bất đồng bộ. Ngoài ra, chúng ta sẽ có một vài use cases for messaging queues in real-world scenarios như là: Sending emails, Data post-processing, Batch updates for databases ...
Use case 3: Build pubsub service with buffered channel
- Dựa vào đặc tính channel là một queue, tôi sẽ có 1 simple demo về pubsub như sau:
messageQueue: chứa danh sách các message sẽ xủ lí.type Message struct { topic string content interface{} } type MessageChannel chan Message func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel }
MessageChannel: kênh giao tiếp giữa các publisher và subcriber. MessageChannel lúc này sẽ đóng vai trò như một trái tim của hệ thống.
mapTopicMessage: chứa một bản map giữa topic và danh sách các message channel. Một topic sẽ được subcribe bởi nhiều subcriber nên ta sẽ quan hệ 1:N. Nó đóng vai trò như việc quản lý topic và các message channel.
- Nguyên lý vận hành:
messageQueue khi nhận được một message mới, service sẽ lọc ra các danh sách MessageChannel tương ứng với topic của message đó và gửi message mới đến. Mỗi subcriber sẽ communicate with MessageChannel để lấy message.... func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) go run(messageQueue, mapTopicMessage) } func run(messageQueue chan Message, mapTopicMessage map[string][]MessageChannel) { for { message := <-messageQueue listMessageChannel, ok := mapTopicMessage[message.topic] if ok { for _, messageChannel := range listMessageChannel { messageChannel <- message } } } }
- Publish message
func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) go run(messageQueue, mapTopicMessage) // publish publish(messageQueue, topic, "user-name is update to Hung") time.Sleep(time.Second * 10) } func publish(messageQueue chan Message, topic string, content string) { message := Message{ topic: topic, content: content, } messageQueue <- message fmt.Printf("%v: publish new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.topic, message.content) } result: 08:46:28: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'
- Register Subscription
khi có một "register subcription" request, service sẽ trả về một MessageChannel. Subcriber sẽ giao tiếp với MessageChannel đó để nhận message.func main() { ... // subcribe sub1 := registerSubscription(mapTopicMessage, topic) ... } func registerSubscription(mapTopicMessage map[string][]MessageChannel, topic string) MessageChannel { newMessageChannel := make(MessageChannel) value, ok := mapTopicMessage[topic] if ok { value = append(value, newMessageChannel) mapTopicMessage[topic] = value } else { mapTopicMessage[topic] = []MessageChannel{newMessageChannel} } return newMessageChannel }
- Subcribe
func subcribe(messageChannel MessageChannel) { go func() { for { message := <-messageChannel fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.topic, message.content) } }() }
- Running and see what happen!
func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel go run(messageQueue, mapTopicMessage) // register subcriptions sub1 := registerSubscription(mapTopicMessage, topic) // publish publish(messageQueue, topic, "user-name is update to Hung") subcribe(sub1) time.Sleep(time.Second * 10) } result: 09:16:02: publish new message with topic: 'update-user' - content: 'user-name is update to Hung' 09:16:02: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
- Add more subcriber
func main() { ... // register subcriptions sub1 := registerSubscription(mapTopicMessage, topic) sub2 := registerSubscription(mapTopicMessage, topic) sub3 := registerSubscription(mapTopicMessage, topic) // publish publish(messageQueue, topic, "user-name is update to Hung") subcribe(sub1) subcribe(sub2) subcribe(sub3) time.Sleep(time.Second * 10) } result: 09:20:15: publish new message with topic: 'update-user' - content: 'user-name is update to Hung' 09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung' 09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung' 09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
Xây dựng một pubsub hoàn chỉnh
- to be continued ...
Tạm kết
- to be continued ...