fs2-kafka: hard choices, Pipes and Weaver tests

38 minute read

The agenda is having big re-cap on top of previous parts (part 1,part 2).

Specifically, I want to:

  • make quick introduction to fs2.Pipe notion;
  • discuss possibilities and limitations of turning error-flow into explicit pipeline;
  • highlight some non-obvious aspects of offset commitment (especially when we distribute offsets over multiple streams);
  • show Weaver suite.

In the Previous part, we ended up with sequential processing of incoming events:

def program(state: BankState): fs2.Stream[IO, Unit] =  
  KafkaConsumer  
    .stream(consumerSettings)  
    .subscribeTo("topic1")  
    .records  
    .evalTap { committable =>  
      processRecord(state)(committable.record)  
    }

The awkward part here is processRecord itself:

def processRecord(bankState: BankState)(  
    record: ConsumerRecord[String, Either[Throwable, AccountOperationV2]]  
): IO[Unit]

That function is a placeholder. It was introduced to show that errors are traveling alongside valid account operations.

Separating different concerns at a higher level is essential from an architectural perspective. As engineers, we must ensure that we isolate the business logic, which, in our case, involves updating the bank account state. Therefore, we should create a testable and modular business logic that can be easily maintained and updated.

Separate flow for errors

Error processing flows benefit from being explicitly defined as separate pipelines. However, it can be challenging to isolate error handling from business logic completely. For example, there may be scenarios where the implementation includes re-iterations, fallbacks, or retries in response to specific error conditions. In such cases, the line between error handling and business logic can become blurred, making it more challenging to maintain a clear separation between them. For instance, when an error occurs, the system may attempt to resolve the issue by trying a different approach, retrying the same operation after a brief delay, or using a fallback value. Unfortunately, such actions are closely related to the core business logic, making separating the error-handling framework from the primary functionality difficult.

Let’s return to our toy example and the separated pipeline for errors.

pipeline

The notion of long-living continuous streams requires careful reasoning. And before moving to the following steps, it is necessary to familiarize yourself with Pipe abstraction.

Pipe

Pipe is a function!

In the fs2 library, a Pipe represents a transformation or processing step we apply to a data stream. It is a structure that takes a stream of inputs and produces a stream of outputs. In other words, a Pipe[F[_], A, B] is a function that takes a Stream[F, A] and returns a Stream[F, B].


/** 
 A stream transformation represented as a function from stream to stream.
 Pipes are typically applied with the `through` operation on `Stream`.  
*/ 
type Pipe[F[_], -I, +O] = Stream[F, I] => Stream[F, O]

Although Pipe is simply an alias for a function, it plays a crucial role in breaking down streaming applications into smaller, more manageable components.

For instance, we need to convert Fahrenheit to Celsius continuously. We can isolate it in explicit WeatherService and appropriate Pipe.

trait WeatherService[F[_]] {  
  def fahrenheitToCelsius: Pipe[F, Double, Double]  
}  
  
object WeatherService {  
  def make[F[_]]: WeatherService[F] = new WeatherService[F] {  
    override val fahrenheitToCelsius: Pipe[F, Double, Double] =  
      _.map(fahrenheit => (fahrenheit - 32) * 5 / 9)  
  }  
}

Now, nothing prevents us from testing that pipe.

class FahrenheitToCelsiusPipeSpec extends AnyFunSuite with Matchers {  
  
  // The fahrenheitToCelsiusPipe to be tested  
  test(  
    "The fahrenheitToCelsius should convert the input temperatures from Fahrenheit to Celsius"  
  ) {  
    val weatherService = WeatherService.make[IO]  
  
    // Define your test input and expected output  
    val testInput: Stream[IO, Double] = Stream.emits[IO, Double](List(32.0, 212.0))  
    val expectedOutput: List[Double] = List(0.0, 100.0)  
  
    // Apply the pipe to the input stream and compile the stream into a list  
    val actualOutput: IO[List[Double]] =  
      testInput.through(weatherService.fahrenheitToCelsius).compile.toList  
  
    // Compare the expected and actual output  
    actualOutput.unsafeRunSync() shouldEqual expectedOutput  
  }  
}
testOnly io.github.antonkw.FahrenheitToCelsiusPipeSpec

compiling 1 Scala source to fs2-kafka-study-case/target/scala-2.13/classes ...
done compiling
FahrenheitToCelsiusPipeSpec:
- The fahrenheitToCelsius should convert the input temperatures from Fahrenheit to Celsius
Run completed in 209 milliseconds.
Tests: succeeded 1

Separating Stream of Either

Let’s return to the stream of Either that we want to decouple.

The goal is to manage the processing of a single stream with multiple pipes, each responsible for a different data type. Rather than splitting the stream into separate streams, the focus should be on using pipes to segregate various records safely. This approach ensures that each pipe addresses its designated data type, providing neat stream processing.

pipeline

Now we can think about eliminating “unnecessary” sides of Either values. The vital step is to ensure you’re not reasoning about a particular number of records. Rather than perceiving a stream as a fixed number of entries, consider it a “flow” of entries. Unfortunately, there is a trap that can catch some people who are used to a more imperative style with record-by-record processing. Pipe is just a function, and you can withdraw elements of the input stream.

Time for practice! There is a simplest possible example. The snippet defines an inputStream containing a list with two Either elements: an Int value wrapped as a Left, and a String value wrapped as a Right.

object PipeDemo extends Simple {  
  val inputStream: fs2.Stream[IO, Either[Int, String]] =  
    fs2.Stream.emits(List(1.asLeft, "one".asRight))  
  override def run: IO[Unit] = inputStream.foreach(IO.println).compile.drain  
}

Now, let’s fix placeholder with signature.

override def run: IO[Unit] = {  
  val collectRights: Pipe[IO, Either[Int, String], String] = ???  
  
  val rightsOnly: fs2.Stream[IO, String] =  
    inputStream.through(collectRights)  
  
  rightsOnly.foreach(IO.println).compile.drain  
}

The straightforward approach is using flatMap to emit values packed in Right:

val collectRights: Pipe[IO, Either[Int, String], String] = _.flatMap {  
  case Right(stringValue) => fs2.Stream.emit(stringValue)  
  case Left(_)            => fs2.Stream.empty  
}

It is easy to imagine how you can write a bunch of helpers to extract any abstract sides of Either.

The problem there is efficiency. Under the hood, documentation explicitly asks to use single-element emit wisely.The straightforward approach is using flatMap to emit values packed in Right:

/** Lifts the given output value `O` into a pull that performs no  
  * effects, emits that single output in a singleton chunk, and always  * terminates successfully with a unit result.  *  
  * _Note_: using singleton chunks is not efficient. If possible,  * use the chunk-based `output` method instead.  
  */
def output1[F[x] >: Pure[x], O](o: O): Pull[F, O, Unit] = Output(Chunk.singleton(o))

If you want to familiarize yourself with the notion of Chunk and the internals of fs2 in common, I recommend watching Michael Pilquist - fs2.Chunk talk.

Long story short, doing manual emit calls is not the best way to gather elements.

The thing starts to look weird. The only obvious thing we can do without building new streams from elements is unsafe transformation:

val collectRights: Pipe[IO, Either[Int, String], String] =  
  _.filter(_.isRight).map(_.toOption.get)
  

Luckily, fs2 provides more sophisticated way to collect some of elements:

def collect[O2](pf: PartialFunction[O, O2]): Stream[F, O2]

So, we need to feed the partial function that is defined at Right:

val collectRights: Pipe[IO, Either[Int, String], String] =  
  _.collect { case Right(value) => value }

Inside, it uses buffers to create a smaller amount of new chunks.

/** More efficient version of `filter(pf.isDefinedAt).map(pf)`. */  
def collect[O2](pf: PartialFunction[O, O2]): Chunk[O2] = {  
  val b = makeArrayBuilder[Any]  
  b.sizeHint(size)  
  foreach(o => if (pf.isDefinedAt(o)) b += pf(o))  
  Chunk.array(b.result()).asInstanceOf[Chunk[O2]]  
}

So, we safely process only valid entries:

override def run: IO[Unit] = {  
  val collectRights: Pipe[IO, Either[Int, String], String] =  
    _.collect { case Right(value) => value }  
  
  val rightsOnly: fs2.Stream[IO, String] =  
    inputStream.through(collectRights)  
  
  rightsOnly.foreach(IO.println).compile.drain  
}

How to reach errors in the same way? The answer is broadcastThrough. The broadcastThrough function is a combinator that allows to apply multiple pipes to the same input stream, effectively broadcasting the input data through each pipe in parallel.

So, we can write something like this:

override def run: IO[Unit] = {  
  val collectRights: Pipe[IO, Either[Int, String], String] =  
    _.collect { case Right(value) => value }  
  
  val collectLefts: Pipe[IO, Either[Int, String], Int] =  
    _.collect { case Left(value) => value }  
  
  inputStream.broadcastThrough(collectLefts, collectRights).foreach(IO.println).compile.drain  
}

It initially works, but there is a weird type of inference If you try to transform output, you’ll find a stream of Any. It is expected if you think about it. But let us be more concrete.

All pipes should produce the same output:

def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](  
    pipes: Pipe[F2, O, O2]*  
): Stream[F2, O2]

So, it makes sense to specify the output type explicitly:

inputStream.broadcastThrough[IO, String](collectLefts, collectRights).foreach(IO.println).compile.drain 

Now, to compile the snippet, we need to ensure that collectLefts returns strings:

val collectLefts: Pipe[IO, Either[Int, String], String] = _.collect { case Left(value) => value.toString }

With all that, it is time to return to our banking example.

The idea here is to have two output topics. One will be used to keep notifications about successful operations. Another one will be used to store failures. Now, to compile the snippet, we need to ensure that collectLefts returns strings:

val collectValidInput: Pipe[  
  IO,  
  CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]],  
  CommittableConsumerRecord[IO, String, Input]  
] = _.fproduct(_.record.value).collect { case (committable, Right(validValue)) =>  
  committable.as(validValue)  
}

ConsumerRecord (record property under CommittableConsumerRecord) has no unapply, so we need to extract value before doing matching.

Using the partial function without additional steps is still possible, but unsafe transformations appear again.

new PartialFunction[  
  CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]],  
  CommittableConsumerRecord[IO, String, Input]  
] {  
  override def isDefinedAt(  
      x: CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]]  
  ): Boolean = x.record.value.isRight  
  
  override def apply(  
      v1: CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]]  
  ): CommittableConsumerRecord[IO, String, Input] = v1.as(v1.record.value.toOption.get)  
}

And the purpose of using collect is to get rid of things like toOption.get calls.

So, collectValidInput is in place, collectFailures is the same:

val collectFailures: Pipe[  
  IO,  
  CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]],  
  CommittableConsumerRecord[IO, String, DeserializationError]  
] = _.fproduct(_.record.value).collect { case (committable, Left(failure)) =>  
  committable.as(failure)  
}

Updating the state

Time to bring the last pieces of the pipeline.

We can finally implement Pipe, which updates the state. A lot of tricky nuances wait for us there. And the purpose of using collect is to get rid of things like toOption.get calls.

Common low-level type for results

We must put results into some resulting structure (definitely, Unit is not our choice).

def updateBankState(state: BankState): Pipe[  
  IO,  
  CommittableConsumerRecord[IO, String, Input],  
  CommittableConsumerRecord[IO, String, Unit]  
] = _.map { case CommittableConsumerRecord(record, offset) => {  
  val operationResult = record.value match {  
    case withdrawal: WithdrawalOperationV2 =>  
      val withdrawalResult: IO[Either[BankError, AccountOperationResult]] = state.withdraw(withdrawal)  
      withdrawalResult  
    case replenishment: ReplenishmentOperationV2 =>  
      val replenishmentResult: IO[AccountOperationResult] = state.replenish(replenishment)  
      replenishmentResult  
  }  
  operationResult  
}

withdrawalResult and replenishmentResult are not of the same type. Not a big deal since AccountOperationResult could be wrapped with Right.

val operationResult: IO[Either[BankError, AccountOperationResult]] = record.value match {  
  case withdrawal: WithdrawalOperationV2 => state.withdraw(withdrawal)  
  case replenishment: ReplenishmentOperationV2 => state.replenish(replenishment).map(_.asRight[BankError])  
}

And I want to open Pandora’s box. We ought to handle unpredictable errors. withdraw potentially can hide the fallback mechanism with retries and backoffs. There is a chance for failure. attempt gives us Either[Throwable, Either[BankError, AccountOperationResult]].

def updateBankState(state: BankState): Pipe[  
  IO,  
  CommittableConsumerRecord[IO, String, Input],  
  CommittableConsumerRecord[IO, String, Either[Throwable, Either[BankError, AccountOperationResult]]]  
] = _.evalMap { case committable@CommittableConsumerRecord(record, _) => {  
  val resultIO: IO[Either[Throwable, Either[BankError, AccountOperationResult]]] = record.value match {  
    case withdrawal: WithdrawalOperationV2 => state.withdraw(withdrawal).attempt  
    case replenishment: ReplenishmentOperationV2 => state.replenish(replenishment).map(_.asRight[BankError]).attempt  
  }  
  
  resultIO.map(committable.as)  
}

as here is evidence of CommittableConsumerRecord being a functor. It implements as, which replaces the content without changing the container. It is very convenient, we can do transformations without touching offsets and commit offset back once processing is done. That is part of at-least-once implementation. We already know that we can deconstruct those either’s to separate streams. Well done!

Kafka topics

There is still a choice about distribution across topics. And we’ll need to manage those topics.

The choice

We already have the topic for deserialization errors. I suggest to leave it as an explicit case. The reasoning here is simple. Errors there couldn’t have any business-value fallback scenarios. Non-deserializable messages are not supposed to be recovered and can’t be reported to users or businesses. They also can’t be entwined with any operation since they do not reach the stage when they become identifiable.

We still to manage three distinct outputs:

  • success
  • bank error
  • random exception

The choice between reporting options is challenging.

pipeline

All topics separated

The obvious option is having three distinct Kafka topics: one for errors, one for BankError, and one for reporting succeeded operations.

pipeline

Pros:

  • Clear separation of concerns: each topic corresponds to a specific outcome
  • Easier to manage and monitor each type of outcome independently Cons:
  • Consumers should consume messages from different topics. BankError is technically part of the happy path scenario. However, to follow the status of a transaction, a client would need to subscribe to multiple topics and handle the “lack of money” type of error as something that comes from a separate event flow.
  • Higher complexity in setting up and maintaining topics; the potential increase of resource usage.

All topics together (single topic)

Opposite to the previous, we can consider any outcome as a status of operation and put everything to a single topic.

pipeline

Pros:

  • Managing, monitoring, and scaling the topic becomes easier than having separate topics for each result type.
  • Consumers only need to subscribe to a single topic to receive all types of results, simplifying the consumption process and reducing the overhead of managing multiple topic subscriptions.
  • Cohesive view of operations: since all results are in a single topic, it’s easier to correlate different types of results and monitor the overall status of operations.

Cons:

  • Increased message complexity: messages become more complex. Consumers potentially need to follow schema updates even if they are “not interested”. E.x., error metadata is being updated to bring more debugging info. It triggers updates (at least build/deploy) to downstream apps.
  • Filtering overhead. Most likely, separate consumers will be responsible for handling the happy path and listening for purely technical errors. As a result, they will need to filter messages, which may introduce some overhead.
  • Consolidating all results into a single Kafka topic might make it tempting to process errors alongside the main business logic, leading to concerns becoming less separated and more tightly coupled.

Our toy example doesn’t imply much downstream processing, and we have deserialization errors to demonstrate emitting to multiple topics. So, let’s report just simplified status to the single topic.

def updateBankState(state: BankState): Pipe[  
  IO,  
  CommittableConsumerRecord[IO, String, Input],  
  CommittableConsumerRecord[IO, String, String]  
] = _.evalMap { case committable@CommittableConsumerRecord(record, _) =>  
  val resultIO: IO[Either[Throwable, Either[BankError, AccountOperationResult]]] = record.value match {  
    case withdrawal: WithdrawalOperationV2 =>   
      state.withdraw(withdrawal).attempt  
    case replenishment: ReplenishmentOperationV2 =>   
      state.replenish(replenishment).map(_.asRight[BankError]).attempt  
  }  
  
  val messageIO: IO[String] = resultIO.map {  
    case Left(exception) => s"exception ${exception.getMessage}"  
    case Right(operationResult) => operationResult match {  
      case Left(bankError: BankError) => bankError match {  
        case AccountNotFound(accountId) => s"not found $accountId"  
        case InsufficientAmount(accountId, actual) => s"$accountId: lack of money $actual"  
      }  
      case Right(value) => value match {  
        case AccountUpdated(accountId, amount, sequenceId) =>  
          s"$accountId updated, amount $amount, sequence $sequenceId"  
        case OperationIgnored(accountId, sequenceId, actualSequence) =>  
          s"$accountId was not updated, sequence $sequenceId, actual sequence $actualSequence"  
      }  
    }  
  }  
  
  messageIO.map(committable.as)  
}

Pipes (again)

Reminder, pipes are functions!

Quick reminder, Pipe is just a function, so we can compose pipes similarly.

def stateFlow(state: BankState): Pipe[  
  IO,  
  CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]],  
  CommittableConsumerRecord[IO, String, String]  
] = collectValidValues andThen updateBankState(state)

The next step is committing offsets back. Again, we targeted to maintain at-least-one semantics, and that requires attention.

Race between broadcasted pipes

First of all, I introduced broadcastThrough API previously.

Using broadcastThrough brings benefits:

  • Clear separation of concerns in the code makes it easier to reason about the overall processing logic and manage individual parts of the stream processing.
  • It enables concurrent processing of the stream through multiple pipes. Our case is quite artificial since deserialization error shouldn’t occur often and require parallelization.

The cite from scaladocs:

A pipe starts processing a chunk after pulling it from its buffer. The topic enforces some temporal constraints: No chunk is pushed to the buffer of any pipe until after the previous chunk has been published to all pipes. No chunk is pushed to a pipe until the pipe pulls the previous chunk. A chunk may be pushed to some pipes, and pulled by them, before other pipes have pulled the previous chunk. Thus, in processing source values, a fast pipe may be up to two chunks ahead of a slower one. This keeps a balance of progress, and prevents any pipe from getting too far ahead.

In our case, the obvious options are:

  • Commit offsets for any processed offset, no matter which pipe did the processing. It doesn’t work because of processing of the deserialization error (which could be just logging) starts to report that input was processed while bank operations (with earlier offsets) are still in progress or even not started. So there is a chance that we will drop bank operations on the floor.
  • Commit offset only after processing of bank operations. We de-prioritize deserialization errors and let them be committed “occasionally” alongside processed operations. In the code, I will proceed with the second option to still have a full-featured case with a combination of pipe and subsequent parallel processing.

In practice, I would advocate committing offset only after all responsible parties have completed related computations.

  1. In the case of Either-like branching and choices between one of the actions, it is sensible to continue reasoning in terms of a single flow, call evalMap for every entry, and switch to the necessary branch with pattern matching. parEvalMap can help to bring parallelization without ruining ordering.
  2. Separate pipes could be the right choice for heavy and separated IO operations (performed for each entry). But an additional state will be required. The option here is committing offsets to an intermediary which “listens” to all the pipes. That intermediary ensures that no pipe falls behind and commits only when all pipes report that they have done a job.

With all consideration, we can switch back to the bank operations pipeline.

val produceStatus: Pipe[IO, CommittableConsumerRecord[IO, String, String], Unit] =  
  statusStream =>  
    KafkaProducer.stream(producerSettings)  
      .flatMap(producer => statusStream.evalMap {  
        case CommittableConsumerRecord(record, offset) =>  
          producer  
            .produceOne(ProducerRecord("operations", record.key, record.value))  
            .flatten  
            .as(offset)  
      })  
      .groupWithin(100, 10.seconds)  
      .evalTapChunk(chunk => CommittableOffsetBatch.fromFoldable(chunk).commit)  
      .void

Here we define the pipe:

  1. A Kafka producer stream is created using KafkaProducer.stream(producerSettings). This stream will be used to produce records for the Kafka topic. We can re-use producerSettings since we have dummy strings everywhere.
  2. For each CommittableConsumerRecord, the pipe extracts the record and offset, creates a ProducerRecord with the same key and value as the input record, and specifies the destination Kafka topic as “operations”.
  3. Pipe produces the ProducerRecord to the Kafka topic with produceOne; there is a space for batching, but we can assume it is better to deliver status without delays. The flatten method is called to unwrap the IO[IO[A]] returned by produceOne.
  4. .as(offset) puts offset as value for more convenience at the next step.
  5. The produced stream of CommittableOffsets is batched using groupWithin, which collects offsets into chunks based on a maximum size (100) or a maximum duration (10 seconds) and performs commitment.
def operationsFlow(state: BankState): Pipe[  
  IO,  
  CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]],  
  Unit
] = collectValidValues andThen updateBankState(state) andThen produceStatus

And for deserialization errors, I suggest simplifying things and just printing them:

val printFailures: Pipe[  
  IO,   
  CommittableConsumerRecord[IO, String, DeserializationError],  
  Unit
] = _.evalMap(r => IO.println(r.toString)).void

Whole failureFlow:

val failureFlow: Pipe[  
  IO,  
  CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]],  
  Unit
] = collectFailures andThen printFailures

Now, the whole program is just running two pipes.

def program(state: BankState): fs2.Stream[IO, Unit] =  
  KafkaConsumer  
    .stream(consumerSettings)  
    .subscribeTo("topic1")  
    .records  
    .broadcastThrough(stateFlow(state), failureFlow)

Weaver

The final touch is the CI test against real Kafka.

Our toy example requires refactoring to be more or less an independent item (instead of functions inside main).

I structured BankOperationProgram in the following way:

class BankOperationProgram(  
    consumerSettings: ConsumerSettings[  
      IO,  
      String,  
      Either[DeserializationError, AccountOperationV2]  
    ],  
    producerSettings: ProducerSettings[IO, String, String],  
    inputTopic: String,  
    outputTopic: String,
    state: BankState  
) {  
  def processRecord(bankState: BankState)(  
      record: ConsumerRecord[String, Either[Throwable, AccountOperationV2]]  
  ): IO[Unit] =  ??? 
 
  
  val collectValidValues: Pipe[  
    IO,  
    CommittableConsumerRecord[IO, String, Either[DeserializationError, AccountOperationV2]],  
    CommittableConsumerRecord[IO, String, AccountOperationV2]  
  ] = _.fproduct(_.record.value).collect { case (committable, Right(validValue)) =>  
    committable.as(validValue)  
  }  
  
  val collectFailures: Pipe[  
    IO,  
    CommittableConsumerRecord[IO, String, Either[DeserializationError, AccountOperationV2]],  
    CommittableConsumerRecord[IO, String, DeserializationError]  
  ] = _.fproduct(_.record.value).collect { case (committable, Left(failure)) =>  
    committable.as(failure)  
  }  
  
  def updateBankState(state: BankState): Pipe[  
    IO,  
    CommittableConsumerRecord[IO, String, AccountOperationV2],  
    CommittableConsumerRecord[IO, String, String]  
  ] =  ???
  
  val produceStatus: Pipe[IO, CommittableConsumerRecord[IO, String, String], Unit] = ??? 
  
  val printFailures: Pipe[  
    IO,  
    CommittableConsumerRecord[IO, String, DeserializationError],  
    Unit  
  ] = ??? 
  
  val failureFlow: Pipe[  
    IO,  
    CommittableConsumerRecord[IO, String, Either[DeserializationError, AccountOperationV2]],  
    Unit  
  ] = ???
  
  def process: fs2.Stream[IO, Unit] =  
    KafkaConsumer  
      .stream(consumerSettings)  
      .subscribeTo(inputTopic)  
      .records  
      .broadcastThrough(stateFlow(state), failureFlow)  
}

Whole implementation could be found here: BankOperationProgram.scala

All pipes could be tested separately, but now our focus is quick validation that the whole process works as expected.

Weaver is the appropriate tool for such suites.

The suite itself could be shaped in the following way:

object KafkaSuite extends weaver.IOSuite {  
  case class TestResources(  
      consumerSettings: ConsumerSettings[  
        IO,  
        String,  
        Either[DeserializationError, AccountOperationV2]  
      ],  
      producerSettings: ProducerSettings[IO, String, String],  
      inputTopic: String,  
	  outputTopic: String,  
      consumer: KafkaConsumer[IO, String, String]
  )  
  
  type Res = TestResources  
  
  override def sharedResource: Resource[IO, Res] = {  
    val consumerSettings = ConsumerSettings[IO, String, String]  
      .withAutoOffsetReset(AutoOffsetReset.Earliest)  
      .withBootstrapServers("localhost:29092")  
      .withGroupId("group")  
  
    val producerSettings =  
      ProducerSettings[IO, String, String]  
        .withBootstrapServers("localhost:29092")  
        .withProperty("topic.creation.enable", "true")  
  
    KafkaConsumer  
      .resource(consumerSettings)  
      .map(consumer =>  
        TestResources(  
          consumerSettings =  
            ConsumerSettings[IO, String, Either[DeserializationError, AccountOperationV2]]  
              .withAutoOffsetReset(AutoOffsetReset.Earliest)  
              .withBootstrapServers("localhost:29092")  
              .withGroupId("group"),  
          producerSettings = producerSettings,  
          consumer = consumer,  
          topic = "input"  
        )  
      )  
  }  
  
  test("any test") { resources =>  
    ...
  }  
}

It is worth mentioning that defining type Res and overriding sharedResource is the standard flow for suites that extend weaver.IOSuite. TestResources is the fixture that handles all the resources that will be required to run the program.

Let’s produce some test messages:

def produceTestMessages(
                         producerSettings: ProducerSettings[IO, String, String],
                         topic: String
                       ): IO[ProducerResult[String, String]] = KafkaProducer
  .resource(producerSettings)
  .use(
    _.produce(
      ProducerRecords(
        List(
          ProducerRecord(
            topic,
            "key",
            """
              |{
              |  "operation_type" : "replenishment:v2" ,
              |  "account": "123e4567-e89b-12d3-a456-426614174000",
              |  "sequence_id" : 1,
              |  "value": 200
              |}
              |""".stripMargin
          ),
          ProducerRecord(
            topic,
            "key",
            """
              |{
              |  "operation_type" : "withdrawal:v2" ,
              |  "account": "123e4567-e89b-12d3-a456-426614174000",
              |  "sequence_id" : 1,
              |  "value": 100
              |}
              |""".stripMargin
          ),
          ProducerRecord(
            topic,
            "key",
            """
              |{
              |  "operation_type" : "withdrawal:v2" ,
              |  "account": "123e4567-e89b-12d3-a456-426614174000",
              |  "sequence_id" : 2,
              |  "value": 100
              |}
              |""".stripMargin
          )
        )
      )
    ).flatten
  )

The test itself is a combination of what we implemented previously. It produces test operations and runs the program in the separate Fiber:

test("program produces expected messages") { resources =>  
  val produce: IO[ProducerResult[String, String]] =  
    produceTestMessages(resources.producerSettings, resources.inputTopic)  
  
  val process: fs2.Stream[IO, Unit] = for {  
    state <- fs2.Stream.eval(BankState.make)  
    program = new BankOperationProgram(  
      resources.consumerSettings,  
      resources.producerSettings,  
      resources.inputTopic,  
      resources.outputTopic,  
      state  
    )  
    _ <- program.process  
  } yield ()  
  
  for {  
    started <- (produce *> process.compile.drain).start  
    _       <- resources.consumer.subscribeTo(resources.outputTopic)  
    received <- resources.consumer.records  
      .take(3)  
      .compile  
      .toList  
      .timeoutTo(5.seconds, started.cancel *> IO.raiseError(new RuntimeException("timeout")))  
    _ <- started.cancel  
  } yield expect(  
    received.map(_.record.value) == List(  
      "123e4567-e89b-12d3-a456-426614174000 updated, amount 200, sequence 1",  
      "123e4567-e89b-12d3-a456-426614174000 was not updated, sequence 1, actual sequence 1",  
      "123e4567-e89b-12d3-a456-426614174000 updated, amount 100, sequence 2"  
    )  
  )  
}

You can find full test here: KafkaSuite.scala

Instead of summary

  1. The cool thing about that test is that it hides the asynchronous nature and lets us reason about running the underlying process as a simple function that consumes input and produces output. .take(3).compile returns first three entries immediatly after fs2-kafka pull them from Kafka.
  2. Such “waiting for results” tests can be stuck in waiting. So, better to set timeouts.

Finally, running a program in Fiber to stop it with cancel is not the best option. One of the conventions is to transmit a shutdown signal via fs2.concurrent.SignallingRef. For instance, org.http4s.server.ServerBuilder has the following implementation:

/** Runs the server as a Stream that emits only when the terminated signal becomes true.  
  * Useful for servers with associated lifetime behaviors.  */
final def serveWhile(  
    terminateWhenTrue: Signal[F, Boolean],  
    exitWith: Ref[F, ExitCode],  
): Stream[F, ExitCode] =  
  Stream.resource(resource) *> (terminateWhenTrue.discrete  
    .takeWhile(_ === false)  
    .drain ++ Stream.eval(exitWith.get))

That’s it for today. Thank you for reading! Feel free to reach out via Twitter or any different network.