The Spring Kafka project comes with a spring-kafka-test JAR that contains a number of useful utilities to assist you with your application unit testing.
These include an embedded Kafka broker, some static methods to setup consumers/producers and utility methods to fetch results. Let’s demonstrate how these test utilities can be used with a code sample.
We will start from a previous Spring Kafka example in which we created a consumer and producer using Spring Kafka, Spring Boot, and Maven.
A dedicated unit test case for the producer shows how to check that messages are being sent. A second unit test case verifies that messages are received.
2. General Project Overview
Tools used:
Spring Kafka 2.2
Spring Boot 2.1
Maven 3.5
The project has the following directory structure:
3. Maven Setup
Make sure the spring-kafka-test dependency is included in the Maven POM file together with the Spring Kafka and Spring Boot dependencies.
The message consumer and producer classes from the Hello World example are unchanged so we won’t go into detail explaining them. You can check out the Spring Kafka Maven project for more details.
4. Unit Testing with an Embedded Kafka
spring-kafka-test includes an embedded Kafka broker that can be created via a JUnit @ClassRule annotation. The rule will start a ZooKeeper and Kafka server instance on a random port before all the test cases are run, and stops the instances once the test cases are finished.
The EmbeddedKafkaRule constructor takes as parameters: the number of Kafka servers to start, whether a controlled shutdown is needed and the topics that need to be created on the server.
The below snippet shows how the embedded Kafa is defined in each test class.
Always pass the topics as a parameter to the embedded Kafka server. This assures that the topics are not auto-created and present when the MessageListener connects.
As the embedded broker is started on a random port, we can’t use the fix value in the src/main/resources/application.yml properties file. Luckily the @ClassRule sets a spring.embedded.kafka.brokers system property to the address of the embedded broker(s). We will assign the value of this property to the kafka.bootstrap-servers property that is used by the SenderConfig and ReceiverConfig classes. Since this is only needed during unit testing, we create a dedicated testapplication.yml properties file under src/test/resources.
In other words, the application.yml located in the src/test/resources directory contains following:
To have the correct broker address set on the Sender and Receiver beans during each test case, we need to use the @DirtiesContext on all test classes. The reason for this is that each test case contains its own embedded Kafka broker that will each be created on a new random port. By rebuilding the application context, the beans will always be set with the current broker address.
5. Testing the Producer
In the SpringKafkaSenderTest test case, we will be testing the Sender by sending a message to a ‘sender.t’ topic. We will verify whether the sending works by setting up a test-listener on the topic. All the setup will be done before the test case runs using the @Before annotation.
For creating the needed consumer properties a static consumerProps() method provided by KafkaUtils is used. We then create a DefaultKafkaConsumerFactory and ContainerProperties which contains runtime properties (in this case the topic name) for the listener container. Both are then passed to the KafkaMessageListenerContainer constructor.
Received messages need to be stored somewhere. In this example, a thread-safe BlockingQueue is used. We create a new MessageListener and in the onMessage() method we add the received message to the BlockingQueue.
The listener is started by starting the container.
In order to avoid that we send a message before the container has required the number of assigned partitions, we use the waitForAssignment() method on the ContainerTestUtils helper class.
The actual unit test itself consists out of sending a greeting and asserting that the received value is the same as the one that was sent.
The Spring Kafka Test JAR ships with some Hamcrest Matchers that allow checking if the key, value or partition of a received message matches with an expected value. In the below unit test we use a Matcher to check the value of the received message.
The JAR also includes some AssertJ conditions that allow asserting if a received message contains a specific key, value or partition. We illustrate the usage of such a condition by asserting that the key of the received message is null.
For both the Hamcrest matchers and AssertJ conditions, make sure the static imports have been specified.
Note the @DirtiesContext annotation that ensures the correct Kafka broker address is set as explained above.
6. Testing the Consumer
The second SpringKafkaReceiverTest test class focuses on the Receiver which listens to a ‘receiver.t’ topic as defined in the applications.yml properties file. To check the correct working, we use a test-template to send a message to this topic. All setup will be done before the test case runs using the @Before annotation.
The producer properties are created using the static senderProps() method provided by KafkaUtils. These properties are then used to create a DefaultKafkaProducerFactory which is in turn used to create a KafkaTemplate. Finally, we set the default topic that the template uses to ‘receiver.t’.
We need to ensure that the Receiver is initialized before sending the test message. For this we use the waitForAssignment() method of ContainerTestUtils. The link to the message listener container is acquired by auto-wiring the KafkaListenerEndpointRegistry which manages the lifecycle of the listener containers that are not created manually.
Note that if you do not create the topics using the EmbeddedKafkaRule constructor you need to set the partitions per topic to 1 in the waitForAssignment() method instead of getting the partitions from the embedded Kafka server. The reason for this is that it looks like 1 is used as a default for the number of partitions in case topics are created implicitly.
In the test, we send a greeting and check that the message was received by asserting that the latch of the Receiver was lowered to zero.
7. Running the Unit Test Cases
Let’s run above test cases by opening a command prompt and executing following Maven command:
Maven will download the needed dependencies, compile the code and run the unit test case. The result should be a successful build during which following logs are generated:
If you would like to run the above code sample you can get the full source code here.
This concludes our example in which we unit test sending and receiving from Spring Kafka by starting an embedded Kafka broker.
Feel free to drop a line in case of any questions or if you found this post helpful.