找回密码
 立即注册
首页 业界区 业界 Kafka配置SASL_SSL认证传输加密

Kafka配置SASL_SSL认证传输加密

丧血槌 2025-9-30 11:46:56
本文分享自天翼云开发者社区《Kafka配置SASL_SSL认证传输加密》,作者:王****帅
一、SSL证书配置

1、生成证书

    如我输入命令如下:依次是 密码—重输密码—名与姓—组织单位—组织名—城市—省份—国家两位代码—密码—重输密码,后面告警不用管,此步骤要注意的是,名与姓这一项必须输入域名,如 “localhost”,切记不可以随意写,我曾尝试使用其他字符串,在后面客户端生成证书认证的时候一直有问题。
  1. keytool -keystore server.keystore.jks -alias localhost -validity 3650 -genkey
  2. Enter keystore password:
  3. Re-enter new password:
  4. What is your first and last name?
  5. [Unknown]:  localhost
  6. What is the name of your organizational unit?
  7. [Unknown]:  CH-kafka
  8. What is the name of your organization?
  9. [Unknown]:  kafkadev
  10. What is the name of your City or Locality?
  11. [Unknown]:  shanghai
  12. What is the name of your State or Province?
  13. [Unknown]:  shanghai
  14. What is the two-letter country code for this unit?
  15. [Unknown]:  CH
  16. Is CN=localhost, OU=CH-kafka, O=kafkadev, L=shanghai, ST=shanghai, C=CH correct?
  17. [no]:  yes
  18. Enter key password for <localhost>
  19.         (RETURN if same as keystore password):  
  20. Re-enter new password:
  21. Warning:
  22. The JKS keystore uses a proprietary format. It is recommended to migrate to PKCS12 which is an industry standard format using "keytool -importkeystore -srckeystore server.keystore.jks -destkeystore server.keystore.jks -deststoretype pkcs12".
复制代码
完成上面步骤,可使用命令 keytool -list -v -keystore server.keystore.jks 来验证生成证书的内容
2、生成CA

     通过第一步,集群中的每台机器都生成一对公私钥,和一个证书来识别机器。但是,证书是未签名的,这意味着攻击者可以创建一个这样的证书来伪装成任何机器。
     因此,通过对集群中的每台机器进行签名来防止伪造的证书。证书颁发机构(CA)负责签名证书。CA的工作机制像一个颁发护照的政府。政府印章(标志)每本护照,这样护照很难伪造。其他政府核实护照的印章,以确保护照是真实的。同样,CA签名的证书和加密保证签名证书很难伪造。因此,只要CA是一个真正和值得信赖的权威,client就能有较高的保障连接的是真正的机器。如下,生成的CA是一个简单的公私钥对和证书,用于签名其他的证书,下面为输入命令,依次提示输入为 密码—重输密码—国家两位代码—省份—城市—名与姓—组织名—组织单位—名与姓(域名)—邮箱 ,此输入步骤与上面生成证书世输入步骤相反,输入值要与第一步一致,邮箱可不输入

  1. openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650 Generating a 2048 bit RSA private key .........................................................................+++ ..................+++ writing new private key to 'ca-key' Enter PEM pass phrase: Verifying - Enter PEM pass phrase: ----- You are about to be asked to enter information that will be incorporated into your certificate request. What you are about to enter is what is called a Distinguished Name or a DN. There are quite a few fields but you can leave some blank For some fields there will be a default value, If you enter '.', the field will be left blank. ----- Country Name (2 letter code) [XX]:CH State or Province Name (full name) []:shanghai Locality Name (eg, city) [Default City]:shanghai Organization Name (eg, company) [Default Company Ltd]:kafkadev Organizational Unit Name (eg, section) []:CH-kafka Common Name (eg, your name or your server's hostname) []:localhost Email Address []:
  2. 将生成的CA添加到**clients' truststore(客户的信任库)**,以便client可以信任这个CA:
  3. keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
复制代码
3、签名证书

步骤2生成的CA来签名所有步骤1生成的证书,首先,你需要从密钥仓库导出证书:
  1. keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
复制代码
然后用CA签名:{validity},{ca-password} 两个为参数,
  1. openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
复制代码
最后,你需要导入CA的证书和已签名的证书到密钥仓库:
  1. keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
复制代码
上文中的各参数解释如下:
keystore: 密钥仓库的位置
ca-cert: CA的证书
ca-key: CA的私钥
ca-password: CA的密码
cert-file: 出口,服务器的未签名证书
cert-signed: 已签名的服务器证书
上面步骤所有执行脚本如下:注意密码修改为自己的密码,以防混淆,所有步骤密码最好设为同一个
  1. #!/bin/bash
  2. #Step 1
  3. keytool -keystore server.keystore.jks -alias localhost -validity 3650 -keyalg RSA -genkey
  4. #Step 2
  5. openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650
  6. keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
  7. keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
  8. #Step 3
  9. keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
  10. openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:123456
  11. keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
  12. keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
  13. #Step 4
  14. keytool -keystore client.keystore.jks -alias localhost -validity 3650 -keyalg RSA -genkey
复制代码
二、配置zookeeper的安全认证

1、在zookeeper的conf文件夹下创建jaas.conf安全配置文件

此文件中定义了两个用户 admin以及kafka 等于号后面是用户对应的密码
此文件定义的是连接zookeeper服务器的用户 JAAS配置节点默认为Server(节点名不可修改,修改后会报错)
  1. Server {
  2.   org.apache.zookeeper.server.auth.DigestLoginModule required
  3.   user_admin="!234Qwer"
  4.   user_kafka="clearwater001";
  5. };
复制代码
2、在zookeeper的配置文件zoo.cfg中添加认证配置源文件如下
  1. tickTime=2000
  2. initLimit=10
  3. syncLimit=5
  4. dataDir=/usr/local/zookeeper/apache-zookeeper-3.5.9-bin/data
  5. dataLogDir=/usr/local/zookeeper/apache-zookeeper-3.5.9-bin/logs
  6. clientPort=2181
  7. #sasl认证
  8. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  9. requireClientAuthScheme=sasl
  10. jaasLoginRenew=3600000
复制代码
3、在zkEnv.sh启动环境脚本中添加jvm参数,将jaas的配置文件位置作为JVM参数传递给每个客户端的JVM
  1.   LIBPATH=("${ZOOKEEPER_PREFIX}"/share/zookeeper/*.jar)
  2. else
  3.   #release tarball format
  4.   for i in "$ZOOBINDIR"/../zookeeper-*.jar
  5.   do
  6.     CLASSPATH="$i:$CLASSPATH"
  7.   done
  8.   LIBPATH=("${ZOOBINDIR}"/../lib/*.jar)
  9. fi
  10. for i in "${LIBPATH[@]}"
  11. do
  12.     CLASSPATH="$i:$CLASSPATH"
  13. done
  14. #make it work for developers
  15. for d in "$ZOOBINDIR"/../build/lib/*.jar
  16. do
  17.    CLASSPATH="$d:$CLASSPATH"
  18. done
  19. for d in "$ZOOBINDIR"/../zookeeper-server/target/lib/*.jar
  20. do
  21.    CLASSPATH="$d:$CLASSPATH"
  22. done
  23. #make it work for developers
  24. CLASSPATH="$ZOOBINDIR/../build/classes:$CLASSPATH"
  25. #make it work for developers
  26. CLASSPATH="$ZOOBINDIR/../zookeeper-server/target/classes:$CLASSPATH"
  27. case "`uname`" in
  28.     CYGWIN*|MINGW*) cygwin=true ;;
  29.     *) cygwin=false ;;
  30. esac
  31. if $cygwin
  32. then
  33.     CLASSPATH=`cygpath -wp "$CLASSPATH"`
  34. fi
  35. #echo "CLASSPATH=$CLASSPATH"
  36. # default heap for zookeeper server
  37. ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-1000}"
  38. export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS"
  39. #JVM参数
  40. export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/usr/local/zookeeper/apache-zookeeper-3.5.9-bin/conf/jaas.conf"
  41. # default heap for zookeeper client
  42. ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-256}"
  43. export CLIENT_JVMFLAGS="-Xmx${ZK_CLIENT_HEAP}m $CLIENT_JVMFLAGS"
复制代码
三、配置kafka的安全认证


1、在kafka的conf目录下创建jaas.conf认证文件


username和password属性 用来定义kafka中各个broker节点之间相互通信的用户
user_用来定义连接到kafka中各个broker的用户 这些用户可供生产者以及消费者进行使用
两个用户的配置均在JAAS默认配置节点KafkaServer中进行配置
broker连接到zookeeper的用户在JAAS默认配置节点Client中进行配置,从上面zookeeper中的jaas文件中选择一个用户进行使用

  1. KafkaServer {
  2.         org.apache.kafka.common.security.plain.PlainLoginModule required
  3.         username="admin"
  4.         password="clearwater"
  5.         user_admin="clearwater"
  6.         user_kafka="!234Qwer";
  7. };
  8. Client {
  9.   org.apache.kafka.common.security.plain.PlainLoginModule required
  10.     username="kafka"
  11.     password="clearwater001";
  12. };
复制代码
  1.  
复制代码
2、在kafka的conf目录下创建kafka_client_jaas.conf认证文件
  1. KafkaClient {
  2.         org.apache.kafka.common.security.plain.PlainLoginModule required
  3.         username="kafka"
  4.         password="!234Qwer";
  5. };
复制代码
3、在kafka的bin目录下kafka-server-start.sh的启动脚本中配置环境变量,指定jaas.conf文件
  1. if [ $# -lt 1 ];
  2. then
  3.         echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
  4.         exit 1
  5. fi
  6. base_dir=$(dirname $0)
  7. if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
  8.     export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
  9. fi
  10. #环境变量
  11. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  12.     export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G  -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.8.0/config/jaas.conf"
  13. fi
  14. EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
  15. COMMAND=$1
  16. case $COMMAND in
  17.   -daemon)
  18.     EXTRA_ARGS="-daemon "$EXTRA_ARGS
  19.     shift
  20.     ;;
  21.   *)
  22.     ;;
  23. esac
  24. exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
复制代码
4、在kafka的bin目录下kafka-console-producer.sh的启动脚本中配置环境变量,指定jaas.conf文件
  1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  2.     export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.8.0/config/kafka_client_jaas.conf"
  3. fi
  4. exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
复制代码
5、在kafka的bin目录下kafka-server-start.sh的启动脚本中配置环境变量,指定jaas.conf文件
  1. if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  2.     export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.8.0/config/kafka_client_jaas.conf"
  3. fi
  4. exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
复制代码
6、在kafka的bin目录下创建client-ssl.properties认证文件(执行生产者和消费者命令时指定)
  1. security.protocol=SASL_SSL
  2. ssl.endpoint.identification.algorithm=
  3. sasl.mechanism=PLAIN
  4. group.id=test
  5. ssl.truststore.location=/usr/local/kafka_2.12-2.8.0/ssl/client.truststore.jks
  6. ssl.truststore.password=clearwater001!
复制代码
7、配置kafka的server.properties配置文件,添加如下内容
  1. #sasl_ssl
  2. listeners=SASL_SSL://172.17.0.53:9093
  3. advertised.listeners=SASL_SSL://172.17.0.53:9093
  4. security.inter.broker.protocol=SASL_SSL
  5. sasl.enabled.mechanisms=PLAIN
  6. sasl.mechanism.inter.broker.protocol=PLAIN
  7. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  8. allow.everyone.if.no.acl.found=true
  9. ssl.keystore.location=/usr/local/kafka_2.12-2.8.0/ssl/server.keystore.jks
  10. ssl.keystore.password=clearwater001!
  11. ssl.key.password=clearwater001!
  12. ssl.truststore.location=/usr/local/kafka_2.12-2.8.0/ssl/server.truststore.jks
  13. ssl.truststore.password=clearwater001!
  14. ssl.endpoint.identification.algorithm=
复制代码
8、重启zookeeper和kafka,创建topic(命令在第四节),添加生产者和消费者授权
  1. #生产者授权
  2. ./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:"kafka" --producer --topic "test"
  3. #消费者授权
  4. ./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:"kafka" --consumer --topic "test" --group '*'
复制代码
四、相关启动命令

1、启动zookeeper
  1. /usr/local/zookeeper/apache-zookeeper-3.5.9-bin/bin/zkServer.sh start
复制代码
2、启动kafka-server
  1. ./kafka-server-start.sh  -daemon /usr/local/kafka_2.12-2.8.0/config/server.properties  
复制代码
3、创建topic
  1. ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  2. #查看topic list
  3. ./kafka-topics.sh --list --zookeeper localhost:2181
复制代码
4、加密前生产消费消息(一般使用新版本命令)
  1. ###生产消息###
  2. #老版本
  3. ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
  4. #新版本
  5. ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
  6. ###消费消息###
  7. #老版本
  8. ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
  9. #新版本
  10. ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
复制代码
5、加密后生产消费消息命令
  1. #生产消息
  2. ./kafka-console-producer.sh --bootstrap-server 172.17.0.53:9093 --topic test --producer.config client-ssl.properties
  3. #消费消息
  4. ./kafka-console-consumer.sh --bootstrap-server 172.17.0.53:9093 --topic test --from-beginning --consumer.config client-ssl.properties
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册