Filebeat日志脱敏的可行路径与选择
方案一 使用 Filebeat 处理器在采集端脱敏
场景A 直接改写 message(JavaScript 脚本,适合任意文本替换)
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/myapp/*.log
processors:
- script:
lang: javascript
id: mask_sensitive
source: |
function process(event) {
var msg = event.Get("message");
if (msg) {
// 手机号:保留前3后4,中间4位掩码
msg = msg.replace(/(\b1[3-9]\d{2})(\d{4})(\d{4}\b)/g, '$1****$3');
// 邮箱:保留@前1位与域名,中间掩码
msg = msg.replace(/([^@\s]+)@([^@\s]+)/g, function(m, u, d) {
if (u.length <= 2) return m; // 太短不掩码
return u[0] + '***' + u.slice(-1) + '@' + d;
});
event.Put("message", msg);
}
}
场景B 结构化后再脱敏(dissect + replace + drop_fields)
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/myapp/keyvalue.log
processors:
- dissect:
tokenizer: '%{logmonth} %{logday} %{logtime} %{ip} date=%{logdate},time=%{logtime},device_id=%{device_id},log_id=%{log_id},type=%{type},pri=%{pri},session_id="%{session_id}",client_name="%{client_name}",client_ip="%{client_ip}",client_cc="%{client_cc}",dst_ip="%{dst_ip}",from="%{from}",hfrom="%{hfrom}",to="%{to}",polid="%{polid}",domain="%{domain}",mailer="%{mailer}",resolved="%{resolved}",src_type="%{src_type}",direction="%{direction}",virus="%{virus}",disposition="%{disposition},classifier="%{classifier},message_length="%{message_length}",subject="%{subject}",message_id="%{message_id}",recv_time="%{recv_time}",notif_delay="%{notif_delay}",scan_time="%{scan_time}",xfer_time="%{xfer_time}",srcfolder="%{srcfolder}",read_status="%{read_status}"'
field: message
target_prefix: log
- replace:
fields:
- field: log.client_ip
pattern: '(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})'
replacement: '$1.$2.***.$4'
- field: log.from
pattern: '([^@\s]+)@([^@\s]+)'
replacement: '$1***@$2'
- drop_fields:
fields: ["message", "log.file", "log.device_id", "log.log_id", "log.pri", "log.session_id", "log.client_name", "log.polid", "log.src_type", "log.direction", "log.message_length", "log.message_id", "log.scan_time", "log.xfer_time", "log.srcfolder", "log.mailer", "log.offset", "log.logmonth", "log.logday", "log.read_status", "log.recv_time"]
方案二 在 Logstash 或 Elasticsearch 中脱敏
Logstash Ruby 过滤器(适合复杂、跨字段、条件逻辑)
input { beats { port => 5044 } }
filter {
ruby {
path => "/usr/local/logstash/config/mask.rb"
script_params => { "fields" => ["message", "client_ip", "from"] }
}
}
output {
elasticsearch { hosts => ["http://localhost:9200"] index => "app-log-%{+YYYY.MM.dd}" }
}
mask.rb(示例掩码手机号与邮箱):def register(params)
@fields = params["fields"]
end
def filter(event)
@fields.each do |f|
v = event.get(f)
next unless v.is_a?(String)
v = v.gsub(/(\b1[3-9]\d{2})(\d{4})(\d{4}\b)/, '\1****\3')
v = v.gsub(/([^@\s]+)@([^@\s]+)/) { |m| m[0] + '***' + m[-1] + '@' + $2 }
event.set(f, v)
end
[event]
end
Elasticsearch Ingest Pipeline(适合在 ES 侧统一治理)
实践建议与注意事项