fs2-kafka: hard choices, Pipes and Weaver tests
The agenda is having big re-cap on top of previous parts (part 1,part 2).
Specifically, I want to:
- make quick introduction to
fs2.Pipe
notion; - discuss possibilities and limitations of turning error-flow into explicit pipeline;
- highlight some non-obvious aspects of offset commitment (especially when we distribute offsets over multiple streams);
- show
Weaver
suite.
In the Previous part, we ended up with sequential processing of incoming events:
def program(state: BankState): fs2.Stream[IO, Unit] =
KafkaConsumer
.stream(consumerSettings)
.subscribeTo("topic1")
.records
.evalTap { committable =>
processRecord(state)(committable.record)
}
The awkward part here is processRecord
itself:
def processRecord(bankState: BankState)(
record: ConsumerRecord[String, Either[Throwable, AccountOperationV2]]
): IO[Unit]
That function is a placeholder. It was introduced to show that errors are traveling alongside valid account operations.
Separating different concerns at a higher level is essential from an architectural perspective. As engineers, we must ensure that we isolate the business logic, which, in our case, involves updating the bank account state. Therefore, we should create a testable and modular business logic that can be easily maintained and updated.
Separate flow for errors
Error processing flows benefit from being explicitly defined as separate pipelines. However, it can be challenging to isolate error handling from business logic completely. For example, there may be scenarios where the implementation includes re-iterations, fallbacks, or retries in response to specific error conditions. In such cases, the line between error handling and business logic can become blurred, making it more challenging to maintain a clear separation between them. For instance, when an error occurs, the system may attempt to resolve the issue by trying a different approach, retrying the same operation after a brief delay, or using a fallback value. Unfortunately, such actions are closely related to the core business logic, making separating the error-handling framework from the primary functionality difficult.
Let’s return to our toy example and the separated pipeline for errors.
The notion of long-living continuous streams requires careful reasoning.
And before moving to the following steps, it is necessary to familiarize yourself with Pipe
abstraction.
Pipe
Pipe is a function!
In the fs2 library, a Pipe
represents a transformation or processing step we apply to a data stream. It is a structure that takes a stream of inputs and produces a stream of outputs. In other words, a Pipe[F[_], A, B]
is a function that takes a Stream[F, A]
and returns a Stream[F, B]
.
/**
A stream transformation represented as a function from stream to stream.
Pipes are typically applied with the `through` operation on `Stream`.
*/
type Pipe[F[_], -I, +O] = Stream[F, I] => Stream[F, O]
Although Pipe
is simply an alias for a function, it plays a crucial role in breaking down streaming applications into smaller, more manageable components.
For instance, we need to convert Fahrenheit to Celsius continuously.
We can isolate it in explicit WeatherService
and appropriate Pipe
.
trait WeatherService[F[_]] {
def fahrenheitToCelsius: Pipe[F, Double, Double]
}
object WeatherService {
def make[F[_]]: WeatherService[F] = new WeatherService[F] {
override val fahrenheitToCelsius: Pipe[F, Double, Double] =
_.map(fahrenheit => (fahrenheit - 32) * 5 / 9)
}
}
Now, nothing prevents us from testing that pipe.
class FahrenheitToCelsiusPipeSpec extends AnyFunSuite with Matchers {
// The fahrenheitToCelsiusPipe to be tested
test(
"The fahrenheitToCelsius should convert the input temperatures from Fahrenheit to Celsius"
) {
val weatherService = WeatherService.make[IO]
// Define your test input and expected output
val testInput: Stream[IO, Double] = Stream.emits[IO, Double](List(32.0, 212.0))
val expectedOutput: List[Double] = List(0.0, 100.0)
// Apply the pipe to the input stream and compile the stream into a list
val actualOutput: IO[List[Double]] =
testInput.through(weatherService.fahrenheitToCelsius).compile.toList
// Compare the expected and actual output
actualOutput.unsafeRunSync() shouldEqual expectedOutput
}
}
testOnly io.github.antonkw.FahrenheitToCelsiusPipeSpec
compiling 1 Scala source to fs2-kafka-study-case/target/scala-2.13/classes ...
done compiling
FahrenheitToCelsiusPipeSpec:
- The fahrenheitToCelsius should convert the input temperatures from Fahrenheit to Celsius
Run completed in 209 milliseconds.
Tests: succeeded 1
Separating Stream of Either
Let’s return to the stream of Either
that we want to decouple.
The goal is to manage the processing of a single stream with multiple pipes, each responsible for a different data type. Rather than splitting the stream into separate streams, the focus should be on using pipes to segregate various records safely. This approach ensures that each pipe addresses its designated data type, providing neat stream processing.
Now we can think about eliminating “unnecessary” sides of Either
values.
The vital step is to ensure you’re not reasoning about a particular number of records. Rather than perceiving a stream as a fixed number of entries, consider it a “flow” of entries. Unfortunately, there is a trap that can catch some people who are used to a more imperative style with record-by-record processing. Pipe
is just a function, and you can withdraw elements of the input stream.
Time for practice!
There is a simplest possible example. The snippet defines an inputStream
containing a list with two Either
elements: an Int
value wrapped as a Left
, and a String
value wrapped as a Right
.
object PipeDemo extends Simple {
val inputStream: fs2.Stream[IO, Either[Int, String]] =
fs2.Stream.emits(List(1.asLeft, "one".asRight))
override def run: IO[Unit] = inputStream.foreach(IO.println).compile.drain
}
Now, let’s fix placeholder with signature.
override def run: IO[Unit] = {
val collectRights: Pipe[IO, Either[Int, String], String] = ???
val rightsOnly: fs2.Stream[IO, String] =
inputStream.through(collectRights)
rightsOnly.foreach(IO.println).compile.drain
}
The straightforward approach is using flatMap
to emit values packed in Right
:
val collectRights: Pipe[IO, Either[Int, String], String] = _.flatMap {
case Right(stringValue) => fs2.Stream.emit(stringValue)
case Left(_) => fs2.Stream.empty
}
It is easy to imagine how you can write a bunch of helpers to extract any abstract sides of Either
.
The problem there is efficiency. Under the hood, documentation explicitly asks to use single-element emit
wisely.The straightforward approach is using flatMap
to emit values packed in Right
:
/** Lifts the given output value `O` into a pull that performs no
* effects, emits that single output in a singleton chunk, and always * terminates successfully with a unit result. *
* _Note_: using singleton chunks is not efficient. If possible, * use the chunk-based `output` method instead.
*/
def output1[F[x] >: Pure[x], O](o: O): Pull[F, O, Unit] = Output(Chunk.singleton(o))
If you want to familiarize yourself with the notion of Chunk
and the internals of fs2 in common, I recommend watching Michael Pilquist - fs2.Chunk
talk.
Long story short, doing manual emit
calls is not the best way to gather elements.
The thing starts to look weird. The only obvious thing we can do without building new streams from elements is unsafe transformation:
val collectRights: Pipe[IO, Either[Int, String], String] =
_.filter(_.isRight).map(_.toOption.get)
Luckily, fs2 provides more sophisticated way to collect some of elements:
def collect[O2](pf: PartialFunction[O, O2]): Stream[F, O2]
So, we need to feed the partial function that is defined at Right
:
val collectRights: Pipe[IO, Either[Int, String], String] =
_.collect { case Right(value) => value }
Inside, it uses buffers to create a smaller amount of new chunks.
/** More efficient version of `filter(pf.isDefinedAt).map(pf)`. */
def collect[O2](pf: PartialFunction[O, O2]): Chunk[O2] = {
val b = makeArrayBuilder[Any]
b.sizeHint(size)
foreach(o => if (pf.isDefinedAt(o)) b += pf(o))
Chunk.array(b.result()).asInstanceOf[Chunk[O2]]
}
So, we safely process only valid entries:
override def run: IO[Unit] = {
val collectRights: Pipe[IO, Either[Int, String], String] =
_.collect { case Right(value) => value }
val rightsOnly: fs2.Stream[IO, String] =
inputStream.through(collectRights)
rightsOnly.foreach(IO.println).compile.drain
}
How to reach errors in the same way?
The answer is broadcastThrough
.
The broadcastThrough
function is a combinator that allows to apply multiple pipes to the same input stream, effectively broadcasting the input data through each pipe in parallel.
So, we can write something like this:
override def run: IO[Unit] = {
val collectRights: Pipe[IO, Either[Int, String], String] =
_.collect { case Right(value) => value }
val collectLefts: Pipe[IO, Either[Int, String], Int] =
_.collect { case Left(value) => value }
inputStream.broadcastThrough(collectLefts, collectRights).foreach(IO.println).compile.drain
}
It initially works, but there is a weird type of inference
If you try to transform output, you’ll find a stream of Any
.
It is expected if you think about it.
But let us be more concrete.
All pipes should produce the same output:
def broadcastThrough[F2[x] >: F[x]: Concurrent, O2](
pipes: Pipe[F2, O, O2]*
): Stream[F2, O2]
So, it makes sense to specify the output type explicitly:
inputStream.broadcastThrough[IO, String](collectLefts, collectRights).foreach(IO.println).compile.drain
Now, to compile the snippet, we need to ensure that collectLefts
returns strings:
val collectLefts: Pipe[IO, Either[Int, String], String] = _.collect { case Left(value) => value.toString }
With all that, it is time to return to our banking example.
The idea here is to have two output topics. One will be used to keep notifications about successful operations. Another one will be used to store failures.
Now, to compile the snippet, we need to ensure that collectLefts
returns strings:
val collectValidInput: Pipe[
IO,
CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]],
CommittableConsumerRecord[IO, String, Input]
] = _.fproduct(_.record.value).collect { case (committable, Right(validValue)) =>
committable.as(validValue)
}
ConsumerRecord
(record
property under CommittableConsumerRecord
) has no unapply
, so we need to extract value before doing matching.
Using the partial function without additional steps is still possible, but unsafe transformations appear again.
new PartialFunction[
CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]],
CommittableConsumerRecord[IO, String, Input]
] {
override def isDefinedAt(
x: CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]]
): Boolean = x.record.value.isRight
override def apply(
v1: CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]]
): CommittableConsumerRecord[IO, String, Input] = v1.as(v1.record.value.toOption.get)
}
And the purpose of using collect
is to get rid of things like toOption.get
calls.
So, collectValidInput
is in place, collectFailures
is the same:
val collectFailures: Pipe[
IO,
CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]],
CommittableConsumerRecord[IO, String, DeserializationError]
] = _.fproduct(_.record.value).collect { case (committable, Left(failure)) =>
committable.as(failure)
}
Updating the state
Time to bring the last pieces of the pipeline.
We can finally implement Pipe
, which updates the state.
A lot of tricky nuances wait for us there.
And the purpose of using collect
is to get rid of things like toOption.get
calls.
Common low-level type for results
We must put results into some resulting structure (definitely, Unit
is not our choice).
def updateBankState(state: BankState): Pipe[
IO,
CommittableConsumerRecord[IO, String, Input],
CommittableConsumerRecord[IO, String, Unit]
] = _.map { case CommittableConsumerRecord(record, offset) => {
val operationResult = record.value match {
case withdrawal: WithdrawalOperationV2 =>
val withdrawalResult: IO[Either[BankError, AccountOperationResult]] = state.withdraw(withdrawal)
withdrawalResult
case replenishment: ReplenishmentOperationV2 =>
val replenishmentResult: IO[AccountOperationResult] = state.replenish(replenishment)
replenishmentResult
}
operationResult
}
withdrawalResult
and replenishmentResult
are not of the same type. Not a big deal since AccountOperationResult
could be wrapped with Right
.
val operationResult: IO[Either[BankError, AccountOperationResult]] = record.value match {
case withdrawal: WithdrawalOperationV2 => state.withdraw(withdrawal)
case replenishment: ReplenishmentOperationV2 => state.replenish(replenishment).map(_.asRight[BankError])
}
And I want to open Pandora’s box. We ought to handle unpredictable errors. withdraw
potentially can hide the fallback mechanism with retries and backoffs. There is a chance for failure. attempt
gives us Either[Throwable, Either[BankError, AccountOperationResult]]
.
def updateBankState(state: BankState): Pipe[
IO,
CommittableConsumerRecord[IO, String, Input],
CommittableConsumerRecord[IO, String, Either[Throwable, Either[BankError, AccountOperationResult]]]
] = _.evalMap { case committable@CommittableConsumerRecord(record, _) => {
val resultIO: IO[Either[Throwable, Either[BankError, AccountOperationResult]]] = record.value match {
case withdrawal: WithdrawalOperationV2 => state.withdraw(withdrawal).attempt
case replenishment: ReplenishmentOperationV2 => state.replenish(replenishment).map(_.asRight[BankError]).attempt
}
resultIO.map(committable.as)
}
as
here is evidence of CommittableConsumerRecord
being a functor. It implements as
, which replaces the content without changing the container. It is very convenient, we can do transformations without touching offsets and commit offset back once processing is done. That is part of at-least-once implementation.
We already know that we can deconstruct those either’s to separate streams. Well done!
Kafka topics
There is still a choice about distribution across topics. And we’ll need to manage those topics.
The choice
We already have the topic for deserialization errors. I suggest to leave it as an explicit case. The reasoning here is simple. Errors there couldn’t have any business-value fallback scenarios. Non-deserializable messages are not supposed to be recovered and can’t be reported to users or businesses. They also can’t be entwined with any operation since they do not reach the stage when they become identifiable.
We still to manage three distinct outputs:
- success
- bank error
- random exception
The choice between reporting options is challenging.
All topics separated
The obvious option is having three distinct Kafka topics: one for errors, one for BankError
, and one for reporting succeeded operations.
Pros:
- Clear separation of concerns: each topic corresponds to a specific outcome
- Easier to manage and monitor each type of outcome independently Cons:
- Consumers should consume messages from different topics.
BankError
is technically part of the happy path scenario. However, to follow the status of a transaction, a client would need to subscribe to multiple topics and handle the “lack of money” type of error as something that comes from a separate event flow. - Higher complexity in setting up and maintaining topics; the potential increase of resource usage.
All topics together (single topic)
Opposite to the previous, we can consider any outcome as a status of operation and put everything to a single topic.
Pros:
- Managing, monitoring, and scaling the topic becomes easier than having separate topics for each result type.
- Consumers only need to subscribe to a single topic to receive all types of results, simplifying the consumption process and reducing the overhead of managing multiple topic subscriptions.
- Cohesive view of operations: since all results are in a single topic, it’s easier to correlate different types of results and monitor the overall status of operations.
Cons:
- Increased message complexity: messages become more complex. Consumers potentially need to follow schema updates even if they are “not interested”. E.x., error metadata is being updated to bring more debugging info. It triggers updates (at least build/deploy) to downstream apps.
- Filtering overhead. Most likely, separate consumers will be responsible for handling the happy path and listening for purely technical errors. As a result, they will need to filter messages, which may introduce some overhead.
- Consolidating all results into a single Kafka topic might make it tempting to process errors alongside the main business logic, leading to concerns becoming less separated and more tightly coupled.
Our toy example doesn’t imply much downstream processing, and we have deserialization errors to demonstrate emitting to multiple topics. So, let’s report just simplified status to the single topic.
def updateBankState(state: BankState): Pipe[
IO,
CommittableConsumerRecord[IO, String, Input],
CommittableConsumerRecord[IO, String, String]
] = _.evalMap { case committable@CommittableConsumerRecord(record, _) =>
val resultIO: IO[Either[Throwable, Either[BankError, AccountOperationResult]]] = record.value match {
case withdrawal: WithdrawalOperationV2 =>
state.withdraw(withdrawal).attempt
case replenishment: ReplenishmentOperationV2 =>
state.replenish(replenishment).map(_.asRight[BankError]).attempt
}
val messageIO: IO[String] = resultIO.map {
case Left(exception) => s"exception ${exception.getMessage}"
case Right(operationResult) => operationResult match {
case Left(bankError: BankError) => bankError match {
case AccountNotFound(accountId) => s"not found $accountId"
case InsufficientAmount(accountId, actual) => s"$accountId: lack of money $actual"
}
case Right(value) => value match {
case AccountUpdated(accountId, amount, sequenceId) =>
s"$accountId updated, amount $amount, sequence $sequenceId"
case OperationIgnored(accountId, sequenceId, actualSequence) =>
s"$accountId was not updated, sequence $sequenceId, actual sequence $actualSequence"
}
}
}
messageIO.map(committable.as)
}
Pipes (again)
Reminder, pipes are functions!
Quick reminder, Pipe
is just a function, so we can compose pipes similarly.
def stateFlow(state: BankState): Pipe[
IO,
CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]],
CommittableConsumerRecord[IO, String, String]
] = collectValidValues andThen updateBankState(state)
The next step is committing offsets back. Again, we targeted to maintain at-least-one semantics, and that requires attention.
Race between broadcasted pipes
First of all, I introduced broadcastThrough
API previously.
Using broadcastThrough
brings benefits:
- Clear separation of concerns in the code makes it easier to reason about the overall processing logic and manage individual parts of the stream processing.
- It enables concurrent processing of the stream through multiple pipes. Our case is quite artificial since deserialization error shouldn’t occur often and require parallelization.
The cite from scaladocs:
A pipe starts processing a chunk after pulling it from its buffer. The topic enforces some temporal constraints: No chunk is pushed to the buffer of any pipe until after the previous chunk has been published to all pipes. No chunk is pushed to a pipe until the pipe pulls the previous chunk. A chunk may be pushed to some pipes, and pulled by them, before other pipes have pulled the previous chunk. Thus, in processing source values, a fast pipe may be up to two chunks ahead of a slower one. This keeps a balance of progress, and prevents any pipe from getting too far ahead.
In our case, the obvious options are:
- Commit offsets for any processed offset, no matter which pipe did the processing. It doesn’t work because of processing of the deserialization error (which could be just logging) starts to report that input was processed while bank operations (with earlier offsets) are still in progress or even not started. So there is a chance that we will drop bank operations on the floor.
- Commit offset only after processing of bank operations. We de-prioritize deserialization errors and let them be committed “occasionally” alongside processed operations. In the code, I will proceed with the second option to still have a full-featured case with a combination of pipe and subsequent parallel processing.
In practice, I would advocate committing offset only after all responsible parties have completed related computations.
- In the case of
Either
-like branching and choices between one of the actions, it is sensible to continue reasoning in terms of a single flow, callevalMap
for every entry, and switch to the necessary branch with pattern matching.parEvalMap
can help to bring parallelization without ruining ordering. - Separate pipes could be the right choice for heavy and separated IO operations (performed for each entry). But an additional state will be required. The option here is committing offsets to an intermediary which “listens” to all the pipes. That intermediary ensures that no pipe falls behind and commits only when all pipes report that they have done a job.
With all consideration, we can switch back to the bank operations pipeline.
val produceStatus: Pipe[IO, CommittableConsumerRecord[IO, String, String], Unit] =
statusStream =>
KafkaProducer.stream(producerSettings)
.flatMap(producer => statusStream.evalMap {
case CommittableConsumerRecord(record, offset) =>
producer
.produceOne(ProducerRecord("operations", record.key, record.value))
.flatten
.as(offset)
})
.groupWithin(100, 10.seconds)
.evalTapChunk(chunk => CommittableOffsetBatch.fromFoldable(chunk).commit)
.void
Here we define the pipe:
- A Kafka producer stream is created using
KafkaProducer.stream(producerSettings)
. This stream will be used to produce records for the Kafka topic. We can re-useproducerSettings
since we have dummy strings everywhere. - For each
CommittableConsumerRecord
, the pipe extracts therecord
andoffset
, creates aProducerRecord
with the same key and value as the input record, and specifies the destination Kafka topic as “operations”. - Pipe produces the
ProducerRecord
to the Kafka topic withproduceOne
; there is a space for batching, but we can assume it is better to deliver status without delays. Theflatten
method is called to unwrap theIO[IO[A]]
returned byproduceOne
. .as(offset)
puts offset as value for more convenience at the next step.- The produced stream of
CommittableOffset
s is batched usinggroupWithin
, which collects offsets into chunks based on a maximum size (100) or a maximum duration (10 seconds) and performs commitment.
def operationsFlow(state: BankState): Pipe[
IO,
CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]],
Unit
] = collectValidValues andThen updateBankState(state) andThen produceStatus
And for deserialization errors, I suggest simplifying things and just printing them:
val printFailures: Pipe[
IO,
CommittableConsumerRecord[IO, String, DeserializationError],
Unit
] = _.evalMap(r => IO.println(r.toString)).void
Whole failureFlow
:
val failureFlow: Pipe[
IO,
CommittableConsumerRecord[IO, String, Either[DeserializationError, Input]],
Unit
] = collectFailures andThen printFailures
Now, the whole program is just running two pipes.
def program(state: BankState): fs2.Stream[IO, Unit] =
KafkaConsumer
.stream(consumerSettings)
.subscribeTo("topic1")
.records
.broadcastThrough(stateFlow(state), failureFlow)
Weaver
The final touch is the CI test against real Kafka.
Our toy example requires refactoring to be more or less an independent item (instead of functions inside main
).
I structured BankOperationProgram
in the following way:
class BankOperationProgram(
consumerSettings: ConsumerSettings[
IO,
String,
Either[DeserializationError, AccountOperationV2]
],
producerSettings: ProducerSettings[IO, String, String],
inputTopic: String,
outputTopic: String,
state: BankState
) {
def processRecord(bankState: BankState)(
record: ConsumerRecord[String, Either[Throwable, AccountOperationV2]]
): IO[Unit] = ???
val collectValidValues: Pipe[
IO,
CommittableConsumerRecord[IO, String, Either[DeserializationError, AccountOperationV2]],
CommittableConsumerRecord[IO, String, AccountOperationV2]
] = _.fproduct(_.record.value).collect { case (committable, Right(validValue)) =>
committable.as(validValue)
}
val collectFailures: Pipe[
IO,
CommittableConsumerRecord[IO, String, Either[DeserializationError, AccountOperationV2]],
CommittableConsumerRecord[IO, String, DeserializationError]
] = _.fproduct(_.record.value).collect { case (committable, Left(failure)) =>
committable.as(failure)
}
def updateBankState(state: BankState): Pipe[
IO,
CommittableConsumerRecord[IO, String, AccountOperationV2],
CommittableConsumerRecord[IO, String, String]
] = ???
val produceStatus: Pipe[IO, CommittableConsumerRecord[IO, String, String], Unit] = ???
val printFailures: Pipe[
IO,
CommittableConsumerRecord[IO, String, DeserializationError],
Unit
] = ???
val failureFlow: Pipe[
IO,
CommittableConsumerRecord[IO, String, Either[DeserializationError, AccountOperationV2]],
Unit
] = ???
def process: fs2.Stream[IO, Unit] =
KafkaConsumer
.stream(consumerSettings)
.subscribeTo(inputTopic)
.records
.broadcastThrough(stateFlow(state), failureFlow)
}
Whole implementation could be found here: BankOperationProgram.scala
All pipes could be tested separately, but now our focus is quick validation that the whole process works as expected.
Weaver is the appropriate tool for such suites.
The suite itself could be shaped in the following way:
object KafkaSuite extends weaver.IOSuite {
case class TestResources(
consumerSettings: ConsumerSettings[
IO,
String,
Either[DeserializationError, AccountOperationV2]
],
producerSettings: ProducerSettings[IO, String, String],
inputTopic: String,
outputTopic: String,
consumer: KafkaConsumer[IO, String, String]
)
type Res = TestResources
override def sharedResource: Resource[IO, Res] = {
val consumerSettings = ConsumerSettings[IO, String, String]
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:29092")
.withGroupId("group")
val producerSettings =
ProducerSettings[IO, String, String]
.withBootstrapServers("localhost:29092")
.withProperty("topic.creation.enable", "true")
KafkaConsumer
.resource(consumerSettings)
.map(consumer =>
TestResources(
consumerSettings =
ConsumerSettings[IO, String, Either[DeserializationError, AccountOperationV2]]
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:29092")
.withGroupId("group"),
producerSettings = producerSettings,
consumer = consumer,
topic = "input"
)
)
}
test("any test") { resources =>
...
}
}
It is worth mentioning that defining type Res
and overriding sharedResource
is the standard flow for suites that extend weaver.IOSuite
. TestResources
is the fixture that handles all the resources that will be required to run the program.
Let’s produce some test messages:
def produceTestMessages(
producerSettings: ProducerSettings[IO, String, String],
topic: String
): IO[ProducerResult[String, String]] = KafkaProducer
.resource(producerSettings)
.use(
_.produce(
ProducerRecords(
List(
ProducerRecord(
topic,
"key",
"""
|{
| "operation_type" : "replenishment:v2" ,
| "account": "123e4567-e89b-12d3-a456-426614174000",
| "sequence_id" : 1,
| "value": 200
|}
|""".stripMargin
),
ProducerRecord(
topic,
"key",
"""
|{
| "operation_type" : "withdrawal:v2" ,
| "account": "123e4567-e89b-12d3-a456-426614174000",
| "sequence_id" : 1,
| "value": 100
|}
|""".stripMargin
),
ProducerRecord(
topic,
"key",
"""
|{
| "operation_type" : "withdrawal:v2" ,
| "account": "123e4567-e89b-12d3-a456-426614174000",
| "sequence_id" : 2,
| "value": 100
|}
|""".stripMargin
)
)
)
).flatten
)
The test itself is a combination of what we implemented previously. It produces test operations and runs the program in the separate Fiber:
test("program produces expected messages") { resources =>
val produce: IO[ProducerResult[String, String]] =
produceTestMessages(resources.producerSettings, resources.inputTopic)
val process: fs2.Stream[IO, Unit] = for {
state <- fs2.Stream.eval(BankState.make)
program = new BankOperationProgram(
resources.consumerSettings,
resources.producerSettings,
resources.inputTopic,
resources.outputTopic,
state
)
_ <- program.process
} yield ()
for {
started <- (produce *> process.compile.drain).start
_ <- resources.consumer.subscribeTo(resources.outputTopic)
received <- resources.consumer.records
.take(3)
.compile
.toList
.timeoutTo(5.seconds, started.cancel *> IO.raiseError(new RuntimeException("timeout")))
_ <- started.cancel
} yield expect(
received.map(_.record.value) == List(
"123e4567-e89b-12d3-a456-426614174000 updated, amount 200, sequence 1",
"123e4567-e89b-12d3-a456-426614174000 was not updated, sequence 1, actual sequence 1",
"123e4567-e89b-12d3-a456-426614174000 updated, amount 100, sequence 2"
)
)
}
You can find full test here: KafkaSuite.scala
Instead of summary
- The cool thing about that test is that it hides the asynchronous nature and lets us reason about running the underlying process as a simple function that consumes input and produces output.
.take(3).compile
returns first three entries immediatly afterfs2-kafka
pull them from Kafka. - Such “waiting for results” tests can be stuck in waiting. So, better to set timeouts.
Finally, running a program in Fiber to stop it with cancel
is not the best option. One of the conventions is to transmit a shutdown signal via fs2.concurrent.SignallingRef
.
For instance, org.http4s.server.ServerBuilder
has the following implementation:
/** Runs the server as a Stream that emits only when the terminated signal becomes true.
* Useful for servers with associated lifetime behaviors. */
final def serveWhile(
terminateWhenTrue: Signal[F, Boolean],
exitWith: Ref[F, ExitCode],
): Stream[F, ExitCode] =
Stream.resource(resource) *> (terminateWhenTrue.discrete
.takeWhile(_ === false)
.drain ++ Stream.eval(exitWith.get))
That’s it for today. Thank you for reading! Feel free to reach out via Twitter or any different network.