温馨提示×

Filebeat如何进行日志脱敏

小樊
48
2025-11-21 04:02:04
栏目: 编程语言

Filebeat日志脱敏的可行路径与选择

  • Filebeat 处理器(processors)脱敏:在采集端用内置或脚本处理器对事件进行就地改写,适合对已知字段或整条 message 做快速替换、删除、正则化处理。优点是延迟最低、减少敏感数据外泄面;局限是对复杂多行、跨字段、结构化解析后的脱敏能力有限。常见处理器包括:script(JavaScript)dissectreplacedrop_fieldsdrop_event 等。
  • 下游脱敏:将原始日志发送到 LogstashElasticsearch Ingest Pipeline 做更复杂的脱敏与治理(如 Ruby 脚本、条件判断、pipeline 多阶段处理)。优点是灵活可编排、规则集中;代价是多一跳网络与计算开销
  • 架构选择:优先在采集端脱敏“显而易见”的敏感信息;对强合规、跨字段、需重算的复杂规则,放在下游统一治理。

方案一 使用 Filebeat 处理器在采集端脱敏

  • 场景A 直接改写 message(JavaScript 脚本,适合任意文本替换)

    • 思路:用 script 处理器匹配敏感片段并用掩码替换,例如手机号中间四位、身份证中间段、邮箱保留域名等。
    • 示例(脱敏手机号与邮箱,保留前后位,中间打码):
      filebeat.inputs:
      - type: log
        enabled: true
        paths:
          - /var/log/myapp/*.log
        processors:
          - script:
              lang: javascript
              id: mask_sensitive
              source: |
                function process(event) {
                  var msg = event.Get("message");
                  if (msg) {
                    // 手机号:保留前3后4,中间4位掩码
                    msg = msg.replace(/(\b1[3-9]\d{2})(\d{4})(\d{4}\b)/g, '$1****$3');
                    // 邮箱:保留@前1位与域名,中间掩码
                    msg = msg.replace(/([^@\s]+)@([^@\s]+)/g, function(m, u, d) {
                      if (u.length <= 2) return m; // 太短不掩码
                      return u[0] + '***' + u.slice(-1) + '@' + d;
                    });
                    event.Put("message", msg);
                  }
                }
      
    • 说明:脚本语言为 JavaScript;可按需扩展身份证、银行卡、密钥等正则。该方式对性能影响小,适合高吞吐场景。
  • 场景B 结构化后再脱敏(dissect + replace + drop_fields)

    • 思路:先用 dissectdecode_json_fields 将日志拆成结构化字段,再用 replace 对指定字段做掩码,最后 drop_fields 删除不必要字段。
    • 示例(键值对日志,脱敏 client_ip 与 from 字段):
      filebeat.inputs:
      - type: log
        enabled: true
        paths:
          - /var/log/myapp/keyvalue.log
        processors:
          - dissect:
              tokenizer: '%{logmonth} %{logday} %{logtime} %{ip} date=%{logdate},time=%{logtime},device_id=%{device_id},log_id=%{log_id},type=%{type},pri=%{pri},session_id="%{session_id}",client_name="%{client_name}",client_ip="%{client_ip}",client_cc="%{client_cc}",dst_ip="%{dst_ip}",from="%{from}",hfrom="%{hfrom}",to="%{to}",polid="%{polid}",domain="%{domain}",mailer="%{mailer}",resolved="%{resolved}",src_type="%{src_type}",direction="%{direction}",virus="%{virus}",disposition="%{disposition},classifier="%{classifier},message_length="%{message_length}",subject="%{subject}",message_id="%{message_id}",recv_time="%{recv_time}",notif_delay="%{notif_delay}",scan_time="%{scan_time}",xfer_time="%{xfer_time}",srcfolder="%{srcfolder}",read_status="%{read_status}"'
              field: message
              target_prefix: log
          - replace:
              fields:
                - field: log.client_ip
                  pattern: '(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})'
                  replacement: '$1.$2.***.$4'
                - field: log.from
                  pattern: '([^@\s]+)@([^@\s]+)'
                  replacement: '$1***@$2'
          - drop_fields:
              fields: ["message", "log.file", "log.device_id", "log.log_id", "log.pri", "log.session_id", "log.client_name", "log.polid", "log.src_type", "log.direction", "log.message_length", "log.message_id", "log.scan_time", "log.xfer_time", "log.srcfolder", "log.mailer", "log.offset", "log.logmonth", "log.logday", "log.read_status", "log.recv_time"]
      
    • 说明:dissect 适合固定分隔的日志;JSON 日志可用 decode_json_fields 先解析再替换。注意 replace 处理器不支持将 replacement 设为空字符串,如需“删除字段值”请改用脚本处理器或下游处理。

方案二 在 Logstash 或 Elasticsearch 中脱敏

  • Logstash Ruby 过滤器(适合复杂、跨字段、条件逻辑)

    • 思路:Filebeat 将日志发到 Logstash(如端口 5044),用 ruby 过滤器按正则对指定字段脱敏,再写入 ES。
    • 最小示例:
      input { beats { port => 5044 } }
      filter {
        ruby {
          path => "/usr/local/logstash/config/mask.rb"
          script_params => { "fields" => ["message", "client_ip", "from"] }
        }
      }
      output {
        elasticsearch { hosts => ["http://localhost:9200"] index => "app-log-%{+YYYY.MM.dd}" }
      }
      
      mask.rb(示例掩码手机号与邮箱):
      def register(params)
        @fields = params["fields"]
      end
      def filter(event)
        @fields.each do |f|
          v = event.get(f)
          next unless v.is_a?(String)
          v = v.gsub(/(\b1[3-9]\d{2})(\d{4})(\d{4}\b)/, '\1****\3')
          v = v.gsub(/([^@\s]+)@([^@\s]+)/) { |m| m[0] + '***' + m[-1] + '@' + $2 }
          event.set(f, v)
        end
        [event]
      end
      
    • 说明:Ruby 过滤器非常灵活,可做条件分支、分段掩码、正则捕获组替换等;适合集中治理与复杂规则。
  • Elasticsearch Ingest Pipeline(适合在 ES 侧统一治理)

    • 思路:定义包含 Gsub ProcessorDate/Convert ProcessorsPipeline 条件 的 ingest pipeline,将 Filebeat 输出指向该 pipeline,由 ES 在写入时完成脱敏与字段处理。
    • 适用:已有存量索引或希望把治理逻辑集中在 ES 的场景;与 Beats 搭配简单,但会占用 ES 写入计算资源。

实践建议与注意事项

  • 先解析再脱敏:对 JSON 或 KV 日志,优先用 decode_json_fields / dissect 转成结构化字段,再对目标字段做 replace/script,命中率与可维护性更高。
  • 正则与掩码策略:优先使用捕获组保留可辨识前缀/后缀(如手机号前3后4),避免全遮蔽导致排障困难;对邮箱、身份证、银行卡等按业务最小暴露原则设计掩码。
  • 性能与风险:采集端脚本尽量轻量(避免大循环与回溯);对超大字段(如 Base64)可在采集端或下游截断/压缩,减少网络与存储压力(示例实践见多篇案例)。
  • 合规与验证:脱敏属于数据最小化合规要求,务必在测试环境验证规则,确保不影响检索、告警、审计;对关键字段建立脱敏回归测试
  • 字段删除与最小化:不需要的字段用 drop_fields 删除;注意 replace 不能将值置空,需要清空请使用脚本处理器或下游处理。

0