Collecting Metrics using Spring Boot Actuator and Visualizing them using Prometheus and Grafana
Introduction
In this tutorial, we will create a Scala project to analyze log messages stored in Elasticsearch using Apache Spark. The Spark application will read log messages from Elasticsearch, archive the messages as Parquet files to HDFS, and save all ERROR messages to a Relational Database for further analysis. On development, we will use the local filesystem as HDFS and PostgreSQL as the Relational Database. And then deploy the Spark application to Kubernetes.
Centralized Logging series
This tutorial is the 6th part of the Centralized Logging series. The series covers the following topics:
-
Part 1 - Logging in Spring Boot Application
-
Part 2 - Deploying Spring Boot Application to Kubernetes
-
Part 3 - Installing Elasticsearch and Kibana to Kubernetes
-
Part 4 - Centralized Logging with Fluent-bit and Elasticsearch(Kubernetes)
-
Part 5 - Centralized Logging with Fluent-bit and Elasticsearch(On-premise)
-
Part 6 - Log Analysis with Apache Spark
Prerequisites
-
Java 11
-
Scala 2.12
-
IntelliJ IDEA
-
sbt 1.10
-
Apache Spark 3.5.1
-
Elasticsearch up and running
-
Kibana up and running(Optional)
-
PostgreSQL up and running
Create Scala Project in IntelliJ IDEA
Before we start, make sure you have installed Java 11, Scala 2.12, IntelliJ IDEA, and sbt. And do not forget to install Scala plugin in IntelliJ IDEA.
To create a new Scala project in IntelliJ IDEA, follow the steps below:
-
Open IntelliJ IDEA and click on
New
→Project
from the main menu. -
Select
Scala
from the left panel andsbt
from the right panel. -
Fill in the project name, location, and other details as shown below:
-
Click on
Finish
to create the project.
Properties of the Scala project
Property | Value |
---|---|
Name |
nsa-log-analytics |
Location |
~/Dev/workspace |
Language |
Scala |
Build system |
sbt |
JDK |
11 |
sbt |
1.10.0 |
Scala |
2.12.18 |
Package prefix |
com.alexamy.nsa2.analytics.log |
Note that Scala version 2.12.18 is used in this tutorial because Apache Spark 3.5.1 is compatible with Scala 2.12.
The project structure will look like this:
tree -I 'target|project|\.idea|\.bsp' . . ├── build.sbt └── src ├── main │ └── scala └── test └── scala
build.sbt
The below code snippet is generated by IntelliJ IDEA and it does not include the required dependencies for Apache Spark.
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.18"
lazy val root = (project in file("."))
.settings(
name := "new-project",
idePackagePrefix := Some("com.alexamy.nsa2.analytics.log")
)
Let’s start by adding the required dependencies to the build.sbt
file.
import scala.collection.Seq
libraryDependencies ++= Seq(
// # Apache Spark
"org.apache.spark" %% "spark-core" % "3.5.1", // % "provided",
"org.apache.spark" %% "spark-sql" % "3.5.1", // % "provided",
// # Hadoop client
"org.apache.hadoop" % "hadoop-client" % "3.3.4",
"org.apache.hadoop" % "hadoop-client-api" % "3.3.4",
"org.apache.hadoop" % "hadoop-common" % "3.3.4",
// # Log4j
"org.apache.logging.log4j" % "log4j-api-scala_2.12" % "13.0.0",
"org.apache.logging.log4j" % "log4j-core" % "2.19.0" % Runtime,
// # Elasticsearch and PostgreSQL
"org.postgresql" % "postgresql" % "42.7.0",
"org.elasticsearch" %% "elasticsearch-spark-30" % "8.14.0",
// # Azure Storage for Hadoop. Required for Azure Blob Storage
"org.apache.hadoop" % "hadoop-azure" % "3.2.0",
"com.microsoft.azure" % "azure-storage" % "8.6.3"
)
spark.conf
In the project root directory, create a new file named spark.conf
and add the following configurations.
# Apache Spark and Hadop configurations
spark.sql.warehouse.dir = /tmp/spark/warehouse/
spark.hadoop.fs.defaultFS = file:///tmp/spark/warehouse/
# Elasticsearch configurations
# https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
es.net.ssl = true
es.net.ssl.cert.allow.self.signed = true
es.nodes = elasticsearch-master
es.port = 9200
es.net.http.auth.user = elastic
es.net.http.auth.pass = changeit
es.net.ssl.truststore.location = elastic-certificates.p12
es.net.ssl.truststore.pass = changeit
es.read.metadata = true
# Application configurations
app.es_index = nsa2-2024.06.17
app.jdbc.url = jdbc:postgresql://127.0.0.1:5432/nsa2
app.jdbc.username = {dbusername}
app.jdbc.password = {dbpassword}
app.jdbc.table = logging.log_history
app.parquetBaseLocation=./data/parquet/
# Azure Storage configurations
spark.hadoop.fs.azure.account.key.aksdepstorage.dfs.core.windows.net={azure-storage-account-key}
spark.hadoop.fs.azure.skipUserGroupMetadataDuringInitialization=true
All these configurations are required to run the Spark application on my local machine. We will use another configuration file for the Spark application running on Kubernetes.
The first part of the configuration is for Apache Spark and Hadoop. The spark.sql.warehouse.dir
is the location where Spark stores the metadata of the tables. The spark.hadoop.fs.defaultFS
is the default filesystem URI.
The second part is for Elasticsearch configurations. The es.net.ssl
is set to true
to enable SSL. The es.net.ssl.cert.allow.self.signed
is set to true
to allow self-signed certificates. The es.nodes
is the Elasticsearch hostname. The es.port
is the Elasticsearch port. The es.net.http.auth.user
and es.net.http.auth.pass
are the Elasticsearch username and password. The es.net.ssl.truststore.location
is the location of the truststore file. The es.net.ssl.truststore.pass
is the password of the truststore file. The es.read.metadata
is set to true
to read metadata from Elasticsearch.
The third part is for application configurations. The app.es_index
is the Elasticsearch index name. The app.jdbc.url
is the JDBC URL of the PostgreSQL database. The app.jdbc.username
and app.jdbc.password
are the username and password of the PostgreSQL database. The app.jdbc.table
is the table name where the error logs will be saved. The app.parquetBaseLocation
is the base location where the Parquet files will be saved. In terms of truststore, the es.net.ssl.truststore.location
is the location of the truststore file. If it is saved in src/main/resources directory of the project, we can specify its location as its filename. The es.net.ssl.truststore.pass
is the password of the truststore file.
The last part is for Azure Storage configurations. The spark.hadoop.fs.azure.account.key.aksdepstorage.dfs.core.windows.net
is the Azure Storage account key. The spark.hadoop.fs.azure.skipUserGroupMetadataDuringInitialization
is set to true
to skip user group metadata during initialization. This is not required until we deploy the Spark application to Azure Kubernetes Service(AKS).
SparkAppUtil.scala
I added a new Scala object class named SparkAppUtil
in the com.alexamy.nsa2.analytics.log.util
package. This object contains two methods: sparkAppConf
and sparkSession
.
The sparkAppConf method reads the configurations from the spark.conf
file and sets them to the SparkConf object. The sparkSession method creates a new SparkSession object with the given configurations.
And the sparkSession method creates a new SparkSession object with the given configurations.
package com.alexamy.nsa2.analytics.log
package util
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.internal.Logging
import java.nio.file.{Files, Paths}
import java.util.Properties
import scala.io.Source
object SparkAppUtil extends Logging {
def sparkAppConf(): SparkConf = {
val sparkAppConf = new SparkConf
var configFile = System.getenv("SPARK_APP_CONF")
if(configFile == null) {
configFile = System.getProperty("SPARK_APP_CONF", "spark.conf")
}
logInfo(s"CONFIG FILE: $configFile")
if(configFile != null && Files.exists(Paths.get(configFile))) {
val props = new Properties
props.load(Source.fromFile(configFile).bufferedReader())
logInfo("======> props: " + props)
props.forEach((k, v) => sparkAppConf.set(k.toString, v.toString))
} else {
logError(s"File Not Found: $configFile")
}
sparkAppConf
}
def sparkSession(appName: String, conf: SparkConf): SparkSession = {
SparkSession
.builder
.appName(appName)
.master("local[*]")
.config(conf)
.getOrCreate()
}
}
ElasticsearchDocumentCountApp.scala
We are going to write a simple Spark application to count the number of documents in the Elasticsearch index. The application reads the Elasticsearch index name from the spark.conf
file and counts the number of documents in the index.
From this simple application, we can see how to read configurations from the spark.conf
file and create a SparkSession object using the SparkAppUtil
object. And then read the Elasticsearch index name from the configurations and count the number of documents in the index.
package com.alexamy.nsa2.analytics.log
package app
import com.alexamy.nsa2.analytics.log.util.SparkAppUtil
import org.apache.spark.internal.Logging
import org.elasticsearch.spark.sparkContextFunctions
object ElasticsearchDocumentCountApp extends App with Logging {
// start main
val sparkConf = SparkAppUtil.sparkAppConf
val spark = SparkAppUtil.sparkSession("ElasticsearchDocumentCount", sparkConf)
val indexName = sparkConf.get("app.es_index")
val count = spark.sparkContext.esRDD(indexName).count()
logInfo(
s"""
|
| ##### Elasticsearch Document Count #####
| Index Name: ${indexName}
| Document Count: ${count}
| #######################################
|""".stripMargin)
// logInfo(s"Document count for ${indexName}: ${count}")
spark.stop()
// end main
}
The source code is simple and straight-forward. It reads the configurations from the spark.conf
file and create a SparkSession object using the SparkAppUtil
object. It reads the Elasticsearch index name from the configurations and count the number of documents in the index. And then log the index name and the document count.
Run ElasticsearchDocumentCountApp
In IntelliJ IDEA
To run the ElasticsearchDocumentCountApp
application in IntelliJ IDEA, press Ctrl + Shift + R
or right-click on the ElasticsearchDocumentCountApp
object and select Run ElasticsearchDocumentCountApp
.
We can see the log message in the console as shown below:
##### Elasticsearch Document Count ##### Index Name: nsa2-2024.06.17 Document Count: 16 #######################################
In Terminal
To run the ElasticsearchDocumentCountApp
application in a terminal window, follow the steps below:
$ sbt clean package
$ sbt run
Nsa2LogAnalyticsDailyBatchApp
We are going to implement a Spark application to read log messages from Elasticsearch, archive the messages as Parquet files to HDFS, and save all ERROR messages to a Relational Database for further analysis. This application will be run daily to process the log messages of the previous day.
The application will have the following steps:
-
Read log messages from Elasticsearch for the previous day. The log messages are stored in the Elasticsearch index with the name
nsa2-YYYY.MM.DD
. -
If the data already exists in the PostgreSQL database, delete the data for the index.
-
Parse the log messages using Named Capturing Groups of the regular expression.
-
Archive all the log messages as Parquet files to HDFS in a nested directory structure like nsa2/YYYY/MM/DD in Overwrite mode.
-
Save all ERROR messages to a Relational Database. They will be saved to the
logging.log_history
table in the PostgreSQL database.
Document formats saved in Elasticsearch
{
"timestamp": "2024-06-20T15:46:35.475Z",
"log": "2024-06-20T15:46:35.475Z ERROR 82128 ---[nsa2-logging-example] [reactor-http-nio-8] c.a.n.e.l.service.LoggingExampleService : Writing log - level: ERROR, message: This is a sample of ERROR level messages\n",
"hostname": "Youngs-MacBook-Workbench.local"
}
{
"timestamp": "2024-06-20T15:48:15.010Z",
"log": "2024-06-20T15:48:15.010Z ERROR 82128 --- [nsa2-logging-example] [reactor-http-nio-9] c.a.n.e.l.c.LoggingExampleController : =====> onErrorResume: No enum constant org.slf4j.event.Level.INVALID\n\n java.lang.IllegalArgumentException: No enum constant org.slf4j.event.Level.INVALID\n\t at java.base/java.lang.Enum.valueOf(Enum.java:273) ~[na:na]\n\t at org.slf4j.event.Level.valueOf(Level.java:16) ~[slf4j-api-2.0.13.jar:2.0.13]\n\t at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]\n\n",
"hostname": "Youngs-MacBook-Workbench.local"
}
From those samples of log messages, we can see that each log message contains the timestamp, log level, application name, thread name, logger class, and message. To get these fields from the log message, we need to use the following regular expression pattern. We will use the Named Capturing Groups of the regular expression to parse the log message.
^(?<timestamp>[0-9-]+T[:0-9\.]+\d{3}Z)\s+(?<level>[A-Z]+) \s+\d+\s\-{3}\s+\[(?<appName>[\w\-\d]+)\]+\s+\[\s*(?<thread>[\w\-\d]+)\]+ \s+[\w\d\.]*\.(?<loggerClass>[\w\.\d]+)\s+:(?<message>.*)
The Elasticsearch index name and document id can be read from the metadata of the document. The _metadata._index
is the index name and the _metadata._id
is the document id.
All those fields will be saved in the PostgreSQL database for further analysis. We will save the log messages to the logging.log_history
table in the PostgreSQL database. Our scenario is to save all ERROR messages to the database for the operational team to analyze the error logs.
create table logging.log_history
(
id uuid not null primary key,
log_time timestamp,
log_level varchar(10),
app_name varchar(50),
thread varchar(50),
logger_class varchar(50),
message text,
raw_data text,
es_id varchar(50),
es_index varchar(50),
hostname varchar(50)
);
Because it is not easy to use Sequence based primary key in Spark, we use UUID as the primary key. The log_time
is the timestamp of the log message. The log_level
is the log level of the log message. The app_name
is the application name. The thread
is the thread name. The logger_class
is the logger class. The message
is the log message. The raw_data
is the raw log message. The es_id
is the document id of the log message. The es_index
is the index name of the log message. The hostname
is the hostname of the log message.
Nsa2LogAnalyticsDailyBatchApp.scala
I added a new object named Nsa2LogAnalyticsDailyBatchApp
in the com.alexamy.nsa2.analytics.log.app
package. This object contains the main method to process the log messages of the previous day.
package com.alexamy.nsa2.analytics.log
package app
import util.SparkAppUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{col, regexp_extract, uuid}
import org.apache.spark.sql.types._
import java.sql.{Connection, DriverManager, PreparedStatement, SQLException}
object Nsa2LogAnalyticsDailyBatchApp extends App with Logging {
// start main
val sparkConf = SparkAppUtil.sparkAppConf
val spark = SparkAppUtil.sparkSession("Nsa2LogAnalyticsDailyBatchApp", sparkConf)
private val indexName = sparkConf.get("app.es_index")
private val jdbcDriver = "org.postgresql.Driver"
private val jdbcUrl = sparkConf.get("app.jdbc.url")
private val jdbcUsername = sparkConf.get("app.jdbc.username")
private val jdbcPassword = sparkConf.get("app.jdbc.password")
private val jdbcTable = sparkConf.get("app.jdbc.table")
private val parquetBaseLocation = sparkConf.get("app.parquetBaseLocation")
// delete records from the table that have the same indexName
private def deleteIndexRecordsFromDatabaseIfExists(indexName: String): Unit = {
logInfo(s"trying to delete records from ${jdbcTable} where es_index = ${indexName}")
try {
Class.forName(jdbcDriver)
} catch {
case e: ClassNotFoundException =>
logError(s"JDBC driver not found: ${jdbcDriver}")
System.exit(1)
}
val sql = s"""
|DELETE FROM ${jdbcTable}
|WHERE es_index = ?
|""".stripMargin
var connection: Connection = null
var statement: PreparedStatement = null
try {
connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
statement = connection.prepareStatement(sql)
statement.setString(1, indexName)
val deletedRow = statement.executeUpdate()
logInfo(s"${deletedRow} rows deleted from ${jdbcTable} where es_index = ${indexName}")
} catch {
case e: SQLException =>
logError(s"Error deleting rows from ${jdbcTable} where es_index = ${indexName}: ${e.getMessage}")
System.exit(1)
} finally {
if (statement != null) {
statement.close()
}
if (connection != null) {
connection.close()
}
}
}
logInfo(s"indexName: ${indexName}")
if(indexName == null || indexName.isEmpty()) {
logError("indexName is required. Please set app.es_index in spark.conf.")
System.exit(1)
}
deleteIndexRecordsFromDatabaseIfExists(indexName)
private val rawDF = spark.read.format("org.elasticsearch.spark.sql").load(indexName)
val pattern = """^(?<timestamp>[0-9-]+T[:0-9\.]+\d{3}Z)\s+(?<level>[A-Z]+)\s+\d+\s\-{3}
\s+\[(?<appName>[\w\-\d]+)\]+\s+\[\s*(?<thread>[\w\-\d]+)\]+
\s+[\w\d\.]*\.(?<loggerClass>[\w\.\d]+)\s+:(?<message>.*)"""
// nsa2-2024.06.17 -> nsa2/2024/06/17
private val parquetPathForRawData = s"${parquetBaseLocation}raw/${indexName.replace('-', '/').replace('.', '/')}"
private val parquetPath = s"${parquetBaseLocation}processed/${indexName.replace('-', '/').replace('.', '/')}"
logInfo(s"parquetPathForRawData: ${parquetPathForRawData}")
logInfo(s"parquetPath: ${parquetPath}")
rawDF.write
.mode(SaveMode.Overwrite)
.parquet(parquetPathForRawData)
rawDF.printSchema()
val processedDF = rawDF
.withColumn("id", uuid())
.withColumn("log_time", regexp_extract(rawDF("log"), pattern, 1).cast(TimestampType))
.withColumn("log_level", regexp_extract(rawDF("log"), pattern, 2))
.withColumn("app_name", regexp_extract(rawDF("log"), pattern, 3))
.withColumn("thread", regexp_extract(rawDF("log"), pattern, 4))
.withColumn("logger_class", regexp_extract(rawDF("log"), pattern, 5))
.withColumn("message", regexp_extract(rawDF("log"), pattern, 6))
.withColumn("es_id", col("_metadata._id" ))
.withColumn("es_index", col("_metadata._index" ))
.withColumnRenamed("log", "raw_data")
.drop("@timestamp", "timestamp", "_metadata")
processedDF
.write
.mode(SaveMode.Overwrite)
.parquet(parquetPath)
processedDF.show(20)
processedDF
.filter(col("log_level").equalTo("ERROR"))
.write
.format("jdbc")
.option("url", jdbcUrl)
.option("driver", jdbcDriver)
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.option("truncate", "false")
.option("dbtable", jdbcTable)
.option("stringtype", "unspecified" )
.mode(SaveMode.Append)
.save()
spark.stop()
// end main
}
At the beginning of the main method, we read the configurations from the spark.conf
file and create a SparkSession object using the SparkAppUtil
object. We read the Elasticsearch index name, JDBC URL, JDBC username, JDBC password, JDBC table name, and Parquet base location from the configurations.
We have a method named deleteIndexRecordsFromDatabaseIfExists
to delete records from the table that have the same indexName. We use the JDBC driver to connect to the PostgreSQL database and delete the records from the table where the es_index is equal to the indexName.
The variable rawDF is a DataFrame that reads the log messages from the Elasticsearch index. We use the org.elasticsearch.spark.sql
format to read the data from Elasticsearch.
And we have a regular expression pattern to parse the log message. We use the regexp_extract
function to parse the log message and create a new DataFrame named processedDF. The processedDF DataFrame contains the parsed log messages and metadata.
All data saved in the rawDF and processedDF DataFrames are saved as Parquet files to HDFS. The rawDF DataFrame is saved to the parquetPathForRawData
location. The processedDF DataFrame is saved to the parquetPath
location. These are archived in a nested directory structure like nsa2/YYYY/MM/DD in Overwrite mode for possible future use.
And for operational purposes, we save all ERROR messages to the PostgreSQL database. We filter the processedDF DataFrame where the log_level is equal to ERROR and save the data to the logging.log_history
table in the PostgreSQL database. We might need to implement an administrative interface to view the error logs in the future.
.option("stringtype", "unspecified" )
Please note that stringtype is set to unspecified to save UUID data in PostgreSQL database. Spark saves UUID data as a string type by default. If we do not set the stringtype to unspecified, the UUID data will be saved as a text type in the PostgreSQL database.
Here is a sample parquet file saved in HDFS. We can see the directory structure and the Parquet file. The image below is taken from the Parquet Viewer.

Here is a sample of the log_history table in the PostgreSQL database. We can see the log messages saved in the table. The image below is taken from the IntelliJ IDEA Database tool.

Deploy Spark Application to Kubernetes
Deploying the Spark application to Kubernetes is not an easy task. We need to create a Docker image for the Spark application and deploy it to Kubernetes. We also need to create Blob Storage for HDFS and a PostgreSQL database for the Relational Database.
In this section, we will simply look at how to deploy the Spark application to Kubernetes. We are not going to cover the whole process of deploying the Spark application to Kubernetes. We will only cover the deployment part of the Spark application.
Package Spark Application
$ sbt clean package
$ ls -l target/scala-2.12/nsa2-log-analytics_2.12-0.1.0-SNAPSHOT.jar
The packaged jar file is located in the target/scala-2.12 directory and we are going to use this jar file to deploy the Spark application to Kubernetes.
Configuration for Kubernetes
We need a configuration file for the Spark application that will be running on Kubernetes. The Elasticsearch hostname and port, the JDBC URL, the JDBC username, the JDBC password, the Parquet base location, and the Azure Storage account key are required for the Spark application.
spark.sql.warehouse.dir =
abfs://{your-container}@{your-storage-account}.dfs.core.windows.net/nsa2-log-analytics/warehouse/
# org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION
# https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html
# make sure to add spark.hadoop. at the beginning
spark.hadoop.fs.azure.skipUserGroupMetadataDuringInitialization=true
# https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
es.net.ssl = true
es.net.ssl.cert.allow.self.signed = true
es.nodes = elasticsearch-master
es.port = 9200
es.net.http.auth.user = elastic
es.net.http.auth.pass = changeit
es.net.ssl.truststore.location = elastic-certificates.p12
es.net.ssl.truststore.pass = changeit
es.read.metadata = true
app.es_index = nsa2-2024.06.20
app.jdbc.url = jdbc:postgresql://postgresql:5432/nsa2
app.jdbc.username = {your-username}
app.jdbc.password = {your-password}
app.jdbc.table = logging.log_history
app.parquetBaseLocation= abfs://{your-container}@{your-storage-account}.dfs.core.windows.net/nsa2-log-analytics/data/parquet/
spark.hadoop.fs.azure.account.key.aksdepstorage.dfs.core.windows.net={your-storage-account-key}
Deploy Spark Application to Kubernetes using spark-submit
To deploy the Spark application to Kubernetes, we need to use the spark-submit
command. The spark-submit
command is used to submit a Spark application to the cluster. We need to set the configurations for the Spark application and the Kubernetes cluster.
$ export SPARK_JAR=target/scala-2.12/nsa-log-analytics_2.12-0.1.0-SNAPSHOT.jar
$ export K8S_CONTROL_PLANE_URL={your-control-plane-url}
$ export K8S_NAMESPACE={your-namespace}
$ export K8S_AGENT_POOL={your-agent-pool}
$ export AZ_STORAGE_ACCOUNT={your-storage-account}
$ export AZ_STORAGE_ACCOUNT_KEY={your-storage-account-key}
$ export BLOB_CONTAINER={your-container}
$ export SPARK_SA={your-service-account-name-for-spark}
$ export SPARK_DOCKER_IMAGE={your-spark-docker-image}
$ export SPARK_MASTER_URL={your-spark-master-url}
$ spark-submit \
--master k8s://${K8S_CONTROL_PLANE_URL} \
--deploy-mode cluster \
--files conf/aks/spark.conf \
--name nsa2-log-analytics-app \
--conf spark.executor.instances=3 \
--conf spark.log.level=DEBUG \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=${SPARK_SA} \
--conf spark.kubernetes.authenticate.executor.serviceAccountName${SPARK_SA} \
--conf spark.kubernetes.container.image=${SPARK_DOCKER_IMAGE}\
--conf "spark.kubernetes.node.selector.agentpool=${K8S_AGENT_POOL}" \
--conf "spark.kubernetes.namespace=${K8S_NAMESPACE}" \
--conf "spark.hadoop.fs.azure.account.key.aksdepstorage.dfs.core.windows.net=${AZ_STORAGE_ACCOUNT_KEY}" \
--packages org.apache.hadoop:hadoop-azure:3.2.0,
com.microsoft.azure:azure-storage:8.6.3,org.postgresql:postgresql:42.7.0,
org.elasticsearch:elasticsearch-spark-30:8.14.0,
com.squareup.okhttp3:okhttp:4.12.0 \
--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
--conf spark.kubernetes.file.upload.path=
"abfs://${BLOB_CONTAINER}@{AZ_STORAGE_ACCOUNT}.dfs.core.windows.net/nsa2-log-analytics/upload" \
--conf spark.kubernetes.driverEnv.SPARK_MASTER_URL=spark://${SPARK_MASTER_URL}:7077 \
--conf spark.kubernetes.driverEnv.HADOOP_OPTIONAL_TOOLS=hadoop-azure \
--conf spark.executorEnv.HADOOP_OPTIONAL_TOOLS=hadoop-azure \
--conf spark.kubernetes.driver.request.cores="0.1" \
--conf spark.kubernetes.executor.request.cores="0.1" \
--conf spark.kubernetes.driver.limit.cores="0.2" \
--conf spark.kubernetes.executor.limit.cores="0.2" \
--conf spark.kubernetes.driver.master="${K8S_CONTROL_PLANE_URL}" \
--class "com.alexamy.nsa2.analytics.log.app.Nsa2LogAnalyticsDailyBatchApp" \
${SPARK_JAR}
Test scripts
$ curl -u "elastic:changeit" -X DELETE "https://10.0.0.2:9200/nsa2-2024.06.20" --insecure
$ echo "TRACE DEBUG INFO WARN ERROR" | \
tr " " '\n' | \
xargs -I {} curl -X POST -H "Content-Type: application/json" \
-d "This is a sample of {} level messages" \
http://localhost:18080/v1.0.0/log/{}
$ for i in {1..10}; \
do curl -X POST -H "Content-Type: application/json" \
-d "This is n invalid log message - $i" \
http://localhost:18080/v1.0.0/log/INVALID; done
Conclusion
In this tutorial, we created a Scala project to analyze log messages stored in Elasticsearch using Apache Spark. We created a Spark application to read log messages from Elasticsearch, archive the messages as Parquet files to HDFS, and save all ERROR messages to a Relational Database for further analysis. We tested the Spark application in IntelliJ IDEA and deployed it to Kubernetes.