What is InfluxDB?
InfluxDB is a database for time series data, like metrics, analytics, IoT monitoring and etc. Its fast, available and scalable. For more details, visit — https://www.influxdata.com/
Initial setup
Before we get started, there are a few things we need to do:
- Set up Docker (Install Docker)
- Install Gradle
Architecture
The is a javalin microservice application, with 2 endpoints:
POST /upload
: Receive events and insert them on database. If the event is old, discard it.GET /statistics
: Return a summary of the events (count, sum, min and max)
Implementation
The application is made of 2 simple kotlin files:
realtimestatistics.Main
: Creates the endpoints and application settings
data class Statistic(val count: Int = 0, val timestamp: Long = Date().time)
data class Total(val count: Double, val sum: Double, val min: Double, val max: Double)
val influxHost = System.getenv().getOrDefault("influx.host", "influxdb")!!
val influxDB: InfluxDB by lazy { InfluxDBFactory.connect("http://$influxHost:8086", "root", "root") }
fun main(args: Array<String>) {
val app = Javalin.create().start(7000)
val statisticService = StatisticsService(influxDB)
val controller = Controller(statisticService)
app.routes {
get("/statistics", { ctx ->
controller.get(ctx)
})
post("/upload", { ctx ->
controller.post(ctx)
})
}
}
class Controller(private val statisticService: StatisticsService) {
private val asStatusCode = fun StatisticResult.(): Int {
return if (this == StatisticResult.OK) {
201
} else {
204
}
}
fun post(ctx: Context) {
val statistic = ctx.bodyAsClass(Statistic::class.java)
val result = statisticService.create(statistic)
ctx.status(result.asStatusCode())
}
fun get(ctx: Context) {
ctx.json(statisticService.aggregated())
}
}
realtimestatistics.StatisticsService
: Contains the business logic to create and retrieve the metrics, plus the database initialization
private val timeFrameInMillis = 60000
private val aggregateQuery = """
SELECT count(s_count) as count,
sum(s_count) as sum,
min(s_count) as min,
max(s_count) as max
FROM uploads
where time > now() - 60s
"""
init {
influxDB.createDatabase(dbName)
}
fun create(statistic: Statistic): StatisticResult {
val now = Date().time
if ((statistic.timestamp + timeFrameInMillis) >= now) {
influxDB.write(dbName, "", Point.measurement("uploads")
.time(statistic.timestamp, TimeUnit.MILLISECONDS)
.addField("s_count", statistic.count)
.addField("s_timestamp", statistic.timestamp)
.build())
return StatisticResult.OK
}
return StatisticResult.OLD
}
fun aggregated(): Total {
val query = Query(
aggregateQuery,
dbName
)
val results = influxDB.query(query)
.results
if (results.first().series == null) {
return Total(0.0, 0.0, 0.0, 0.0)
}
return results.first().series.first().values
.map { mutableList ->
Total(mutableList[1].toString().toDouble(),
mutableList[2].toString().toDouble(),
mutableList[3].toString().toDouble(),
mutableList[4].toString().toDouble()
)
}[0]
}
For the full source code, check https://github.com/ricardobaumann/real-time-statistics
Running locally
For local running, I am using docker compose. So, on the root folder, run
docker-compose up
and checkout the endpoints above mentioned at http://localhost:7000/
Usage
With the service running, try POSTing to /upload
with
{
"count" : 40
}
And then, GET the summary from /statistics
.
Voila, boys and girls. Please let me know your insights about it. Thanks.