How to Unit Test Kafka Streams Application – PART 1 (DSL)
This is part 1 of 2 articles to unit test Kafka Streams application. The first part talks about testing DSL transformation, stateless and stateful, including joining and windowing.
Overview
Unit testing. Test automation. How many times you hear about it every day. It is so beneficial and becoming so fundamental for every application that we build; we must provide them as our definition of done. Of course, the same rules apply when we build our Kafka Streams applications. We need to equip our applications with robust testing.
Unit Test Kafka Streams
Kafka Streams comes with a test-utils
package, which provides TopologyTestDriver
to assemble and simulate the topology, TestInputTopic
to pipe some data inputs, and TestOutputTopic
to produce the outputs by traversing the topology. In my experience, those three are sufficient enough to do test automation throughout the streams. But, who knows about the future complexity? Or maybe my streams are not complex enough yet to oversee the obstacles with the test-utils
package?
Well, as always, it is good to get basic knowledge and have a strong foundation. We can widen our skills more easily later if we grasp the essential technique to test the streams. So, let’s start.
I used the existing repository from the previous article and starting from there to implement the testing part.
Adding Test Libraries
For the unit test, I will use JUnit 5
, AssertJ
(I love this lib), and of course, test-utils
from Kafka.
<dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka-streams-test-utils</artifactId> | |
<version>2.5.0</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.assertj</groupId> | |
<artifactId>assertj-core</artifactId> | |
<version>3.18.1</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-api</artifactId> | |
<version>5.6.2</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-params</artifactId> | |
<version>5.6.2</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-engine</artifactId> | |
<version>5.6.2</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.platform</groupId> | |
<artifactId>junit-platform-launcher</artifactId> | |
<version>1.6.2</version> | |
<scope>test</scope> | |
</dependency> |
Word Count Streams
I will start with the very famous one for Kafka Streams. The word count app. It is as well-known as Hello World application.
Source
public void createTopology(StreamsBuilder builder) { | |
final KStream<String, String> textLines = builder | |
.stream("streams-plaintext-input", | |
Consumed.with(Serdes.String(), Serdes.String())); | |
textLines | |
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) | |
.groupBy((key, value) -> value) | |
.count(Materialized.as("WordCount")) | |
.toStream() | |
.to("streams-wordcount-output", | |
Produced.with(Serdes.String(), Serdes.Long())); | |
} |
Test
For the testing part. I will start by declaring the test class structure. (I show once here, the rest will be only the test method). First, we will set up the init()
method to be called for each test invocation. This init()
method mainly to set up the topology, configure the input and output topic with topic names that are exactly as same as the source class. Of course, we have to set up the serializer and deserializer as well for each topic.
// WordCountTopologyTest.java | |
private TopologyTestDriver testDriver; | |
private TestInputTopic<String, String> plainTextInput; | |
private TestOutputTopic<String, Long> wordCountOutput; | |
private final Serde<String> stringSerde = new Serdes.StringSerde(); | |
private final Serde<Long> longSerde = new Serdes.LongSerde(); | |
@BeforeEach | |
public void init() { | |
final Properties props = new Properties(); | |
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id"); | |
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); | |
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName()); | |
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName()); | |
WordCountTopology wordCountTopology = new WordCountTopology(); | |
final StreamsBuilder builder = new StreamsBuilder(); | |
wordCountTopology.createTopology(builder); | |
final Topology topology = builder.build(); | |
testDriver = new TopologyTestDriver(topology, props); | |
plainTextInput = testDriver.createInputTopic("streams-plaintext-input", stringSerde.serializer(), | |
stringSerde.serializer()); | |
wordCountOutput = testDriver.createOutputTopic("streams-wordcount-output", stringSerde.deserializer(), | |
longSerde.deserializer()); | |
} | |
@AfterEach | |
public void tearDown() throws IOException { | |
testDriver.close(); | |
FileUtils.deleteDirectory(new File("/tmp/kafka-streams/app-id")); | |
} |
And for the test, we will pipe three lines of text, then count the occurrence and put the result into a Map. By storing the output into the Map it makes sense because we only need to know the latest state of each word and its occurrence.
// WordCountTopologyTest.java | |
@Test | |
@DisplayName("Test word count streams") | |
public void testWordCountStream() { | |
String text1 = "Welcome to kafka streams"; | |
String text2 = "Kafka streams is great"; | |
String text3 = "Welcome back"; | |
// expected output | |
Map<String,Long> expected = Map.of( | |
"welcome", 2L, | |
"to", 1L, | |
"kafka", 2L, | |
"streams", 2L, | |
"is", 1L, | |
"great", 1L, | |
"back", 1L | |
); | |
plainTextInput.pipeInput(null,text1); | |
plainTextInput.pipeInput(null,text2); | |
plainTextInput.pipeInput(null,text3); | |
assertThat(wordCountOutput.isEmpty()).isFalse(); | |
// result | |
Map<String, Long> result = new HashMap<>(); | |
while(!wordCountOutput.isEmpty()) { | |
final KeyValue<String, Long> kv = wordCountOutput.readKeyValue(); | |
result.put(kv.key, kv.value); | |
} | |
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected); | |
assertThat(wordCountOutput.isEmpty()).isTrue(); | |
} |
Alright, just a piece of cake, isn’t it? Let’s move to another use case.
Word Count Streams with Windowing
Now let us add some spice a little bit with our word-count streams. We will group-by the stream, but we will do the operation by windowing the stream every 5 minutes. And for this case, I choose the tumbling time windows as a window mechanism. Therefore the counter will be reset every 5 minutes.
There are many types of windowing in Kafka Streams. As usual, for more information, please visit https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#windowing
Source
//WordCountTimeWindowsTopology.java -- see at line 10 | |
public void createTopology(StreamsBuilder builder) { | |
final KStream<String, String> textLines = builder | |
.stream("streams-plaintext-input", | |
Consumed.with(Serdes.String(), Serdes.String())); | |
textLines | |
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) | |
.groupBy((key, value) -> value) | |
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(5))) | |
.count(Materialized.as("WordCount")) | |
.toStream() | |
.map((key, value) -> new KeyValue<>(key.key(),value)) | |
.to("streams-wordcount-output", | |
Produced.with(Serdes.String(), Serdes.Long())); | |
} |
Note: see line 10 on how to configure the windows.
Test
For this part, the test will be more or less likely similar to the previous one with a little modification. We will pipe the third text 5 minutes after the first two. But it will not make any sense for our testing to put some delay until 5 minutes. Luckily, TestInputTopic
provides the mechanism to manipulate the timestamp by putting Duration
while piping the input data.
// WordCountTimeWindowsTopologyTest.java - see at line 29 | |
@Test | |
@DisplayName("Test word count streams") | |
public void testWordCountStream() { | |
String text1 = "Welcome to kafka streams"; | |
String text2 = "Kafka streams is great"; | |
String text3 = "Welcome back"; | |
// expected output | |
Map<String,Long> expected = Map.of( | |
// please take note, now the welcome is only 1. | |
// Because the second welcome word, come after 5 minutes duration. | |
"welcome", 1L, | |
"to", 1L, | |
"kafka", 2L, | |
"streams", 2L, | |
"is", 1L, | |
"great", 1L, | |
"back", 1L | |
); | |
final Instant now = Instant.now(); | |
// insert two lines with the same timestamp | |
plainTextInput.pipeInput(null,text1, now); | |
plainTextInput.pipeInput(null,text2, now); | |
// simulate 5 minutes after the first two | |
plainTextInput.pipeInput(null,text3, now.plus(5, ChronoUnit.MINUTES)); | |
assertThat(wordCountOutput.isEmpty()).isFalse(); | |
// result | |
Map<String, Long> result = new HashMap<>(); | |
while(!wordCountOutput.isEmpty()) { | |
final KeyValue<String, Long> kv = wordCountOutput.readKeyValue(); | |
result.put(kv.key, kv.value); | |
} | |
assertThat(result).containsExactlyInAnyOrderEntriesOf(expected); | |
assertThat(wordCountOutput.isEmpty()).isTrue(); | |
} |
Note: I expect the welcome
to only have value one instead of two because the stream will be reset every 5 minutes. See line 29 on how to advance the time-windows.
Another piece of cake, isn’t it? Thanks to TestInputTopic
to enable us to manipulate the timestamp.
Join Streams
When you are working with streams, at some point, there is a big possibility that you will need to join your streams. Therefore, I came with this sub-topic for testing streams join. For the use case, I will use the existing streams from my previous article, which is the Employee streams.
Source
I will not show the source snippet here, but if you want to see how it is, please visit the article. I might update a slight modification here while joining with Employment History streams. Instead of an inner join, I used left join because I realized what if the employee is a fresh-grad and has no employment history records?
Test
The testing strategy for join-streams is not much different than the previous ones. The same methods are applied. We need to provide the inputs and read from the output. The difference is only the number of input topics. Since we use Employee topology, which has three input topics (EMPLOYEE, DEPT, and EMPLOYMENT-HISTORY), therefore we need to provide three TestInputTopic
variables for each of them.
I created two test cases. One is to test employee topology without Employment History. We can expect an output here because of the nature of left-join streams; if there is no data on the right side, the streams will continue to proceed. And the other one is to test employee topology with Employment History.
// EmployeeTopologyTest.java | |
@Test | |
@DisplayName("Test Employee Topology between department and employee, exclude employment history") | |
public void testEmployeeAggregationTopologyWithoutEmploymentHistory() { | |
// Finance Department | |
DepartmentDto financeDept = DepartmentDto.builder() | |
.deptId(1) | |
.deptName("Finance") | |
.build(); | |
// Employee: Alice | |
EmployeeDto alice = EmployeeDto.builder() | |
.empId(1000) | |
.empName("Alice") | |
.deptId(1) | |
.build(); | |
// expected output | |
EmployeeResultDto expected = new EmployeeResultDto(); | |
expected.setDeptId(1); | |
expected.setDeptName("Finance"); | |
expected.setEmpId(1000); | |
expected.setEmpName("Alice"); | |
// 1.1 insert finance dept to DEPT topic. | |
// Remember: I put key as null value because we do key repartitioning in deptTable. | |
// But this depends on your use case. | |
deptInput.pipeInput(null, financeDept); | |
// 1.2 output topic (EMP-RESULT) is empty because inner join behaviour between employee and dept | |
assertThat(employeeOutput.isEmpty()).isTrue(); | |
// 2.1 insert employee to EMPLOYEE topic. | |
// Remember: I put key as null value because we do key repartitioning in empTable. | |
// But this depends on your use case. | |
employeeInput.pipeInput(null, alice); | |
// 2.2 output topic (EMP-RESULT) now is not empty because there are two stream data with associated key (dept_id) | |
assertThat(employeeOutput.isEmpty()).isFalse(); | |
assertThat(employeeOutput.readKeyValue()).isEqualTo(new KeyValue<>(1000, expected)); | |
// 2.3 make sure no record left in the output topic | |
assertThat(employeeOutput.isEmpty()).isTrue(); | |
} | |
@Test | |
@DisplayName("Test Employee Topology between department and employee, include employment history") | |
public void testEmployeeAggregationTopologyWithEmploymentHistory() { | |
// Finance Department | |
DepartmentDto financeDept = DepartmentDto.builder() | |
.deptId(1) | |
.deptName("Finance") | |
.build(); | |
// Employee: Alice | |
EmployeeDto alice = EmployeeDto.builder() | |
.empId(1000) | |
.empName("Alice") | |
.deptId(1) | |
.build(); | |
// History: Company A | |
EmploymentHistoryDto historyCompanyA = EmploymentHistoryDto.builder() | |
.empHistId(1) | |
.empId(1000) | |
.employerName("Company A") | |
.build(); | |
// History: Company B | |
EmploymentHistoryDto historyCompanyB = EmploymentHistoryDto.builder() | |
.empHistId(1) | |
.empId(1000) | |
.employerName("Company B") | |
.build(); | |
// expected output | |
EmployeeResultDto expected = new EmployeeResultDto(); | |
expected.setDeptId(1); | |
expected.setDeptName("Finance"); | |
expected.setEmpId(1000); | |
expected.setEmpName("Alice"); | |
expected.setEmploymentHistory(Set.of("Company A", "Company B")); | |
// 1. insert finance dept to DEPT topic. | |
// Remember: I put key as null value because we do key repartitioning in deptTable. | |
// But this depends on your use case. | |
deptInput.pipeInput(null, financeDept); | |
// 2. insert employee to EMPLOYEE topic. | |
// Remember: I put key as null value because we do key repartitioning in empTable. | |
// But this depends on your use case. | |
employeeInput.pipeInput(null, alice); | |
// 3. insert employee to EMPLOYMENT-HISTORY topic. | |
// Remember: I put key as null value because we do key repartitioning in empTable. | |
// But this depends on your use case. | |
employmentHistoryInput.pipeInput(null, historyCompanyA); | |
employmentHistoryInput.pipeInput(null, historyCompanyB); | |
// make sure topic is not empty | |
assertThat(employeeOutput.isEmpty()).isFalse(); | |
// loop until last records, because we cannot predict the left-join behaviour. | |
// what we now is only the last record should be as what we expected. | |
KeyValue<Integer, EmployeeResultDto> kv = null; | |
while(!employeeOutput.isEmpty()) { | |
kv = employeeOutput.readKeyValue(); | |
} | |
// make sure kv is not null | |
assertThat(kv).isNotNull(); | |
assertThat(kv).isEqualTo(new KeyValue<>(1000, expected)); | |
// make sure no record left in the output topic | |
assertThat(employeeOutput.isEmpty()).isTrue(); | |
} |
To be Continue..
I finished the DSL part until the join use case. There are still lots of DSL types for Kafka Streams, but in my experience, the tricks are always similar to these previous three cases. And there is another type of Kafka Streams, which is Processor API.
I will discuss how to test about Processor API in the part 2.
One thought on “How to Unit Test Kafka Streams Application – PART 1 (DSL)”