In this post, you’re going to learn how to create a Spring Kafka Hello World example that uses Spring Boot and Maven.
(Step-by-step)
So if you’re a Spring Kafka beginner, you’ll love this guide.
Let’s get started.
If you want to learn more about Spring Kafka – head on over to the Spring Kafka tutorials page.
1. What is Spring Kafka?
The Spring for Apache Kafka (spring-kafka) project applies core Spring concepts to the development of Kafka-based messaging solutions.
It provides a ‘template’ as a high-level abstraction for sending messages. It also contains support for Message-driven POJOs with @KafkaListener
annotations and a listener container.
Spring Kafka is a Spring main project. It is developed and maintained by Pivotal Software.
To show how Spring Kafka works let’s create a simple Hello World example. We will build a sender to produce the message and a receiver to consume the message.
2. General Project Overview
Tools used:
- Spring Kafka 2.2
- Spring Boot 2.1
- Maven 3.5
Our project has the following directory structure:
3. Maven Setup
We build and run our example using Maven. If not already done, download and install Apache Maven.
Let’s use Spring Initializr to generate our Maven project. Make sure to select Kafka as a dependency.
Click Generate Project to generate and download the Spring Boot project template. At the root of the project, you’ll find a pom.xml file which is the XML representation of the Maven project.
To avoid having to manage the version compatibility of the different Spring dependencies, we will inherit the defaults from the spring-boot-starter-parent
parent POM.
The generated project contains Spring Boot Starters that manage the different Spring dependencies.
The spring-boot-starter
dependency is the core starter, it includes auto-configuration, logging, and YAML support. The spring-boot-starter-test
includes the dependencies for testing Spring Boot applications with libraries that include JUnit, Hamcrest and Mockito.
A dependency on spring-kafka
is added. We also include spring-kafka-test
to have access to an embedded Kafka broker when running our unit test.
Note that the version of Spring Kafka is linked to the version of the Apache Kafka client that is used. You need to align the version of Spring Kafka to the version of the Kafka broker you connect to. For more information consult the complete Kafka client compatibility list.
In the plugins section, you’ll find the Spring Boot Maven Plugin: spring-boot-maven-plugin
. It allows us to build a single, runnable “uber-jar”. This is a convenient way to execute and transport code. Also, the plugin allows you to start the example via a Maven command.
4. Spring Boot Setup
We use Spring Boot so that we have a Spring Kafka application that you can “just run”. Start by creating a SpringKafkaApplication
class. It contains the main()
method that uses Spring Boot’s SpringApplication.run()
to launch the application.
Note that
@SpringBootApplication
is a convenience annotation that adds:@Configuration
,@EnableAutoConfiguration
, and@ComponentScan
.
For more information on Spring Boot, check the Spring Boot getting started guide.
The below sections will detail how to create a sender and receiver together with their respective configurations. It is also possible to have Spring Boot autoconfigure Spring Kafka using default values so that actual code that needs to be written is reduced to a bare minimum.
This example will send/receive a simple
String
. If you would like to send more complex objects you could, for example, use an Avro Kafka serializer or the Kafka Jsonserializer that ships with Spring Kafka.
We also create an application.yml YAML properties file under src/main/resources. Properties from this file will be injected by Spring Boot into our configuration beans using the @Value
annotation.
5. Create a Spring Kafka Message Producer
For sending messages we will be using the KafkaTemplate
which wraps a Producer
and provides convenience methods to send data to Kafka topics. The template provides asynchronous send methods which return a ListenableFuture
.
In the Sender
class, the KafkaTemplate
is auto-wired as the creation will be done further below in a separate SenderConfig
class.
For this example, we will use the send()
method that takes as input a String
payload that needs to be sent.
Note that the Kafka broker default settings cause it to auto-create a topic when a request for an unknown topic is received.
The creation of the KafkaTemplate
and Sender
is handled in the SenderConfig
class. The class is annotated with @Configuration
which indicates that the class can be used by the Spring IoC container as a source of bean definitions.
In order to be able to use the Spring Kafka template, we need to configure a ProducerFactory
and provide it in the template’s constructor.
The producer factory needs to be set with some mandatory properties amongst which the ‘BOOTSTRAP_SERVERS_CONFIG’ property that specifies a list of host:port pairs used for establishing the initial connections to the Kafka cluster. Note that this value is configurable as it is fetched from the application.yml configuration file.
A message in Kafka is a key-value pair with a small amount of associated metadata. As Kafka stores and transports Byte
arrays, we need to specify the format from which the key and value will be serialized. In this example we are sending a String
as payload, as such we specify the StringSerializer
class which will take care of the needed transformation.
For a complete list of the other configuration parameters, you can consult the Kafka ProducerConfig API.
6. Create a Spring Kafka Message Consumer
Like with any messaging-based application, you need to create a receiver that will handle the published messages. The Receiver
is nothing more than a simple POJO that defines a method for receiving messages. In the below example we named the method receive()
, but you can name it anything you like.
The @KafkaListener
annotation creates a ConcurrentMessageListenerContainer
message listener container behind the scenes for each annotated method. To do so, a factory bean with name kafkaListenerContainerFactory
is expected that we will configure in the next section.
Using the topics
element, we specify the topics for this listener.
For more information on the other available elements on the KafkaListener
, you can consult the API documentation.
For testing convenience, we added a
CountDownLatch
. This allows the POJO to signal that a message is received. This is something you are not likely to implement in a production application.
The creation and configuration of the different Spring Beans needed for the Receiver
POJO are grouped in the ReceiverConfig
class. Similar to the SenderConfig
it is annotated with @Configuration
.
Note the
@EnableKafka
annotation which enables the detection of the@KafkaListener
annotation that was used on the previousReceiver
class.
The kafkaListenerContainerFactory()
is used by the @KafkaListener
annotation from the Receiver
in order to configure a MessageListenerContainer
. To create it, a ConsumerFactory
and accompanying configuration Map
is needed.
In this example, a number of mandatory properties are set amongst which the initial connection and deserializer parameters.
We also specify a ‘GROUP_ID_CONFIG’ which allows to identify the group this consumer belongs to. Messages will be load balanced over consumer instances that have the same group id.
On top of that, we also set ‘AUTO_OFFSET_RESET_CONFIG’ to "earliest". This ensures that our consumer reads from the beginning of the topic even if some messages were already sent before it was able to startup.
For a complete list of the other configuration parameters, you can consult the Kafka ConsumerConfig API.
7. Testing the Spring Kafka Template & Listener
A basic SpringKafkaApplicationTest
is provided to verify that we are able to send and receive a message to and from Apache Kafka. It contains a testReceiver()
unit test case that uses the Sender
bean to send a message to the ‘helloworld.t’ topic on the Kafka bus.
We then check if the CountDownLatch
from the Receiver
was lowered from 1 to 0 as this indicates a message was processed by the receive()
method.
An embedded Kafka broker is started by using the @EmbeddedKafka
annotation.
As the embedded server is started on a random port, we provide a dedicated src/test/resources/apppication.yml properties file for testing which uses the spring.embedded.kafka.brokers
system property to set the correct address of the broker(s).
Below test case can also be executed after you install Kafka and Zookeeper on your local system. Just comment out
@EmbeddedKafka
and change the ‘bootstrap-servers’ property of the application properties file located in src/test/resources to the address of the local broker.
In order to run above test case, open a command prompt in the project root folder and execute following Maven command:
The result is a successful build during which a Hello World message is sent and received using Kafka.
If you would like to run the above code sample you can get the full source code here.
In this getting started tutorial you learned how to create a Spring Kafka template and Spring Kafka listener to send/receive messages.
If you found this sample useful or have a question you would like to ask, drop a line below!
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/271515.html