Spring Boot Kafka Producer and Consumer Example Intellij Idea
Kafka spring boot example, Spring Boot Kafka producer consumer example, Kafka spring boot tutorial, Apache Kafka Spring Boot microservices example, Kafka Spring Boot interview questions, Kafka spring boot example github, Spring Kafka documentation, spring-kafka maven
Apache Kafka is a distributed event-streaming platform built for scalability, reliability, and fault tolerance. It’s widely used for real-time data pipelines, event-driven microservices, and messaging frameworks. By combining Kafka with Spring Boot, you can simplify the task of setting up producers and consumers in Java applications, enabling efficient communication between services.
This tutorial provides a hands-on guide to implementing a basic Kafka Producer and Consumer with Spring Boot. Whether you’re building an event-driven architecture or experimenting with real-time messaging, this example will set you on the right track for implementing Kafka solutions in your application.
Table of Contents
- Introduction to Kafka Messaging
- Define Kafka Producer Bean
- Define Kafka Listener (Consumer)
- Create a REST Controller to Produce Messages
- Kafka Topic Configuration
- JSON Message Production and Consumption
- Using @KafkaListener with Error Handling
- Logging and Monitoring Messages
- Unit Testing Kafka Producer/Consumer
- Wrap-Up and Best Practices
Introduction to Kafka Messaging
Kafka acts as a messaging broker where producers send messages to topics and consumers retrieve them. Kafka’s ability to handle high-throughput data with fault tolerance makes it ideal for distributed systems.
Key Concepts:
- Producers: Send messages to Kafka topics.
- Consumers: Consume messages from topics.
- Topics: Channels where messages are published and consumed.
- Partitions: Divide topics for scalability, allowing multiple consumers to process data concurrently.
Kafka is a vital component for building event-driven systems or systems requiring real-time communication. Spring Boot’s built-in spring-kafka
module makes connecting with Kafka intuitive and straightforward.
Define Kafka Producer Bean
Spring Boot provides the KafkaTemplate
class for configuring Kafka producers. This template simplifies message production by abstracting producer configuration.
Configuration Example:
Create a configuration class for the Kafka producer:
@Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
By injecting KafkaTemplate
into your service or controller, you can now publish messages to specific Kafka topics.
Define Kafka Listener (Consumer)
A Kafka consumer listens for messages on a given topic. Spring Boot provides the @KafkaListener
annotation to simplify this process.
Define Consumer Configuration:
@Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_example"); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
Example Kafka Listener:
@Component public class KafkaConsumer { @KafkaListener(topics = "example-topic", groupId = "group_id_example") public void consume(String message) { System.out.println("Consumed message: " + message); } }
The consumer section is ready to consume messages sent to the topic example-topic
.
Create a REST Controller to Produce Messages
To test Kafka message flow, create a REST endpoint that triggers a producer to send a message.
REST Controller:
@RestController @RequestMapping("/api/kafka") public class KafkaController { private final KafkaTemplate<String, String> kafkaTemplate; public KafkaController(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @PostMapping("/publish") public String publishMessage(@RequestParam String message) { kafkaTemplate.send("example-topic", message); return "Message published successfully!"; } }
Access the endpoint via:
POST http://localhost:8080/api/kafka/publish?message=HelloWorld
The @PostMapping
triggers the producer to send the message.
Kafka Topic Configuration
Spring Boot allows programmatic topic creation through KafkaAdmin.
Example:
@Bean public NewTopic createTopic() { return TopicBuilder.name("example-topic") .partitions(3) .replicas(1) .build(); }
This ensures that the topic example-topic
is automatically created when the application starts, avoiding manual CLI commands.
JSON Message Production and Consumption
To handle JSON, configure Kafka to use JsonSerializer
and JsonDeserializer
.
Example Producer Configuration:
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
Example Consumer Configuration:
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
Ensure your consumer class includes a POJO structure for deserialization:
public class Message { private String content; // Getters & Setters }
JSON Listener:
@KafkaListener(topics = "json-topic") public void consumeJson(Message message) { System.out.println("Consumed JSON message: " + message); }
Using @KafkaListener with Error Handling
To handle consumer errors, use SeekToCurrentErrorHandler
.
Example Configuration:
factory.setErrorHandler(new SeekToCurrentErrorHandler((record, exception) -> { System.err.println("Error processing message: " + record); }, 3)); // Retry 3 times before failing
This enables error retries and prevents consumer crashes.
Logging and Monitoring Messages
It’s essential to log and monitor Kafka messages for operational visibility.
Logging Integration:
Use the spring.kafka.listener
properties to manage logging levels:
spring.kafka.listener.log-container-config=true spring.kafka.listener.ack-mode=record
Monitoring Tools:
- Kafka Manager: GUI for monitoring topics and partitions.
- Prometheus/Kafka Exporter: Detailed metrics for Kafka clusters.
- Logs with ELK Stack: Centralized logging using ElasticSearch, Logstash, and Kibana.
Unit Testing Kafka Producer/Consumer
Unit tests allow validating your application’s Kafka logic.
Testing Producers:
@MockBean private KafkaTemplate<String, String> kafkaTemplate; @Test void testSendMessage() { kafkaTemplate.send("test-topic", "test-message"); verify(kafkaTemplate).send("test-topic", "test-message"); }
Testing Consumers:
Mock Kafka consumers and test their logic independently using MockConsumer
.
Wrap-Up and Best Practices
Kafka integrates seamlessly with Spring Boot for building robust messaging systems. Key practices to follow:
- Use JSON serialization for structured messages.
- Handle consumer errors with retries and dead-letter topics.
- Monitor application health with logging and external tools.
- Write unit tests for both producers and consumers.
Start leveraging Kafka in your Spring Boot applications to enable scalable, real-time event-driven architectures!
FAQs
Q1. What is Spring Boot Kafka Producer?
A component for sending messages to Kafka topics using KafkaTemplate
.
Q2. How to handle JSON in Kafka?
Use JsonSerializer
for producers and JsonDeserializer
for consumers.
Q3. Can Kafka auto-create topics?
Auto-create topics using NewTopic
beans or CLI commands.
For more information, refer to the Spring Kafka Documentation.