温馨提示×

ubuntu上kafka安全设置怎么做

小樊
48
2025-11-15 16:26:56
栏目: 智能运维

Ubuntu上Kafka安全设置实操指南

一 安全目标与总体架构

  • 启用传输加密:优先使用 SASL_SSLSSL 保护数据链路,禁用明文 PLAINTEXT
  • 启用身份认证:推荐 SASL/SCRAM(如 SCRAM-SHA-256/512),便于动态管理用户;不建议在集群内长期使用 SASL/PLAIN
  • 启用授权控制:开启 ACL,按用户/客户端主机对 Topic/Group/Cluster 进行精细化授权。
  • 网络与系统加固:仅开放必要端口,限制来源网段,开启防火墙与系统日志审计。

二 生成证书与密钥库

  • 准备证书目录并生成 Broker 密钥库(JKS):
KAFKA_HOME=/opt/kafka
mkdir -p $KAFKA_HOME/config/certs

keytool -genkey \
  -alias kafka \
  -keyalg RSA \
  -keystore $KAFKA_HOME/config/certs/kafka.server.keystore.jks \
  -validity 3650 \
  -keysize 2048
  • 导出并分发证书,构建信任库(每个 Broker 和客户端都需要导入其他 Broker 的证书):
# 导出
keytool -export \
  -alias kafka \
  -file $KAFKA_HOME/config/certs/kafka.server.crt \
  -keystore $KAFKA_HOME/config/certs/kafka.server.keystore.jks

# 导入到信任库(客户端与集群内各 Broker 均执行)
keytool -import \
  -alias kafka \
  -file $KAFKA_HOME/config/certs/kafka.server.crt \
  -keystore $KAFKA_HOME/config/certs/kafka.server.truststore.jks
  • 提示:生产环境建议使用由内部 CA 签发的证书,并在所有节点统一信任该 CA

三 配置 Broker 安全参数

  • 编辑 $KAFKA_HOME/config/server.properties,建议仅保留安全监听器并开启双向认证:
# 监听与对外地址(按实际主机名/IP填写)
listeners=SASL_SSL://0.0.0.0:9093
advertised.listeners=SASL_SSL://<your.kafka.host>:9093

# 内部通信安全
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512

# SSL 参数
ssl.keystore.location=/opt/kafka/config/certs/kafka.server.keystore.jks
ssl.keystore.password=YourKeystorePass
ssl.key.password=YourKeyPass
ssl.truststore.location=/opt/kafka/config/certs/kafka.server.truststore.jks
ssl.truststore.password=YourTruststorePass
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.3
# 可选:限制加密套件
# ssl.cipher.suites=TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384

# 授权器(ACL)
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 超级用户(谨慎授予,运维/初始化用)
super.users=User:admin
  • 说明:
    • 如暂未启用 SASL,可先用 SSL 监听器(将 listeners 改为 SSL://),但生产建议“加密+认证”同时使用。
    • 若使用 KRaft 模式,请将上述安全配置同样应用于 controller.listenersinter.broker.listener.name

四 配置 JAAS 与用户凭证

  • 创建 JAAS 配置文件 $KAFKA_HOME/config/kafka_server_jaas.conf
KafkaServer {
  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin"
  password="AdminPass!";
};

# 如果仍使用 Zookeeper,需为 ZK 连接配置身份(示例)
Client {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  username="zkadmin"
  password="ZkPass!";
};
  • 在 Broker 启动脚本中注入 JAAS(以 kafka-server-start.sh 为例,在 exec java 前加入):
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf"
  • 创建 SCRAM 用户(无需重启 Broker):
# 创建 admin(如上面 JAAS 已配置,可跳过或用于变更密码)
$KAFKA_HOME/bin/kafka-configs.sh --alter \
  --add-config 'SCRAM-SHA-512=[iterations=8192,password=AdminPass!]' \
  --entity-type users --entity-name admin

# 创建业务用户
$KAFKA_HOME/bin/kafka-configs.sh --alter \
  --add-config 'SCRAM-SHA-512=[iterations=8192,password=WriterPass!' \
  --entity-type users --entity-name writer

$KAFKA_HOME/bin/kafka-configs.sh --alter \
  --add-config 'SCRAM-SHA-512=[iterations=8192,password=ReaderPass!' \
  --entity-type users --entity-name reader
  • 提示:SCRAM 凭证可通过命令行随时增删改,适合生产运维。

五 客户端与 ACL 验证及网络加固

  • 客户端配置示例(producer.properties/consumer.properties 或命令行 --command-config):
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="writer" \
  password="WriterPass!";

ssl.truststore.location=/opt/kafka/config/certs/kafka.server.truststore.jks
ssl.truststore.password=YourTruststorePass
# 双向认证时再配置客户端密钥库
# ssl.keystore.location=/path/to/client.keystore.jks
# ssl.keystore.password=YourClientKeystorePass
# ssl.key.password=YourClientKeyPass
  • 命令行快速测试:
# 列出 Topic(使用 SCRAM 凭证)
$KAFKA_HOME/bin/kafka-topics.sh --list \
  --bootstrap-server <your.kafka.host>:9093 \
  --command-config <(echo -e "security.protocol=SASL_SSL\nsasl.mechanism=SCRAM-SHA-512\nsasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=\"writer\" password=\"WriterPass!\";")

# 生产消息
$KAFKA_HOME/bin/kafka-console-producer.sh --topic test-topic \
  --bootstrap-server <your.kafka.host>:9093 \
  --producer.config producer.properties

# 消费消息
$KAFKA_HOME/bin/kafka-console-consumer.sh --topic test-topic \
  --from-beginning \
  --bootstrap-server <your.kafka.host>:9093 \
  --consumer.config consumer.properties
  • 配置 ACL(示例:授予 writer 对 test-topic 的生产权限,reader 的消费权限):
# 写权限
$KAFKA_HOME/bin/kafka-acls.sh --add \
  --topic test-topic \
  --principal User:writer \
  --operation Write --operation Create \
  --host <client.ip.or.cidr> \
  --bootstrap-server <your.kafka.host>:9093

# 读权限
$KAFKA_HOME/bin/kafka-acls.sh --add \
  --topic test-topic \
  --principal User:reader \
  --operation Read --operation Describe \
  --group '*' \
  --host <client.ip.or.cidr> \
  --bootstrap-server <your.kafka.host>:9093

# 查看 ACL
$KAFKA_HOME/bin/kafka-acls.sh --list \
  --topic test-topic \
  --bootstrap-server <your.kafka.host>:9093
  • 网络与防火墙(UFW 示例,仅开放 9093):
sudo ufw allow 9093/tcp
sudo ufw enable
# 云环境请同时配置安全组仅放通可信网段
  • 验证与排错要点:
    • 使用 TLSv1.2+,禁用 PLAINTEXT;必要时开启 ssl.client.auth=required 做双向认证。
    • 客户端报 “SASL authentication failed” 时,核对 sasl.jaas.config 的用户名/密码与 SCRAM 配置是否一致。
    • ACL 无权限时,检查 principal(User:xxx)、host 段、以及是否对 Topic/Group/Cluster 授予了对应 operation

0