Centralized Logging - Part 7 : Real-time Log Streaming with Kafka
Introduction
In this tutorial, we will learn how to stream logs in real-time using Apache Kafka. We are going to use rewrite_tag filter to route ERROR logs to Kafka Output Plugin. We will use the Kafka Connect to stream logs from the Fluent-bit to Kafka. This way we can add the real-time log streaming capability to our centralized logging system.
Centralized Logging series
This tutorial is the 6th part of the Centralized Logging series. The series covers the following topics:
Prerequisites
Before you start this tutorial, you need to have the following prerequisites:
-
A Kubernetes cluster
-
A Kafka cluster
-
Java 17 or later
-
Gradle 8.0 or later
Fluent-bit Configuration
We are going to add Filters and Outputs to the Fluent-bit configuration to route ERROR logs to Kafka. We will use the rewrite_tag filter to route ERROR logs to the Kafka Output Plugin.
-
Add a filter to add a tag to the ERROR logs after parsing the logs.
-
Add a filter to add UUID field to records of ERROR logs.
-
Add a Kafka Output Plugin to send the ERROR logs to Kafka.
[FILTER]
Name rewrite_tag
Match nsa2.*
Rule $log .[\d]{3}Z\sERROR\s error_row true
[FILTER]
Name record_modifier
Match error_row
Uuid_key message_key
# If needed to debug the logs, you can use stdout filter
[FILTER]
Name stdout
Match error_row
The rewrite_tag filter adds a tag to the logs that match the specified rule. The new tag is error_row, and it keeps the old tag. The record_modifier filter adds a UUID field to the records having the error_row tag. The message_key field is used when sending log messages to Kafka. The stdout filter is used for debugging purposes.
[OUTPUT]
Name kafka
Match error_row
Brokers ${KAFKA_BROKERS}
Topics nsa2-error-logs
Message_key_field message_key
rdkafka.security.protocol SASL_PLAINTEXT
rdkafka.sasl.mechanism SCRAM-SHA-256
rdkafka.sasl.username ${KAFKA_USERNAME}
rdkafka.sasl.password ${KAFKA_PASSWORD}
The Kafka Output Plugin sends the logs to the Kafka cluster. The Match error_row specifies that the logs with the error_row tag will be sent to Kafka. Note that message_key_field should be the same as the Uuid_key in the record_modifier filter.
Here is an example of logs written to console after applying stdout filter to the Fluent-bit configuration. Note that the message_key field is added to the logs.
[0] error_row: [[1719518938.116059862, {}], {"logTime"=>"2024-06-27T20:08:58.115Z", "level"=>"ERROR", "appName"=>"nsa2-logging-example", "loggerClass"=>"LoggingExampleService", "message"=>" Writing log - level: ERROR, message: This is a sample of ERROR level messages", "log"=>"2024-06-27T20:08:58.115Z ERROR 1 --- [nsa2-logging-example] [or-http-epoll-4] c.a.n.e.l.service.LoggingExampleService : Writing log - level: ERROR, message: This is a sample of ERROR level messages", "message_key"=>"b52fba70-8e01-4f71-8619-55f6e4045cb2"}]
At the Kafka cluster, we can see the logs written to the nsa2-error-logs topic.

Here is an example of logs written to the nsa2-error-logs topic.
{
"timestamp": 1719518938.11606,
"logTime": "2024-06-27T20:08:58.115Z",
"level": "ERROR",
"appName": "nsa2-logging-example",
"loggerClass": "LoggingExampleService",
"message": " Writing log - level: ERROR, message: This is a sample of ERROR level messages",
"log": "2024-06-27T20:08:58.115Z ERROR 1 --- [nsa2-logging-example] [or-http-epoll-4] c.a.n.e.l.service.LoggingExampleService : Writing log - level: ERROR, message: This is a sample of ERROR level messages",
"message_key": "b52fba70-8e01-4f71-8619-55f6e4045cb2"
}
Spring Boot - Reactor Kafka Consumer for Real-time Log Streaming
Now that we have the logs streaming to Kafka, we can consume the logs in a Spring Boot Application. We are going to use the Reactor Kafka to consume the logs from the Kafka cluster. The consumer will save the logs to the PostgreSQL database using R2DBC.
In this section, we will go through source codes below:
-
build.gradle
-
application.yml
-
ReactiveKafkaConsumerConfig.java
-
LogConsumerService.java
-
LogPayload.java
-
LogErrorNotification.java
-
LogPayloadMapper.java
-
ErrorLogNotificationRepository.java
-
schema.sql
To implement and run this application, I chose Java 21, Spring Boot 3.3.1 and Gradle 8.0.
We need to create a Spring Boot Application with the following dependencies:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
implementation 'io.projectreactor.kafka:reactor-kafka'
implementation 'org.mapstruct:mapstruct:1.5.5.Final'
runtimeOnly 'org.postgresql:r2dbc-postgresql'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
annotationProcessor 'org.mapstruct:mapstruct-processor:1.5.5.Final'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testCompileOnly 'org.projectlombok:lombok'
testAnnotationProcessor 'org.projectlombok:lombok'
}
reactor-kafka is a reactive API for Kafka. We will use this library to consume logs from the Kafka cluster. We will use R2DBC to save the logs to the PostgreSQL database. And mapstruct is used for mapping between DTOs and Entities. In this case, LogPayload will be mapped to ErrorLogNotification
Please note that there are two annotationProcessor for lombok and mapstruct. These are used for generating the code for lombok and mapstruct. These processors are used during the compilation phase and create the necessary code for lombok and mapstruct.
server:
port: 38081
spring:
main:
banner-mode: off
application:
name: nsa2-logging-kafka-reactive-consumer-example
r2dbc:
url: ${R2DBC_URL:r2dbc:postgresql://localhost:5432/nsa2}
username: ${DB_USERNAME:nsa2}
password: ${DB_PASSWORD:nsa2}
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
consumer:
group-id: ${KAFKA_GROUP_ID:nsa2-logging-kafka-reactive-consumer-example}
key-deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
auto-offset-reset: earliest
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
security:
protocol: SASL_PLAINTEXT
properties:
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_USERNAME}" password="${KAFKA_PASSWORD}";
sasl.mechanism: SCRAM-SHA-256
spring.json:
trusted.packages: '*' # Allow all packages to be deserialized
type.headers: false
value:
default:
type: com.alexamy.nsa2.example.logging.kafka.reactive.consumer.model.LogPayload
app:
kafka:
topic: ${KAFKA_TOPIC:nsa2-error-logs}
In the application.yml, we set properties for the r2dbc and kafka.consumer. As for the security protocol, we use SASL_PLAINTEXT. The sasl.jaas.config is set to SCRAM-SHA-256. The spring.json.trusted.packages is set to '*' to allow all packages to be deserialized. This depends on the security policy of the Kafka cluster. Because we are using UUID as the key, we set the key-deserializer to org.apache.kafka.common.serialization.UUIDDeserializer. The value-deserializer is set to org.springframework.kafka.support.serializer.JsonDeserializer.
The property app.kafka.topic
is set to nsa2-error-logs. This is the topic where the logs are written to by the Fluent-bit.
package com.alexamy.nsa2.example.logging.kafka.reactive.config;
import com.alexamy.nsa2.example.logging.kafka.reactive.consumer.model.LogPayload;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.UUIDDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.*;
@Slf4j
@RequiredArgsConstructor
@EnableKafka
@Configuration
public class ReactiveKafkaConsumerConfig {
public static final String BEAN_NAME_KAFKA_CONSUMER_TEMPLATE = "logConsumerTemplate";
@Value("${app.kafka.topic}")
private String kafkaTopic;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
private final KafkaProperties kafkaProperties;
// https://utronics.hashnode.dev/spring-webflux-reactive-kafka-cassandra-complete-reactive-spring-apps
@Bean
public ReceiverOptions<UUID, LogPayload> kafkaReceiverOptions() {
// log.info("=====> Creating Kafka receiver options. bootstrapServers: {}, groupId: {}, kafkaTopic: {}",
// bootstrapServers, groupId, kafkaTopic);
// log.info("=====> Kafka properties: {}", kafkaProperties);
// Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
// log.info("=====> Kafka consumer properties: {}", consumerProperties);
ReceiverOptions<UUID, LogPayload> basicReceiverOptions =
ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singleton(kafkaTopic))
.addAssignListener(partitions -> log.info("=====> Assigned partitions: {}", partitions))
.addRevokeListener(partitions -> log.info("=====> Revoked partitions: {}", partitions));
}
// @Bean
public ReceiverOptions<UUID, LogPayload> kafkaReceiverOptions_____() {
final String jaasConfig = String.format(
"%s required username=\"%s\" " +
"password=\"%s\";", ScramLoginModule.class.getName(),
"iclinic", "Test2010!"
);
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
consumerProps.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
consumerProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, LogPayload.class.getName());
consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
consumerProps.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
consumerProps.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
return ReceiverOptions.<UUID, LogPayload>create(consumerProps)
.withKeyDeserializer(new UUIDDeserializer())
.withValueDeserializer(new JsonDeserializer<>(LogPayload.class))
.subscription(Set.of(kafkaTopic));
}
@Bean(BEAN_NAME_KAFKA_CONSUMER_TEMPLATE)
public ReactiveKafkaConsumerTemplate<UUID, LogPayload> logConsumerTemplate() {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions());
}
}
All required configurations for the Kafka consumer are set in the application.yml
file. The Kafka consumer configuration is set in the
ReactiveKafkaConsumerConfig class. The configuration is set using the @Configuration annotation. The ReceiverOptions are set using the ReceiverOptions.create method. The ReceiverOptions are set with the properties from the application.yml file. The ReceiverOptions are used to create the KafkaReceiver. The KafkaReceiver consumes logs from the Kafka cluster. The logs are consumed from the nsa2-error-logs topic. The logs are deserialized to LogPayload using the JsonDeserializer. The logs are consumed in a reactive way using the KafkaReceiver.receive method. The logs are saved to the PostgreSQL database using the LogConsumerService class. ReactiveKafkaConsumerTemplate<UUID, LogPayload> is used to consume logs from the Kafka cluster. The logs are consumed in a reactive way using the KafkaReceiver.receive method.
Refer to https://docs.spring.io/spring-boot/appendix/application-properties/index.html for more information on the properties.
package com.alexamy.nsa2.example.logging.kafka.reactive.consumer.service;
import com.alexamy.nsa2.example.logging.kafka.reactive.config.ReactiveKafkaConsumerConfig;
import com.alexamy.nsa2.example.logging.kafka.reactive.consumer.model.LogPayload;
import com.alexamy.nsa2.example.logging.kafka.reactive.mapper.LogPayloadMapper;
import com.alexamy.nsa2.example.logging.kafka.reactive.r2dbc.model.ErrorLogNotification;
import com.alexamy.nsa2.example.logging.kafka.reactive.r2dbc.repository.ErrorLogNotificationRepository;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import java.util.UUID;
@Slf4j
@RequiredArgsConstructor
@Component
public class LogConsumerService {
@Qualifier(ReactiveKafkaConsumerConfig.BEAN_NAME_KAFKA_CONSUMER_TEMPLATE)
private final ReactiveKafkaConsumerTemplate<UUID, LogPayload> logConsumerTemplate;
private final ErrorLogNotificationRepository notificationRepository;
private final LogPayloadMapper logPayloadMapper;
// @EventListener(ApplicationStartedEvent.class)
@PostConstruct
public void init() {
startConsuming().subscribe();
}
public Flux<ErrorLogNotification> startConsuming() {
log.info("=====> Starting to consume logs");
return logConsumerTemplate.receiveAutoAck()
.map(record -> record.value())
.doOnNext(record -> log.info("Received log: {}", record))
.map(logPayloadMapper::mapToErrorLogNotification)
.flatMap(notificationRepository::save)
.doOnNext(saved -> log.info("Saved error log notification: {}", saved))
.doOnError(e -> log.error("Error occurred while consuming log", e));
}
}
LogConsumerService.java is a service class that consumes logs from the Kafka cluster. The startConsuming method consumes logs from the Kafka cluster. The logs are saved to the PostgreSQL database using the LogErrorNotificationRepository class.
Let’s look into the LogPayload and LogErrorNotification classes.
LogPayload.java
package com.alexamy.nsa2.example.logging.kafka.reactive.consumer.model;
//public record LogPayload(String timestamp, String logTime, String level,
// String appName, String loggerClass, String message, String log, String message_key) {
//}
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.*;
import java.time.LocalDateTime;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class LogPayload {
String timestamp;
@JsonFormat(pattern="yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
LocalDateTime logTime;
String level;
String appName;
String loggerClass;
String message;
String log;
}
LogPayload represents the log payload that is written to the Kafka topic. The logTime field is annotated with @JsonFormat to specify the date format.
package com.alexamy.nsa2.example.logging.kafka.reactive.r2dbc.model;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;
import java.time.LocalDateTime;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Table(schema = "logging", name = "error_log_notification")
public class ErrorLogNotification {
@Id
private Long id;
private LocalDateTime timestamp;
@Column("log_time")
private LocalDateTime logTime;
@Column("log_level")
private String logLevel;
@Column("application_name")
private String applicationName;
@Column("logger_class")
private String loggerClass;
private String message;
@Column("stack_trace")
private String stackTrace;
}
ErrorLogNotification represents the log payload that is saved to the PostgreSQL database.
package com.alexamy.nsa2.example.logging.kafka.reactive.mapper;
import com.alexamy.nsa2.example.logging.kafka.reactive.consumer.model.LogPayload;
import com.alexamy.nsa2.example.logging.kafka.reactive.r2dbc.model.ErrorLogNotification;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
@Mapper(componentModel = "spring")
public interface LogPayloadMapper {
@Mapping(target = "logLevel", source = "level")
@Mapping(target = "applicationName", source = "appName")
@Mapping(target = "stackTrace", source = "log")
@Mapping(target = "id", ignore = true)
@Mapping(target = "timestamp", expression = "java(java.time.LocalDateTime.now())")
ErrorLogNotification mapToErrorLogNotification(LogPayload payload);
}
LogPayloadMapper is a mapper class that maps LogPayload to ErrorLogNotification.
The mapstruct annotation @Mapper is used to generate the mapper implementation. The mapper implementation is generated during the compilation phase by the annotation processor which provides quite fast mapping between DTOs and Entities.
@Component
public class LogPayloadMapperImpl implements LogPayloadMapper {
public LogPayloadMapperImpl() {
}
public ErrorLogNotification mapToErrorLogNotification(LogPayload payload) {
if (payload == null) {
return null;
} else {
ErrorLogNotification errorLogNotification = new ErrorLogNotification();
errorLogNotification.setLogLevel(payload.getLevel());
errorLogNotification.setApplicationName(payload.getAppName());
errorLogNotification.setStackTrace(payload.getLog());
errorLogNotification.setLogTime(payload.getLogTime());
errorLogNotification.setLoggerClass(payload.getLoggerClass());
errorLogNotification.setMessage(payload.getMessage());
errorLogNotification.setTimestamp(LocalDateTime.now());
return errorLogNotification;
}
}
}
package com.alexamy.nsa2.example.logging.kafka.reactive.r2dbc.repository;
import com.alexamy.nsa2.example.logging.kafka.reactive.r2dbc.model.ErrorLogNotification;
import org.springframework.data.r2dbc.repository.R2dbcRepository;
public interface ErrorLogNotificationRepository
extends R2dbcRepository<ErrorLogNotification, Long> {
}
ErrorLogNotificationRepository is a repository class that saves logs to the PostgreSQL database. For this example, we do not have to implement the methods. Spring Data R2DBC provides the implementation for the repository. This data will be used by the notification service to send notifications to the users. In this example, we are not going to implement the notification service.
CREATE TABLE logging.error_log_notification
(
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP NOT NULL,
log_level VARCHAR(10) NOT NULL,
log_time TIMESTAMP NOT NULL,
application_name VARCHAR(255) NOT NULL,
logger_class VARCHAR(255) NOT NULL,
message TEXT,
stack_trace TEXT,
);
To test this application, we need to create a PostgreSQL database and table. The schema.sql file creates the error_log_notification table.
All the source codes listed here are available at Github.
Running the Application
To run the application, we need to build the application and run it. We can build the application using the following command:
$ export DB_USERNAME={db-username}
$ export DB_PASSWORD={db-password}
$ export R2DBC_URL=r2dbc:postgresql://localhost:5432/nas2
$ export KAFKA_BOOTSTRAP_SERVERS={kafka-bootstrap-servers}
$ export KAFKA_USERNAME={kafka-username}
$ export KAFKA_PASSWORD={kafka-password}
$ export KAFKA_TOPIC=nsa2-error-logs
$ ./gradlew bootRun
You can use your onw values for the environment variables.
Testing the Application
In the same way we tested the previous applications, we can test this application. We can check the logs in the PostgreSQL database using the following command:
nsa2-logging-example application
$ echo "ERROR" | tr " " '\n' \
| xargs -I {} curl -X POST -H "Content-Type: application/json" \
-d "This is a sample of {} level messages" \
http://localhost:18080/v1.0.0/log/{}
Kafka topic nsa2-error-logs
To check the logs in the Kafka topic nsa2-error-logs, we can use the Conduktor. We can see the logs written to the Kafka topic nsa2-error-logs.
For more information on how to use Conduktor, refer to Conduktor Quickstart.
PostgreSQL database
To check the logs in the PostgreSQL database, we can use the following query:
select * from logging.error_log_notification;

Conclusion
In this tutorial, we learned how to stream logs in real-time using Apache Kafka. We used the rewrite_tag filter to route ERROR logs to the Kafka Output Plugin. We used the Kafka Connect to stream logs from the Fluent-bit to Kafka. We also implemented a Spring Boot Application to consume logs from the Kafka cluster. The logs are saved to the PostgreSQL database using R2DBC. This way we can add the real-time log streaming capability to our centralized logging system.