How to Publish Subscribe to Kafka with Spring and SASL/SCRAM
After we secure Kafka Broker and Zookeeper with SASL/SCRAM, it is time for client (Java + Spring) to connect to secured Kafka.
Overview
As per my promise in previous post, in this article I will show from client perspective how to connect to Zookeeper and Kafka Broker with SASL/SCRAM protocol.
We have configured authentication between zookeeper and broker, as well as inter brokers. However, it is still not enough from security perspectives. We are missing authentications.
To place with security requirements, let me demonstrate a simple role based authorization for producer and consumer as well.
For this tutorial I will use, as usual, Spring Boot combine with Spring Kafka.
Use Case
First, we are going to create a topic. The topic name will be my-topic
. Then we will create a producer to put message into topic, as well as a consumer to read message from the topic. Therefore, we have to define actors for the use case. Thus, we will need three personas, which are:
- Administrator.
Administrator is a super user. His main responsibilities are: user management, role based access management and topic management. Administrator is represented by user admin.
Note: user admin has already created in previous post. - Producer
Producer is the one who publish messages to topic. Alice will be the producer in this case.
- Consumer
Consumer is the one who subscribe messages from topic. Bob will be the consumer in this case. And he belongs to consumer group calledmy-consumer-group
.
Broker Configuration
First we need to update several broker properties inside /etc/kafka/server.properties
.
1 2 3 4 5 6 7 8 |
# for ACL (RBAC) authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # set this to true for allowing users to access topic with no associated ACLs as default behavior allow.everyone.if.no.acl.found=true # configure super user super.users=User:admin |
Since we want to minimize impact for existing topics, we need to set allow.everyone.if.no.acl.found
to true. This setting will allow existing topic with no ACLs configuration to continue to run as usual. After we gradually set all topics with ACL, we can safely set the value to false.
Authorization and ACLs
We need to additional users, which are alice and bob as a producer and a consumer respectively.
1 2 3 4 5 6 7 8 9 10 11 |
# alice /usr/bin/kafka-configs --zookeeper \ '172.20.20.10:2181,172.20.20.20:2181,172.20.20.30:2181' \ --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=producer-secret]' \ --entity-type users --entity-name alice # bob /usr/bin/kafka-configs --zookeeper \ '172.20.20.10:2181,172.20.20.20:2181,172.20.20.30:2181' \ --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=bob-secret],SCRAM-SHA-512=[password=producer-secret]' \ --entity-type users --entity-name bob |
For configuring authorization and ACL, we create topic and give it a label as my-topic
.
1 2 3 4 5 6 7 8 |
# create topic /usr/bin/kafka-topics --create --zookeeper \ '172.20.20.10:2181,172.20.20.20:2181,172.20.20.30:2181' \ --replication-factor 3 --partitions 2 --topic my-topic # list topic to determine whether my-topic has been created /usr/bin/kafka-topics --zookeeper \ '172.20.20.10:2181,172.20.20.20:2181,172.20.20.30:2181' -list |
Next, we will set ACLs for my-topic
. Alice will get producer
role meanwhile Bob will get consumer
roles.
1 2 3 4 5 6 7 8 9 |
# producer role /usr/bin/kafka-acls --authorizer-properties zookeeper.connect='172.20.20.10:2181,172.20.20.20:2181,172.20.20.30:2181' \ --add --allow-principal User:alice \ --producer --topic my-topic # consuemr role /usr/bin/kafka-acls --authorizer-properties zookeeper.connect='172.20.20.10:2181,172.20.20.20:2181,172.20.20.30:2181' \ --add --allow-principal User:bob \ --consumer --topic my-topic --group my-consumer-group |
JAAS
During the bootstrap, spring will load and delegate org.springframework.kafka.core.KafkaAdmin
to AdminClient
into application context. AdminClient
then will try to authenticate and connect to Kafka server. Of course, because previously we set allow.everyone.if.no.acl.found
with value true, we can safely ignore the authentication. But as I mentioned previously, in the end our target is to set this property value into false. Thus, we need to setup the security as well for KafkaAdmin
using JAAS. We will create JAAS configuration file in our client host, under /var/private/jaas/jaas-spring-client.conf
.
1 2 3 4 5 |
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="alice" password="alice-secret"; }; |
Note: sample JAAS configuration under src/main/resources
folder. Username depends on which configuration you are working on. In this example, I put the configuration for producer, hence the username is alice.
Next, we set JVM properties during startup.
1 2 3 4 5 |
public static void main(final String[] args) { System.setProperty("java.security.auth.login.config", "/var/private/jaas/jaas-spring-client.conf"); SpringApplication.run(KafkaSecurityApplication.class, args); } |
Producer
In producer configurations, we need to insert additional properties for SSL and SASL mechanism. And as a reminder, I put ssl.endpoint.identification.algorithm
to empty string because my certificate does not contain FQDN. One more thing, I exported kafka.client.truststore.jks
from previous post, and put it on client machine under folder /var/private/ssl/
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
/*snippet*/ private static final String TRUSTSTORE_JKS = "/var/private/ssl/kafka.client.truststore.jks"; private static final String SASL_PROTOCOL = "SASL_SSL"; private static final String SCRAM_SHA_256 = "SCRAM-SHA-256"; private final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; private final String prodJaasCfg = String.format(jaasTemplate, "alice", "alice-secret"); @Bean public ProducerFactory<String, Object> producerFactory() { final Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); configProps.put(ProducerConfig.ACKS_CONFIG, "all"); configProps.put(ProducerConfig.CLIENT_ID_CONFIG, "cid1"); configProps.put("sasl.mechanism", SCRAM_SHA_256); configProps.put("sasl.jaas.config", prodJaasCfg); configProps.put("security.protocol", SASL_PROTOCOL); configProps.put("ssl.truststore.location", TRUSTSTORE_JKS); configProps.put("ssl.truststore.password", "password"); configProps.put("ssl.endpoint.identification.algorithm", ""); return new DefaultKafkaProducerFactory<>(configProps); } |
Consumer
For consumer, not much different with producer configuration.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
/* snippet code */ private static final String TRUSTSTORE_JKS = "/var/private/ssl/kafka.client.truststore.jks"; private static final String SASL_PROTOCOL = "SASL_SSL"; private static final String SCRAM_SHA_256 = "SCRAM-SHA-256"; private final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; private final String consJaasCfg = String.format(jaasTemplate, "bob", "bob-secret"); private static final String TRUSTED_PACKAGE = "com.rurocker.demo.kafkasecurity.dto"; @Bean public ConsumerFactory<String, String> consumerFactory() { final Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(JsonDeserializer.TRUSTED_PACKAGES, TRUSTED_PACKAGE); props.put("sasl.mechanism", SCRAM_SHA_256); props.put("sasl.jaas.config", consJaasCfg); props.put("security.protocol", SASL_PROTOCOL); props.put("ssl.truststore.location", TRUSTSTORE_JKS); props.put("ssl.truststore.password", "password"); props.put("ssl.endpoint.identification.algorithm", ""); return new DefaultKafkaConsumerFactory<>(props); } |
Now you can produce message, by using KafkaTemplate
and consume message with @KafkaListener
as usual. The producer example is under KafkaProducer
class and the consumer is under TestConsumer
class.
Conclusion
That’s it my second articles about configuring authentication and authorization in Kafka from zookeeper and broker perspectives as well as client perspectives. You can play around by changing the username, password, roles and any possibility to make sure if the RBAC is already setup as expected. Next article will talk about message encryption.
Meanwhile, the GitHub repository for this sample is on this https://github.com/ru-rocker/spring-kafka-sasl-scram.
Thanks for sharing !
I have to add that SSL and this part contain few important misleading information. I guess you didnt check if it will work what you wrote 🙂