Request-Reply pattern using Apache Kafka or How not to loose your data
This article contains only one line of code :)
There are many patterns to help with software design. The Gang-of-Fourth provides us with patterns to organize the code better. The other list of patterns called Enterprise Integration patterns (EIP) is intended to help with communications between applications.
There are two kinds of communications between a couple of applications:
1. Synchronous — HTTP, Sockets
2. Asynchronous — Message Queues, Databases, Files, E-Mail, Cloud storage
One of EIP is Request-Reply.
It is very simple. When one service needs in some data it sends a Request to the other service which is responsible of such data. Then responsible service prepares an Response and provides the Requestor with it.
It is not a secret, that when two applications communicate each other using sockets or HTTP, responses are sent back to Requestor right after request is caught. No mess, no miscommunications.
But sometimes there are no possibility to choose which kind of communication to use. The Asynchronicity bring us some sort of complexity.
There are no worries in case when an application does not required in response just-in-time (event driven).
But sometimes it needs to gather some data from others services for further processing (e.g. in transaction). I call such behavior “Sync through Async”. Such cases are very interesting in their deviations.
My case I want to tell you about is such one.
The company where I work at is to provide E-Wallets and E-Payments for people. A few weeks ago I have implemented the service for risk scoring of incoming payments. What is it? It is a service for qualitative assessment of any payment and making the verdict — should the payment be processed or not.
Payment risk scoring in our case is to calculate a value depending on three incoming parameters:
- Risk scoring of user (wallet)
- Risk scoring of country where the payment is going to (according to political verdicts)
- Risk scoring of payment type
So if resulting value (after calculating) becomes greater than the critical bordered value, the payment should be declined.
We have had the service responsible to user risk scoring already when I started. So my goal was to implement the service to calculate the very payment risk, but the service should have gathered user score from the responsible service. We used Apache Kafka for service communications only.
Here are a principle scheme of the microservice was called RMS (risk management system).
- when the main payment processing service meet a payment having the status “antifraud is to be checked for”, it send Payment ID into Kafka topic with name “PAYMENT-READY-FOR-ANTIFRAUD”
- Antifraud service catch this payment ID from Kafka, enrich it with full payment data reading it from database, then sends payment model into Kafka topic named “PAYMENT-DETAILS”
- PAYMENT DETAILS Kafka topic is read by RMS in order to:
a) receives payment
b) asks user risk score from Know-Your-Customer (KYC) service sending the Request into Kafka topic with name “WALLET-KYC-REQUEST”
c) Reply for such Request is waited from Kafka topic having name “WALLET-KYC-RESPONSE” by RMS . Here is Request-Reply pattern — each request waits its reply to proceed further calculations.
d) country score and payment type score are gathered from local database by RMS both
e) combining all three risk score values altogether RMS is to calculate payment risk score
f) payment risk score is sent into Kafka topic with name “PAY-DECISION”
- Antifraud service listens to payment decisions from Kafka topic, processes them in order to approve or decline the payment
The described above solution was successfully tested onto two of our independent QA stands. But it started to fail right after it was deployed onto production infrastructure (we use Kubernetes).
Checking application logs it was noted that RMS service sent user score Request into Kafka topic “WALLET-KYC-REQUEST”, then KYC service sent the Reply into Kafka topic “WALLET-KYC-RESPONSE”, but RMS service did not use them together in order to calculate decision. All Requests failed with timeout while were waiting their Responses. All Responses did not “see” corresponding Requests and failed with timeout as well.
We decided that Request has not been aggregated with corresponding Response due to they had different correlation keys (please see Aggregate EIP implemented in Apache Camel for details). But actually it was not true. We have checked it up by adding more logging messages.
And the cause was revealed right after I have noticed that RMS service was running in two instances in one Kubernetes cluster. It explained all we have seen in logs.
One RMS instance sent Request and another instance received corresponding Response. Obviously they were not able to be aggregated together.
Such the problem is usual in microservice architecture. Developers must keep in mind that their applications might be started in more that one instances in parallel. And when the data is needed to be processed inside one instance of application (in one transaction or without any) from the beginning till the end, it really important to be convicted that the application does not consume the data not intending to itself. Being more precise, the application should consume the data intending only to itself.
After understanding how it should have been implemented we started to redesign RMS service to make it consuming Responses in the instance which has the corresponding Request only.
In order to explain how it should work I should tell about Kafka managing.
All Kafka topics are divided into partitions.
Kafka clients are of types: producer and consumer. First one puts the data into Kafka topic and the latter reads the data listening (polling) it from Kafka topic accordingly.
When the Kafka producer sends message into Kafka topic it is put only in one partition of the topic. The magic is how the producer chooses the partition to put message in. Every message might have the Key (it might be null at all). All Producers have their Hash-Function which helps to select a concrete partition to put the message to. So that Hash-Function takes the message Key as an argument to calculate the Result. Then this Result is the Partition number where the message will be put to.
partitionNumber = messageKey % partitionsCount
The partitions allow writing messages in parallel. If a message is published with a key, then, by default, the producer will hash the given key to determine the destination partition. This provides a guarantee that all messages with the same key will be sent to the same partition. In addition, a consumer will have the guarantee of getting messages delivered in order for that partition.
In other side the Consumer does not read the data along. It have to work in Consumer Group (remember group-id in connection properties?). Kafka guarantee every message is delivered every consumer group. Kafka guarantee that every message is delivered to only one consumer in each consumer group.
It is complex scheme how consumers work in their groups — there is one leader and the others interacting by communication protocol having such steps as connecting, leader selecting, partitions distribution and others. All is need to know now — partition distribution uses a Strategy which could be selected on startup.
RMS service had one producer to write into WALLET-KYC-RESPONSE and one consumer to read from WALLET-KYC-RESPONSE. KYC service was not scaled at that moment — it listened all the partitions and wrote in one of partitions of topic WALLET-KYC-RESPONSE. All RMS consumers (in all instances of RMS) worked into one consumer group. Please take a look at the scheme below.
Our problem became how to make the consumer listen the very partition which the producer will put the message into.
The good solution would be to adjust all producers to put messages right into the topics that was listened to by consumers which was in need of the very data. It became a mess! The other side — such scenario is very expansive to maintain. They must to keep in mind the whole scheme of interactions between four microservices and to tune them all if only one of them needs scaling. If they do not, the data would have might be lost.
The better solution
Our solution takes for about two hours to be implemented. And the maintaining will be inexpensive.
We use Kafka polices to administer all Kafka connections — Kafka users and Consumer groups both.
So we added into the Kafka polices a rule to allow consumer group described with a mask in group-id. It allows us to set up consumer group in RMS service into runtime right after it starts.
We literally have written:
String groupId = "rms-" + UUID.random().toString();
It allows not to have a headache while scaling RMS service. All RMS instances receives all Responses. It does not matter whether the instance has sent Request right before and is awaiting the Response or not.
Therefore, whereas Responses are gathered by every running instance of RMS service, not all of they could be paired with according Request. And it is normal for us, because it is a good scenario! We have the Aggregator to pair Request with Reply. And if any of them becomes not paired it failed with timeout, writing a log message. Much better — we get guarantee that every Request with master-data will be paired with Response and the process will not be interrupted.
- The design of asynchronous interaction should take into account not only aspects of the interaction itself, but also the stages of architecture expansion.
- Synchronous interaction would be cheaper to support the solution we found.
- When implementing Request-Reply on Kafka, it is important that the retention periods of messages in the topic in the Kafka broker are set to values greater than the time of a possible restart of applications — this will reduce the risk of data loss during restarts.