A Simple Quarkus App with Kafka and SmallRye Mutiny

Praveen Ray
3 min readJun 26, 2021

--

Kafka Java Libraries are futures based whereas Quarkus uses SmallRye reactive library. It’s not always obvious how to combine the two to build a reactive pipeline. Here’s an example application which shows one way to do it.

Although Quarkus has an extension which makes working with Kafka fairly easy, like all abstractions, it hides much of lower level Kafka details. Sometimes, it’s useful to use lower level APIs to understand the implementation details.

The sample application is a command line application subscribing to an input reactive stream. This stream is based off a kafka topic. A message on the topic is expected to point to a file path. Upon arrival of the message, it reads the file using Vertx async file operations and creates a new reactive stream with lines from the file. Each line is then converted to JSON and sent over to an output Kafka Topic.

The interesting bits are :

  • How to use the CSV parser library inside Quarkus while the file read is done via non-blocking file I/O APIs
  • How to marry Kafka’s Futures based API to SmallRye Mutiny

CSV Parsing using Apache commons CSV parser is quite easy if done imperatively. However, reading a file in non-blocking fashion has to be done via callbacks and state must be maintained to accumulate bytes until a full line has been read. Once a line is available, we can feed it to the Parser. Here’s the code in Kotlin:

fun readAccountsFile(path: Path): Multi<Map<String,Any>> {
val uni = vertx.fileSystem().open(path.toAbsolutePath().toString(), OpenOptions().setRead(true))
var intermediateBuffer = AtomicReference(Buffer.buffer())
val header = AtomicReference<List<String>>(null)
return uni.onItem().transformToMulti { file ->
file.setReadBufferSize(10*1024)
file.toMulti()
}.onItem().transformToMultiAndConcatenate { buffer ->
val (listOfBuffers, lastBuffer) = splitIntoLines(intermediateBuffer.get().appendBuffer(buffer), emptyList())
intermediateBuffer.set(lastBuffer)
Multi.createFrom().items(*listOfBuffers.toTypedArray())
}.map {
val str = it.toString()

logger.info("LINE: $str")
val csvParser = CSVFormat.DEFAULT.parse(StringReader(str))
val records = csvParser.records
if (records.isNotEmpty()) {
val record = records.first()
val fields = record.toList()
if (!header.compareAndSet(null, fields)) {
val headerFields = header.get()
fields.foldIndexed(emptyMap<String, Any>()) { index, map, field ->
map.plus(headerFields[index] to field)
}
} else emptyMap()
} else emptyMap()
}.filter { it.isNotEmpty() }
}

Important bit is the call to ‘transformToMultiAndConcatenate’ . This accepts a partial byte buffer from vertx and splits into lines. Each line is then further fed to next stage in the reactive pipeline. Any bytes which do not yet have seen the End of Line are kept in a temporary variable ‘intermediateBuffer’ to be used during next callback from file reader.

Using this Multi is fairly easy:

readAccountsFile(filePath).onItem().transform { map ->
println("One line from file: $map")
}

Let’s read the file and write to a Kafka Topic using Kafka’s Java API (which is future based):

val topicName = "output.topic"val jackson = ObjectMapper()readAccountsFile(filePath).onItem().transformToUniAndConcatenate { row->
val props = createProducerProps(kafkaBroker, false)
val producer = KafkaProducer<String,String>(props)
val record = ProducerRecord<String, String>(topicName, UUID.randomUUID().toString(), jackson.writeValueAsString(data))
Uni.createFrom().emitter { em ->
producer.send(record) { rs, e: Exception? ->
if (e != null) {
em.fail(e)
} else {
em.complete(
"Published at partition ${rs.partition()}:${rs.offset()}"
)
}
}
}

}

The important bit here is using Emitter to defer publishing the message until after actual send has happened.

The full code is here.

--

--