In an earlier post we looked at Kafka and got hands on with creating topics, and producing and consuming messages. However, the basic tools are limited and only simple manual testing would be possible. Ideally we want to write automated tests, as these are repeatable and allow us to use more advanced datatypes.
Produce and Consume
Assuming we have Kafka topics associated with some sort of application that either consumes messages, produces them or even both, we need to be able to simply read and write those messages in our tests.
Producing a message is relatively simple – the message broker accepts it and places it on one of the partitions. We need only pass the details of the broker, topic and some information on the datatype, along with the message itself.
Consuming a message is slightly more complex – it can appear on any of the partitions, and there may be other unread messages that our consumer hasn’t read yet, meaning we may read messages that we didn’t create or aren’t relevant, so we need to be able to move the offset before we expect a message to appear, and then provide the capability to consume exactly one message (which we can then repeat if we’re expecting multiple messages)
Building the tools
With maximum flexibility and reusability in mind, let’s encapsulate the methods we need to be able to write a simple test into a separate test library and then build a test using the library.
A simple class providing a constructor to instantiate an object that allows us to write messages.
KProducer(String bootstrapServer, String keySerializer, String valueSerializer)
And a method to actually send a message (returning the timestamp as that might be useful)
syncWrite(String topic, Object key, Object value)
Another class, allowing us to subscribe to the relevant partitions of our chosen topic using a consumer group, along with methods to either read the next message (taking into account the current offset of the consumer group), or to move all offsets to the end (a fast forward to the end).
KConsumer(String bootstrapServer, String keySerializer, String valueSerializer, String consumerGroup, String topic, int partitions) ConsumerRecords readNext(Long timeout) void skipForward()
Clone the repository from https://github.com/ObjectiveTester/AllThingsTesting.com-examples.git and open the
TestLib directory in an IDE to build it, or go ahead and install it directly into your local Maven repo with:
mvn clean install
The test itself
Unfortunately, I don’t have a prebuilt Kafka based application to test but producing and consuming a message to a common Kafka topic would be a simple demonstration of the concepts.
Follow the steps detailed in Exploring Kafka to start up a simple Kafka instance, and as mentioned in the article create a multi partitioned Kafka topic.
Next, open the sample test from the
SimpleKafkaTest directory of the examples repo and run it either from an IDE or from the command line with:
The test handles all the setup, and then creates some randomised data and writes it with:
producer.syncWrite("multi-partition", testkey, "test data " + testkey);
And then asserts the data is correct after reading:
assertEquals(testkey, record.key()); assertEquals("test data " + testkey, record.value());
(assuming we actually receive a record at all!)
This is a simple example of how to use a library to separate out the underlying logic required to communicate with a complex service, and the example test demonstrates two different serializer/deserializers – a real world test will most probably use a custom datatype, and so further work is required.
One of the other benefits of separating the logic from the tests is the possibility to build additional tools, perhaps to facilitate data creation for other types of testing.