Dưới đây là một ví dụ cấu hình cho tệp logstash.conf của bạn để xử lý log từ Kafka, sử dụng grok để phân tích cú pháp và thêm một số cấu hình mutate để loại bỏ các trường không cần thiết. Bạn cũng có thể thêm bước gsub để lọc các ký tự không hợp lệ nếu cần:
input { kafka { bootstrap_servers => "10.100.30.32:9092" topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"] group_id => "log_consumer_group" auto_offset_reset => "earliest" }
} filter { # Sử dụng gsub để loại bỏ các ký tự không hợp lệ (nếu cần) mutate { gsub => ["message", "\u001b|\n", ""] } # Phân tích cú pháp log với grok grok { match => { "message" => """ {"instant":{"epochSecond":%{NUMBER:epoch_sec}," + "\"nanoOfSecond\":%{NUMBER:nano_sec}," + "\"thread\":\"%{DATA:thread}\"," + "\"level\":\"%{WORD:level}\"," + "\"loggerName\":\"%{DATA:logger_name}\"," + "\"message\":\"%{GREEDYDATA:log_message}\"," + "\"endOfBatch\":%{GREEDYDATA:end_of_batch}," + "\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," + "\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," + "\"spanId\":\"%{DATA:span_id}\"," + "\"className\":\"%{DATA:class_name}\"," + "\"clientIp\":\"%{IP:client_ip}\"," + "\"clientMessageId\":\"%{DATA:client_message_id}\"," + "\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," + "\"duration\":%{NUMBER:duration}," + "\"methodName\":\"%{DATA:method_name}\"," + "\"path\":\"%{DATA:path}\"," + "\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," + "\"threadId\":%{NUMBER:thread_id}," + "\"threadPriority\":%{NUMBER:thread_priority}," + "\"logType\":\"%{WORD:log_type}\"," + "\"application\":\"%{DATA:application}\"," + "\"localIp\":\"%{IP:local_ip}\"} """ } } # Thêm các trường cần thiết và loại bỏ các trường không cần thiết mutate { add_field => { "ts" => "%{epoch_sec}" "ip" => "%{client_ip}" "msgId" => "%{client_message_id}" "duration" => "%{duration}" "method" => "%{method_name}" "path" => "%{path}" "app" => "%{application}" "localIp" => "%{local_ip}" "logType" => "%{log_type}" } remove_field => ["instant", "thread", "level", "logger_name", "message", "endOfBatch", "logger_fqcn", "contextMap", "threadId", "threadPriority"] } # Chuyển đổi timestamp date { match => [ "client_time", "ISO8601" ] timezone => "+07:00" target => "@timestamp" } # Thêm trường ngày tháng ruby { code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))" }
} output { elasticsearch { hosts => ["http://10.152.183.57:9200"] template => "/usr/share/logstash/templates/logstash_template.json" template_name => "logstash" template_overwrite => true index => "logstash-%{indexDay}" document_type => "_doc" codec => json } stdout { codec => rubydebug }
}