- vừa được xem lúc

Xử lý log từ Kafka

0 0 7

Người đăng: Đồng Dương

Theo Viblo Asia

Dưới đây là một ví dụ cấu hình cho tệp logstash.conf của bạn để xử lý log từ Kafka, sử dụng grok để phân tích cú pháp và thêm một số cấu hình mutate để loại bỏ các trường không cần thiết. Bạn cũng có thể thêm bước gsub để lọc các ký tự không hợp lệ nếu cần:

input { kafka { bootstrap_servers => "10.100.30.32:9092" topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"] group_id => "log_consumer_group" auto_offset_reset => "earliest" }
} filter { # Sử dụng gsub để loại bỏ các ký tự không hợp lệ (nếu cần) mutate { gsub => ["message", "\u001b|\n", ""] } # Phân tích cú pháp log với grok grok { match => { "message" => """ {"instant":{"epochSecond":%{NUMBER:epoch_sec}," + "\"nanoOfSecond\":%{NUMBER:nano_sec}," + "\"thread\":\"%{DATA:thread}\"," + "\"level\":\"%{WORD:level}\"," + "\"loggerName\":\"%{DATA:logger_name}\"," + "\"message\":\"%{GREEDYDATA:log_message}\"," + "\"endOfBatch\":%{GREEDYDATA:end_of_batch}," + "\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," + "\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," + "\"spanId\":\"%{DATA:span_id}\"," + "\"className\":\"%{DATA:class_name}\"," + "\"clientIp\":\"%{IP:client_ip}\"," + "\"clientMessageId\":\"%{DATA:client_message_id}\"," + "\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," + "\"duration\":%{NUMBER:duration}," + "\"methodName\":\"%{DATA:method_name}\"," + "\"path\":\"%{DATA:path}\"," + "\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," + "\"threadId\":%{NUMBER:thread_id}," + "\"threadPriority\":%{NUMBER:thread_priority}," + "\"logType\":\"%{WORD:log_type}\"," + "\"application\":\"%{DATA:application}\"," + "\"localIp\":\"%{IP:local_ip}\"} """ } } # Thêm các trường cần thiết và loại bỏ các trường không cần thiết mutate { add_field => { "ts" => "%{epoch_sec}" "ip" => "%{client_ip}" "msgId" => "%{client_message_id}" "duration" => "%{duration}" "method" => "%{method_name}" "path" => "%{path}" "app" => "%{application}" "localIp" => "%{local_ip}" "logType" => "%{log_type}" } remove_field => ["instant", "thread", "level", "logger_name", "message", "endOfBatch", "logger_fqcn", "contextMap", "threadId", "threadPriority"] } # Chuyển đổi timestamp date { match => [ "client_time", "ISO8601" ] timezone => "+07:00" target => "@timestamp" } # Thêm trường ngày tháng ruby { code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))" }
} output { elasticsearch { hosts => ["http://10.152.183.57:9200"] template => "/usr/share/logstash/templates/logstash_template.json" template_name => "logstash" template_overwrite => true index => "logstash-%{indexDay}" document_type => "_doc" codec => json } stdout { codec => rubydebug }
} 

Bình luận

Bài viết tương tự

- vừa được xem lúc

Kafka là gì?

Apache Kafka® là một nền tảng stream dữ liệu phân tán. . stream data: dòng dữ liệu, hãy tưởng tượng dữ liệu là nước trong 1 con suối. .

0 0 43

- vừa được xem lúc

001: Message-driven programming với Message broker và Apache Kafka

Bài viết nằm trong series Apache Kafka từ zero đến one. . . Asynchronous programming.

0 0 165

- vừa được xem lúc

002: Apache Kafka topic, partition, offset và broker

Bài viết nằm trong series Apache Kafka từ zero đến one. Nói qua về lịch sử, Kafka được phát triển bởi LinkedIn (các anh em dev chắc chẳng xa lạ gì) và viết bằng ngôn ngữ JVM, cụ thể là Java và Scala.

0 0 153

- vừa được xem lúc

003: Gửi và nhận message trong Apache Kafka

Bài viết nằm trong series Apache Kafka từ zero đến one. . . Nếu muốn các message được lưu trên cùng một partition để đảm bảo thứ tự thì làm cách nào.

0 0 224

- vừa được xem lúc

004: Apache Kafka consumer offset, Broker discovery và Zookeeper

Bài viết nằm trong series Apache Kafka từ zero đến one. 1) Consumer offset.

0 0 130

- vừa được xem lúc

Apache Kafka - Producer - Gửi message đến Kafka bằng kafka-python

Overview. Understand how to produce message and send to the Kafka topic. Architecture. .

0 0 65