Spring Boot 整合 Kafka
项目目录结构
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Kafka Starter -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
kafka-provider
-
application.yml
server: port: 10001 spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: test-consumer-group
-
service
@Service public class ProviderServiceImpl { private static final String TOPIC_NAME = "dragon-topic"; @Resource private KafkaTemplate<String, String> kafkaTemplate; public String sendMsg(String info) { kafkaTemplate.send(TOPIC_NAME,info); return "发送成功"; } }
@RestController @RequestMapping("/provider") @RequiredArgsConstructor public class ProviderController { private final ProviderServiceImpl providerService; @GetMapping public String providerApi() { return providerService.sendMsg("莫等闲,白了少年头,空悲切。"); } }
启动类
@SpringBootApplication
public class ConsumerApp {
public static void main(String[] args) {
SpringApplication.run(ConsumerApp.class, args);
}
}
kafka-consumer
-
kafka-listener
@Component public class KafkaListener { private static final Logger log = LoggerFactory.getLogger(KafkaListener.class); private static final String TOPIC_NAME = "dragon-topic"; @org.springframework.kafka.annotation.KafkaListener(topics = TOPIC_NAME) public void receive(String msg) { log.info("接收到消息:{}",msg); } }
-
application.yml
server: port: 10002 spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: test-consumer-group
启动类
@SpringBootApplication
public class ConsumerApp {
public static void main(String[] args) {
SpringApplication.run(ConsumerApp.class, args);
}
}