Dựa vào log bạn đã cung cấp, dưới đây là cấu hình Logstash để lấy ra các trường cần thiết và viết tắt chúng để dễ dàng thực hiện truy vấn KQL (Kibana Query Language).
Cấu hình Logstash
filter { # Sử dụng grok để trích xuất các trường từ log grok { match => { "message" => """ {"instant":{"epochSecond":%{NUMBER:epoch_sec}," + "\"nanoOfSecond\":%{NUMBER:nano_sec}," + "\"thread\":\"%{DATA:thread}\"," + "\"level\":\"%{WORD:level}\"," + "\"loggerName\":\"%{DATA:logger_name}\"," + "\"message\":\"%{GREEDYDATA:log_message}\"," + "\"endOfBatch\":%{GREEDYDATA:end_of_batch}," + "\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," + "\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," + "\"spanId\":\"%{DATA:span_id}\"," + "\"className\":\"%{DATA:class_name}\"," + "\"clientIp\":\"%{IP:client_ip}\"," + "\"clientMessageId\":\"%{DATA:client_message_id}\"," + "\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," + "\"duration\":%{NUMBER:duration}," + "\"methodName\":\"%{DATA:method_name}\"," + "\"path\":\"%{DATA:path}\"," + "\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," + "\"threadId\":%{NUMBER:thread_id}," + "\"threadPriority\":%{NUMBER:thread_priority}," + "\"logType\":\"%{WORD:log_type}\"," + "\"application\":\"%{DATA:application}\"," + "\"localIp\":\"%{IP:local_ip}\"} """ } } # Tạo các trường viết tắt mutate { add_field => { "ts" => "%{epoch_sec}" "ip" => "%{client_ip}" "msgId" => "%{client_message_id}" "duration" => "%{duration}" "method" => "%{method_name}" "path" => "%{path}" "app" => "%{application}" "localIp" => "%{local_ip}" "logType" => "%{log_type}" } remove_field => ["instant", "thread", "level", "logger_name", "message", "endOfBatch", "logger_fqcn", "contextMap", "threadId", "threadPriority"] }
}
Giải thích Cấu hình: Input: Đọc dữ liệu từ một file JSON. Bạn cần thay đổi đường dẫn "/path/to/your/logfile.log" thành đường dẫn thực tế của file log.
Filter:
Sử dụng json để giải mã trường message từ log. Dùng mutate để tạo ra các trường viết tắt: ts: Thời gian từ epochSecond. ip: Địa chỉ IP của client. msgId: ID của message. duration: Thời gian xử lý. method: Phương thức API. path: Đường dẫn API. app: Tên ứng dụng. localIp: Địa chỉ IP cục bộ. logType: Loại log (REQUEST/RESPONSE). Xóa các trường không cần thiết để tiết kiệm dung lượng lưu trữ và làm cho dữ liệu dễ quản lý hơn. Output:
Ghi dữ liệu vào Elasticsearch với tên index có định dạng theo ngày. Xuất dữ liệu ra stdout để kiểm tra. Bạn có thể điều chỉnh cấu hình này dựa trên nhu cầu cụ thể của bạn. Hãy đảm bảo rằng bạn đã cấu hình đúng đường dẫn và thông tin kết nối đến Elasticsearch.
Nếu bạn muốn sử dụng grok để phân tích log trước khi lấy ra các trường cần thiết, bạn có thể điều chỉnh cấu hình Logstash như sau. Cấu hình dưới đây sẽ sử dụng grok để trích xuất thông tin từ log và sau đó sử dụng mutate để tạo các trường viết tắt.
Cấu hình Logstash với Grok
filter { # Sử dụng grok để trích xuất các trường từ log grok { match => { "message" => """ {"instant":{"epochSecond":%{NUMBER:epoch_sec}," + "\"nanoOfSecond\":%{NUMBER:nano_sec}," + "\"thread\":\"%{DATA:thread}\"," + "\"level\":\"%{WORD:level}\"," + "\"loggerName\":\"%{DATA:logger_name}\"," + "\"message\":\"%{GREEDYDATA:log_message}\"," + "\"endOfBatch\":%{GREEDYDATA:end_of_batch}," + "\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," + "\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," + "\"spanId\":\"%{DATA:span_id}\"," + "\"className\":\"%{DATA:class_name}\"," + "\"clientIp\":\"%{IP:client_ip}\"," + "\"clientMessageId\":\"%{DATA:client_message_id}\"," + "\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," + "\"duration\":%{NUMBER:duration}," + "\"methodName\":\"%{DATA:method_name}\"," + "\"path\":\"%{DATA:path}\"," + "\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," + "\"threadId\":%{NUMBER:thread_id}," + "\"threadPriority\":%{NUMBER:thread_priority}," + "\"logType\":\"%{WORD:log_type}\"," + "\"application\":\"%{DATA:application}\"," + "\"localIp\":\"%{IP:local_ip}\"} """ } } # Tạo các trường viết tắt mutate { add_field => { "ts" => "%{epoch_sec}" "ip" => "%{client_ip}" "msgId" => "%{client_message_id}" "duration" => "%{duration}" "method" => "%{method_name}" "path" => "%{path}" "app" => "%{application}" "localIp" => "%{local_ip}" "logType" => "%{log_type}" } remove_field => ["instant", "thread", "level", "logger_name", "message", "endOfBatch", "logger_fqcn", "contextMap", "threadId", "threadPriority"] }
}
Giải thích Cấu hình: Input: Tương tự như trước, đọc dữ liệu từ một file JSON.
Filter:
Grok: Sử dụng biểu thức grok để trích xuất các trường từ log. Mỗi trường được gán tên để dễ dàng truy cập sau này. Mutate: Sau khi trích xuất các trường bằng grok, sử dụng mutate để tạo các trường viết tắt tương tự như trong cấu hình trước. Output: Ghi dữ liệu vào Elasticsearch và xuất ra stdout.
Lưu ý: Bạn cần thay đổi đường dẫn tới file log trong phần input. Kiểm tra và điều chỉnh định dạng grok nếu cần thiết, đảm bảo rằng nó phù hợp với cấu trúc thực tế của log mà bạn đang sử dụng. Nếu bạn có thêm yêu cầu cụ thể nào khác về việc sử dụng grok, hãy cho tôi biết!
Template JSON cho Log
{ "template": { "mappings": { "properties": { "ts": { "type": "date", "format": "epoch_second" }, "ip": { "type": "ip" }, "msgId": { "type": "keyword" }, "duration": { "type": "integer" }, "method": { "type": "keyword" }, "path": { "type": "keyword" }, "app": { "type": "keyword" }, "localIp": { "type": "ip" }, "logType": { "type": "keyword" }, "loggerName": { "type": "keyword" }, "level": { "type": "keyword" }, "clientTime": { "type": "date", "format": "strict_date_optional_time||epoch_millis" }, "stackTrace": { "type": "text" } } } }
}
Giải thích Template JSON: mappings: Đây là phần định nghĩa các trường và kiểu dữ liệu của chúng mà bạn sẽ gửi lên Elasticsearch. ts: Thời gian log dưới dạng epoch_second, cho phép Elasticsearch hiểu và phân tích thời gian đúng cách. ip: Địa chỉ IP của client, định dạng ip giúp Elasticsearch xử lý tốt hơn. msgId: ID của message, kiểu keyword để dễ dàng truy vấn. duration: Thời gian xử lý, kiểu integer để thực hiện các phép toán. method: Phương thức HTTP, kiểu keyword. path: Đường dẫn API, kiểu keyword. app: Tên ứng dụng, kiểu keyword. localIp: Địa chỉ IP cục bộ, kiểu ip. logType: Loại log (REQUEST/RESPONSE), kiểu keyword. loggerName: Tên logger, kiểu keyword. level: Mức độ log (INFO, ERROR,...), kiểu keyword. clientTime: Thời gian của client, hỗ trợ nhiều định dạng ngày tháng. stackTrace: Stack trace nếu có lỗi, kiểu text để lưu trữ thông tin lớn.