Spring Boot auto-configuration attempts to automatically configure your Spring application based on the JAR dependencies that have been added. In other words, if the spring-kafka-1.2.2.RELEASE.jar is on the classpath and you have not manually configured any Consumer
or Provider
beans, then Spring Boot will auto-configure them using default values.
In order to demonstrate this behavior we will start from a previous Spring Kafka tutorial in which we send/receive messages to/from an Apache Kafka topic using Spring Kafka. The original code will be reduced to a bare minimum in order to demonstrate Spring Boot’s autoconfiguration.
If you want to learn more about Spring Kafka – head on over to the Spring Kafka tutorials page.
General Project Setup
Tools used:
- Spring Kafka 1.2
- Spring Boot 1.5
- Maven 3.5
The project is built using Maven. The Maven POM file contains the needed dependencies for Spring Boot and Spring Kafka as shown below.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.codenotfound</groupId>
<artifactId>spring-kafka-boot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-kafka-boot</name>
<description>Spring Kafka - Spring Boot Example</description>
<url>https://www.codenotfound.com/spring-kafka-boot-example.html</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>1.2.2.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<!-- spring-boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- spring-boot-maven-plugin -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
The SpringKafkaApplication
remains unchanged. What is important to note is that in order for the auto-configuration to work we need to opt-in by adding the @EnableAutoConfiguration
or @SpringBootApplication
(which is same as adding @Configuration
@EnableAutoConfiguration
@ComponentScan
) annotation to one of our @Configuration
classes.
You should only ever add one
@EnableAutoConfiguration
annotation. It is recommended to add it to your primary@Configuration
class.
package com.codenotfound.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
}
Autoconfigure the Spring Kafka Message Producer
The setup and creation of the KafkaTemplate
and Producer
beans is automatically done by Spring Boot. The only things left to do are auto-wiring the KafkaTemplate
and using it in the send()
method.
By annotating the
Sender
class with@Component
, Spring will instantiate this class as a bean that we will use in our test case. In order for this to work, we also need the@EnableAutoConfiguration
which was indirectly specified onSpringKafkaApplication
by using the@SpringBootApplication
annotation.
package com.codenotfound.kafka.producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class Sender {
private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String payload) {
LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
kafkaTemplate.send(topic, payload);
}
}
Autoconfigure the Spring Kafka Message Consumer
Similar to the Sender
, the setup and creation of the ConcurrentKafkaListenerContainerFactory
and KafkaMessageListenerContainer
beans is automatically done by Spring Boot.
The @KafkaListener
annotation creates a message listener container for the annotated receive()
method. The topic name is specified using the ${kafka.topic.boot}
placeholder for which the value will be automatically fetched from the application.yml properties file.
package com.codenotfound.kafka.consumer;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${kafka.topic.boot}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
latch.countDown();
}
}
For the Receiver
, Spring Boot takes care of most of the configuration. There are however two properties that need to be explicitly set in the application.yml properties file:
- The
kafka.consumer.auto-offset-reset
property needs to be set to ‘earliest’ which ensures the new consumer group will get the message sent in case the container started after the send was completed. - The
kafka.consumer.group-id
property needs to be specified as we are using group management to assign topic partitions to consumers. In this example we will assign it the value ‘boot’.
spring:
kafka:
consumer:
auto-offset-reset: earliest
group-id: boot
kafka:
topic:
boot: boot.t
Scroll down to # APACHE KAFKA in the following link in order to get a complete overview of all the Spring Kafka properties that can be set for auto configuration using the Spring Boot application properties file.
Testing the Sender and Receiver
In order to verify that our code works, a basic SpringKafkaApplicationTest
test case is used. It contains a testReceiver()
unit test case that uses the Sender
to send a message to the ‘boot.t’ topic on the Kafka bus. We then use the CountDownLatch
from the Receiver
to verify that a message was successfully received.
The test case runs using the embedded Kafka broker which is started via a JUnit @ClassRule.
Note that we have added a dedicated application.yml properties file under src/test/resources in order to override the default broker address with the address of the embedded broker using the
spring.kafka.bootstrap-servers
property.
package com.codenotfound.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.test.context.junit4.SpringRunner;
import com.codenotfound.kafka.consumer.Receiver;
import com.codenotfound.kafka.producer.Sender;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringKafkaApplicationTest {
private static String BOOT_TOPIC = "boot.t";
@Autowired
private Sender sender;
@Autowired
private Receiver receiver;
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, BOOT_TOPIC);
@Test
public void testReceive() throws Exception {
sender.send(BOOT_TOPIC, "Hello Boot!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
assertThat(receiver.getLatch().getCount()).isEqualTo(0);
}
}
Fire up the above test case by opening a command prompt and execute following Maven command:
mvn test
Maven will then download the dependencies, compile the code and run the unit test case during which following logs should be generated:
. ____ _ __ _ _
/// / ___'_ __ _ _(_)_ __ __ _ / / / /
( ( )/___ | '_ | '_| | '_ // _` | / / / /
/// ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_/__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.5.4.RELEASE)
20:58:18.284 [main] INFO c.c.kafka.SpringKafkaApplicationTest - Starting SpringKafkaApplicationTest on cnf-pc with PID 6032 (started by CodeNotFound in c:/codenotfound/code/spring-kafka/spring-kafka-avr
o)
20:58:18.284 [main] DEBUG c.c.kafka.SpringKafkaApplicationTest - Running with Spring Boot v1.5.4.RELEASE, Spring v4.3.9.RELEASE
20:58:18.285 [main] INFO c.c.kafka.SpringKafkaApplicationTest - No active profile set, falling back to default profiles: default
20:58:19.011 [main] INFO c.c.kafka.SpringKafkaApplicationTest - Started SpringKafkaApplicationTest in 1.07 seconds (JVM running for 5.484)
20:58:20.348 [main] INFO c.codenotfound.kafka.producer.Sender - sending user='{"name": "John Doe", "favorite_number": null, "favorite_color": "green"}'
20:58:20.364 [main] DEBUG c.c.kafka.serializer.AvroSerializer - data='{"name": "John Doe", "favorite_number": null, "favorite_color": "green"}'
20:58:20.364 [main] DEBUG c.c.kafka.serializer.AvroSerializer - serialized data='104A6F686E20446F6502000A677265656E'
20:58:20.383 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG c.c.k.serializer.AvroDeserializer - data='104A6F686E20446F6502000A677265656E'
20:58:20.384 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG c.c.k.serializer.AvroDeserializer - deserialized data='{"name": "John Doe", "favorite_number": null, "favorite_color": "green"}'
20:58:20.390 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.c.kafka.consumer.Receiver - received user='{"name": "John Doe", "favorite_number": null, "favorite_color": "green"}'
20:58:21.599 [main] ERROR o.a.zookeeper.server.ZooKeeperServer - ZKShutdownHandler is not registered, so ZooKeeper server won't take any action on ERROR or SHUTDOWN server state changes
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.248 sec - in com.codenotfound.kafka.SpringKafkaApplicationTest
Results :
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.217 s
[INFO] Finished at: 2017-08-02T20:58:22+02:00
[INFO] Final Memory: 19M/211M
[INFO] ------------------------------------------------------------------------
If you would like to run the above code sample you can get the full source code here.
Using Spring Boot’s autoconfiguration we were able to setup a Sender
and Receiver
using only a couple of lines of code. Hopefully, this example will kick-start your Spring Kafka development.
Feel free to leave a comment in case something was not clear or just to let me know if everything worked.
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/271522.html