How to Encrypt and Decrypt Kafka Message using Custom (De)Serializers
Sensitive data always need to be handled with extra careful. Thus, in some cases, we need to encrypt before delivering message to Kafka topic.
Overview
Today’s post still has correlation with two previous articles, which talk about Kafka and security. Even though we already guarded our Kafka with authorization just like in the first post; and we cautiously reserved our client with authentication and roles-based access like in the second post, in my own perspective it is not enough. Because occasionally our message contains sensitive information, such as ID number, Credit Card number, etc. We have to be careful to store those kind of information into memory or disk as a plain text. The last thing you want is your customer sensitive data spread all over the internet. It will destroy your company reputations and can impact in financial loss. Therefore, I will address message encryption and decryption in Kafka.
Approach
There are many ways to do this kind of approach. First, there are various encryption algorithms. We have two types of keys, which are symmetric and asymmetric. AES, 3-DES, Blowfish are the examples of symmetric encryption. For asymmetric, we have RSA, PKCS and DSA as examples. We will use symmetric keys for Kafka message encryption. The reason is because there is a possibility one message can be consumed with multiple consumer group. Meanwhile, asymmetric use public key to encrypt and private key to decrypt. It will raise a question, which consumer-group’s public key would be used for encryption? Consequently, we need to share the same key for encryption and decryption to eliminate confusion and reach some consensus between consumer-groups. For this, I pick AES as encryption algorithm due to its acceptance worldwide.
Next, the way we will encrypt the message. Of course there are some options to do this. We can encrypt sensitive value, set the value into POJO using setter method, then send the message to Kafka. Another way is we optimize Kafka Serializer/De-Serializer mechanism. Thus, we send POJO to Kafka. Kafka then will convert to JSON, encrypt it then store into topic. I will choose the latter approach in this article.
Why do we need to convert to JSON first then encrypt? Why don’t we serialize object using ByteArrayOutputStream
since the encryption only need byte after all? The answer is because our consumer should not be limited with Java client. We want every service that will subscribe to our topic, regardless its programming language, can still process the message. Because JSON is very common across languages, we should find no issue with this approach.
Sample Code
We need three additional java files to fulfill the approach and two configuration changes.
AES.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
// taken from https://howtodoinjava.com/security/aes-256-encryption-decryption/ with slight modification public class AES { public static byte[] encrypt(byte[] values, String secret, String salt) { try { byte[] iv = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; IvParameterSpec ivspec = new IvParameterSpec(iv); SecretKeyFactory factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256"); KeySpec spec = new PBEKeySpec(secret.toCharArray(), salt.getBytes(), 65536, 256); SecretKey tmp = factory.generateSecret(spec); SecretKeySpec secretKey = new SecretKeySpec(tmp.getEncoded(), "AES"); Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding"); cipher.init(Cipher.ENCRYPT_MODE, secretKey, ivspec); return cipher.doFinal(values); } catch (Exception e) { System.out.println("Error while encrypting: " + e.toString()); } return null; } public static byte[] decrypt(byte[] val, String secret, String salt) { try { byte[] iv = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; IvParameterSpec ivspec = new IvParameterSpec(iv); SecretKeyFactory factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256"); KeySpec spec = new PBEKeySpec(secret.toCharArray(), salt.getBytes(), 65536, 256); SecretKey tmp = factory.generateSecret(spec); SecretKeySpec secretKey = new SecretKeySpec(tmp.getEncoded(), "AES"); Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5PADDING"); cipher.init(Cipher.DECRYPT_MODE, secretKey, ivspec); return cipher.doFinal(val); } catch (Exception e) { System.out.println("Error while decrypting: " + e.toString()); } return null; } } |
AESSerializer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
public class AESSerializer<T> extends JsonSerializer<T> { public static final String AES_SECRET_KEY = "aes.serializer.secret.keys"; public static final String AES_SALT_KEY = "aes.serializer.salt.keys"; private String secret = null; private String salt = null; @Override public void configure(Map<String, ?> configs, boolean isKey) { super.configure(configs, isKey); secret = (String) configs.get(AES_SECRET_KEY); if (secret == null) { throw new SerializationException(AES_SECRET_KEY + " cannot be null."); } salt = (String) configs.get(AES_SALT_KEY); if (salt == null) { throw new SerializationException(AES_SALT_KEY + " cannot be null."); } } @Override public byte[] serialize(String topic, T data) { byte[] json = super.serialize(topic, data); return AES.encrypt(json, secret, salt); } } |
AESDeserializer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
public class AESDeserializer<T> extends JsonDeserializer<T> { public static final String AES_SECRET_KEY = "aes.deserializer.secret.keys"; public static final String AES_SALT_KEY = "aes.deserializer.salt.keys"; private String secret = null; private String salt = null; @Override public void configure(Map<String, ?> configs, boolean isKey) { super.configure(configs, isKey); secret = (String) configs.get(AES_SECRET_KEY); if (secret == null) { throw new SerializationException(AES_SECRET_KEY + " cannot be null."); } salt = (String) configs.get(AES_SALT_KEY); if (salt == null) { throw new SerializationException(AES_SALT_KEY + " cannot be null."); } } @Override public T deserialize(String topic, Headers headers, byte[] data) { return super.deserialize(topic, headers, AES.decrypt(data, secret, salt)); } @Override public T deserialize(String topic, byte[] data) { return super.deserialize(topic, AES.decrypt(data, secret, salt)); } } |
KafkaConfiguration.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
// snippet @Bean public ProducerFactory<String, Object> producerFactory() { final Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AESSerializer.class); configProps.put(AESSerializer.AES_SECRET_KEY, "SDGhEh1UEkRBLJXS"); configProps.put(AESSerializer.AES_SALT_KEY, "h4R6gewBUPMnS3FZ"); configProps.put(ProducerConfig.ACKS_CONFIG, "all"); configProps.put(ProducerConfig.CLIENT_ID_CONFIG, "cid1"); configProps.put("sasl.mechanism", SCRAM_SHA_256); configProps.put("sasl.jaas.config", prodJaasCfg); configProps.put("security.protocol", SASL_PROTOCOL); configProps.put("ssl.truststore.location", TRUSTSTORE_JKS); configProps.put("ssl.truststore.password", "password"); configProps.put("ssl.endpoint.identification.algorithm", ""); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public ConsumerFactory<String, String> consumerFactory() { final Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AESDeserializer.class); props.put(AESDeserializer.AES_SECRET_KEY, "SDGhEh1UEkRBLJXS"); props.put(AESDeserializer.AES_SALT_KEY, "h4R6gewBUPMnS3FZ"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(JsonDeserializer.TRUSTED_PACKAGES, TRUSTED_PACKAGE); props.put("sasl.mechanism", SCRAM_SHA_256); props.put("sasl.jaas.config", consJaasCfg); props.put("security.protocol", SASL_PROTOCOL); props.put("ssl.truststore.location", TRUSTSTORE_JKS); props.put("ssl.truststore.password", "password"); props.put("ssl.endpoint.identification.algorithm", ""); return new DefaultKafkaConsumerFactory<>(props); } |
Conclusion
In this example, I hard coded password and salt values. Of course, it is not a good practice. You can shift those two values inside properties file with Jasypt encrypted values, or better to put them into Hashicorp Vault. The samples are already written in my previous posts.
As a reminder, every approach has its own benefits and drawbacks. Tighten up security, it will drain performance. You should know the best option for your solution. As usual, the code is on my GitHub repository, inside branch encrypt (https://github.com/ru-rocker/spring-kafka-sasl-scram/tree/encrypt).
Pretty cool post, i really appreciate it. Something like this i wanted to implement. My idea was to encode avro messages instead of text message without schema. I just did a gist file, in scala.
https://gist.github.com/alonsoir/acfff23335eae53b5a7cd8f80171e951
your solution is much cleaner, i prefer it over mine.
Thanks a lot.
Glad this helps 🙂