- vừa được xem lúc

[Golang] Channel trong golang và use case - part IIII (pubsub pattern)

0 0 28

Người đăng: Phượng Sồ

Theo Viblo Asia

Mở đầu

  • Tiếp tục series, hôm nay là một buổi chia sẽ của tôi về cách implement lại pubsub pattern bằng golang channel. Let's go, guys!

Pubsub

  • Trước hết ta sẽ có một định nghĩa đầy đủ từ wiki :
    In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any, there may be.

  • Pubsub là một message pattern mà ở trong đó publisher chỉ việc gửi message và không cần quan tâm đến có subscriber nào nhận hay không và các message sẽ được phân loại và gủi đi mà không cần quan tâm xem có subscribers nào hay không. Publishers và subscribers không biết sự tồn tại của nhau. Ở một số hệ thông pubsub, sẽ có thêm 1 thành phần là broker, nó đảm nhiệm phân loại và gửi message.

  • Pubsub hay message queue nói chung được sử dụng khá phổ biến trong micro-service architectures. Nó cung cấp một phương thức giúp các service giao tiếp với nhau một cách bất đồng bộ. Ngoài ra, chúng ta sẽ có một vài use cases for messaging queues in real-world scenarios như là: Sending emails, Data post-processing, Batch updates for databases ...

Use case 3: Build pubsub service with buffered channel

  • Dựa vào đặc tính channel là một queue, tôi sẽ có 1 simple demo về pubsub như sau:
    type Message struct { topic string content interface{}
    } type MessageChannel chan Message func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel
    }
    
    messageQueue: chứa danh sách các message sẽ xủ lí.

    MessageChannel: kênh giao tiếp giữa các publisher và subcriber. MessageChannel lúc này sẽ đóng vai trò như một trái tim của hệ thống.

    mapTopicMessage: chứa một bản map giữa topic và danh sách các message channel. Một topic sẽ được subcribe bởi nhiều subcriber nên ta sẽ quan hệ 1:N. Nó đóng vai trò như việc quản lý topic và các message channel.

  • Nguyên lý vận hành:
    messageQueue khi nhận được một message mới, service sẽ lọc ra các danh sách MessageChannel tương ứng với topic của message đó và gửi message mới đến. Mỗi subcriber sẽ communicate with MessageChannel để lấy message.
    ...
    func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) go run(messageQueue, mapTopicMessage) } func run(messageQueue chan Message, mapTopicMessage map[string][]MessageChannel) { for { message := <-messageQueue listMessageChannel, ok := mapTopicMessage[message.topic] if ok { for _, messageChannel := range listMessageChannel { messageChannel <- message } } }
    }
    
  • Publish message
    func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) go run(messageQueue, mapTopicMessage) // publish publish(messageQueue, topic, "user-name is update to Hung") time.Sleep(time.Second * 10)
    } func publish(messageQueue chan Message, topic string, content string) { message := Message{ topic: topic, content: content, } messageQueue <- message fmt.Printf("%v: publish new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.topic, message.content)
    } result: 08:46:28: publish new message with topic: 'update-user' - content: 'user-name is update to Hung'
    
  • Register Subscription
    func main() { ... // subcribe sub1 := registerSubscription(mapTopicMessage, topic) ...
    } func registerSubscription(mapTopicMessage map[string][]MessageChannel, topic string) MessageChannel { newMessageChannel := make(MessageChannel) value, ok := mapTopicMessage[topic] if ok { value = append(value, newMessageChannel) mapTopicMessage[topic] = value } else { mapTopicMessage[topic] = []MessageChannel{newMessageChannel} } return newMessageChannel
    }
    
    khi có một "register subcription" request, service sẽ trả về một MessageChannel. Subcriber sẽ giao tiếp với MessageChannel đó để nhận message.
  • Subcribe
    func subcribe(messageChannel MessageChannel) { go func() { for { message := <-messageChannel fmt.Printf("%v: receive new message with topic: '%v' - content: '%v' \n", time.Now().Format("15:04:05"), message.topic, message.content) } }() }
    
  • Running and see what happen!
    func main() { maxMessage := 10000 topic := "update-user" messageQueue := make(chan Message, maxMessage) mapTopicMessage := make(map[string][]MessageChannel) // map[topic][]MessageChannel go run(messageQueue, mapTopicMessage) // register subcriptions sub1 := registerSubscription(mapTopicMessage, topic) // publish publish(messageQueue, topic, "user-name is update to Hung") subcribe(sub1) time.Sleep(time.Second * 10)
    }
    result: 09:16:02: publish new message with topic: 'update-user' - content: 'user-name is update to Hung' 09:16:02: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
    
  • Add more subcriber
    func main() { ... // register subcriptions sub1 := registerSubscription(mapTopicMessage, topic) sub2 := registerSubscription(mapTopicMessage, topic) sub3 := registerSubscription(mapTopicMessage, topic) // publish publish(messageQueue, topic, "user-name is update to Hung") subcribe(sub1) subcribe(sub2) subcribe(sub3) time.Sleep(time.Second * 10)
    }
    result: 09:20:15: publish new message with topic: 'update-user' - content: 'user-name is update to Hung' 09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung' 09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung' 09:20:15: receive new message with topic: 'update-user' - content: 'user-name is update to Hung'
    

Xây dựng một pubsub hoàn chỉnh

  • to be continued ...

Tạm kết

  • to be continued ...

Bình luận

Bài viết tương tự

- vừa được xem lúc

gRPC - Nó là gì và có nên sử dụng hay không?

Nhân một ngày rảnh rỗi, mình ngồi đọc lại RPC cũng như gRPC viết lại để nhớ lâu hơn. Vấn đề là gì và tại sao cần nó .

0 0 131

- vừa được xem lúc

Embedded Template in Go

Getting Start. Part of developing a web application usually revolves around working with HTML as user interface.

0 0 56

- vừa được xem lúc

Tạo Resful API đơn giản với Echo framework và MySQL

1. Giới thiệu.

0 0 60

- vừa được xem lúc

Sử dụng goquery trong golang để crawler thông tin các website Việt Nam bị deface trên mirror-h.org

. Trong bài viết này, mình sẽ cùng mọi người khám phá một package thu thập dữ liệu có tên là goquery của golang. Mục tiêu chính của chương trình crawler này sẽ là lấy thông tin các website Việt Nam bị deface (là tấn công, phá hoại website, làm thay đổi giao diện hiển thị của một trang web, khi người

0 0 237

- vừa được xem lúc

Tạo ứng dụng craw dữ liệu bing với Golang, Mysql driver

Chào mọi người . Lâu lâu ta lại gặp nhau 1 lần, để tiếp tục series chia sẻ kiến thức về tech, hôm nay mình sẽ tìm hiểu và chia sẻ về 1 ngôn ngữ đang khá hot trong cộng đồng IT đó là Golang.

0 0 75

- vừa được xem lúc

Golang: Rest api and routing using MUX

Routing with MUX. Let's create a simple CRUD api for a blog site. # All . GET articles/ .

0 0 54