fs2-kafka introduction: lib for ADT-events, idempotent writes and batches of offsets

22 minute read

FS2 Kafka is a great library that simplifies reasoning around Kafka by representing event flow as fs2 streams. Once I started working with it, I found that there were few public “slow start” snippets. So I take a chance to contribute and make the initial tech investigation easier for the next person. Previous part was about careful error handling.

Today I want to proceed with toy example of event sourcing. Agenda is the following:

  1. ADT encoding. Which questions should be addressed when you consider different approaches.
  2. How to tackle state mutations.
  3. Committing offsets.

Study case is simplified, but I hope that it can help to reason about a couple of important questions.

JSON

Events

Unbreakable rule

Having a single schema per topic would be great from a technical perspective. No diversity, just the same set of properties every time. It could be stated as best practice: avoid multiple types of messages in any abstract message queue.

But there is another rule which is even more unbreakable. Any events that need to stay in a fixed order must go in the same topic. This is a cite from Should You Put Several Event Types in the Same Kafka Topic? by Martin Kleppmann. Highly recommend it if you need to get more feeling about what can justify having different kinds of messages in a topic.

Putting Several Event Types in the Same Topic – Revisited unfolds the topic and shows how to use schema registries to formalize “one of” kind of schemas.

Schema registries are not the topic for today. Instead, our focus is on the notion of event flow.

Study case

Let us imagine that we have Kafka as a backend for event sourcing. Hence, our study-case messages will describe diverse subsequent updates of the state.

The simplest scenario is bank operations. We can have two of them to have a proper context:

  • withdrawal;
  • replenishment.

The reasoning about the importance of order should be free of explanations. To be precise, we talk about debit accounts; withdrawal is possible only when a sufficient amount is available on the account. Now, we can pay attention to the events themselves.

Any account operation should have a reference to an account.

Hence, we can fix that obligation on the root trait.

sealed trait AccountOperation {  
  def account: UUID  
}

And we have two operations that form our study-case ADT.

final case class WithdrawalOperation(
  account: UUID, 
  value: Long
) extends AccountOperation  
  
final case class ReplenishmentOperation(
  account: UUID, 
  value: Long
) extends AccountOperation

ADT serialization

circe-extras

Surprisingly, there are no well-established conventions regarding ADT serialization. For instance, Circe proposes usage of class names as discriminator values. So, you are supposed to have a configuration like the following:

implicit val config = Configuration.default.withDiscriminator("operation_type")

The derived codec will form such JSONs.

{ 
  "operation_type" : "WithdrawalOperation" , 
  "account": "123e4567-e89b-12d3-a456-426614174000",
  "value": 200 
}

My stance here is that class names cause multiple problems:

  1. When communicating with other teams, you must negotiate patterns and things like event types. “We can’t consume some_event:v1 because we can’t name class in that way” sounds unprofessional.
  2. Even if we work in terms of perfectly shaped Domain Driven Design methodology, there could be a demand for refactoring. And “don’t dare to rename classes” is a severe limitation.
  3. It is necessary to explicitly care about the “immutability” of class names. For instance, maintain an isolated project with tests that do (de)serialization round-trips. The thesis here is that control over non-changing class names requires additional attention.
  4. Versioning becomes difficult. Let’s assume that we need to enrich WithdrawalOperation with additional property. The class handles the same semantics, the name should be the same, but you can’t rename the original WithdrawalOperation to WithdrawalOperationV1 or LegacyWithdrawalOperation. The best solution here is to append V1 suffix to every class name.

That’s why I prefer to keep discriminator values as explicitly written constants.

circe-tagged-adt-codec

abdolence/circe-tagged-adt-codec brings ability to specify event type as constant in annotations.

sealed trait TestEvent

@JsonAdt("my-event-1") 
case class MyEvent1(anyYourField : String) 

Resulting JSON looks in the following way:

{
  "type" : "my-event-1",
  "anyYourField" : "my-data"
}

self-crafted circe-discriminator-codec

I wanted to extend provided functionality with two specific features:

  1. Encoding JSONs with snake_case.
  2. Easy configuration of property name treated as a discriminator. I had two concerns regarding hard-coded "type":
    • Just like the value of the discriminator, the property name could be the subject of discussion with third parties. Or we can demand bringing domain-specific naming.
    • Any universal name can collide with the property of an entity.

So, here is the fork: antonkw/circe-discriminator-codec.

Feel free to play around with it, released artifact is "io.github.antonkw" %% "circe-tagged-adt-codec" % "0.0.7-SNAPSHOT". It is located in the snapshots of in s01.oss.sonatype.org.

Now, we put JsonAdt annotation with the target discriminator:

@JsonAdt("withdrawal:v1")  
final case class WithdrawalOperation(
  account: UUID, 
  value: Long
) extends AccountOperation  
  
@JsonAdt("replenishment:v1")  
final case class ReplenishmentOperation(
  account: UUID, 
  value: Long
) extends AccountOperation

Finally, we add a decoder.

object AccountOperation extends AutoDerivation {  
  implicit val accountOperationDecoder: Decoder[AccountOperation] =  
    JsonTaggedAdtCodec.createDecoder("operation_type")   
}  

Let’s give us a try and run the app we developed previously with AccountOperation as input type.

fs2-kafka-study-case/tree/main/src/it/docker has Docker configuration for Kafka.

docker-compose up will run instance for you.

fs2-kafka-study-case/../DeserializationDemoApp7.scala is the app for the first iteration of the post.

KafkaProducer snippet

I just put raw JSONs as string to show what is being supplied.

    val producerSettings =
      ProducerSettings[IO, String, String]
        .withBootstrapServers("localhost:29092")
        .withProperty("topic.creation.enable", "true")

    val produce = KafkaProducer
      .resource(producerSettings)
      .use(
        _.produce(
          ProducerRecords(
            List(
              ProducerRecord(
                "topic1",
                "key",
                """
                  |{
                  |  "operation_type" : "replenishment:v1" ,
                  |  "account": "123e4567-e89b-12d3-a456-426614174000",
                  |  "value": 200
                  |}
                  |""".stripMargin
              ),
              ProducerRecord(
                "topic1",
                "key",
                """
                  |{
                  | "operation_type" : "withdrawal:v1" ,
                  | "account": "123e4567-e89b-12d3-a456-426614174000",
                  | "value": 100
                  |}
                  |""".stripMargin
              )
            )
          )
        ).flatten
      )
KafkaConsumer snippets

I use deserializer I wrote in the previous part.

    def deserializer[Body: Decoder]: Deserializer[IO, Either[DeserializationError, Body]] =
      Deserializer
        .string[IO]
        .flatMap(rawBody =>
          io.circe.parser
            .parse(rawBody)
            .fold(
              error => GenericDeserializer.fail[IO, Json](InvalidJson(rawBody, error)),
              GenericDeserializer.const[IO, Json]
            )
        )
        .flatMap(json =>
          json
            .as[Body]
            .fold(
              decodingFailure =>
                GenericDeserializer
                  .fail[IO, Body](InvalidEntity(json, decodingFailure)),
              GenericDeserializer.const[IO, Body]
            )
        )
        .attempt
        .map(_.left.map {
          case expected: DeserializationError => expected
          case unexpected                     => UnexpectedError(unexpected)
        })

Now, we derive ConsumerSettings

type Input = AccountOperation

implicit val accountOperationDeserializer: 
  Deserializer[IO, Either[DeserializationError, Input]] =
    deserializer[Input]
    
val consumerSettings =
  ConsumerSettings[IO, String, Either[DeserializationError, Input]]
    .withAutoOffsetReset(AutoOffsetReset.Earliest)
    .withBootstrapServers("localhost:29092")
    .withGroupId("group")

Finally, KafkaConsumer is in working state.

val stream: fs2.Stream[IO, Unit] =  
  KafkaConsumer  
    .stream(consumerSettings)  
    .subscribeTo("topic1")  
    .records  
    .evalMap { committable =>  
      processRecord(committable.record)  
    }

We feed two operations:

{
    "operation_type": "replenishment:v1",
    "account": "123e4567-e89b-12d3-a456-426614174000",
    "value": 200
},
{
    "operation_type": "withdrawal:v1",
    "account": "123e4567-e89b-12d3-a456-426614174000",
    "value": 100
}

Successfully decoded lines appear in the console.

ConsumerRecord(   
  Right(
    ReplenishmentOperation(
      123e4567-e89b-12d3-a456-426614174000,
      200)
    )
  )
)

ConsumerRecord(
  Right(
    WithdrawalOperation(
      123e4567-e89b-12d3-a456-426614174000,
      100
    )
  )
)

Events recap

  1. Our decoding mechanism allows parsing ADTs according to specified discriminator values.
  2. Kafka provides guarantees and preserves chronological order of operations. When we tune partitioning, we must remember the importance of ordering for accounts.

Handling state

We have the flow of incoming operations. Time to build a state!

Model

AccountUpdated confirms the applied operation. amount is the state of the account after an update.

case class AccountUpdated(accountId: UUID, amount: Long)

Also, we must care about predicaments when withdrawal cannot be made.

sealed trait BankError extends NoStackTrace  
  
case class AccountNotFound(accountId: UUID) extends BankError  
  
case class InsufficientAmount(accountId: UUID, actual: Long) extends BankError  

Now we have everything to describe BankState:

trait BankState {  
  def withdraw(  
      account: UUID,  
      withdrawal: Long  
  ): IO[Either[BankError, AccountUpdated]]  
    
  def replenish(account: UUID, replenishment: Long): IO[AccountUpdated]  
}

Ref-based implementation

Ref is a good choice to handle an in-memory state in a potentially concurrent environment.Now we have everything to describe BankState:

object BankState {  
  def make: IO[BankState] = IO.ref[Map[UUID, Long]](Map.empty).map { ref =>  
    new BankState {  
      def withdraw(  
          account: UUID,  
          withdrawal: Long  
      ): IO[Either[BankError, AccountUpdated]] =  
        ref.modify(state =>  
          state.get(account) match {  
            case Some(amount) =>  
              if (amount >= withdrawal)  
                state.updated(account, amount - withdrawal) -> AccountUpdated(  
                  account,  
                  amount - withdrawal  
                ).asRight  
              else  
                state -> InsufficientAmount(account, amount).asLeft  
            case None => state -> AccountNotFound(account).asLeft  
          }  
        )  
  
      def replenish(account: UUID, replenishment: Long): IO[AccountUpdated] =  
        ref.modify { state =>  
          val updatedAmount: Long = state.getOrElse(account, 0L) + replenishment  
          state.updated(account, updatedAmount) -> AccountUpdated(account, updatedAmount)  
        }  
    }  
  }  
}

Time to update processRecord:

def processRecord(bankState: BankState)(  
    record: ConsumerRecord[String, Either[Throwable, AccountOperation]]  
): IO[Unit] =  
  record.value match {  
    case Left(value) => IO.println(value)  
    case Right(operation) =>  
      operation match {  
        case WithdrawalOperation(account, value) =>  
          bankState.withdraw(account, value).flatMap(IO.println)  
        case ReplenishmentOperation(account, value) =>  
          bankState.replenish(account, value).flatMap(IO.println)  
      }  
  }

Building program

val stream is replaced by def program:

def program(state: BankState): fs2.Stream[IO, Unit] =  
  KafkaConsumer  
    .stream(consumerSettings)  
    .subscribeTo("topic1")  
    .records  
    .evalMap { committable =>  
      processRecord(state)(committable.record)  
    }

And before running program we need to make the state.

fs2.Stream.eval(BankState.make).flatMap(program)

We run test case and observe that event are applied to the state:

AccountUpdated(123e4567-e89b-12d3-a456-426614174000,200)
Right(AccountUpdated(123e4567-e89b-12d3-a456-426614174000,100))

at-least-once consumption

The next step is to do something meaningful with the result.

We need to commit offsets. Otherwise, the app will reprocess all events from the beginning after every deployment.

But we need to do another re-iteration before do any offset commitments.

Once you consume an event, you need to understand what happens when a piece of logic (that processes an individual event) doesn’t reach the target.

You need to decide what semantics of event processing works for you:

  • at-least-once;
  • at-most once
  • exactly-once.

The first thought is always “yeah, exactly-once sound like what we need.” In reality, it is a big deal.

idempotent writes

For bank operations in our toy example, I suggest picking up at-least-once approach with making an update idempotent. For instance, an operation accompanied by sequenceId and account state comprises latestSequenceId.

Let’s do it!

Updated trait:

sealed trait AccountOperationV2 {  
  def account: UUID  
  def sequenceId: Long  
}

Operations have sequenceId property now:

@JsonAdt("withdrawal:v2")  
final case class WithdrawalOperationV2(
  account: UUID, 
  value: Long, 
  sequenceId: Long
) extends AccountOperationV2  
  
@JsonAdt("replenishment:v2")  
final case class ReplenishmentOperationV2(
  account: UUID, 
  value: Long, 
  sequenceId: Long
) extends AccountOperationV2

Please pay attention that we’re free not to change class names. Only operation types in annotations are required to be updated.

Single number can’t describe necessary state of an account, here is AccountState:

case class AccountState(
  amount: Long, 
  latestSequenceId: Long
)

Standalone AccountUpdated as the resulting type also looks insufficient now. Re-processing multiple times isn’t a reason to notify interested parties about numerous updates. That’s not an error too. It is not a business error because our retries are separate from the business flow. And technically, there is nothing wrong with ignoring operations that are retried because some node failed and didn’t commit offset for already processed records. So, we tackle expected result.

Hence, AccountUpdated is not only one possible result of processing.

sealed trait AccountOperationResult {  
  def accountId: UUID  
  def sequenceId: Long  
}  

case class AccountUpdated(
  accountId: UUID, 
  amount: Long, 
  sequenceId: Long
) extends AccountOperationResult
      
case class OperationIgnored(
  accountId: UUID, 
  sequenceId: Long, 
  actualSequence: Long
) extends AccountOperationResult

BankState requires upgrade as well.

trait BankState {  
  def withdraw(  
      withdrawal: WithdrawalOperation  
  ): IO[Either[BankError, AccountOperationResult]]  
  
  def replenish(replenishment: ReplenishmentOperation): IO[AccountOperationResult]  
}

Time to refactor functions. We need to check whether latestSequenceId of the account state precedes sequenceId of the operation.

def withdraw(  
    withdrawal: WithdrawalOperation  
): IO[Either[BankError, AccountOperationResult]] =  
  ref.modify(state =>  
    state.get(withdrawal.account) match {  
      case Some(accountState) =>  
        if (accountState.latestSequenceId >= withdrawal.sequenceId)  
          (  
            state,  
            OperationIgnored(  
              withdrawal.account,  
              withdrawal.sequenceId,  
              accountState.latestSequenceId  
            ).asRight  
          )  
        else if (accountState.amount >= withdrawal.value)  
          (  
            state.updated(  
              withdrawal.account,  
              AccountState(  
                amount = accountState.amount - withdrawal.value,  
                latestSequenceId = withdrawal.sequenceId  
              )  
            ),  
            AccountUpdated(  
              withdrawal.account,  
              accountState.amount - withdrawal.value,  
              withdrawal.sequenceId  
            ).asRight  
          )  
        else  
          state -> InsufficientAmount(withdrawal.account, withdrawal.value).asLeft  
      case None => state -> AccountNotFound(withdrawal.account).asLeft  
    }  
  )
def replenish(replenishment: ReplenishmentOperation): IO[AccountOperationResult] =  
  ref.modify { state =>  
    state.get(replenishment.account) match {  
      case Some(accountState) =>  
        if (accountState.latestSequenceId >= replenishment.sequenceId)  
          (  
            state,  
            OperationIgnored(  
              replenishment.account,  
              replenishment.sequenceId,  
              accountState.latestSequenceId  
            )  
          )  
        else {  
          (  
            state.updated(  
              replenishment.account,  
              AccountState(  
                amount = accountState.amount + replenishment.value,  
                latestSequenceId = replenishment.sequenceId  
              )  
            ),  
            AccountUpdated(  
              replenishment.account,  
              accountState.amount + replenishment.value,  
              replenishment.sequenceId  
            )  
          )  
        }  
      case None =>  
        (  
          state.updated(  
            replenishment.account,  
            AccountState(  
              amount = replenishment.value,  
              latestSequenceId = replenishment.sequenceId  
            )  
          ),  
          AccountUpdated(  
            replenishment.account,  
            replenishment.value,  
            replenishment.sequenceId  
          )  
        )  
    }  
  }

processRecord does matching:

def processRecord(bankState: BankState)(  
    record: ConsumerRecord[String, Either[Throwable, AccountOperationV2]]  
): IO[Unit] =  
  record.value match {  
    case Left(value) => IO.println(value)  
    case Right(operation) =>  
      operation match {  
        case op: WithdrawalOperationV2    => bankState.withdraw(op).flatMap(IO.println)  
        case op: ReplenishmentOperationV2 => bankState.replenish(op).flatMap(IO.println)  
      }  
  }

Now, we can simulate re-processing:

 ProducerRecord(
   "topic1",
   "key",
   """
     |{
     |  "operation_type" : "replenishment:v2" ,
     |  "account": "123e4567-e89b-12d3-a456-426614174000",
     |  "sequence_id" : 1,
     |  "value": 200
     |}
     |""".stripMargin
 ),
 ProducerRecord(
   "topic1",
   "key",
   """
     |{
     | "operation_type" : "replenishment:v2" ,
     | "account": "123e4567-e89b-12d3-a456-426614174000",
     | "sequence_id" : 1,
     | "value": 200
     |}
     |""".stripMargin
 )
AccountUpdated(123e4567-e89b-12d3-a456-426614174000,200,1)
Right(OperationIgnored(123e4567-e89b-12d3-a456-426614174000,1,1))

offsets and batching

Finally, we reach stage when we can commit our progress.

There is a convenient API called CommittableOffsetBatch. It allows to commit a bunch of offsets together. So, we can group records, groupWithin allows to specify both amount and time limitations. And we commit that aggregated batch.

.groupWithin(2, 10.seconds)  
.evalTapChunk(chunk => 
  CommittableOffsetBatch.fromFoldable(chunk.map(_.offset)).commit
)

Summary

fs2-kafka-study-case/../DeserializationDemoApp8.scala is the app we have at that stage.

Key points we touched:

  1. Aspects of encoding events.
  2. Nuances of dealing with a state. In-memory state simplified things, but the mechanism of dealing with full-featured environment are the same. We need to think twice about semi-processed events.
  3. fs2-kafka API that helps to deal with stream of events.

And I owe the final part with sending different kind of events to separate topics.

Thank you for reading!

2023/03/03 Update

Paweł Jurczenko noticed that def deserializer could be simplified.

Specifically, there is io.circe.parser.parse with decoding resulting json json.as[Body].

It could be replaced by io.circe.parser.decode call.

It completely makes sense in practice, exhaustive pattern matching allows to discriminate type of failures.

I want to denote that I made those two steps (parsing string, decoding Json as an entity) separated to have very transparent error reasoning.