fs2-kafka introduction: deserialization with careful error handling

20 minute read

FS2 Kafka is a great library that simplifies reasoning around Kafka by representing event flow as fs2 streams. Once I started working with it, I found that there were few public “slow start” snippets. So I take a chance to contribute and make the initial tech investigation easier for the next person. The goal of the post is to provide a template with thorough error handling. High-level flow is the following:

High-level flow

It is simple as it seems. The application should consume messages from the input topic, apply some business logic, and send results to one of the output topics. Key addressed points:

  • Input is not a single-schema topic; the app should support multiple types of messages.
  • The app should meaningfully report low-level errors and provide means to raise domain-related errors.
  • Each message ends with a reported result (expected output or error). There is no intention to present that flow as a pattern by itself. It can make sense in some circumstances, but the key point here is how to tackle the range of related tasks.

The first part will not touch on the distribution of results (over multiple topics). The topic for today is only the step-by-step deserialization of a single schema with careful error management.

Naive example

Whether we want it, we ought to handle low-level deserialization. The library provides a high-level abstraction to move with a happy path. However, it makes it necessary to think about error handling at the top of the pipeline. Let me quickly show the problem. ConsumerSettings is where you fix the deserialization of raw values:

def processRecord(record: ConsumerRecord[String, String]): IO[Unit] =
  IO.println(record)
  
type Input = String  
  
val consumerSettings =  
  ConsumerSettings[IO, String, Input]  
    .withAutoOffsetReset(AutoOffsetReset.Earliest)  
    .withBootstrapServers("localhost:29092")  
    .withGroupId("group")   
  
val stream: fs2.Stream[IO, Unit] =  
  KafkaConsumer  
    .stream(consumerSettings)  
    .subscribeTo("topic")  
    .records  
    .evalMap { committable =>  
      processRecord(committable.record)  
    }

That stream will decode incoming messages as String, and you shouldn’t think much about exceptional values. The constructor of ConsumerSettings expects an implicit RecordDeserializer that will convert raw bytes into meaningful values.

implicit valueDeserializer: RecordDeserializer[F, V]

You can find the snippet here.

What happens if we don’t care about failures?

Let’s see what happens when input is unexpected.

We can change the expected type: type Input = UUID. There is an implicit built-in Deserializer for UUID. Hence, Input type is the only thing you need to tune to consume UUID instead of String.

val consumerSettings =  
  ConsumerSettings[IO, String, Input]  
    .withAutoOffsetReset(AutoOffsetReset.Earliest)  
    .withBootstrapServers("localhost:29092")  
    .withGroupId("group")  
  
val producerSettings =  
  ProducerSettings[IO, String, String]  
    .withBootstrapServers("localhost:29092")  
    .withProperty("topic.creation.enable", "true")  
  
val produce = KafkaProducer  
  .resource(producerSettings)  
  .use(_.produceOne(ProducerRecord("topic1", "key", "value")).flatten)  
  
val stream: fs2.Stream[IO, Unit] =  
  KafkaConsumer  
    .stream(consumerSettings)  
    .subscribeTo("topic1")  
    .records  
    .evalMap { committable =>  
      processRecord(committable.record)  
    }

You can find the snippet here. After running produce *> stream.compile.drain.as(ExitCode.Success), we observe that the stream shuts down due to a predictable deserialization error:

java.lang.IllegalArgumentException: Invalid UUID string: value
	at java.base/java.util.UUID.fromString1(UUID.java:280)

At that point, we face the choice:

  • Handle such occurrences on the application level. That means that we expect a failing of fs2.Stream with subsequent re-start of the stream. The downside here is the reallocation of resources (connection to Kafka, to be precise). It sounds sensible when you fully control input; that means you’re responsible for building messages, and invalid content is exceptional (and most likely never going to happen).
  • Making the transformation from raw value more ambiguous with explicit handling of unexpected values. That means we start to suspect any input of being invalid. In that scenario, applied conversion doesn’t throw an error. Instead, the deserialization step has output, which should not be considered pure valid input values.

In that post, I want to focus on the second option.

Deserialization steps

Let us have the case with headers and JSON body. And JSON represents some expected entity.

Steps

We have three phases:

  1. Decoding headers. We tackle a simple key-value structure and look for known keys.
  2. Decoding body to JSON. A valid JSON structure is required at that step.
  3. Conversion from JSON to an entity. JSON should represent a specified entity.

And the common intention is to carry errors with the help of error-handling structures (like Either).

Let’s repair our example with the app that shuts down due to an invalid UUID in the message.

Built-in instances of Deserializer

We used a UUID deserializer encoded in the library:

implicit def uuid[F[_]](implicit F: Sync[F]): Deserializer[F, UUID] =  
  Deserializer.string[F].map(UUID.fromString).suspend

Turning into Either

What is cool is that we can “turn on” catching exceptions by saying that we expect Either.

There is an implicit function that prepares an instance of Deserializer[F, Either[Throwable, A]] for any given Deserializer[F, A]

/**  
  * The attempt [[Deserializer]] try to deserialize to type `A`,  
  * When it fails returns `Left` containing the exception, otherwise returns `Right` with the value `A`  
  */  
implicit def attempt[F[_], A](  
  implicit deserializer: Deserializer[F, A]  
): Deserializer[F, Either[Throwable, A]] =  
  deserializer.attempt

With all of that, the only change we have to make is to change the specified type for apply call of ConsumerSettings:

ConsumerSettings[IO, String, Either[Throwable, Input]]

Since we explicitly define the signature of processRecord, we need to change the type there:

def processRecord(record: ConsumerRecord[String, Either[Throwable, Input]]): IO[Unit] =  
  IO.println(record)

After that, an invalid record won’t cause an app crash, and we’ll observe log lines:

ConsumerRecord(
  topic = topic1, 
  value = Left(java.lang.IllegalArgumentException: Invalid UUID string: value)
)

You can find the snippet here.

Converting raw body into JSON

We tackled UUID, time for JSON! I added circe-core and circe-parser libraries to dependencies.

With the updated input type type Input = Json we ought to bring an instance of Deserializer.

implicit val deserializer = Deserializer.string[IO]  
  .map(io.circe.parser.parse)  
  .flatMap(_.fold(GenericDeserializer.fail[IO, Json], GenericDeserializer.const[IO, Json]))  
  .attempt
  1. parse returns Either exception or Json
  2. fold allows to “merge” that Either into GenericDeserializer
  3. GenericDeserializer delegates raising errors to IO, and we need to call attempt to have Either again

Transformation to an entity

Time to transform valid Json into an entity.

case class Example(value: Int)  
  
type Input = Example

Decoder will be required on circe side:

implicit object Example {  
  implicit val decoder: Decoder[Example] = Decoder.instance(_.get[Int]("value").map(Example.apply))  
}

Deserializer needs an additional transformation from Json to Example:

.flatMap(  
  _.as[Example]  
    .fold(  
      GenericDeserializer.fail[IO, Example],  
      GenericDeserializer.const[IO, Example]  
    )  
)

You can find the snippet here.

Generic Deserializer

Actually, we can write a generic transformation for any class that has Decoder.

implicit def deserializer[A: Decoder] = Deserializer.string[IO]  
  .map(io.circe.parser.parse)  
  .flatMap(_.fold(GenericDeserializer.fail[IO, Json], GenericDeserializer.const[IO, Json]))  
  .flatMap(_.as[A].fold(GenericDeserializer.fail[IO, A], GenericDeserializer.const[IO, A]))  
  .attempt

Cool, now we can produce record """ {"value" : 42 } """ and receive consumer record Right(Example(42)).

You can find the snippet here.

We have no error model yet. That means we have no dedicated types to differentiate errors that occurred at different deserialization stages. It seems sensible to care about it a little bit later. We have another stage to consider. So, we postponed the final refinement; it is time to switch to Kafka headers.

Headers and error management

fs2.kafka.GenericDeserializer has “verbose” constructor to take into account all available metadata:

  • topic
  • headers
  • body
def instance[F[_], A](  
  f: (String, Headers, Array[Byte]) => F[A]  
)(implicit F: Sync[F]): Deserializer[F, A] =

And there is a function that uses generic instance but focuses only on headers:

def headers[F[_], A](f: Headers => Deserializer[F, A])(implicit F: Sync[F]): Deserializer[F, A] =  
  Deserializer.instance { (topic, headers, bytes) =>  
    f(headers).deserialize(topic, headers, bytes)  
  }

It is also time to bring error model to not move it to explicit iteration. We already touched all deserialization steps, let us be careful with raising errors. Our error cases will form ADT:

sealed trait DeserializationError extends NoStackTrace

headers of Headers type is simple key-value structure. There is an apply that returns Option[Header] result.

Deserializer.headers(headers =>  
  headers("correlation_id")  
    .map(header =>  
      header...   
    )
)

Before we dive into the header itself, I want to notice that we should think about situation when the header is not presented. We need to raise an error in case we have None: headers("correlation_id").map(...).getOrElse(???).

So, time to introduce the first error case:

case class NoHeaderFound(name: String, headers: Headers) extends DeserializationError {  
  override def toString: String =  
    s"Header-not-found, headers[${headers.toChain.toList.map(_.key).mkString(",")}]"  
}

Once we don’t observe “correlation_id”, we fail deserialization with brand-new NoHeaderFound.

headers("correlation_id")
  .getOrElse(GenericDeserializer.fail[IO, UUID](NoHeaderFound("correlation_id", headers)))

Deserialization of Header

We can tackle the Header instance in the same way as the body. Library provides convenient function attemptAs:

final def attemptAs[A](  
  implicit deserializer: HeaderDeserializer.Attempt[A]  
): Either[Throwable, A] =  
  deserializer.deserialize(value)

HeaderDeserializer is decoupled from a generic deserializer but provides similar functionality. The UUID-related instance is in place; hence we can call header.attemptAs[UUID] without any additional manipulations.

header
  .attemptAs[UUID]  
  .fold(  
    error => ???,  
    GenericDeserializer.const[IO, UUID]  
  )  
)  

And finally, we need a structure to report the case when UUID-deserialization fails.

case class HeaderDeserializationError(name: String, header: Header, e: Throwable)  
    extends DeserializationError {  
  override def toString: String =  
    s"Header-deserialization-error, header:[${header.key()}->${header.as[String]}], cause: [${e.getMessage}]"  
}

The whole snippet looks like this.

headers(CorrelationId)  
  .map(header =>  
    header  
      .attemptAs[UUID]  
      .fold(  
        error =>  
          GenericDeserializer  
            .fail[IO, UUID](HeaderDeserializationError(CorrelationId, header, error)),  
        GenericDeserializer.const[IO, UUID]  
      )  
  )  
  .getOrElse(GenericDeserializer.fail[IO, UUID](NoHeaderFound(CorrelationId, headers)))

Rest of error cases

But we deserialized only headers; the whole serializer is not ready yet. If we have two sources for building an entity, we can’t rely only on the body decoder.

To have a precise model, we can have an entity for the body itself and a “rich” entity that comprises properties from all sources.

We can imagine that only correlation id is being passed through the headers.

We can fix the property of having correlation id as trait that any target entity should implement.

trait Traceable {  
  def correlationId: UUID  
}  

So, we’ll have Body: Decoder for body and Rich <: Traceable for a target entity. And Rich might be a result of computation over A and UUID

def deserializer[Body: Decoder, Rich <: Traceable](  
    f: (Body, UUID) => Rich  
): Deserializer[IO, Either[DeserializationError, Rich]] 

Now we need to tune the generic deserializer we had to reason in terms of DeserializationError.

First, we ought to tackle strings that are not JSONs:

case class InvalidJson(rawBody: String, cause: ParsingFailure) extends DeserializationError {  
  override def toString: String =  
    s"Invalid-json, [$rawBody] is not a json, cause: ${cause.getMessage}"  
}

Full folding looks like this:

rawBody =>  
    io.circe.parser  
      .parse(rawBody)  
      .fold(  
        error => 
          GenericDeserializer.fail[IO, Json](InvalidJson(rawBody, error)),  
        GenericDeserializer.const[IO, Json]  
      )  

Invalid entity case is union of raw JSON and error:

case class InvalidEntity(jsonBody: Json, cause: DecodingFailure) extends DeserializationError {  
  override def toString: String =  
    s"Invalid-entity, [${jsonBody.noSpaces}], cause: ${cause.getMessage()}]"  
}
json =>  
  json  
    .as[Body]  
    .fold(  
      decodingFailure =>  
        GenericDeserializer.fail[IO, Body](InvalidEntity(json, decodingFailure)),  
      GenericDeserializer.const[IO, Body]  
    )  

Finalized deserializer

The whole deserializer could be assembled now:

def deserializer[Body: Decoder, Rich <: Traceable](  
    f: (Body, UUID) => Rich  
): Deserializer[IO, Either[DeserializationError, Rich]] =  
  Deserializer  
    .headers(headers =>  
      headers(CorrelationId)  
        .map(header =>  
          header  
            .attemptAs[UUID]  
            .fold(  
              error =>  
                GenericDeserializer  
                  .fail[IO, UUID](  
                    HeaderDeserializationError(CorrelationId, header, error)  
                  ),  
              GenericDeserializer.const[IO, UUID]  
            )  
        )  
        .getOrElse(  
          GenericDeserializer.fail[IO, UUID](NoHeaderFound(CorrelationId, headers))  
        )  
        .flatMap(correlationId =>  
          Deserializer  
            .string[IO]  
            .flatMap(rawBody =>  
              io.circe.parser  
                .parse(rawBody)  
                .fold(  
                  error => GenericDeserializer.fail[IO, Json](InvalidJson(rawBody, error)),  
                  GenericDeserializer.const[IO, Json]  
                )  
            )  
            .flatMap(json =>  
              json  
                .as[Body]  
                .fold(  
                  decodingFailure =>  
                    GenericDeserializer  
                      .fail[IO, Body](InvalidEntity(json, decodingFailure)),  
                  GenericDeserializer.const[IO, Body]  
                )  
            )  
            .map(f(_, correlationId))  
        )  
    )  
    .attempt  
    .map(_.left.map {  
      case expected: DeserializationError => expected  
      case unexpected                     => UnexpectedError(unexpected)  
    })

You can find the snippet here.

It wouldn’t look awkward if we don’t care about dedicated errors. My stance here is that such separation is vital for study case. And for real program everybody needs to asses “paranoia level” and set up appropriate level of verbosity.

Updated domain model

Reminder, we need to have implementation of Traceable with UUID property.

case class Example(value: Int) {
  def traceable(correlationId: UUID) = TraceableExample(value, correlationId)
}

case class TraceableExample(value: Int, correlationId: UUID) extends Traceable

And derivation of Deserializer is as simple as:

implicit val traceableExampleDeserializer
    : Deserializer[IO, Either[DeserializationError, TraceableExample]] =
  deserializer[Example, TraceableExample](_ traceable _)

Test runs

Time for test runs!

Put message without correlation_id header

We observe Left(Header-not-found, headers[]) entry in the logs.

Put message with invalid correlation_id

ProducerRecord now comprises “correlation_id”.

ProducerRecord("topic1", "key", """{"value" : 42 }""")  
  .withHeaders(Headers(Header[String]("correlation_id", "hi")))
Left(Header-deserialization-error, header:[correlation_id->hi], cause: [Invalid UUID string: hi])

Valid headers with non-JSON in body

ProducerRecord("topic1", "key", """{""")  
  .withHeaders(  
    Headers(Header[String]("correlation_id", "123e4567-e89b-12d3-a456-426614174000"))  
  )
Left(Invalid-json, [{] is not a json, cause: exhausted input)

Valid JSON but not the entity

ProducerRecord("topic1", "key", """{}""")  
  .withHeaders(  
    Headers(Header[String]("correlation_id", "123e4567-e89b-12d3-a456-426614174000"))  
  )
Left(Invalid-entity, [{}], cause: Missing required field: DownField(value)])

All valid

ProducerRecord("topic1", "key", """ { "value" : 42 } """)  
  .withHeaders(  
    Headers(Header[String]("correlation_id", "123e4567-e89b-12d3-a456-426614174000"))  
  )
Right(TraceableExample(42,123e4567-e89b-12d3-a456-426614174000))

Summary

At that point, I want to wrap up the note.

We covered:

  • The notion of deserializers in fs2-kafka library.
  • The idea of tackling deserialization as a multi-step process.

The next steps are:

  • Processing valid entries and reporting errors to dedicated error-topic.
  • Considering options to consume multiple schemas from one topic.