Blogi 29.2.2016

Enterprise Integration Patterns Demystified – Idempotent Consumer Is Your Friend

Gofore

Kiva kun löysit tämän artikkelin! Se sisältää varmasti hyvää tietoa, mutta pidäthän mielessä, että se on kirjoitettu 8 vuotta sitten.

What is an Idempotent Consumer (aka Idempotent Receiver) and why should you consider it as a friend? First, it’s an Enterprise Integration Pattern (EIP) as the title suggests. Second, it takes care of handling duplicate messages that may travel between different systems. This blog post reveals a couple of real-life problems that can be solved using the Idempotent Consumer pattern and provides some technological insight on the implementation side.

Why do we need to handle duplicate messages?

If (when) communication relies on unreliable protocols e.g. over the Internet using HTTP message delivery can only be guaranteed by sending the message again until the sender receives the message acknowledgement. Thus, sending duplicate messages is inherent in the communication protocol itself. Suppose a B2B case where application A sends an order request to application B but doesn’t get an acknowledgement. It doesn’t know whether B received the message or not, so it decides to send the same message again. If there’s no duplicate message handling implemented, application B gets two orders instead of one.
Another point of failure lies in the area of distributed transactions in case all the parties involved are not able to participate in the distributed two-phase commit. If a message is sent to two applications but only one of them processes it successfully the state is inconsistent. In case both receiving applications do handle duplicate messages the message can be sent again. The application that already processed the message successfully can just ignore the resent message while the other one gets a second chance to get things right.

Step into the Camel route

Apache Camel is an open source Java framework that enables you to integrate distinct applications. Apart from providing a wide variety of transports and APIs it also gives you concrete implementations of many of the EIPs. The glue between transports and EIPs is the Domain Specific Language (DSL) that supports type-safe smart completion of routing rules in Java code. The list of supported transports is huge, and in addition to many old-school transports like FTP, JMS, Rest Services, Web Services, etc. it includes many of the Amazon Web Services (AWS) components e.g. Elastic Compute Cloud (EC2), DynamoDB (DDB), Simple Email Service (SES), Simple Queue Service (SQS), and Simple Storage Service (S3) to name a few.
One of the Camel provided EIP implementations is the Idempotent Consumer. In order to detect duplicate messages it needs to retrieve the unique message id from the given message exchange. There are various ways to accomplish this e.g. by using an XPath Expression to fetch it from the message body. The unique message id can then be searched from an Idempotent Repository – if it’s found the message is already consumed, otherwise the message is processed and the id is added to the repository. One thing to note about the unique message id is that it should not be tied to any domain concept in order to keep the business logic separate from the integration infrastructure.
There are a couple of options that let you control how the duplicate message handling works. First, you can enable eager processing which means that Camel will add the id to the repository before the message has been processed in order to detect duplicates even for those messages that are currently in progress. By disabling eager processing Camel will only detect duplicates for those messages that have been successfully processed. Second, you can choose whether to skip duplicates or not. By enabling this option the duplicates are not processed any further. Otherwise message processing is continued and you are given the option to implement some custom logic for the duplicates. The following code snippet depicts the latter case i.e. messages are routed to different routes (duplicateMessages vs. newMessages) based on whether they are already processed or not.

Idempotent Consumer in a Camel route
 from(inputQueue).
    idempotentConsumer(header("messageId")).messageIdRepository(idempotentRepository).skipDuplicate(false).
    filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true)).
        to(duplicateMessages).
        stop().
    end().
    to(newMessages);

Consider that you’d like your application to work in a functional style i.e. always to return the same response for identical requests. This can be achieved by saving the response as part of the newMessages route and fetching it in the duplicateMessages route. In the order example context this would mean that no duplicate orders are created but instead an existing ”order received” response is returned. Because duplicates are detected already in the integration layer the order processing application don’t need to be accessed at all. This is illustrated in the following picture.

MongoDB as an Idempotent Repository

Regarding Idempotent Repository there are some alternatives to choose from. Camel provides the following implementations:

MemoryIdempotentRepository FileIdempotentRepository HazelcastIdempotentRepository JdbcMessageIdRepository JpaMessageIdRepository InfinispanIdempotentRepository

Besides these you are free to create your own Idempotent Repository implementation e.g. a NoSQL version that uses MongoDB as a repository. All you need to do is implement the IdempotentRepository interface and take it into use in a Camel route. A skeleton version is depicted in the following code snippet.

MongoDbIdempotentRepository skeleton implementation
public class MongoDbIdempotentRepository implements IdempotentRepository<String> {
    @Override
    public boolean add(String key) {
        // add key to the repository according to the java.util.Set contract
    }
    @Override
    public boolean confirm(String key) {
        // confirm the key to the repository, after the exchange has been processed successfully
    }
    @Override
    public boolean contains(String key) {
        // check if the repository contains the key according to the java.util.Set contract
    }
    @Override
    public boolean remove(String key) {
        // remove the key from the repository (invoked if the exchange failed)
    }
    @Override
    public void start() {
        // start the service e.g. open up a MongoDB connection (a CamelContext lifecycle event)
    }
    @Override
    public void stop() {
        // stop the service e.g. close the MongoDB connection (a CamelContext lifecycle event)
    }

After the exchange has been processed the key is either confirmed to the repository or removed from it. This means that if the exchange failed it is possible to resend the message without it being detected as a duplicate i.e. only the successfully processed exchanges count. The start and stop methods originate from a Service interface which is extended by the IdempotentRepository interface and tie the implementation to the CamelContext lifecycle. In the MongoDbIdempotentRepository example they are used to open up and close the MongoDB connection respectively.
The idempotent repository only needs to keep track of the unique message id and the confirmed flag. The latter relates to the eager option discussed above i.e. in case it’s enabled the message can be considered as duplicate even though the exchange is not yet confirmed to be successfully processed. Otherwise only confirmed entries are taken into account when checking whether the repository contains the key or not. Looking at the MongoDB an idempotent repository entry might look like the following.

A MongoDB idempotent repository entry
{
    "_id" : ObjectId("579c43812adc2ab4e6e51df4"),
    "messageId" : "83502935223",
    "confirmed" : true
}

Depending on the use case it may not be desirable to keep the entries in the repository forever. Luckily MongoDB provides a couple of features that can be taken into use for expiring documents from a collection automatically. You may choose to use a time to live (TTL) collection feature by defining how long an entry is allowed to live in a collection. After the specified period of time mongod automatically removes expired entries from a collection. Another choice is to create the idempotent repository as a capped collection which is a fixed size collection. Once the collection fills its allocated space it starts overwriting the oldest documents in FIFO style in order to make room for new documents. Either way, you don’t have to worry about manually deleting old idempotent repository entries.

Takaisin ylös