fs2-kafka introduction: deserialization with careful error handling
FS2 Kafka is a great library that simplifies reasoning around Kafka by representing event flow as fs2 streams. Once I started working with it, I found that there were few public “slow start” snippets. So I take a chance to contribute and make the initial tech investigation easier for the next person. The goal of the post is to provide a template with thorough error handling. High-level flow is the following:
It is simple as it seems. The application should consume messages from the input topic, apply some business logic, and send results to one of the output topics. Key addressed points:
- Input is not a single-schema topic; the app should support multiple types of messages.
- The app should meaningfully report low-level errors and provide means to raise domain-related errors.
- Each message ends with a reported result (expected output or error). There is no intention to present that flow as a pattern by itself. It can make sense in some circumstances, but the key point here is how to tackle the range of related tasks.
The first part will not touch on the distribution of results (over multiple topics). The topic for today is only the step-by-step deserialization of a single schema with careful error management.
Naive example
Whether we want it, we ought to handle low-level deserialization. The library provides a high-level abstraction to move with a happy path. However, it makes it necessary to think about error handling at the top of the pipeline.
Let me quickly show the problem.
ConsumerSettings
is where you fix the deserialization of raw values:
def processRecord(record: ConsumerRecord[String, String]): IO[Unit] =
IO.println(record)
type Input = String
val consumerSettings =
ConsumerSettings[IO, String, Input]
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:29092")
.withGroupId("group")
val stream: fs2.Stream[IO, Unit] =
KafkaConsumer
.stream(consumerSettings)
.subscribeTo("topic")
.records
.evalMap { committable =>
processRecord(committable.record)
}
That stream will decode incoming messages as String
, and you shouldn’t think much about exceptional values.
The constructor of ConsumerSettings
expects an implicit RecordDeserializer
that will convert raw bytes into meaningful values.
implicit valueDeserializer: RecordDeserializer[F, V]
You can find the snippet here.
What happens if we don’t care about failures?
Let’s see what happens when input is unexpected.
We can change the expected type: type Input = UUID
.
There is an implicit built-in Deserializer
for UUID
. Hence, Input
type is the only thing you need to tune to consume UUID
instead of String
.
val consumerSettings =
ConsumerSettings[IO, String, Input]
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:29092")
.withGroupId("group")
val producerSettings =
ProducerSettings[IO, String, String]
.withBootstrapServers("localhost:29092")
.withProperty("topic.creation.enable", "true")
val produce = KafkaProducer
.resource(producerSettings)
.use(_.produceOne(ProducerRecord("topic1", "key", "value")).flatten)
val stream: fs2.Stream[IO, Unit] =
KafkaConsumer
.stream(consumerSettings)
.subscribeTo("topic1")
.records
.evalMap { committable =>
processRecord(committable.record)
}
You can find the snippet here.
After running produce *> stream.compile.drain.as(ExitCode.Success)
, we observe that the stream shuts down due to a predictable deserialization error:
java.lang.IllegalArgumentException: Invalid UUID string: value
at java.base/java.util.UUID.fromString1(UUID.java:280)
At that point, we face the choice:
- Handle such occurrences on the application level. That means that we expect a failing of fs2.Stream with subsequent re-start of the stream. The downside here is the reallocation of resources (connection to Kafka, to be precise). It sounds sensible when you fully control input; that means you’re responsible for building messages, and invalid content is exceptional (and most likely never going to happen).
- Making the transformation from raw value more ambiguous with explicit handling of unexpected values. That means we start to suspect any input of being invalid. In that scenario, applied conversion doesn’t throw an error. Instead, the deserialization step has output, which should not be considered pure valid input values.
In that post, I want to focus on the second option.
Deserialization steps
Let us have the case with headers and JSON body. And JSON represents some expected entity.
We have three phases:
- Decoding headers. We tackle a simple key-value structure and look for known keys.
- Decoding body to JSON. A valid JSON structure is required at that step.
- Conversion from JSON to an entity. JSON should represent a specified entity.
And the common intention is to carry errors with the help of error-handling structures (like Either
).
Let’s repair our example with the app that shuts down due to an invalid UUID in the message.
Built-in instances of Deserializer
We used a UUID deserializer encoded in the library:
implicit def uuid[F[_]](implicit F: Sync[F]): Deserializer[F, UUID] =
Deserializer.string[F].map(UUID.fromString).suspend
Turning into Either
What is cool is that we can “turn on” catching exceptions by saying that we expect Either
.
There is an implicit function that prepares an instance of Deserializer[F, Either[Throwable, A]]
for any given Deserializer[F, A]
/**
* The attempt [[Deserializer]] try to deserialize to type `A`,
* When it fails returns `Left` containing the exception, otherwise returns `Right` with the value `A`
*/
implicit def attempt[F[_], A](
implicit deserializer: Deserializer[F, A]
): Deserializer[F, Either[Throwable, A]] =
deserializer.attempt
With all of that, the only change we have to make is to change the specified type for apply
call of ConsumerSettings
:
ConsumerSettings[IO, String, Either[Throwable, Input]]
Since we explicitly define the signature of processRecord
, we need to change the type there:
def processRecord(record: ConsumerRecord[String, Either[Throwable, Input]]): IO[Unit] =
IO.println(record)
After that, an invalid record won’t cause an app crash, and we’ll observe log lines:
ConsumerRecord(
topic = topic1,
value = Left(java.lang.IllegalArgumentException: Invalid UUID string: value)
)
You can find the snippet here.
Converting raw body into JSON
We tackled UUID, time for JSON!
I added circe-core
and circe-parser
libraries to dependencies.
With the updated input type type Input = Json
we ought to bring an instance of Deserializer
.
implicit val deserializer = Deserializer.string[IO]
.map(io.circe.parser.parse)
.flatMap(_.fold(GenericDeserializer.fail[IO, Json], GenericDeserializer.const[IO, Json]))
.attempt
parse
returns Either exception or Json- fold allows to “merge” that Either into GenericDeserializer
- GenericDeserializer delegates raising errors to IO, and we need to call
attempt
to have Either again
Transformation to an entity
Time to transform valid Json
into an entity.
case class Example(value: Int)
type Input = Example
Decoder
will be required on circe side:
implicit object Example {
implicit val decoder: Decoder[Example] = Decoder.instance(_.get[Int]("value").map(Example.apply))
}
Deserializer
needs an additional transformation from Json
to Example
:
.flatMap(
_.as[Example]
.fold(
GenericDeserializer.fail[IO, Example],
GenericDeserializer.const[IO, Example]
)
)
You can find the snippet here.
Generic Deserializer
Actually, we can write a generic transformation for any class that has Decoder
.
implicit def deserializer[A: Decoder] = Deserializer.string[IO]
.map(io.circe.parser.parse)
.flatMap(_.fold(GenericDeserializer.fail[IO, Json], GenericDeserializer.const[IO, Json]))
.flatMap(_.as[A].fold(GenericDeserializer.fail[IO, A], GenericDeserializer.const[IO, A]))
.attempt
Cool, now we can produce record """ {"value" : 42 } """
and receive consumer record Right(Example(42))
.
You can find the snippet here.
We have no error model yet. That means we have no dedicated types to differentiate errors that occurred at different deserialization stages. It seems sensible to care about it a little bit later. We have another stage to consider. So, we postponed the final refinement; it is time to switch to Kafka headers.
Headers and error management
fs2.kafka.GenericDeserializer
has “verbose” constructor to take into account all available metadata:
- topic
- headers
- body
def instance[F[_], A](
f: (String, Headers, Array[Byte]) => F[A]
)(implicit F: Sync[F]): Deserializer[F, A] =
And there is a function that uses generic instance
but focuses only on headers:
def headers[F[_], A](f: Headers => Deserializer[F, A])(implicit F: Sync[F]): Deserializer[F, A] =
Deserializer.instance { (topic, headers, bytes) =>
f(headers).deserialize(topic, headers, bytes)
}
It is also time to bring error model to not move it to explicit iteration. We already touched all deserialization steps, let us be careful with raising errors. Our error cases will form ADT:
sealed trait DeserializationError extends NoStackTrace
headers
of Headers
type is simple key-value structure.
There is an apply
that returns Option[Header]
result.
Deserializer.headers(headers =>
headers("correlation_id")
.map(header =>
header...
)
)
Before we dive into the header itself, I want to notice that we should think about situation when the header is not presented. We need to raise an error in case we have None
: headers("correlation_id").map(...).getOrElse(???)
.
So, time to introduce the first error case:
case class NoHeaderFound(name: String, headers: Headers) extends DeserializationError {
override def toString: String =
s"Header-not-found, headers[${headers.toChain.toList.map(_.key).mkString(",")}]"
}
Once we don’t observe “correlation_id”, we fail deserialization with brand-new NoHeaderFound
.
headers("correlation_id")
.getOrElse(GenericDeserializer.fail[IO, UUID](NoHeaderFound("correlation_id", headers)))
Deserialization of Header
We can tackle the Header
instance in the same way as the body.
Library provides convenient function attemptAs
:
final def attemptAs[A](
implicit deserializer: HeaderDeserializer.Attempt[A]
): Either[Throwable, A] =
deserializer.deserialize(value)
HeaderDeserializer
is decoupled from a generic deserializer but provides similar functionality. The UUID
-related instance is in place; hence we can call header.attemptAs[UUID]
without any additional manipulations.
header
.attemptAs[UUID]
.fold(
error => ???,
GenericDeserializer.const[IO, UUID]
)
)
And finally, we need a structure to report the case when UUID-deserialization fails.
case class HeaderDeserializationError(name: String, header: Header, e: Throwable)
extends DeserializationError {
override def toString: String =
s"Header-deserialization-error, header:[${header.key()}->${header.as[String]}], cause: [${e.getMessage}]"
}
The whole snippet looks like this.
headers(CorrelationId)
.map(header =>
header
.attemptAs[UUID]
.fold(
error =>
GenericDeserializer
.fail[IO, UUID](HeaderDeserializationError(CorrelationId, header, error)),
GenericDeserializer.const[IO, UUID]
)
)
.getOrElse(GenericDeserializer.fail[IO, UUID](NoHeaderFound(CorrelationId, headers)))
Rest of error cases
But we deserialized only headers; the whole serializer is not ready yet. If we have two sources for building an entity, we can’t rely only on the body decoder.
To have a precise model, we can have an entity for the body itself and a “rich” entity that comprises properties from all sources.
We can imagine that only correlation id is being passed through the headers.
We can fix the property of having correlation id as trait
that any target entity should implement.
trait Traceable {
def correlationId: UUID
}
So, we’ll have Body: Decoder
for body and Rich <: Traceable
for a target entity. And Rich
might be a result of computation over A
and UUID
def deserializer[Body: Decoder, Rich <: Traceable](
f: (Body, UUID) => Rich
): Deserializer[IO, Either[DeserializationError, Rich]]
Now we need to tune the generic deserializer we had to reason in terms of DeserializationError
.
First, we ought to tackle strings that are not JSONs:
case class InvalidJson(rawBody: String, cause: ParsingFailure) extends DeserializationError {
override def toString: String =
s"Invalid-json, [$rawBody] is not a json, cause: ${cause.getMessage}"
}
Full folding looks like this:
rawBody =>
io.circe.parser
.parse(rawBody)
.fold(
error =>
GenericDeserializer.fail[IO, Json](InvalidJson(rawBody, error)),
GenericDeserializer.const[IO, Json]
)
Invalid entity case is union of raw JSON and error:
case class InvalidEntity(jsonBody: Json, cause: DecodingFailure) extends DeserializationError {
override def toString: String =
s"Invalid-entity, [${jsonBody.noSpaces}], cause: ${cause.getMessage()}]"
}
json =>
json
.as[Body]
.fold(
decodingFailure =>
GenericDeserializer.fail[IO, Body](InvalidEntity(json, decodingFailure)),
GenericDeserializer.const[IO, Body]
)
Finalized deserializer
The whole deserializer could be assembled now:
def deserializer[Body: Decoder, Rich <: Traceable](
f: (Body, UUID) => Rich
): Deserializer[IO, Either[DeserializationError, Rich]] =
Deserializer
.headers(headers =>
headers(CorrelationId)
.map(header =>
header
.attemptAs[UUID]
.fold(
error =>
GenericDeserializer
.fail[IO, UUID](
HeaderDeserializationError(CorrelationId, header, error)
),
GenericDeserializer.const[IO, UUID]
)
)
.getOrElse(
GenericDeserializer.fail[IO, UUID](NoHeaderFound(CorrelationId, headers))
)
.flatMap(correlationId =>
Deserializer
.string[IO]
.flatMap(rawBody =>
io.circe.parser
.parse(rawBody)
.fold(
error => GenericDeserializer.fail[IO, Json](InvalidJson(rawBody, error)),
GenericDeserializer.const[IO, Json]
)
)
.flatMap(json =>
json
.as[Body]
.fold(
decodingFailure =>
GenericDeserializer
.fail[IO, Body](InvalidEntity(json, decodingFailure)),
GenericDeserializer.const[IO, Body]
)
)
.map(f(_, correlationId))
)
)
.attempt
.map(_.left.map {
case expected: DeserializationError => expected
case unexpected => UnexpectedError(unexpected)
})
You can find the snippet here.
It wouldn’t look awkward if we don’t care about dedicated errors. My stance here is that such separation is vital for study case. And for real program everybody needs to asses “paranoia level” and set up appropriate level of verbosity.
Updated domain model
Reminder, we need to have implementation of Traceable
with UUID
property.
case class Example(value: Int) {
def traceable(correlationId: UUID) = TraceableExample(value, correlationId)
}
case class TraceableExample(value: Int, correlationId: UUID) extends Traceable
And derivation of Deserializer
is as simple as:
implicit val traceableExampleDeserializer
: Deserializer[IO, Either[DeserializationError, TraceableExample]] =
deserializer[Example, TraceableExample](_ traceable _)
Test runs
Time for test runs!
Put message without correlation_id header
We observe Left(Header-not-found, headers[])
entry in the logs.
Put message with invalid correlation_id
ProducerRecord
now comprises “correlation_id”.
ProducerRecord("topic1", "key", """{"value" : 42 }""")
.withHeaders(Headers(Header[String]("correlation_id", "hi")))
Left(Header-deserialization-error, header:[correlation_id->hi], cause: [Invalid UUID string: hi])
Valid headers with non-JSON in body
ProducerRecord("topic1", "key", """{""")
.withHeaders(
Headers(Header[String]("correlation_id", "123e4567-e89b-12d3-a456-426614174000"))
)
Left(Invalid-json, [{] is not a json, cause: exhausted input)
Valid JSON but not the entity
ProducerRecord("topic1", "key", """{}""")
.withHeaders(
Headers(Header[String]("correlation_id", "123e4567-e89b-12d3-a456-426614174000"))
)
Left(Invalid-entity, [{}], cause: Missing required field: DownField(value)])
All valid
ProducerRecord("topic1", "key", """ { "value" : 42 } """)
.withHeaders(
Headers(Header[String]("correlation_id", "123e4567-e89b-12d3-a456-426614174000"))
)
Right(TraceableExample(42,123e4567-e89b-12d3-a456-426614174000))
Summary
At that point, I want to wrap up the note.
We covered:
- The notion of deserializers in fs2-kafka library.
- The idea of tackling deserialization as a multi-step process.
The next steps are:
- Processing valid entries and reporting errors to dedicated error-topic.
- Considering options to consume multiple schemas from one topic.