Dưới đây là cấu hình Logstash cho bộ lọc dựa trên dữ liệu log request và response mà bạn đã cung cấp. Cấu hình này sẽ sử dụng plugin grok để phân tích cú pháp các log và plugin mutate để xử lý các trường cần thiết.
Cấu Hình Logstash Cho request và response
input { kafka { bootstrap_servers => "10.100.30.32:9092" topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"] group_id => "log_consumer_group" auto_offset_reset => "earliest" }
} filter { # Sử dụng mutate để lọc đi ký tự không mong muốn mutate { gsub => ["message", "\u001b|\n", ""] } # Phân tích log response if "RESPONSE" in [message] { grok { match => { "message" => """ {"instant":{"epochSecond":%{NUMBER:epoch_sec}," + "\"nanoOfSecond\":%{NUMBER:nano_sec}," + "\"thread\":\"%{DATA:thread}\"," + "\"level\":\"%{WORD:level}\"," + "\"loggerName\":\"%{DATA:logger_name}\"," + "\"message\":\"%{GREEDYDATA:log_message}\"," + "\"endOfBatch\":%{GREEDYDATA:end_of_batch}," + "\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," + "\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," + "\"spanId\":\"%{DATA:span_id}\"," + "\"className\":\"%{DATA:class_name}\"," + "\"clientIp\":\"%{IP:client_ip}\"," + "\"clientMessageId\":\"%{DATA:client_message_id}\"," + "\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," + "\"duration\":%{NUMBER:duration}," + "\"methodName\":\"%{DATA:method_name}\"," + "\"path\":\"%{DATA:path}\"," + "\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," + "\"threadId\":%{NUMBER:thread_id}," + "\"threadPriority\":%{NUMBER:thread_priority}," + "\"logType\":\"%{WORD:log_type}\"," + "\"application\":\"%{DATA:application}\"," + "\"localIp\":\"%{IP:local_ip}\"} """ } } } # Phân tích log request if "REQUEST" in [message] { grok { match => { "message" => """ {"instant":{"epochSecond":%{NUMBER:epoch_sec}," + "\"nanoOfSecond\":%{NUMBER:nano_sec}," + "\"thread\":\"%{DATA:thread}\"," + "\"level\":\"%{WORD:level}\"," + "\"loggerName\":\"%{DATA:logger_name}\"," + "\"message\":\"%{GREEDYDATA:log_message}\"," + "\"endOfBatch\":%{GREEDYDATA:end_of_batch}," + "\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," + "\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," + "\"spanId\":\"%{DATA:span_id}\"," + "\"className\":\"%{DATA:class_name}\"," + "\"clientIp\":\"%{IP:client_ip}\"," + "\"clientMessageId\":\"%{DATA:client_message_id}\"," + "\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," + "\"duration\":%{NUMBER:duration}," + "\"methodName\":\"%{DATA:method_name}\"," + "\"path\":\"%{DATA:path}\"," + "\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," + "\"threadId\":%{NUMBER:thread_id}," + "\"threadPriority\":%{NUMBER:thread_priority}," + "\"logType\":\"%{WORD:log_type}\"," + "\"application\":\"%{DATA:application}\"," + "\"localIp\":\"%{IP:local_ip}\"} """ } } } # Tạo các trường mới mutate { add_field => { "ts" => "%{epoch_sec}" "ip" => "%{client_ip}" "msgId" => "%{client_message_id}" "duration" => "%{duration}" "method" => "%{method_name}" "path" => "%{path}" "app" => "%{application}" "localIp" => "%{local_ip}" "logType" => "%{log_type}" "type" => "%{+log_type}" # Đánh dấu loại log (REQUEST/RESPONSE) } remove_field => [ "instant", "thread", "level", "logger_name", "message", "endOfBatch", "logger_fqcn", "contextMap", "threadId", "threadPriority" ] } # Chuyển đổi timestamp date { match => [ "client_time", "ISO8601" ] timezone => "+07:00" target => "@timestamp" } ruby { code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))" }
} output { elasticsearch { hosts => ["http://10.152.183.57:9200"] template => "/usr/share/logstash/templates/logstash_template.json" template_name => "logstash" template_overwrite => true index => "logstash-%{indexDay}" document_type => "_doc" } stdout { codec => rubydebug }
}