How to Secure Confluent Kafka with SSL and SASL/SCRAM
Overview
When I decided to include Apache Kafka as one of our technology stack, I never imagine the demand would be this huge. At first, my requirements were only two things. Something faster than ActiveMQ for MQTT needs, and future necessity for event driven approach. It started well in late 2017 and established as one of our important technology stack until now.
As a consequence, the requirements for using and storing message inside Kafka are getting bigger and complex. One of these requirements is security aspect. Even though we put Kafka inside our private network and behind firewall, still it is not sufficient enough. People started to ask about how secure Kafka is; are the connections encrypted; and how about authentication and authorization for each Kafka topic.
Therefore this article is started by securing Apache Kafka systems first. Then I will write about client connection into Kafka brokers based on role based access mechanism.
This article is a summary from security tutorial from confluent page There are lot of security methods based on this link, and I choose SASL/SCRAM method for my use case.
Since my VM installation is using Confluent version is 5.1.2, this article will based on that version.
SSL
First of all, I’ll go with securing the connection using SSL protocol. Each machine in cluster has public-private key and certificate as an identity. Because the machines are located inside private network, I decided to create self-signed certificate for each machine. Therefore I have to register all of self-signed certificate into each machine trust store (JKS).
SASL/SCRAM and JAAS
Salted Challenge Response Authentication Mechanism (SCRAM) is a family of modern, password-based challenge mechanism providing authentication of a user to a server. It can be used for password based login to services¹. Apache Kafka itself supports SCRAM-SHA-256 and SCRAM-SHA-512.
JAAS (Java Authentication and Authorization Service) is a Java implementation of Pluggable Authentication Module (PAM). We are going to use JAAS for inter-broker authentication and zookeeper-broker authentication.
Use Case
Use case for this article is upgrade existing Kafka server that has been installed in past article by adding some security layer on top of it (SASL/SCRAM) with SCRAM-SHA-256 mechanism.
And as a pre-existing setup, we already have had three-servers, which their respective IPs are 172.20.20.10 (node-1), 172.20.20.20 (node-2) and 172.20.20.30 (node-3). Each node has zookeeper and broker in their machine.
Certificate
Because some host naming issue on my Kafka environment (Vagrant), I disabled host-name verification for this tutorial. However, you can always enable it and put FQDN into CN or SAN fields while generating certificate.
One more thing, the certificate validity is set to 10 years because I am just lazy to generate new one each year (not a good behavior).
For each node, we will generate one key store and two trust store. The key store is needed as private key for broker and the trust store as a public key for broker and client (spring-kafka for instance). Just as a reminder, we need trust store because this is self-signed certificate (not trusted authority by JVM point of view).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# do this in node-1, node-2 and node-3 mkdir -p /var/private/ssl cd /var/private/ssl # generate keys keytool -keystore kafka.server.keystore.jks -alias localhost -validity 3650 -genkey # Create Certificate Authority. # Pick one of the node, or separate server to store ca-key and ca-cert. openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650 # add certificate to client and broker trust store. # each broker and client in each node register the same ca-cert keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert # Create CSR – signing request for each node. # {n} represent node number. For instance, node-1 will be cert-file1 keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file{n} # Sign with CA for each cert-file. # Use same ca-key and ca-cert for each signing. openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file{n} -out cert-signed{n} -days 3650 -CAcreateserial -passin pass:{password} # Import into broker keystore keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed{n} |
Zookeeper
For zookeeper, the authentication is using Digest-MD5 combine with JAAS as an implementation.
1 2 3 4 |
# set authentication provider in /etc/kafka/zookeeper.properties and add following lines. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider authProvider.2=org.apache.zookeeper.server.auth.SASLAuthenticationProvider authProvider.3=org.apache.zookeeper.server.auth.SASLAuthenticationProvider |
Configure JAAS for zookeeper under /etc/kafka/zookeeper_jaas.conf
.
1 2 3 4 5 |
Server { org.apache.zookeeper.server.auth.DigestLoginModule required user_super="admin-secret" user_kafka="kafka-secret"; }; |
Set runtime configuration for storing KAFKA_OPTS environment variables because we are using systemd
to start zookeeper. And then update systemd
for zookeeper.
1 2 3 4 5 6 7 |
vim /etc/kafka/zk-runtime.cfg KAFKA_OPTS="-Djava.security.auth.login.config=etc/kafka/zookeeper_jaas.conf" vim /etc/systemd/system/multi-user.target.wants/confluent-zookeeper.service # right under the [Service] tag, add this line [Service] EnvironmentFile=/etc/kafka/zk-runtime.cfg |
Broker
I divided four steps for broker configurations. There are SSL configuration, JAAS configuration, SASL configuration and Listener configuration.
SSL
First step, configure keystore and truststore for each broker and security inter broker, under /etc/kafka/server.properties
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# put this line if your certificate does not contain FQDN ssl.endpoint.identification.algorithm= # ssl configuration ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password={password} ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password={password} ssl.key.password={password} # set security inter broker for SASL through SSL security.inter.broker.protocol=SASL_SSL ssl.client.auth=required # add authorizer for future needs (next article) authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer |
Note: please be careful if your certificate does not contain FQDN. You have to put additional properties as mentioned in line 2.
JAAS
Now we configure authentication for brokers. This configuration is intended as a communication between zookeeper and brokers, as well as between brokers themselves. Put the configuration under new file /etc/kafka/kafka_server_jaas.conf
.
1 2 3 4 5 6 7 8 9 10 11 |
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret"; }; Client { org.apache.zookeeper.server.auth.DigestLoginModule required username="kafka" password="kafka-secret"; }; |
Note: in the Client part, the username and password match with the zookeeper JAAS config for user_kafka. For admin username, will be created later in the SASL part.
Set runtime configuration for storing KAFKA_OPTS environment variables because we are using systemd
to start broker. And then update systemd
configuration for broker.
1 2 3 4 5 6 7 8 9 |
vim /etc/kafka/kafka-runtime.cfg KAFKA_OPTS="-Djava.security.auth.login.config=etc/kafka/zookeeper_jaas.conf" # update systemd vim /etc/systemd/system/multi-user.target.wants/confluent-kafka.service # right under the [Service] tag, add this line [Service] EnvironmentFile=/etc/kafka/kafka-runtime.cfg |
SASL Authentication
Before we configure SASL/SCRAM authentication into brokers, we will create admin user first. We can execute this command in any broker.
1 2 3 4 |
/usr/bin/kafka-configs --zookeeper \ '172.20.20.10:2181,172.20.20.20:2181,172.20.20.30:2181' --alter \ --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' \ --entity-type users --entity-name admin |
Configure /etc/kafka/server.properties
.
1 2 3 4 5 |
# List of enabled mechanisms, can be more than one sasl.enabled.mechanisms=SCRAM-SHA-256, PLAIN # Specify one of of the SASL mechanisms sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 |
Listener
Because we are going to do rolling update, and minimized impact for existing topic, we will open three listeners with different protocols and ports for each broker.
1 2 3 |
# /etc/kafka/server.properties listeners=PLAINTEXT://:9092,SSL://:9093,SASL_SSL://:9094 advertised.host.name=PLAINTEXT://172.20.20.10:9092,SSL://172.20.20.10:9093,SASL_SSL://172.20.20.10:9094 |
Note: change the IP address accordingly for every broker.
Restart Zookeeper and Kafka
After all the configurations are finished, it is time to restart zookeepers and brokers. First we need to reload systemd
because we changed the configuration.
Now restart zookeeper one by one, then follow by broker. You should see there are three ports, and they are ready to receive connection from client. You can choose port 9092 for PLAIN connection, 9093 for SSL connection and 9094 for SASL_SSL connection (Of course you need to provide authentication to access 9094 port). Later I will show how to connect to Kafka with SCRAM authentication by using spring boot.
And here are the final properties file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################# Server Basics ############################# | |
# The id of the broker. This must be set to a unique integer for each broker. | |
broker.id=2 | |
############################# Socket Server Settings ############################# | |
# The address the socket server listens on. It will get the value returned from | |
# java.net.InetAddress.getCanonicalHostName() if not configured. | |
# FORMAT: | |
# listeners = listener_name://host_name:port | |
# EXAMPLE: | |
# listeners = PLAINTEXT://your.host.name:9092 | |
listeners=PLAINTEXT://:9092,SSL://:9093,SASL_SSL://:9094 | |
# Hostname and port the broker will advertise to producers and consumers. If not set, | |
# it uses the value for "listeners" if configured. Otherwise, it will use the value | |
# returned from java.net.InetAddress.getCanonicalHostName(). | |
#advertised.listeners=PLAINTEXT://your.host.name:9092 | |
advertised.host.name=172.20.20.20 | |
advertised.listeners=PLAINTEXT://172.20.20.20:9092,SSL://172.20.20.20:9093,SASL_SSL://172.20.20.20:9094 | |
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details | |
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL | |
# The number of threads that the server uses for receiving requests from the network and sending responses to the network | |
num.network.threads=3 | |
# The number of threads that the server uses for processing requests, which may include disk I/O | |
num.io.threads=8 | |
# The send buffer (SO_SNDBUF) used by the socket server | |
socket.send.buffer.bytes=102400 | |
# The receive buffer (SO_RCVBUF) used by the socket server | |
socket.receive.buffer.bytes=102400 | |
# The maximum size of a request that the socket server will accept (protection against OOM) | |
socket.request.max.bytes=104857600 | |
############################# Log Basics ############################# | |
# A comma seperated list of directories under which to store log files | |
log.dirs=/var/lib/kafka | |
# The default number of log partitions per topic. More partitions allow greater | |
# parallelism for consumption, but this will also result in more files across | |
# the brokers. | |
num.partitions=1 | |
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. | |
# This value is recommended to be increased for installations with data dirs located in RAID array. | |
num.recovery.threads.per.data.dir=1 | |
############################# Internal Topic Settings ############################# | |
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" | |
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3. | |
offsets.topic.replication.factor=1 | |
transaction.state.log.replication.factor=1 | |
transaction.state.log.min.isr=1 | |
# The minimum age of a log file to be eligible for deletion due to age | |
log.retention.hours=168 | |
# The maximum size of a log segment file. When this size is reached a new log segment will be created. | |
log.segment.bytes=1073741824 | |
# The interval at which log segments are checked to see if they can be deleted according | |
# to the retention policies | |
log.retention.check.interval.ms=300000 | |
zookeeper.connect=172.20.20.10:2181,172.20.20.20:2181,172.20.20.30:2181 | |
# Timeout in ms for connecting to zookeeper | |
zookeeper.connection.timeout.ms=60000 | |
group.initial.rebalance.delay.ms=0 | |
ssl.endpoint.identification.algorithm= | |
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks | |
ssl.truststore.password=password | |
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks | |
ssl.keystore.password=password | |
ssl.key.password=password | |
security.inter.broker.protocol=SASL_SSL | |
# List of enabled mechanisms, can be more than one | |
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN | |
# Specify one of of the SASL mechanisms | |
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 | |
ssl.client.auth=required | |
# for ACL (RBAC) | |
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer | |
allow.everyone.if.no.acl.found=true | |
super.users=User:admin |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The basic time unit in milliseconds used by ZooKeeper. | |
# It is used to do heartbeats and the minimum session timeout will be twice the tickTime. | |
tickTime=2000 | |
# the directory where the snapshot is stored. | |
dataDir=/var/lib/zookeeper | |
# the port at which the clients will connect | |
clientPort=2181 | |
# disable the per-ip limit on the number of connections since this is a non-production config | |
# maxClientCnxns=0 | |
initLimit=5 | |
syncLimit=2 | |
server.1=172.20.20.10:2888:3888 | |
server.2=172.20.20.20:2888:3888 | |
server.3=172.20.20.30:2888:3888 | |
autopurge.snapRetainCount=3 | |
autopurge.purgeInterval=24 | |
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider | |
authProvider.2=org.apache.zookeeper.server.auth.SASLAuthenticationProvider | |
authProvider.3=org.apache.zookeeper.server.auth.SASLAuthenticationProvider |
Conclusion
This is only first part, which is securing Zookeeper and Kafka. Next article will talk about how to connect to Kafka using SASL/SCRAM mechanism. Any missing configuration, please provide feedback. For now, try and enjoy playing with server configuration.
References
- https://docs.confluent.io/5.1.2/tutorials/security_tutorial.html
- https://docs.confluent.io/5.1.2/kafka/authentication_sasl/authentication_sasl_scram.html
- https://en.wikipedia.org/wiki/Simple_Authentication_and_Security_Layer
- https://en.wikipedia.org/wiki/Java_Authentication_and_Authorization_Service
In the zookeeper JAAS config, why do you repeat 3 times the authProvider?
authProvider.X=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
You are right. I was trying something else at that time. It should be only one.