Kotlin Coroutines in Quarkus Ecosystem

Praveen Ray
3 min readJan 1, 2022

--

As a Developer Advocate, I am always on lookout for better development experience and helping my Team find better tools so they can move faster. We have been looking at Quarkus and believe this provides far better development tooling than Spring. Couple this with Kotlin and one can go much faster prototyping/developing and testing code.

One area that took us some time to get used to is Quarkus’s emphasis (for good reasons) on using Reactive APIs. Since Quarkus is built on
Vertx, it makes perfect sense to go Reactive. Not to mention the usual benefits of not keeping your threads blocked.

Quarkus prefers to use Reactive pipelines and uses SmallRye Mutiny as it’s preferred vehicle of Reactivity.

On the other hand, Kotlin’s preferred way is to use Co-Routines.

This causes friction at times and sometimes it’s not obvious how to marry the two approaches.

Here are few scenarios we encountered and how we bridged the two wonderful worlds.

Reading a File:

import javax.enterprise.context.ApplicationScoped
import io.vertx.mutiny.core.Vertx
import io.vertx.mutiny.core.file.AsyncFileimport
import io.smallrye.mutiny.coroutines.awaitSuspending
@ApplicationScoped
class FileReader(
private val vertx: Vertx
) {
suspend fun readFile(filepath: String): String {
val (error, fileContents) = vertx.fileSystem()
.readFile("/tmp/myfile.txt")
.onItem().transform { b ->
Pair<String?,String?>(
null,
b.getString(0, b.length())
)
}.onFailure().recoverWithItem { e ->
Pair<String?,String?>(e.message, null)
}.awaitSuspending()
if (!error.isNullOrBlank()) {
throw RuntimeException(error)
}
return fileContents
}
}

Getting Web Response:

import javax.enterprise.context.ApplicationScoped
import io.vertx.mutiny.core.Vertx
import io.vertx.mutiny.core.file.AsyncFileimport
import io.smallrye.mutiny.coroutines.awaitSuspending
@ApplicationScoped
class WebReader(
private val vertx: Vertx
) {
private val webClient: WebClient = WebClient.create(vertx)

suspend fun getContents(fullUrl: String): String {
val body = webClient.postAbs(fullUrl)
.send()
.onItem().transform { it.bodyAsString()}
.onFailure.recoverWithItem { e -> e.message }
.awaitSuspending()
return body
}
}

The `awaitSuspending` comes from following in build.gradle: io.smallrye.reactive:mutiny-kotlin and is the key to connecting SmallRye with Kotlin Coroutines.

Performing Postgres Query:

Make sure to add following dependencies to your build.gradle:

io.quarkus:quarkus-reactive-pg-client
io.smallrye.reactive:smallrye-mutiny-vertx-web-client
io.smallrye.reactive:mutiny-kotlin

Code:

import javax.enterprise.context.ApplicationScoped
import io.smallrye.mutiny.Multi
import io.smallrye.mutiny.coroutines.asFlow
import io.vertx.mutiny.pgclient.PgPool
import io.vertx.mutiny.sqlclient.Tuple
import kotlinx.coroutines.flow.Flow
@ApplicationScoped
class DatabaseReactive(
private val pgPool: PgPool
) {
suspend fun performQuery(
selectStatment: String,
params: Tuple
): List<Map<String, Any>> {
return pgPool
.preparedQuery(selectStatment)
.execute(params)
.onItem().transformToMulti { Multi.createFrom().iterable(it) }
.onItem().transform { rs ->
mapOf(
"id" to rs.getLong("id"),
"columnA" to rs.getString("columnA"),
// .. perform other gets as per selectStatment
)
}.asFlow().toCollection(mutableListOf())
}
}

Reading/Writing from/to Kafka topic:

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.clients.producer.*
import org.apache.kafka.common.*
import java.time.Duration
import java.time.Instant
import java.util.Properties
import io.smallrye.mutiny.Uni
import io.smallrye.mutiny.coroutines.awaitSuspending
import javax.enterprise.context.ApplicationScoped
@ApplicationScoped
class Kafka {
suspend fun readFromTopic(
topic: String,
startOffset: Long,
partitionNumber: Int,
broker: String,
consumerGroupName: String,
readDurationInMillis: Long,
): Flow<ConsumerRecord<String, ByteArray>> {
val endTime = Instant.now().plusMillis(readDurationInMillis)
return flow {
createKafkaConsumer(broker, consumerGroupName)
.use { kafkaConsumer ->
seekToOffset(kafkaConsumer, topic,
partitionNumber, startOffset)
while (Instant.now() < endTime) {
val records = kafkaConsumer.poll(5000)
records.forEach { emit(it) }
}
}
}
}

suspend fun writeToTopic(
topic: String,
key: String? = null,
value: ByteArray,
headers: Map<String, String> = emptyMap(),
): String? {
return createKafkaProducer(
Configuration.kafkaBootstrap
).use { producer ->
val record = ProducerRecord(topic, key, value)
headers.entries.forEach { entry ->
record.headers().add(
entry.key, entry.value.toByteArray()
)
}
val uni: Uni<String?> = Uni.createFrom()
.emitter<String?> { em ->
producer.send(record) { rs, e: Exception? ->
if (e != null) {
em.fail(e)
} else {
em.complete(null)
}
}
}.onFailure().recoverWithUni { _ ->
Uni.createFrom().item(
"error publishing key $key to topic $topic"
)
}
uni.awaitSuspending()
}
}
private fun seekToOffset(
kafkaConsumer: KafkaConsumer<String, ByteArray>,
topic: String,
partition: Int?,
startOffset: Long?
) {
if (partition != null && startOffset != null) {
val topicPartition = TopicPartition(topic, partition)
kafkaConsumer.assign(setOf(topicPartition))
kafkaConsumer.seek(topicPartition, startOffset)
}
}
}

--

--

Praveen Ray
Praveen Ray

Written by Praveen Ray

System Design, Leading High Performance Teams

No responses yet