Basics of Batch Processing in MuleSoft CloudHub

Introduction

Batch processing provides a construct for asynchronously processing large datasets. The batch job splits up incoming source data into individual records and sends them onto a series of batch steps. Each batch job contains three phases, which are explained below.

Phase 1 - Load and Dispatch

This implicit phase in the batch processor splits the incoming collection into individual records and places them in a persistent queue. You don't need to explicitly configure anything to do this. Data in persistent queues survives the Mule runtime restarts, we will go into this in more detail later.

Phase 2 - Process

This phase consists of actual logic in the form of batch steps that every record will go through. In this phase, Mule asynchronously processes records that were queued in the previous implicit phase. You can have any number of batch steps in this phase, and add entry conditions so every record will get evaluated before entering that step. Every batch step also contains a construct for aggregations, where you can aggregate a configured number of records and work on those as a collection.

Phase 3 - Complete

This is the optional phase where you can generate reports on the batch job execution. The payload here is the batch job report (BatchJobResult object) with information such as total, successful, and failed records. People usually write some logic here to notify concerned parties about the successes or failures to facilitate any further action.

Every collection supplied to a batch processor goes through these phases, and an instance of this execution is known as a Batch Job instance.

Why Use Batch Processors?

Parallel processing in Mule can be achieved in multiple ways, such as using scopes like Parallel for each or scatter/gather. Using batch processors not only achieves parallel processing, but also gives you the ability to fine-tune parallel processing with many built-in batch features, such as:

  1. Separate and fine-tune certain phases of processing, by configuring batch steps with filters and entry conditions.
  2. Control the scheduling of batch job instances if there is more than one instance waiting to run for a given batch job definition.
  3. Stop processing when a configured number of failed records is met.
  4. Wait and collect a configured number of records and act on that collection. For example, if you want to process 10000 records in a parallel fashion, but want to collect and send those transformed payloads to SFDC 2000 at a time, you can use batch aggregate to achieve this.
  5. Generate reports on success and failed records.

These features cannot be achieved by any other component in Mule, only by the batch processor.

Batch Processor Persistent Queues Behavior in CloudHub

When records are loaded into the persistent queues, Mule runtime writes the data in a CloudHub worker secondary storage. This data survives when the Mule runtime (JVM/Tanuki wrapper restart) inside the worker restarts, which means that in these cases the batch processor can continue where it left off. This Mule runtime restart inside the worker is not equivalent to the application restart - an application restart creates a brand new worker with a fresh Mule runtime in it, and all the data in the previous worker will be lost, so there will be no continuity in case of a CH worker application restart.

What About the “Enabling Persistent Queues” Setting for the CH App?

When you enable this setting when deploying an app in CH with batch processor logic, the batch processor no longer writes records to in-worker secondary storage, but it uses AWS SQS queues underneath. Batch processing continuity for “application worker restarts” will NOT be achieved no matter what the persistent queue setting is. The main idea for this is to use Cloud-based queuing/storage.

Remember that the batch processor only runs in the CloudHub primary worker, you cannot modify this behavior. Many people expect that enabling this persistent queue setting would share the processing of records between multiple nodes in the cluster, but this is not correct. The Batch processor writes all the metadata in the worker secondary storage; once the worker is lost, the data will be lost and Mule won’t be able to continue processing the leftover records in those persistent (SQS) queues.

Some disadvantages of using the persistent queue setting when you have a batch processor in your app are:

  1. Increased record processing time due to the SQS overhead. Every message needs to be pushed to an SQS queue and retrieved from it, between every batch step.
  2. Duplicate processing of records. There is a chance of processing a record multiple times when your processing step takes more than 70 seconds. This happens because the acknowledgment timeout for the SQS queue defaults to 70 seconds. If the batch step processing is not complete, and Mule didn't acknowledge the message within that 70 seconds, the message will be put back to the queue and will be processed again by the same batch step. However, you can tweak this timeout setting by providing this global property to the Mule app: Persistent.queue.min.timeout

unnamed

Tips Around the Ideal Configuration of the Batch Processor

    1. You may give the value of the “Max failed records“ setting to -1, This implies that the batch processor does not stop processing no matter how many records are failed. Make sure you properly handle failed records with some strategies such as having a batch step with the accept policy as “ONLY FAILURES“ to take care of the failed records within that step. Remember that the error handler in the flow where the batch job resides will be ignored.
    2. Map your flow correlation ID variable to the field “Job instance ID“ in batch processor settings. The batch processor generates a random UUID for every batch job instance, and it logs some information under that batch job instance ID such as the record counts, success and failure reports, etc. If we tweak this setting, it will be easier to find these native logs in our log analyzers by using a known correlation ID that we usually maintain across application layers.11abc4df-eb3f-441a-b842-fac4fa9662ea
    3. The Batch processor clones the whole Mule event on the load and dispatch phase, so try to send just your collection into the batch processor without unnecessary variables. This saves some memory in case of processing millions of records. Use the “Remove Variable” component to remove unnecessary variables before invoking batch processors.
    4. There is no need to configure the max Concurrency setting, as Mule automatically decides the number of threads to run on based on the hardware resources available. You may set max Concurrency to 1 when developing batch processing logic in Anypoint Studio. This is for local debugging ONLY. This forces the batch processor to run synchronously, and you can debug your code easily without worrying about having to run multiple threads.
    5. Keep in mind that batch processor execution is asynchronous from the main thread. This means that upon batch processor invocation from a flow, a copy of the main thread (Mule message) will be created. The batch processor will run on that copy and the main thread will continue processing beyond the batch processor (if any).

Batch Processor Asynchronous Behavior FAQ

Let's say you have two batch job definitions in your project, batch jobs A and B. What if you want to run these batch jobs sequentially?

You can chain batch job runs one after another by calling the batch job B flow from the on-complete phase of batch job A. This ensures that batch job B will run only after batch job A.

As the batch job execution is asynchronous from the main thread, what if you have a use case where you want to skip the execution if one is already running (to mitigate that async behavior)?

You may use a pattern where you can implement a 'lock' (entry) using Mule object store where you will check for the lock's existence in the object store before invoking your batch job. If there is no lock, you can create one and proceed to invoke your batch job. Otherwise, you can skip that scheduler run by routing to the end of the flow and by not invoking the batch job.284dc525-b076-4196-95b4-aa2a2cb47d9a

You need to remove that entry from the object store inside your batch job code when your run is complete so that the next execution of the batch job runs. Make sure you remove this lock in your batch job exception handling logic as well in case of any exception in the flow.

5 minute read