Kafka spring boot example, Spring Boot Kafka producer consumer example, Kafka spring boot tutorial, Apache Kafka Spring Boot microservices example, Kafka Spring Boot interview questions, Kafka spring boot example github, Spring Kafka documentation, spring-kafka maven

Spring Boot Kafka Producer and Consumer Example Intellij Idea

Kafka spring boot example, Spring Boot Kafka producer consumer example, Kafka spring boot tutorial, Apache Kafka Spring Boot microservices example, Kafka Spring Boot interview questions, Kafka spring boot example github, Spring Kafka documentation, spring-kafka maven

Apache Kafka is a distributed event-streaming platform built for scalability, reliability, and fault tolerance. It’s widely used for real-time data pipelines, event-driven microservices, and messaging frameworks. By combining Kafka with Spring Boot, you can simplify the task of setting up producers and consumers in Java applications, enabling efficient communication between services.

This tutorial provides a hands-on guide to implementing a basic Kafka Producer and Consumer with Spring Boot. Whether you’re building an event-driven architecture or experimenting with real-time messaging, this example will set you on the right track for implementing Kafka solutions in your application.

Table of Contents

  1. Introduction to Kafka Messaging
  2. Define Kafka Producer Bean
  3. Define Kafka Listener (Consumer)
  4. Create a REST Controller to Produce Messages
  5. Kafka Topic Configuration
  6. JSON Message Production and Consumption
  7. Using @KafkaListener with Error Handling
  8. Logging and Monitoring Messages
  9. Unit Testing Kafka Producer/Consumer
  10. Wrap-Up and Best Practices

Introduction to Kafka Messaging

Kafka acts as a messaging broker where producers send messages to topics and consumers retrieve them. Kafka’s ability to handle high-throughput data with fault tolerance makes it ideal for distributed systems.

Key Concepts:

  • Producers: Send messages to Kafka topics.
  • Consumers: Consume messages from topics.
  • Topics: Channels where messages are published and consumed.
  • Partitions: Divide topics for scalability, allowing multiple consumers to process data concurrently.

Kafka is a vital component for building event-driven systems or systems requiring real-time communication. Spring Boot’s built-in spring-kafka module makes connecting with Kafka intuitive and straightforward.


Define Kafka Producer Bean

Spring Boot provides the KafkaTemplate class for configuring Kafka producers. This template simplifies message production by abstracting producer configuration.

Configuration Example:

Create a configuration class for the Kafka producer:

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

By injecting KafkaTemplate into your service or controller, you can now publish messages to specific Kafka topics.


Define Kafka Listener (Consumer)

A Kafka consumer listens for messages on a given topic. Spring Boot provides the @KafkaListener annotation to simplify this process.

Define Consumer Configuration:

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_example");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Example Kafka Listener:

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "example-topic", groupId = "group_id_example")
    public void consume(String message) {
        System.out.println("Consumed message: " + message);
    }
}

The consumer section is ready to consume messages sent to the topic example-topic.


Create a REST Controller to Produce Messages

To test Kafka message flow, create a REST endpoint that triggers a producer to send a message.

REST Controller:

@RestController
@RequestMapping("/api/kafka")
public class KafkaController {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaController(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/publish")
    public String publishMessage(@RequestParam String message) {
        kafkaTemplate.send("example-topic", message);
        return "Message published successfully!";
    }
}

Access the endpoint via:

POST http://localhost:8080/api/kafka/publish?message=HelloWorld

The @PostMapping triggers the producer to send the message.


Kafka Topic Configuration

Spring Boot allows programmatic topic creation through KafkaAdmin.

Example:

@Bean
public NewTopic createTopic() {
    return TopicBuilder.name("example-topic")
                       .partitions(3)
                       .replicas(1)
                       .build();
}

This ensures that the topic example-topic is automatically created when the application starts, avoiding manual CLI commands.


JSON Message Production and Consumption

To handle JSON, configure Kafka to use JsonSerializer and JsonDeserializer.

Example Producer Configuration:

configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

Example Consumer Configuration:

configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

Ensure your consumer class includes a POJO structure for deserialization:

public class Message {
    private String content;
    // Getters & Setters
}

JSON Listener:

@KafkaListener(topics = "json-topic")
public void consumeJson(Message message) {
    System.out.println("Consumed JSON message: " + message);
}

Using @KafkaListener with Error Handling

To handle consumer errors, use SeekToCurrentErrorHandler.

Example Configuration:

factory.setErrorHandler(new SeekToCurrentErrorHandler((record, exception) -> {
    System.err.println("Error processing message: " + record);
}, 3)); // Retry 3 times before failing

This enables error retries and prevents consumer crashes.


Logging and Monitoring Messages

It’s essential to log and monitor Kafka messages for operational visibility.

Logging Integration:

Use the spring.kafka.listener properties to manage logging levels:

spring.kafka.listener.log-container-config=true
spring.kafka.listener.ack-mode=record

Monitoring Tools:

  • Kafka Manager: GUI for monitoring topics and partitions.
  • Prometheus/Kafka Exporter: Detailed metrics for Kafka clusters.
  • Logs with ELK Stack: Centralized logging using ElasticSearch, Logstash, and Kibana.

Unit Testing Kafka Producer/Consumer

Unit tests allow validating your application’s Kafka logic.

Testing Producers:

@MockBean
private KafkaTemplate<String, String> kafkaTemplate;

@Test
void testSendMessage() {
    kafkaTemplate.send("test-topic", "test-message");
    verify(kafkaTemplate).send("test-topic", "test-message");
}

Testing Consumers:

Mock Kafka consumers and test their logic independently using MockConsumer.


Wrap-Up and Best Practices

Kafka integrates seamlessly with Spring Boot for building robust messaging systems. Key practices to follow:

  1. Use JSON serialization for structured messages.
  2. Handle consumer errors with retries and dead-letter topics.
  3. Monitor application health with logging and external tools.
  4. Write unit tests for both producers and consumers.

Start leveraging Kafka in your Spring Boot applications to enable scalable, real-time event-driven architectures!


FAQs

Q1. What is Spring Boot Kafka Producer?

A component for sending messages to Kafka topics using KafkaTemplate.

Q2. How to handle JSON in Kafka?

Use JsonSerializer for producers and JsonDeserializer for consumers.

Q3. Can Kafka auto-create topics?

Auto-create topics using NewTopic beans or CLI commands.

For more information, refer to the Spring Kafka Documentation.

Similar Posts