- vừa được xem lúc

Apache Kafka - Producer - Gửi message đến Kafka bằng kafka-python

0 0 65

Người đăng: Bình Trịnh

Theo Viblo Asia


Understand how to produce message and send to the Kafka topic


Producer has many types and sources: message from Credit Card transactions, message from Facebook, Email or any systems

When the producer send the message to kafka, kafka sau khi nhận message và randomly phân bố message đó về từng partition. Vậy nên Producer chỉ cần quan tâm việc:

  • Boostrap Server
  • Topic
  • Value_serializer : cách, định dạng mà message được gửi đến
  • client_id : là id mà client được cấp và producer có id này mới send được message to kafka topic
  • acks : có 3 dạng (0, 1, 'all'), khi gửi message đến kafka, procedure yêu cầu kafka xác nhận cho mình để tiến hành process tiếp tục các message khác. defaults to acks=1
  1. acks = 0: Producer sẽ không chờ việc Kafka xác nhận đã hoàn thành việc nhận dữ liệu. Mà mỗi lần có message, Producer sẽ tự động add message vào menmory. Do vậy, trong một số trường hợp dữ liệu sẽ bị mất và Kafka không guarantee cho việc này.
  2. acks = 1: Producer sẽ chỉ chờ cho việc message được write xuống leader xong mà không quan tâm việc message được replicate sang những follower khác. Ngay sau đó, Producer sẽ tiếp tục gửi một message khác đến. Với cơ chế này thì trong 1 vài trường hợp message sẽ bị lost ở consumer, do có lỗi tại leader
  3. acks = all. Producer sẽ chờ cho toàn bộ quá trình leader và follower được write xuống thì mới process một message khác. Do vậy Kafka sẽ đảm bảo việc message sẽ được ko lost dữ liệu. Nhưng sẽ xảy ra trường hợp dữ liệu bị ngẽn tại Producer

Code example


  • Python 3.6 , 3.7, 3.8
  • pip install kafka-python - required
  • pip install Faker - optional : this Lib to ramdomly create dummy data

Code example

  1. Please visit the previous document to know how to set up kafka, kafka CLI, Kafka UI.
  2. Produce the dummy data
from time import time
from kafka import KafkaProducer
from faker import Faker
import json, time faker = Faker() def get_register(): return { 'name': faker.name(), 'year' : faker.year() }
  1. Send data to Kafka

As above we know that 3 points must have to send the message to kafka is:

  • boostrap server or broker: the ip/host of broker
  • topic name
  • value_serializer : message and message type

Code for sending message:

from time import time
from kafka import KafkaProducer
from faker import Faker
import json, time faker = Faker() def get_register(): return { 'name': faker.name(), 'year' : faker.year() } def json_serializer(data): return json.dumps(data).encode('utf-8') producer = KafkaProducer( bootstrap_servers = ['localhost:9092'], # server name value_serializer = json_serializer # function callable ) while 1==1: user = get_register() print(user) producer.send( 'second_topic',user ) time.sleep(3)

Focus on only one partition

Set up việc gửi message chỉ đến 1 given partition in list partition of kafka

from time import time
from kafka import KafkaProducer
from faker import Faker
import json, time faker = Faker() def get_register(): return { 'name': faker.name(), 'add' : faker.year() } def get_partitioner(key_bytes, all_partitions, available_partitions): return 0 def json_serializer(data): return json.dumps(data).encode('utf-8') producer = KafkaProducer( bootstrap_servers = ['localhost:9092'], # server name value_serializer = json_serializer, # function callable # partitioner = get_partitioner, # function return 0 >>> only partition_0 can received messages ) while 1==1: user = get_register() print(user) producer.send( 'second_topic',user ) time.sleep(3)

Bình luận

Bài viết tương tự

- vừa được xem lúc

Thao tác với File trong Python

Python cung cấp các chức năng cơ bản và phương thức cần thiết để thao tác các file. Bài viết này tôi xin giới thiệu những thao tác cơ bản nhất với file trong Python.

0 0 63

- vừa được xem lúc

Tập tành crawl dữ liệu với Scrapy Framework

Lời mở đầu. Chào mọi người, mấy hôm nay mình có tìm hiểu được 1 chút về Scrapy nên muốn viết vài dòng để xem mình đã học được những gì và làm 1 demo nho nhỏ.

0 0 166

- vừa được xem lúc

Sử dụng Misoca API (oauth2) với Python

Với bài viết này giúp chúng ta có thể nắm được. ・Tìm hiểu cách xử lý API misoca bằng Python.

0 0 49

- vừa được xem lúc

[Series Pandas DataFrame] Phân tích dữ liệu cùng Pandas (Phần 3)

Tiếp tục phần 2 của series Pandas DataFrame nào. Let's go!!. Ở phần trước, các bạn đã biết được cách lấy dữ liệu một row hoặc column trong Pandas DataFame rồi phải không nào. 6 Hoc.

0 0 63

- vừa được xem lúc

Lập trình socket bằng Python

Socket là gì. Một chức năng khác của socket là giúp các tầng TCP hoặc TCP Layer định danh ứng dụng mà dữ liệu sẽ được gửi tới thông qua sự ràng buộc với một cổng port (thể hiện là một con số cụ thể), từ đó tiến hành kết nối giữa client và server.

0 0 79

- vừa được xem lúc

[Series Pandas DataFrame] Phân tích dữ liệu cùng Pandas (Phần 2)

Nào, chúng ta cùng đến với phần 2 của series Pandas DataFrame. Truy xuất Labels và Data. Bạn đã biết cách khởi tạo 1 DataFrame của mình, và giờ bạn có thể truy xuất thông tin từ đó. Với Pandas, bạn có thể thực hiện các thao tác sau:.

0 0 95