Acceptance Tests with embedded Kafka – Best Practices

Acceptance Tests with embedded Kafka – Best Practices

Testing an application with asynchronous communication based on Spring and Kafka with Cucumber can be difficult and cumbersome. Therefore, in this blog post we wrote our best practices for writing acceptance tests. These should help you to easily create robust acceptance tests using embedded Kafka. For this blog entry it is beneficial to have some knowledge of Spring, Kafka and Cucumber.

Introduction

In one of our microservices we are using Kafka for asynchronous communication. Recently we had some problems in the build pipeline due to some acceptance tests failures. However local execution worked just fine.

Digging deeper into the issue, we found the following problems:

  • Inconsistent configuration of the Kafka consumers which read from the Kafka topics
  • Race Conditions when creating the inital system state by sending a new message and consuming the expected result

Therefore, we decided to create some guidelines, when writing acceptance tests for Kafka in Spring. By following these, the risk for Race Conditions and inconsistent configuration should be minimized thus resulting in more stable acceptance tests.

We focused on these three aspects:

  • Structure of Cucumber features
  • Configuration and creation of Kafka consumers
  • Handling asynchronosity in tests

Structure of Cucumber features

We unified our definition of the Given-When-Then steps following the best practice BDD approach.

Scenario: An existing user should be renamed
  Given A user with id="user-3" and name "Hans Oldman"
  When The name of the user with id="user-3" changes to "Peter Youngman"
  Then The name of user with id="user-3" should have changed to "Peter Youngman"
Given-Step
  • Create the initial state in the system, which is necessary to perform the action in the „When“ step.
  • Multipe „Given“ steps can be chained with „And“.

In this example, we create a new user in the application.

When-Step
  • Keep it simple when you write your action step. A single action should be triggered. This action should modify the system state to the expected state.
  • If another action is required, it might be beneficial to change the initial state of the system in the “When” step. (Multiple actions lead to more sources of error)

In the example above, we want to change the name of an user. The creation of an user in the „Given“ step will be tested in a seperate test.

Then-Step
  • Validate the expected system state
  • Use data from outpout topic(s) or persisted data in a database
  • Use one „Then“ step to validate a single topic or table. (Reusability of steps)
  • Concatenate multipe „Then“ steps with „And“ for complex validations

Here the changed system status should be validated using the data on the output topic(s) or the persisted data in a database. We recommend to validate a single topic/database table in a “Then” step, as this makes it easier to reuse the steps when creating more complex tests. Of course it is possible and often necessary to validate multiple topics or tables. For this purpose the „Then“ steps can be concatenated with „And“.

In our example, we will load the user from the database to validate the changed name.

Kafka consumers

Configuration

With the property auto.offset.reset for a Kafka consumer it is possible to consume all messages that have been written to the topic (earliest). Or, using the value latest, only new messages after the creation of the consumer are consumed. Mixing this configuration at different consumers can lead to confusion, and it is also unpredictable which messages with earliest will actually be consumed, as this would require a fixed execution order of the tests.

To solve this problem, we decided to consume only the messages that are sent during the execution of a specific scenario. Therefore, we configured our consumer with auto.offset.reset set to latest (see code below). Furthermore, we defined a strict pattern when to open and close Kafka consumers. They should be created before and after each test to ensure that a message on a topic is not consumed on accident from a wrong scenario.

public static <K, V> Consume<K, V> setUpConsumer(
  final EmbeddedKafkaBroker embeddedKafkaBroker,
  final Deserializer<K> keySerde,
  final Deserializer<V> valueSerde) {
    Map<String, Object> consumerProps = 
    KafkaTestUtils.consumerProps(UUID.randomUUID().toString(), "true", embeddedKafkaBroker);
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    DefaultKafkaConsumerFactory<K, V> cf = new DefaultKafkaConsumerFactory<>(consumerProps, keySerde, valueSerde);
    return cf.createConsumer();
}

Consumer Setup

As described in the previous section, the consumer for a specific topic should be created and closed right before and after the test scenario. To achieve this we can use the @Before and @After hooks of Cucumber.

Therefore we added the following method to create a consumer and added the @Before annotation including a unique tag.

@Before(value = "@UserConsumer")
public void createUserConsumer() {
    userConsumer = KafkaTestConsumerUtil.setUpConsumer(embeddedKafkaBroker,
        new StringDeserializer(), new JsonDeserializer<>(User.class, objectMapper));
    embeddedKafkaBroker.consumeFromAnEmbeddedTopic(userConsumer, USER_TOPIC);
}

And a second method to close the according consumer after the test execution (doesn’t matter if successful or not), using the same unique tag from the @Before to link

@After(value = "@UserConsumer")
public void closeUserConsumer() {
    this.userConsumer.close();
    this.userConsumer = null;
}

All of this code is wrapped inside a class, designed to provide different consumers, which can be used in the test steps. This class can be autowired across the acceptance tests to access the consumers.

public class UserConsumer {

    public static final String USER_TOPIC = "user-demo-topic";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;
    @Autowired
    private ObjectMapper objectMapper;

    private Consumer<String, User> userConsumer;

    @Before(value = "@UserConsumer")
    public void createUserConsumer() {
        userConsumer = KafkaTestConsumerUtil.setUpConsumer(embeddedKafkaBroker,
            new StringDeserializer(), new JsonDeserializer<>(User.class, objectMapper));
        embeddedKafkaBroker.consumeFromAnEmbeddedTopic(userConsumer, USER_TOPIC);
    }

    public Consumer<String, User> getUserConsumer() {
        if (userConsumer == null) {
            throw new IllegalStateException("No consumer for User created. Use the value from @Before in the feature description for creation");
        }
        return userConsumer;
    }


    @After(value = "@UserConsumer")
    public void closeUserConsumer() {
        this.userConsumer.close();
        this.userConsumer = null;
    }

}

We can use this new tag to create and close a consumer around a test. Therefore we add the tag @UserConsumer to a scenario definition:

@UserConsumer
Scenario: Creating a new user should publish a message to the topic
  When A user with id="user-1" and name "Max Mustermann" is created
  Then The user with id="user-1" and name "Max Mustermann" should be written on the topic

This test is going to publish a message on a Kafka topic. Afterwards we want to validate the ouput of this topic.

Handling asynchronicity in tests

Problem

With the changes above, in an acceptance test, we are now able to receive messages only created by a specific scenario. However. There might be still issues with race conditions, because of the asynchronous processing of the Kafka messages. The issue therefore is the way how messages are consumed from a topic:

public static <K, V> ConsumerRecord<K, V> consume(K id, Consumer<K, V> consumer, String topic, int timeout) {
    Iterable<ConsumerRecord<K, V>> records = KafkaTestUtils.getRecords(consumer, timeout).records(topic);
    
    return StreamSupport.stream(records.spliterator(), false)
        .filter(aggregateConsumerRecord -> aggregateConsumerRecord.key().equals(id))
        .reduce((first, second) -> second)
        .orElseThrow(AssertionError::new);
}

In the code example above we are using KafkaTestUtils.getRecords(consumer, timeout) to read messages from Kafka. The result is filtered for a specific message id. The problem with this line is, that the KafkaTestUtils.getRecords(consumer, timeout) method is only called once. The specified timeout in this method only applies if no messages are received from the topic.

Let us assume the tests are running on a fast local machine and the message we want to consume is already processed and written to the destination topic. With KafkaTestUtils.getRecords(consumer, timeout), everything would be fine, and we would be able to read this message.
However, assume the same is running on a build server which is much slower than your local machine. After the command is sent in the “When” step, you start immediately reading the records from the destination topic. Now the KafkaTestUtils.getRecords(consumer, timeout) method can time out (if no message is on the topic or no message arrived in the interval) or return messages from previous tests (when the consumer is configured with auto.offset.reset set to earliest). This will cause the test to fail, although there is only a race condition between processing and reading of the Kafka message.

Awaitility

To avoid such behavior, it is necessary to wait a certain amount of time before consuming from the topic. A straight forward way is simply use Thread.sleep() However, this is considered bad practice, as you wait for 5 seconds, regardless of the fact that the message may arrived earlier. The more advanced approach is to implement a polling mechanism. This offers the possibility to poll the topics at regular intervals and resume right after the message arrives. Fortunately, there is already a widely used library that tackles this issue.

We decided to use Awaitility to implement a polling mechanism and configured the await() method to poll for 5 seconds, in an interval of 500ms:

public class AwaitilityHelper {

    public static void wait(Callable<Boolean> condition) {
        await()
            .atMost(Duration.ofSeconds(5))
            .with()
            .pollDelay(Duration.ZERO)
            .pollInterval(Duration.ofMillis(500))
            .until(condition);
    }
}

Subsequently we refactored our consume() from above to use Awaitility:

public static <K, V> ConsumerRecord<K, V> consume(Consumer<K, V> consumer, String topic, final Predicate<? super ConsumerRecord<K, V>> condition) {
    final AtomicReference<ConsumerRecord<K, V>> poll = new AtomicReference<>();
    AwaitilityHelper.wait(() -> {
        Iterable<ConsumerRecord<K, V>> records = KafkaTestUtils.getRecords(consumer).records(topic);
        final ConsumerRecord<K, V> record = StreamSupport.stream(records.spliterator(), false)
            .filter(condition)
            .findFirst()
            .orElse(null);
        poll.set(record);
        return record != null;
    });
    return poll.get();
}

As you can see there is a new method parameter which specifies the condition that must be fulfilled. Awaitility now polls every 500ms for 5 seconds using the KafkaTestUtils.getRecords(consumer).

Of course, race conditions can still occur, if the processing of a message takes more than 5 seconds. In this case, you have to reconfigure Awaitility to match your needs.

Summary

To briefly sum up the explained concepts:

  1. A uniform and consistant approach for Cucumber features is essential. It is beneficial to only use a single “Then” step. If another one is needed it is better to to expand the “Given” step.
  2. Consume only messages from the current scenario. Make sure this is supported and implemented by the design of your acceptance tests.
  3. Use a library like Awaitility for implementing a polling mechanism to tackle the issue with timeouts and race conditions.

After we refactored our acceptance tests considering all these aspects, our tests are now running more stable are even faster. In addition, we have reduced the complexity of the code and increased its readability and maintainability.

All of the code snippets from above can be found in a working sample application in this repository on GitHub.

Schreibe einen Kommentar