I recently had a customer who needed to consolidate data from several legacy databases (Sybase, DB2, and Oracle 10g) and then copy the transformed result set to a Microsoft SQL Server 2008 database. This was clearly an ETL use case so, being a good consultant, I suggested that they use a tool like Oracle Data Integrator. However, because they had a limited budget for new licenses and already had a license to Oracle SOA Suite 11g that they wanted to test out, they asked me to do this using BPEL.
At first, I thought that this would be fairly simple. I figured all I had to do was to just slap in a bunch of database adapters into my composite along with a few transforms into my BPEL project and then I would be off to the races.
I soon discovered that even though this solution worked perfectly for low volumes of data, as soon as I started processing anything over approximately 20,000 records at a time I ran into memory problems. It got even better when I realized that this was really going to be an issue when I would have to process more than 300,000 records in one batch every night in Production.
Houston, we have a problem.
Of course, I was on a tight timeline and just to make things more interesting this was a high-profile project because it was the first test of SOA 11g at this customer. Success meant more projects. Failure meant a nice, cold bench. Of course, this was no time to panic.
That’s when inspiration struck and I discovered how to make BPEL gulp. I realized that I had to transfer this data in bite size chunks. The key to all of this was when I realized I needed to stage the consolidated data in my Oracle database. Then I could use Oracle’s pagination feature to grab chunks of data at a time:
SELECT EFF_DT, CUSTOMER_NO, DAILY_REVENUE FROM
( SELECT ROWNUM R, EFF_DT, CUSTOMER_NO, DAILY_REVENUE
WHERE R >= 2000 and R < 3000;
If you’d like to see a simple project that demonstrates specifically how this works, download HowToMakeBPELGulp.zip. For this demo, I’m running Oracle’s Pre-built Virtual Machine for SOA Suite and BPM Suite 11g.
First, let’s set up your database. Because we need a fairly large amount of data in this example, you will need to download two zip files: HowToMakeBPELGulp_DataModel_1.zip and HowToMakeBPELGulp_DataModel_2.zip. Once they have been extracted, be sure that you have the following 4 files in the same directory:
Log into your local database as the SYSTEM user and run 1-MasterDataLoader.sql. This will set up two users: CUSTOMER_STAGING and CUSTOMER_REPOSITORY. The passwords for both users are the same as their respective names. This script will also load 300,000 rows of sample data into the CUSTOMER_STAGING.CUSTOMER_SOURCE_DATA table. This is the staging table we’ll use to hold all of the data we’ve consolidated from our various data sources. The script is going to take a little while to run so please be patient.
Next, you’ll need to set up a WebLogic datasource and data connection pool for both users. If you’re a little fuzzy on how this works, check out a blog article I recently wrote on this topic: Oracle SOA Database Adapter 101 - WebLogic Configuration Steps.
Here is the setup information for users:
|Data Source JNDI||jdbc/CustomerStaging|
|Connection Pool JNDI||eis/DB/CustomerStaging|
|Data Source JNDI||jdbc/CustomerRepository|
|Connection Pool JNDI||eis/DB/CustomerRepository|
Here are the steps to create this demo:
SELECT COUNT(*) FROM CUSTOMER_SOURCE_DATA
SELECT EFF_DT, CUSTOMER_NO, DAILY_REVENUE, COMPANY_NAME, STREET1, STREET2, CITY, STATE, ZIP, PHONE FROM (SELECT ROWNUM R, EFF_DT, CUSTOMER_NO, DAILY_REVENUE, COMPANY_NAME, STREET1, STREET2, CITY, STATE, ZIP, PHONE FROM CUSTOMER_SOURCE_DATA) WHERE R >= ? and R < ?
In the 6th line of the generated XML, replace "arg1" with "beginningPosition." In the 7th line, replace "arg2" with "endingPosition."
- Create a new SOA project and then open up your composite
- Create a database adapter external reference called “CustomerSourceData_GetCount” which will run the following query against the CUSTOMER_STAGING schema:
- Create an asynchronous BPEL process called SyncCustomerUpdates and wire the CustomerSourceData_GetCount database adapter to this new process. You need to get the total record count from the CUSTOMER_SOURCE_DATA table in preparation for the while loop that we will create in the next few steps. Note that this pattern will NOT work with a synchronous BPEL process that is invoked from a web service.
- In your BPEL process, create a variable called InstanceData based off of the following XSD in order to track your position in the resultset in the while loop we're about to create. The natural question here would be why I didn’t choose to create variables based on simple datatypes rather than one variable based upon a complex type. The reason, as we’ll see later, is so that I can use this variable within transforms in the while loop.
- Instantiate your new instanceData variable to the following values:
if (totalInstances < 1000) then
endingPosition = totalInstances + 1
endingPosition = 1000
- TotalInstances = response count from CustomerSourceData_GetCount
- BeginningPosition = 1 -- (It’s important to note that SOA 11g array indexes begin at 1 as opposed to java indexes that start at 0)
- EndingPosition – This will require a bit of logic to handle different situations. Since we’re going to iterate in blocks of 1000, we will need to have logic that says:
- continueProcessing = true
- Now we’re ready to create our while loop. Let’s give it the name: "WhileSyncDataRecordsRemainForProcessing". Be sure to insert a skip condition so that if instanceData.totalInstances = 0 then the while block is skipped. The while condition should evaluate the following condition: instanceData.continueProcessing = 'true'.
- Go back out to your composite and create another Database Adapter called “RetrieveBlockOfData_From_CustomerSourceData” against the CustomerStaging database. This will be a Pure SQL Query:
- In your BPEL process, insert a transform within the while loop you just created. This will be used to populate request data for the RetrieveBlockOfData_From_CustomerSourceData query. Use the InstanceData object as your input.
- Insert an invoke activity to call the RetrieveBlockOfData_From_CustomerSourceData query.
- In your composite, create a new Database Adapter called, “CustomerRepository_Insert” against the CUSTOMER_REPOSITORY schema. Set the Operation Type to “Insert Only.” Import the CUSTOMER_DAILY_REVENUE table from the CUSTOMER_REPOSITORY schema. For this simple example, you can select CUSTOMER_NO and EFF_DT as the primary keys.
- Once you’ve wired this to your BPEL process, go into your while loop and create a new transform to map the output from the RetrieveBlockOfData_From_CustomerSourceData query to the input for the CustomerRepository_Insert query. This obviously isn't the world's most exciting transform, but the intent isn't to focus on how the data is altered but rather to provide a pattern for managing the data transfer when there are a lot of rows.
- Insert an invoke activity to call the CustomerRepository_Insert query
- Next, we need to insert a new transform within the while loop to increment your counters within the InstanceData variable – that is unless you like unleashing never-ending loops on your poor, unsuspecting WebLogic server. The InstanceData variable is both your input and your output for this transform. Also, remember to cast your numeric variables as number values in when statements. If you don't, they will be evaluated as strings which will be tricky to debug because sometimes they will work correctly and other times they won't depending upon which values you're comparing. For example:
<xsl:when test="number(/client:InstanceData/client:beginningPosition) = 1.0">
- totalInstances – Will simply be mapped to itself
- beginningPosition – Use the following logic:
if (beginningPosition = 1) then
beginningPosition = beginningPosition + 999
beginningPosition = beginningPosition + 1000
- endingPosition – Use the following logic:
if ((endingPosition + 1000) > totalInstances) then
endingPosition = totalInstances + 1 else endingPosition = endingPosition + 1000
- continueProcessing – use the following logic:
if (totalInstances > endingPosition) then
continueProcessing = true
else continueProcessing = false
If you were to deploy the process in its current state, more than likely you’d get a FabricInvokationException at the end of the process and everything would be rolled back. You would see messages in the logs saying things like: “bpel engine can not proceed further without an active transaction” and “Transaction timed out after 303 seconds.”
So, how can this be fixed? Do I send Houston another panicky radio message? Maybe I could use my phone a friend option.
After a little thought, I realized that this was happening because I needed to commit each transaction at the end of each iteration of the while loop. Otherwise, the transaction ended up being too large to commit when the while loop finished iterating thru 300,000 rows of data. What a shock! In order to trigger the commit, I needed to force the process to dehydrate at the end of each while loop. In other words … gulp.
In this pattern, each while loop will retrieve 1000 records from the CUSTOMER_STAGING database, transform the data, and then write those 1000 records to the CUSTOMER_REPOSITORY database. However, as we’ve just seen, that’s not enough. At the end of the while loop, it’s critical that you insert a Dehydration point from the Oracle Extensions.
This will force BPEL to commit the transaction, thus avoiding the possibility of a transaction timing out due to its size. If you don't want to use the dehydration point, you can also insert a Wait activity. You will need to set the interval to at least 3 seconds, which is the minimum interval that will force the BPEL engine to trigger a dehydration. Regardless, using this pattern you can process extremely large resultsets using BPEL.
I love it when a plan comes together
Whew. We made it. Now if you ever feel an overwhelming urge to use BPEL as an ETL tool you have one more pattern in your tool box to help you out. In review, we found in the course of this discussion that the important pieces of this pattern were:
- Use an Oracle database to stage your data (or any database that easily supports pagination)
- Only process small chunks of data at a time using Oracle's pagination feature
- Be sure to force a commit after processing each chunk of data
Be sure to check out my upcoming blog posts that will discuss more advanced topics involving Oracle SOA 11g, BPEL, BPMN and project management.