Service Foundry
Young Gyu Kim <credemol@gmail.com>

Collecting Metrics using Spring Boot Actuator and Visualizing them using Prometheus and Grafana

Introduction

In this tutorial, we will create a Scala project to analyze log messages stored in Elasticsearch using Apache Spark. The Spark application will read log messages from Elasticsearch, archive the messages as Parquet files to HDFS, and save all ERROR messages to a Relational Database for further analysis. On development, we will use the local filesystem as HDFS and PostgreSQL as the Relational Database. And then deploy the Spark application to Kubernetes.

Centralized Logging series

This tutorial is the 6th part of the Centralized Logging series. The series covers the following topics:

  1. Part 1 - Logging in Spring Boot Application

  2. Part 2 - Deploying Spring Boot Application to Kubernetes

  3. Part 3 - Installing Elasticsearch and Kibana to Kubernetes

  4. Part 4 - Centralized Logging with Fluent-bit and Elasticsearch(Kubernetes)

  5. Part 5 - Centralized Logging with Fluent-bit and Elasticsearch(On-premise)

  6. Part 6 - Log Analysis with Apache Spark

Prerequisites

  • Java 11

  • Scala 2.12

  • IntelliJ IDEA

  • sbt 1.10

  • Apache Spark 3.5.1

  • Elasticsearch up and running

  • Kibana up and running(Optional)

  • PostgreSQL up and running

Create Scala Project in IntelliJ IDEA

Before we start, make sure you have installed Java 11, Scala 2.12, IntelliJ IDEA, and sbt. And do not forget to install Scala plugin in IntelliJ IDEA.

To create a new Scala project in IntelliJ IDEA, follow the steps below:

  1. Open IntelliJ IDEA and click on NewProject from the main menu.

  2. Select Scala from the left panel and sbt from the right panel.

  3. Fill in the project name, location, and other details as shown below:

  4. Click on Finish to create the project.

Properties of the Scala project

Property Value

Name

nsa-log-analytics

Location

~/Dev/workspace

Language

Scala

Build system

sbt

JDK

11

sbt

1.10.0

Scala

2.12.18

Package prefix

com.alexamy.nsa2.analytics.log

Note that Scala version 2.12.18 is used in this tutorial because Apache Spark 3.5.1 is compatible with Scala 2.12.

The project structure will look like this:

tree -I 'target|project|\.idea|\.bsp' .
.
├── build.sbt
└── src
    ├── main
    │   └── scala
    └── test
        └── scala

build.sbt

The below code snippet is generated by IntelliJ IDEA and it does not include the required dependencies for Apache Spark.

build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.12.18"

lazy val root = (project in file("."))
  .settings(
    name := "new-project",
    idePackagePrefix := Some("com.alexamy.nsa2.analytics.log")
  )

Let’s start by adding the required dependencies to the build.sbt file.

import scala.collection.Seq

libraryDependencies ++= Seq(
//  #  Apache Spark
  "org.apache.spark" %% "spark-core" % "3.5.1", // % "provided",
  "org.apache.spark" %% "spark-sql" % "3.5.1", // % "provided",

//  # Hadoop client
  "org.apache.hadoop" % "hadoop-client" % "3.3.4",
  "org.apache.hadoop" % "hadoop-client-api" % "3.3.4",
  "org.apache.hadoop" % "hadoop-common" % "3.3.4",

//  # Log4j
  "org.apache.logging.log4j" % "log4j-api-scala_2.12" % "13.0.0",
  "org.apache.logging.log4j" % "log4j-core" % "2.19.0" % Runtime,

//  # Elasticsearch and PostgreSQL
  "org.postgresql" % "postgresql" % "42.7.0",
  "org.elasticsearch" %% "elasticsearch-spark-30" % "8.14.0",

//  # Azure Storage for Hadoop. Required for Azure Blob Storage
  "org.apache.hadoop" % "hadoop-azure" % "3.2.0",
  "com.microsoft.azure" % "azure-storage" % "8.6.3"
)

spark.conf

In the project root directory, create a new file named spark.conf and add the following configurations.

spark.conf
# Apache Spark and Hadop configurations
spark.sql.warehouse.dir = /tmp/spark/warehouse/
spark.hadoop.fs.defaultFS = file:///tmp/spark/warehouse/

# Elasticsearch configurations
# https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
es.net.ssl = true
es.net.ssl.cert.allow.self.signed = true
es.nodes = elasticsearch-master
es.port = 9200
es.net.http.auth.user = elastic
es.net.http.auth.pass = changeit
es.net.ssl.truststore.location = elastic-certificates.p12
es.net.ssl.truststore.pass = changeit
es.read.metadata = true

# Application configurations
app.es_index = nsa2-2024.06.17
app.jdbc.url = jdbc:postgresql://127.0.0.1:5432/nsa2
app.jdbc.username = {dbusername}
app.jdbc.password = {dbpassword}
app.jdbc.table = logging.log_history
app.parquetBaseLocation=./data/parquet/

# Azure Storage configurations
spark.hadoop.fs.azure.account.key.aksdepstorage.dfs.core.windows.net={azure-storage-account-key}
spark.hadoop.fs.azure.skipUserGroupMetadataDuringInitialization=true

All these configurations are required to run the Spark application on my local machine. We will use another configuration file for the Spark application running on Kubernetes.

The first part of the configuration is for Apache Spark and Hadoop. The spark.sql.warehouse.dir is the location where Spark stores the metadata of the tables. The spark.hadoop.fs.defaultFS is the default filesystem URI.

The second part is for Elasticsearch configurations. The es.net.ssl is set to true to enable SSL. The es.net.ssl.cert.allow.self.signed is set to true to allow self-signed certificates. The es.nodes is the Elasticsearch hostname. The es.port is the Elasticsearch port. The es.net.http.auth.user and es.net.http.auth.pass are the Elasticsearch username and password. The es.net.ssl.truststore.location is the location of the truststore file. The es.net.ssl.truststore.pass is the password of the truststore file. The es.read.metadata is set to true to read metadata from Elasticsearch.

The third part is for application configurations. The app.es_index is the Elasticsearch index name. The app.jdbc.url is the JDBC URL of the PostgreSQL database. The app.jdbc.username and app.jdbc.password are the username and password of the PostgreSQL database. The app.jdbc.table is the table name where the error logs will be saved. The app.parquetBaseLocation is the base location where the Parquet files will be saved. In terms of truststore, the es.net.ssl.truststore.location is the location of the truststore file. If it is saved in src/main/resources directory of the project, we can specify its location as its filename. The es.net.ssl.truststore.pass is the password of the truststore file.

The last part is for Azure Storage configurations. The spark.hadoop.fs.azure.account.key.aksdepstorage.dfs.core.windows.net is the Azure Storage account key. The spark.hadoop.fs.azure.skipUserGroupMetadataDuringInitialization is set to true to skip user group metadata during initialization. This is not required until we deploy the Spark application to Azure Kubernetes Service(AKS).

SparkAppUtil.scala

I added a new Scala object class named SparkAppUtil in the com.alexamy.nsa2.analytics.log.util package. This object contains two methods: sparkAppConf and sparkSession. The sparkAppConf method reads the configurations from the spark.conf file and sets them to the SparkConf object. The sparkSession method creates a new SparkSession object with the given configurations. And the sparkSession method creates a new SparkSession object with the given configurations.

package com.alexamy.nsa2.analytics.log
package util

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.internal.Logging

import java.nio.file.{Files, Paths}
import java.util.Properties
import scala.io.Source

object SparkAppUtil extends Logging {

  def sparkAppConf(): SparkConf = {
    val sparkAppConf = new SparkConf

    var configFile = System.getenv("SPARK_APP_CONF")

    if(configFile == null) {
      configFile = System.getProperty("SPARK_APP_CONF", "spark.conf")
    }

    logInfo(s"CONFIG FILE: $configFile")

    if(configFile != null && Files.exists(Paths.get(configFile))) {
      val props = new Properties
      props.load(Source.fromFile(configFile).bufferedReader())

      logInfo("======> props: " + props)
      props.forEach((k, v) => sparkAppConf.set(k.toString, v.toString))
    } else {
      logError(s"File Not Found: $configFile")
    }

    sparkAppConf
  }

  def sparkSession(appName: String, conf: SparkConf): SparkSession = {
    SparkSession
      .builder
      .appName(appName)
      .master("local[*]")
      .config(conf)
      .getOrCreate()
  }
}

ElasticsearchDocumentCountApp.scala

We are going to write a simple Spark application to count the number of documents in the Elasticsearch index. The application reads the Elasticsearch index name from the spark.conf file and counts the number of documents in the index. From this simple application, we can see how to read configurations from the spark.conf file and create a SparkSession object using the SparkAppUtil object. And then read the Elasticsearch index name from the configurations and count the number of documents in the index.

package com.alexamy.nsa2.analytics.log
package app

import com.alexamy.nsa2.analytics.log.util.SparkAppUtil
import org.apache.spark.internal.Logging
import org.elasticsearch.spark.sparkContextFunctions

object ElasticsearchDocumentCountApp extends App with Logging {

  // start main
  val sparkConf = SparkAppUtil.sparkAppConf

  val spark = SparkAppUtil.sparkSession("ElasticsearchDocumentCount", sparkConf)

  val indexName = sparkConf.get("app.es_index")

  val count = spark.sparkContext.esRDD(indexName).count()

  logInfo(
    s"""
      |
      | ##### Elasticsearch Document Count #####
      | Index Name: ${indexName}
      | Document Count: ${count}
      | #######################################
      |""".stripMargin)
//  logInfo(s"Document count for ${indexName}: ${count}")

  spark.stop()
  // end main
}

The source code is simple and straight-forward. It reads the configurations from the spark.conf file and create a SparkSession object using the SparkAppUtil object. It reads the Elasticsearch index name from the configurations and count the number of documents in the index. And then log the index name and the document count.

Run ElasticsearchDocumentCountApp

In IntelliJ IDEA

To run the ElasticsearchDocumentCountApp application in IntelliJ IDEA, press Ctrl + Shift + R or right-click on the ElasticsearchDocumentCountApp object and select Run ElasticsearchDocumentCountApp.

We can see the log message in the console as shown below:

 ##### Elasticsearch Document Count #####
 Index Name: nsa2-2024.06.17
 Document Count: 16
 #######################################
In Terminal

To run the ElasticsearchDocumentCountApp application in a terminal window, follow the steps below:

$ sbt clean package
$ sbt run

Nsa2LogAnalyticsDailyBatchApp

We are going to implement a Spark application to read log messages from Elasticsearch, archive the messages as Parquet files to HDFS, and save all ERROR messages to a Relational Database for further analysis. This application will be run daily to process the log messages of the previous day.

The application will have the following steps:

  1. Read log messages from Elasticsearch for the previous day. The log messages are stored in the Elasticsearch index with the name nsa2-YYYY.MM.DD.

  2. If the data already exists in the PostgreSQL database, delete the data for the index.

  3. Parse the log messages using Named Capturing Groups of the regular expression.

  4. Archive all the log messages as Parquet files to HDFS in a nested directory structure like nsa2/YYYY/MM/DD in Overwrite mode.

  5. Save all ERROR messages to a Relational Database. They will be saved to the logging.log_history table in the PostgreSQL database.

Document formats saved in Elasticsearch

An example of a document saved in Elasticsearch which contains a normal log message.
{
  "timestamp": "2024-06-20T15:46:35.475Z",
  "log": "2024-06-20T15:46:35.475Z ERROR 82128 ---[nsa2-logging-example] [reactor-http-nio-8] c.a.n.e.l.service.LoggingExampleService  : Writing log - level: ERROR, message: This is a sample of ERROR level messages\n",
  "hostname": "Youngs-MacBook-Workbench.local"
}
An example of a document saved in Elasticsearch which contains an error log message.
{
  "timestamp": "2024-06-20T15:48:15.010Z",
  "log": "2024-06-20T15:48:15.010Z ERROR 82128 --- [nsa2-logging-example] [reactor-http-nio-9] c.a.n.e.l.c.LoggingExampleController     : =====> onErrorResume: No enum constant org.slf4j.event.Level.INVALID\n\n java.lang.IllegalArgumentException: No enum constant org.slf4j.event.Level.INVALID\n\t at java.base/java.lang.Enum.valueOf(Enum.java:273) ~[na:na]\n\t at org.slf4j.event.Level.valueOf(Level.java:16) ~[slf4j-api-2.0.13.jar:2.0.13]\n\t at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]\n\n",
  "hostname": "Youngs-MacBook-Workbench.local"
}

From those samples of log messages, we can see that each log message contains the timestamp, log level, application name, thread name, logger class, and message. To get these fields from the log message, we need to use the following regular expression pattern. We will use the Named Capturing Groups of the regular expression to parse the log message.

regular expression pattern to parse the log message
^(?<timestamp>[0-9-]+T[:0-9\.]+\d{3}Z)\s+(?<level>[A-Z]+) \s+\d+\s\-{3}\s+\[(?<appName>[\w\-\d]+)\]+\s+\[\s*(?<thread>[\w\-\d]+)\]+ \s+[\w\d\.]*\.(?<loggerClass>[\w\.\d]+)\s+:(?<message>.*)

The Elasticsearch index name and document id can be read from the metadata of the document. The _metadata._index is the index name and the _metadata._id is the document id.

All those fields will be saved in the PostgreSQL database for further analysis. We will save the log messages to the logging.log_history table in the PostgreSQL database. Our scenario is to save all ERROR messages to the database for the operational team to analyze the error logs.

ddl.sql - logging.log_history table
create table logging.log_history
(
    id           uuid not null primary key,
    log_time     timestamp,
    log_level    varchar(10),
    app_name     varchar(50),
    thread       varchar(50),
    logger_class varchar(50),
    message      text,
    raw_data     text,
    es_id        varchar(50),
    es_index     varchar(50),
    hostname     varchar(50)
);

Because it is not easy to use Sequence based primary key in Spark, we use UUID as the primary key. The log_time is the timestamp of the log message. The log_level is the log level of the log message. The app_name is the application name. The thread is the thread name. The logger_class is the logger class. The message is the log message. The raw_data is the raw log message. The es_id is the document id of the log message. The es_index is the index name of the log message. The hostname is the hostname of the log message.

Nsa2LogAnalyticsDailyBatchApp.scala

I added a new object named Nsa2LogAnalyticsDailyBatchApp in the com.alexamy.nsa2.analytics.log.app package. This object contains the main method to process the log messages of the previous day.

Nsa2LogAnalyticsDailyBatchApp.scala
package com.alexamy.nsa2.analytics.log
package app

import util.SparkAppUtil

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{col, regexp_extract, uuid}
import org.apache.spark.sql.types._

import java.sql.{Connection, DriverManager, PreparedStatement, SQLException}

object Nsa2LogAnalyticsDailyBatchApp extends App with Logging {

  // start main
  val sparkConf = SparkAppUtil.sparkAppConf


  val spark = SparkAppUtil.sparkSession("Nsa2LogAnalyticsDailyBatchApp", sparkConf)

  private val indexName = sparkConf.get("app.es_index")
  private val jdbcDriver = "org.postgresql.Driver"
  private val jdbcUrl = sparkConf.get("app.jdbc.url")
  private val jdbcUsername = sparkConf.get("app.jdbc.username")
  private val jdbcPassword = sparkConf.get("app.jdbc.password")
  private val jdbcTable = sparkConf.get("app.jdbc.table")
  private val parquetBaseLocation = sparkConf.get("app.parquetBaseLocation")

  // delete records from the table that have the same indexName
  private def deleteIndexRecordsFromDatabaseIfExists(indexName: String): Unit = {
    logInfo(s"trying to delete records from ${jdbcTable} where es_index = ${indexName}")

    try {
      Class.forName(jdbcDriver)
    } catch {
      case e: ClassNotFoundException =>
        logError(s"JDBC driver not found: ${jdbcDriver}")
        System.exit(1)
    }

    val sql = s"""
                 |DELETE FROM ${jdbcTable}
                 |WHERE es_index = ?
                 |""".stripMargin

    var connection: Connection = null
    var statement: PreparedStatement = null
    try {
      connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
      statement = connection.prepareStatement(sql)

      statement.setString(1, indexName)
      val deletedRow = statement.executeUpdate()
      logInfo(s"${deletedRow} rows deleted from ${jdbcTable} where es_index = ${indexName}")

    } catch {
      case e: SQLException =>
        logError(s"Error deleting rows from ${jdbcTable} where es_index = ${indexName}: ${e.getMessage}")
        System.exit(1)
    } finally {
      if (statement != null) {
        statement.close()
      }
      if (connection != null) {
        connection.close()
      }
    }
  }


  logInfo(s"indexName: ${indexName}")

  if(indexName == null || indexName.isEmpty()) {
    logError("indexName is required. Please set app.es_index in spark.conf.")
    System.exit(1)
  }

  deleteIndexRecordsFromDatabaseIfExists(indexName)


  private val rawDF = spark.read.format("org.elasticsearch.spark.sql").load(indexName)

  val pattern = """^(?<timestamp>[0-9-]+T[:0-9\.]+\d{3}Z)\s+(?<level>[A-Z]+)\s+\d+\s\-{3}
\s+\[(?<appName>[\w\-\d]+)\]+\s+\[\s*(?<thread>[\w\-\d]+)\]+
\s+[\w\d\.]*\.(?<loggerClass>[\w\.\d]+)\s+:(?<message>.*)"""

  //  nsa2-2024.06.17 -> nsa2/2024/06/17
  private val parquetPathForRawData = s"${parquetBaseLocation}raw/${indexName.replace('-', '/').replace('.', '/')}"
  private val parquetPath = s"${parquetBaseLocation}processed/${indexName.replace('-', '/').replace('.', '/')}"

  logInfo(s"parquetPathForRawData: ${parquetPathForRawData}")
  logInfo(s"parquetPath: ${parquetPath}")

  rawDF.write
    .mode(SaveMode.Overwrite)
    .parquet(parquetPathForRawData)

  rawDF.printSchema()

  val processedDF = rawDF
    .withColumn("id", uuid())
    .withColumn("log_time", regexp_extract(rawDF("log"), pattern, 1).cast(TimestampType))
    .withColumn("log_level", regexp_extract(rawDF("log"), pattern, 2))
    .withColumn("app_name", regexp_extract(rawDF("log"), pattern, 3))
    .withColumn("thread", regexp_extract(rawDF("log"), pattern, 4))
    .withColumn("logger_class", regexp_extract(rawDF("log"), pattern, 5))
    .withColumn("message", regexp_extract(rawDF("log"), pattern, 6))
    .withColumn("es_id", col("_metadata._id" ))
    .withColumn("es_index", col("_metadata._index" ))
    .withColumnRenamed("log", "raw_data")
    .drop("@timestamp", "timestamp", "_metadata")

  processedDF
    .write
    .mode(SaveMode.Overwrite)
    .parquet(parquetPath)

  processedDF.show(20)

  processedDF
    .filter(col("log_level").equalTo("ERROR"))
    .write
    .format("jdbc")
    .option("url", jdbcUrl)
    .option("driver",  jdbcDriver)
    .option("user", jdbcUsername)
    .option("password", jdbcPassword)
    .option("truncate", "false")
    .option("dbtable", jdbcTable)
    .option("stringtype", "unspecified" )
    .mode(SaveMode.Append)
    .save()

  spark.stop()
  // end main
}

At the beginning of the main method, we read the configurations from the spark.conf file and create a SparkSession object using the SparkAppUtil object. We read the Elasticsearch index name, JDBC URL, JDBC username, JDBC password, JDBC table name, and Parquet base location from the configurations.

We have a method named deleteIndexRecordsFromDatabaseIfExists to delete records from the table that have the same indexName. We use the JDBC driver to connect to the PostgreSQL database and delete the records from the table where the es_index is equal to the indexName.

The variable rawDF is a DataFrame that reads the log messages from the Elasticsearch index. We use the org.elasticsearch.spark.sql format to read the data from Elasticsearch.

And we have a regular expression pattern to parse the log message. We use the regexp_extract function to parse the log message and create a new DataFrame named processedDF. The processedDF DataFrame contains the parsed log messages and metadata.

All data saved in the rawDF and processedDF DataFrames are saved as Parquet files to HDFS. The rawDF DataFrame is saved to the parquetPathForRawData location. The processedDF DataFrame is saved to the parquetPath location. These are archived in a nested directory structure like nsa2/YYYY/MM/DD in Overwrite mode for possible future use.

And for operational purposes, we save all ERROR messages to the PostgreSQL database. We filter the processedDF DataFrame where the log_level is equal to ERROR and save the data to the logging.log_history table in the PostgreSQL database. We might need to implement an administrative interface to view the error logs in the future.

.option("stringtype", "unspecified" )

Please note that stringtype is set to unspecified to save UUID data in PostgreSQL database. Spark saves UUID data as a string type by default. If we do not set the stringtype to unspecified, the UUID data will be saved as a text type in the PostgreSQL database.

Here is a sample parquet file saved in HDFS. We can see the directory structure and the Parquet file. The image below is taken from the Parquet Viewer.

Parquet Viewer

Here is a sample of the log_history table in the PostgreSQL database. We can see the log messages saved in the table. The image below is taken from the IntelliJ IDEA Database tool.

Query log_history table

Deploy Spark Application to Kubernetes

Deploying the Spark application to Kubernetes is not an easy task. We need to create a Docker image for the Spark application and deploy it to Kubernetes. We also need to create Blob Storage for HDFS and a PostgreSQL database for the Relational Database.

In this section, we will simply look at how to deploy the Spark application to Kubernetes. We are not going to cover the whole process of deploying the Spark application to Kubernetes. We will only cover the deployment part of the Spark application.

Package Spark Application

Package the Spark application using sbt.
$ sbt clean package
$ ls -l target/scala-2.12/nsa2-log-analytics_2.12-0.1.0-SNAPSHOT.jar

The packaged jar file is located in the target/scala-2.12 directory and we are going to use this jar file to deploy the Spark application to Kubernetes.

Configuration for Kubernetes

We need a configuration file for the Spark application that will be running on Kubernetes. The Elasticsearch hostname and port, the JDBC URL, the JDBC username, the JDBC password, the Parquet base location, and the Azure Storage account key are required for the Spark application.

conf/aks/spark.conf
spark.sql.warehouse.dir =
abfs://{your-container}@{your-storage-account}.dfs.core.windows.net/nsa2-log-analytics/warehouse/

# org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION
# https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html
# make sure to add spark.hadoop. at the beginning
spark.hadoop.fs.azure.skipUserGroupMetadataDuringInitialization=true


# https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
es.net.ssl = true
es.net.ssl.cert.allow.self.signed = true

es.nodes = elasticsearch-master
es.port = 9200
es.net.http.auth.user = elastic
es.net.http.auth.pass = changeit
es.net.ssl.truststore.location = elastic-certificates.p12
es.net.ssl.truststore.pass = changeit

es.read.metadata = true

app.es_index = nsa2-2024.06.20
app.jdbc.url = jdbc:postgresql://postgresql:5432/nsa2
app.jdbc.username = {your-username}
app.jdbc.password = {your-password}
app.jdbc.table = logging.log_history
app.parquetBaseLocation= abfs://{your-container}@{your-storage-account}.dfs.core.windows.net/nsa2-log-analytics/data/parquet/

spark.hadoop.fs.azure.account.key.aksdepstorage.dfs.core.windows.net={your-storage-account-key}

Deploy Spark Application to Kubernetes using spark-submit

To deploy the Spark application to Kubernetes, we need to use the spark-submit command. The spark-submit command is used to submit a Spark application to the cluster. We need to set the configurations for the Spark application and the Kubernetes cluster.

$ export SPARK_JAR=target/scala-2.12/nsa-log-analytics_2.12-0.1.0-SNAPSHOT.jar
$ export K8S_CONTROL_PLANE_URL={your-control-plane-url}
$ export K8S_NAMESPACE={your-namespace}
$ export K8S_AGENT_POOL={your-agent-pool}
$ export AZ_STORAGE_ACCOUNT={your-storage-account}
$ export AZ_STORAGE_ACCOUNT_KEY={your-storage-account-key}
$ export BLOB_CONTAINER={your-container}
$ export SPARK_SA={your-service-account-name-for-spark}
$ export SPARK_DOCKER_IMAGE={your-spark-docker-image}
$ export SPARK_MASTER_URL={your-spark-master-url}

$ spark-submit \
  --master k8s://${K8S_CONTROL_PLANE_URL} \
  --deploy-mode cluster \
  --files conf/aks/spark.conf \
  --name nsa2-log-analytics-app \
  --conf spark.executor.instances=3 \
  --conf spark.log.level=DEBUG \
  --conf spark.kubernetes.container.image.pullPolicy=Always \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=${SPARK_SA} \
  --conf spark.kubernetes.authenticate.executor.serviceAccountName${SPARK_SA} \
  --conf spark.kubernetes.container.image=${SPARK_DOCKER_IMAGE}\
  --conf "spark.kubernetes.node.selector.agentpool=${K8S_AGENT_POOL}" \
  --conf "spark.kubernetes.namespace=${K8S_NAMESPACE}" \
  --conf "spark.hadoop.fs.azure.account.key.aksdepstorage.dfs.core.windows.net=${AZ_STORAGE_ACCOUNT_KEY}" \
  --packages org.apache.hadoop:hadoop-azure:3.2.0,
com.microsoft.azure:azure-storage:8.6.3,org.postgresql:postgresql:42.7.0,
org.elasticsearch:elasticsearch-spark-30:8.14.0,
com.squareup.okhttp3:okhttp:4.12.0 \
  --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
  --conf spark.kubernetes.file.upload.path=
"abfs://${BLOB_CONTAINER}@{AZ_STORAGE_ACCOUNT}.dfs.core.windows.net/nsa2-log-analytics/upload" \
  --conf spark.kubernetes.driverEnv.SPARK_MASTER_URL=spark://${SPARK_MASTER_URL}:7077 \
  --conf spark.kubernetes.driverEnv.HADOOP_OPTIONAL_TOOLS=hadoop-azure \
  --conf spark.executorEnv.HADOOP_OPTIONAL_TOOLS=hadoop-azure \
  --conf spark.kubernetes.driver.request.cores="0.1" \
  --conf spark.kubernetes.executor.request.cores="0.1" \
  --conf spark.kubernetes.driver.limit.cores="0.2" \
  --conf spark.kubernetes.executor.limit.cores="0.2" \
  --conf spark.kubernetes.driver.master="${K8S_CONTROL_PLANE_URL}" \
  --class "com.alexamy.nsa2.analytics.log.app.Nsa2LogAnalyticsDailyBatchApp" \
  ${SPARK_JAR}

Test scripts

Delete Elasticsearch index
$ curl -u "elastic:changeit" -X DELETE "https://10.0.0.2:9200/nsa2-2024.06.20" --insecure
Write Log messages
$ echo "TRACE DEBUG INFO WARN ERROR" | \
  tr " " '\n' | \
  xargs -I {} curl -X POST -H "Content-Type: application/json" \
  -d "This is a sample of {} level messages" \
  http://localhost:18080/v1.0.0/log/{}
Write Error logs
$ for i in {1..10}; \
  do  curl -X POST -H "Content-Type: application/json" \
  -d "This is n invalid log message - $i" \
  http://localhost:18080/v1.0.0/log/INVALID; done

Conclusion

In this tutorial, we created a Scala project to analyze log messages stored in Elasticsearch using Apache Spark. We created a Spark application to read log messages from Elasticsearch, archive the messages as Parquet files to HDFS, and save all ERROR messages to a Relational Database for further analysis. We tested the Spark application in IntelliJ IDEA and deployed it to Kubernetes.