Service Foundry
Young Gyu Kim <credemol@gmail.com>

Centralized Logging - Part 7 : Real-time Log Streaming with Kafka

Introduction

In this tutorial, we will learn how to stream logs in real-time using Apache Kafka. We are going to use rewrite_tag filter to route ERROR logs to Kafka Output Plugin. We will use the Kafka Connect to stream logs from the Fluent-bit to Kafka. This way we can add the real-time log streaming capability to our centralized logging system.

Prerequisites

Before you start this tutorial, you need to have the following prerequisites:

  • A Kubernetes cluster

  • A Kafka cluster

  • Java 17 or later

  • Gradle 8.0 or later

Fluent-bit Configuration

We are going to add Filters and Outputs to the Fluent-bit configuration to route ERROR logs to Kafka. We will use the rewrite_tag filter to route ERROR logs to the Kafka Output Plugin.

  • Add a filter to add a tag to the ERROR logs after parsing the logs.

  • Add a filter to add UUID field to records of ERROR logs.

  • Add a Kafka Output Plugin to send the ERROR logs to Kafka.

fluent-bit.yaml - filters section after multiline parser
    [FILTER]
        Name                rewrite_tag
        Match               nsa2.*
        Rule                $log .[\d]{3}Z\sERROR\s error_row true

    [FILTER]
        Name         record_modifier
        Match        error_row
        Uuid_key     message_key

    # If needed to debug the logs, you can use stdout filter
    [FILTER]
        Name                stdout
        Match               error_row

The rewrite_tag filter adds a tag to the logs that match the specified rule. The new tag is error_row, and it keeps the old tag. The record_modifier filter adds a UUID field to the records having the error_row tag. The message_key field is used when sending log messages to Kafka. The stdout filter is used for debugging purposes.

fluent-bit.yaml - outputs section
    [OUTPUT]
        Name kafka
        Match error_row
        Brokers ${KAFKA_BROKERS}
        Topics nsa2-error-logs
        Message_key_field message_key
        rdkafka.security.protocol SASL_PLAINTEXT
        rdkafka.sasl.mechanism  SCRAM-SHA-256
        rdkafka.sasl.username   ${KAFKA_USERNAME}
        rdkafka.sasl.password   ${KAFKA_PASSWORD}

The Kafka Output Plugin sends the logs to the Kafka cluster. The Match error_row specifies that the logs with the error_row tag will be sent to Kafka. Note that message_key_field should be the same as the Uuid_key in the record_modifier filter.

Here is an example of logs written to console after applying stdout filter to the Fluent-bit configuration. Note that the message_key field is added to the logs.

[0] error_row: [[1719518938.116059862, {}], {"logTime"=>"2024-06-27T20:08:58.115Z",
"level"=>"ERROR", "appName"=>"nsa2-logging-example",
"loggerClass"=>"LoggingExampleService",
"message"=>" Writing log - level: ERROR, message: This is a sample of ERROR level messages",
"log"=>"2024-06-27T20:08:58.115Z ERROR 1 --- [nsa2-logging-example]
[or-http-epoll-4]
c.a.n.e.l.service.LoggingExampleService  : Writing log -
level: ERROR, message: This is a sample of ERROR level messages",
"message_key"=>"b52fba70-8e01-4f71-8619-55f6e4045cb2"}]

At the Kafka cluster, we can see the logs written to the nsa2-error-logs topic.

conduktor topics 2

Here is an example of logs written to the nsa2-error-logs topic.

{
	"timestamp": 1719518938.11606,
	"logTime": "2024-06-27T20:08:58.115Z",
	"level": "ERROR",
	"appName": "nsa2-logging-example",
	"loggerClass": "LoggingExampleService",
	"message": " Writing log - level: ERROR, message: This is a sample of ERROR level messages",
	"log": "2024-06-27T20:08:58.115Z ERROR 1 --- [nsa2-logging-example] [or-http-epoll-4] c.a.n.e.l.service.LoggingExampleService  : Writing log - level: ERROR, message: This is a sample of ERROR level messages",
	"message_key": "b52fba70-8e01-4f71-8619-55f6e4045cb2"
}

Spring Boot - Reactor Kafka Consumer for Real-time Log Streaming

Now that we have the logs streaming to Kafka, we can consume the logs in a Spring Boot Application. We are going to use the Reactor Kafka to consume the logs from the Kafka cluster. The consumer will save the logs to the PostgreSQL database using R2DBC.

In this section, we will go through source codes below:

  • build.gradle

  • application.yml

  • ReactiveKafkaConsumerConfig.java

  • LogConsumerService.java

  • LogPayload.java

  • LogErrorNotification.java

  • LogPayloadMapper.java

  • ErrorLogNotificationRepository.java

  • schema.sql

To implement and run this application, I chose Java 21, Spring Boot 3.3.1 and Gradle 8.0.

We need to create a Spring Boot Application with the following dependencies:

build.gradle - dependencies
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.kafka:spring-kafka'
    implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
    implementation 'io.projectreactor.kafka:reactor-kafka'
    implementation 'org.mapstruct:mapstruct:1.5.5.Final'
    runtimeOnly 'org.postgresql:r2dbc-postgresql'
    compileOnly 'org.projectlombok:lombok'

    annotationProcessor 'org.projectlombok:lombok'
    annotationProcessor 'org.mapstruct:mapstruct-processor:1.5.5.Final'

    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'io.projectreactor:reactor-test'
    testImplementation 'org.springframework.kafka:spring-kafka-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
    testCompileOnly 'org.projectlombok:lombok'
    testAnnotationProcessor 'org.projectlombok:lombok'
}

reactor-kafka is a reactive API for Kafka. We will use this library to consume logs from the Kafka cluster. We will use R2DBC to save the logs to the PostgreSQL database. And mapstruct is used for mapping between DTOs and Entities. In this case, LogPayload will be mapped to ErrorLogNotification

Please note that there are two annotationProcessor for lombok and mapstruct. These are used for generating the code for lombok and mapstruct. These processors are used during the compilation phase and create the necessary code for lombok and mapstruct.

src/main/resources/application.yml
server:
  port: 38081
spring:
  main:
    banner-mode: off

  application:
    name: nsa2-logging-kafka-reactive-consumer-example

  r2dbc:
    url: ${R2DBC_URL:r2dbc:postgresql://localhost:5432/nsa2}
    username: ${DB_USERNAME:nsa2}
    password: ${DB_PASSWORD:nsa2}

  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}

    consumer:
      group-id: ${KAFKA_GROUP_ID:nsa2-logging-kafka-reactive-consumer-example}
      key-deserializer: org.apache.kafka.common.serialization.UUIDDeserializer
      auto-offset-reset: earliest
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      security:
        protocol: SASL_PLAINTEXT
      properties:
        sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_USERNAME}" password="${KAFKA_PASSWORD}";
        sasl.mechanism: SCRAM-SHA-256
        spring.json:
          trusted.packages: '*' # Allow all packages to be deserialized
          type.headers: false
          value:
            default:
              type: com.alexamy.nsa2.example.logging.kafka.reactive.consumer.model.LogPayload
app:
  kafka:
    topic: ${KAFKA_TOPIC:nsa2-error-logs}

In the application.yml, we set properties for the r2dbc and kafka.consumer. As for the security protocol, we use SASL_PLAINTEXT. The sasl.jaas.config is set to SCRAM-SHA-256. The spring.json.trusted.packages is set to '*' to allow all packages to be deserialized. This depends on the security policy of the Kafka cluster. Because we are using UUID as the key, we set the key-deserializer to org.apache.kafka.common.serialization.UUIDDeserializer. The value-deserializer is set to org.springframework.kafka.support.serializer.JsonDeserializer.

The property app.kafka.topic is set to nsa2-error-logs. This is the topic where the logs are written to by the Fluent-bit.

ReactiveKafkaConsumerConfig.java
package com.alexamy.nsa2.example.logging.kafka.reactive.config;

import com.alexamy.nsa2.example.logging.kafka.reactive.consumer.model.LogPayload;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.UUIDDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import reactor.kafka.receiver.ReceiverOptions;

import java.util.*;

@Slf4j
@RequiredArgsConstructor
@EnableKafka
@Configuration
public class ReactiveKafkaConsumerConfig {
    public static final String BEAN_NAME_KAFKA_CONSUMER_TEMPLATE = "logConsumerTemplate";
    @Value("${app.kafka.topic}")
    private String kafkaTopic;

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;


    private final KafkaProperties kafkaProperties;

    // https://utronics.hashnode.dev/spring-webflux-reactive-kafka-cassandra-complete-reactive-spring-apps
    @Bean
    public ReceiverOptions<UUID, LogPayload> kafkaReceiverOptions() {
//        log.info("=====> Creating Kafka receiver options. bootstrapServers: {}, groupId: {}, kafkaTopic: {}",
//                bootstrapServers, groupId, kafkaTopic);
//        log.info("=====> Kafka properties: {}", kafkaProperties);

//        Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
//        log.info("=====> Kafka consumer properties: {}", consumerProperties);

        ReceiverOptions<UUID, LogPayload> basicReceiverOptions =
                ReceiverOptions.create(kafkaProperties.buildConsumerProperties());


        return basicReceiverOptions.subscription(Collections.singleton(kafkaTopic))
                .addAssignListener(partitions -> log.info("=====> Assigned partitions: {}", partitions))
                .addRevokeListener(partitions -> log.info("=====> Revoked partitions: {}", partitions));

    }

//    @Bean
    public ReceiverOptions<UUID, LogPayload> kafkaReceiverOptions_____() {
        final String jaasConfig = String.format(
                "%s required username=\"%s\" " +
                "password=\"%s\";", ScramLoginModule.class.getName(),
                "iclinic", "Test2010!"
        );

        Map<String, Object> consumerProps = new HashMap<>();

        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, UUIDDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        consumerProps.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
        consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        consumerProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, LogPayload.class.getName());

        consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        consumerProps.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        consumerProps.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);


        return ReceiverOptions.<UUID, LogPayload>create(consumerProps)
                .withKeyDeserializer(new UUIDDeserializer())
                .withValueDeserializer(new JsonDeserializer<>(LogPayload.class))
                .subscription(Set.of(kafkaTopic));
    }

    @Bean(BEAN_NAME_KAFKA_CONSUMER_TEMPLATE)
    public ReactiveKafkaConsumerTemplate<UUID, LogPayload> logConsumerTemplate() {
        return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions());
    }
}

All required configurations for the Kafka consumer are set in the application.yml file. The Kafka consumer configuration is set in the ReactiveKafkaConsumerConfig class. The configuration is set using the @Configuration annotation. The ReceiverOptions are set using the ReceiverOptions.create method. The ReceiverOptions are set with the properties from the application.yml file. The ReceiverOptions are used to create the KafkaReceiver. The KafkaReceiver consumes logs from the Kafka cluster. The logs are consumed from the nsa2-error-logs topic. The logs are deserialized to LogPayload using the JsonDeserializer. The logs are consumed in a reactive way using the KafkaReceiver.receive method. The logs are saved to the PostgreSQL database using the LogConsumerService class. ReactiveKafkaConsumerTemplate<UUID, LogPayload> is used to consume logs from the Kafka cluster. The logs are consumed in a reactive way using the KafkaReceiver.receive method.

LogConsumerService.java
package com.alexamy.nsa2.example.logging.kafka.reactive.consumer.service;

import com.alexamy.nsa2.example.logging.kafka.reactive.config.ReactiveKafkaConsumerConfig;
import com.alexamy.nsa2.example.logging.kafka.reactive.consumer.model.LogPayload;
import com.alexamy.nsa2.example.logging.kafka.reactive.mapper.LogPayloadMapper;
import com.alexamy.nsa2.example.logging.kafka.reactive.r2dbc.model.ErrorLogNotification;
import com.alexamy.nsa2.example.logging.kafka.reactive.r2dbc.repository.ErrorLogNotificationRepository;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

import java.util.UUID;

@Slf4j
@RequiredArgsConstructor
@Component
public class LogConsumerService {
    @Qualifier(ReactiveKafkaConsumerConfig.BEAN_NAME_KAFKA_CONSUMER_TEMPLATE)
    private final ReactiveKafkaConsumerTemplate<UUID, LogPayload> logConsumerTemplate;

    private final ErrorLogNotificationRepository notificationRepository;

    private final LogPayloadMapper logPayloadMapper;

//    @EventListener(ApplicationStartedEvent.class)
    @PostConstruct
    public void init() {
        startConsuming().subscribe();
    }

    public Flux<ErrorLogNotification> startConsuming() {
        log.info("=====> Starting to consume logs");

        return logConsumerTemplate.receiveAutoAck()
                .map(record -> record.value())
                .doOnNext(record -> log.info("Received log: {}", record))
                .map(logPayloadMapper::mapToErrorLogNotification)
                .flatMap(notificationRepository::save)
                .doOnNext(saved -> log.info("Saved error log notification: {}", saved))
                .doOnError(e -> log.error("Error occurred while consuming log", e));

    }


}

LogConsumerService.java is a service class that consumes logs from the Kafka cluster. The startConsuming method consumes logs from the Kafka cluster. The logs are saved to the PostgreSQL database using the LogErrorNotificationRepository class.

Let’s look into the LogPayload and LogErrorNotification classes.

LogPayload.java

package com.alexamy.nsa2.example.logging.kafka.reactive.consumer.model;

//public record LogPayload(String timestamp, String logTime, String level,
//                         String appName, String loggerClass, String message, String log, String message_key) {
//}

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.*;

import java.time.LocalDateTime;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class LogPayload {
    String timestamp;
    @JsonFormat(pattern="yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
    LocalDateTime logTime;
    String level;
    String appName;
    String loggerClass;
    String message;
    String log;
}

LogPayload represents the log payload that is written to the Kafka topic. The logTime field is annotated with @JsonFormat to specify the date format.

ErrorLogNotification.java
package com.alexamy.nsa2.example.logging.kafka.reactive.r2dbc.model;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;

import java.time.LocalDateTime;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Table(schema = "logging", name = "error_log_notification")
public class ErrorLogNotification {
    @Id
    private Long id;
    private LocalDateTime timestamp;
    @Column("log_time")
    private LocalDateTime logTime;
    @Column("log_level")
    private String logLevel;
    @Column("application_name")
    private String applicationName;
    @Column("logger_class")
    private String loggerClass;
    private String message;
    @Column("stack_trace")
    private String stackTrace;
}

ErrorLogNotification represents the log payload that is saved to the PostgreSQL database.

LogPayloadMapper.java
package com.alexamy.nsa2.example.logging.kafka.reactive.mapper;

import com.alexamy.nsa2.example.logging.kafka.reactive.consumer.model.LogPayload;
import com.alexamy.nsa2.example.logging.kafka.reactive.r2dbc.model.ErrorLogNotification;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;

@Mapper(componentModel = "spring")
public interface LogPayloadMapper {

    @Mapping(target = "logLevel", source = "level")
    @Mapping(target = "applicationName", source = "appName")
    @Mapping(target = "stackTrace", source = "log")
    @Mapping(target = "id", ignore = true)
    @Mapping(target = "timestamp", expression = "java(java.time.LocalDateTime.now())")
    ErrorLogNotification mapToErrorLogNotification(LogPayload payload);

}

LogPayloadMapper is a mapper class that maps LogPayload to ErrorLogNotification.

The mapstruct annotation @Mapper is used to generate the mapper implementation. The mapper implementation is generated during the compilation phase by the annotation processor which provides quite fast mapping between DTOs and Entities.

LogPayloadMapperImpl.java
@Component
public class LogPayloadMapperImpl implements LogPayloadMapper {
    public LogPayloadMapperImpl() {
    }

    public ErrorLogNotification mapToErrorLogNotification(LogPayload payload) {
        if (payload == null) {
            return null;
        } else {
            ErrorLogNotification errorLogNotification = new ErrorLogNotification();
            errorLogNotification.setLogLevel(payload.getLevel());
            errorLogNotification.setApplicationName(payload.getAppName());
            errorLogNotification.setStackTrace(payload.getLog());
            errorLogNotification.setLogTime(payload.getLogTime());
            errorLogNotification.setLoggerClass(payload.getLoggerClass());
            errorLogNotification.setMessage(payload.getMessage());
            errorLogNotification.setTimestamp(LocalDateTime.now());
            return errorLogNotification;
        }
    }
}
ErrorLogNotificationRepository.java
package com.alexamy.nsa2.example.logging.kafka.reactive.r2dbc.repository;

import com.alexamy.nsa2.example.logging.kafka.reactive.r2dbc.model.ErrorLogNotification;
import org.springframework.data.r2dbc.repository.R2dbcRepository;

public interface ErrorLogNotificationRepository
    extends R2dbcRepository<ErrorLogNotification, Long>  {
}

ErrorLogNotificationRepository is a repository class that saves logs to the PostgreSQL database. For this example, we do not have to implement the methods. Spring Data R2DBC provides the implementation for the repository. This data will be used by the notification service to send notifications to the users. In this example, we are not going to implement the notification service.

src/main/resources/sql/schema.sql
CREATE TABLE logging.error_log_notification
(
    id               SERIAL PRIMARY KEY,
    timestamp        TIMESTAMP    NOT NULL,
    log_level        VARCHAR(10) NOT NULL,
    log_time         TIMESTAMP    NOT NULL,
    application_name VARCHAR(255) NOT NULL,
    logger_class     VARCHAR(255) NOT NULL,
    message          TEXT,
    stack_trace      TEXT,
);

To test this application, we need to create a PostgreSQL database and table. The schema.sql file creates the error_log_notification table.

All the source codes listed here are available at Github.

Running the Application

To run the application, we need to build the application and run it. We can build the application using the following command:

$ export DB_USERNAME={db-username}
$ export DB_PASSWORD={db-password}
$ export R2DBC_URL=r2dbc:postgresql://localhost:5432/nas2
$ export KAFKA_BOOTSTRAP_SERVERS={kafka-bootstrap-servers}
$ export KAFKA_USERNAME={kafka-username}
$ export KAFKA_PASSWORD={kafka-password}
$ export KAFKA_TOPIC=nsa2-error-logs
$ ./gradlew bootRun

You can use your onw values for the environment variables.

Testing the Application

In the same way we tested the previous applications, we can test this application. We can check the logs in the PostgreSQL database using the following command:

nsa2-logging-example application

how to simulate the error logs
$ echo "ERROR" | tr " " '\n' \
  | xargs -I {} curl -X POST -H "Content-Type: application/json" \
  -d "This is a sample of {} level messages" \
  http://localhost:18080/v1.0.0/log/{}

Kafka topic nsa2-error-logs

To check the logs in the Kafka topic nsa2-error-logs, we can use the Conduktor. We can see the logs written to the Kafka topic nsa2-error-logs.

For more information on how to use Conduktor, refer to Conduktor Quickstart.

PostgreSQL database

To check the logs in the PostgreSQL database, we can use the following query:

select * from logging.error_log_notification;
part 7 result query
Figure 1. Result

Conclusion

In this tutorial, we learned how to stream logs in real-time using Apache Kafka. We used the rewrite_tag filter to route ERROR logs to the Kafka Output Plugin. We used the Kafka Connect to stream logs from the Fluent-bit to Kafka. We also implemented a Spring Boot Application to consume logs from the Kafka cluster. The logs are saved to the PostgreSQL database using R2DBC. This way we can add the real-time log streaming capability to our centralized logging system.