March 8 2017

Subscribing to a message from an Oracle AQ queue is not out of the box functionality within Mule, but can be done by following the steps provided in this article.

JMS transport

The first step to get a Mule flow to communicate with AQ is to add the following spring bean configuration:

   <spring:bean id="connFactory" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory"> 
   <!-- JDBC URL -->   
      <spring:constructor-arg index="0"> 
         <spring:value>oracle db url</spring:value> 
   <!-- properties --> 
   <spring:constructor-arg index="1" type="java.util.Properties" value=""> 

As you probably noticed, a reference to the class, oracle.jms.AQjmsFactory is specified. This class which is found in the aqapi.jar provides the ability for Mule to utilize the JMS transport for communication with AQ. Another dependency to be added to the Mule project is the jmscommon.jar (these jars can be found within an Oracle install, such as SOA).

With the spring bean configuration set, reference it within the JMS connector attribute, ‘connectionFactory-ref’:

<jms:connector name="OracleAQConnector" 
   <reconnect-forever frequency="2000" /> 

Now you can define your flow to subscribe to the queue using the AQ configuration:

<jms:inbound-endpoint queue="AQ_Name" connector-ref="OracleAQConnector" doc:name="JMS" />


Advanced Data Types

Once the JMS configuration is done, running the application may give you a result that is not expected, an error: Payload factory must be specified for destinations with ADT payloads.

If this is the case, you will need to add some Java code to your Mule application to enable the consumption of advanced data types (ADT). An ADT can be used to define the payload that is placed onto AQ within the Oracle database. This type is much like an object, but in Oracle-speak.

ORADataFactory Implementation

The first Java class to be written implements the ORADataFactory interface, which is used to describe the Oracle data types. Within this class, define the getters and setters for the ADT. This blog post provides an excellent example of how this is done.

Custom Message Receiver

With the ORADataFactory implemented, it now can be referenced by the next piece of the ADT puzzle. Create a Java class that overrides the default JMS message receiver (org.mule.transport.jms.JmsMessageReceiver or org.mule.transport.jms.MultiConsumerJmsMessageReceiver) and specify this new message receiver class in the JMS connector configuration:

<service-overrides messageReceiver="com.avio.jms.connectorOverride.CustomMessageReceiver" />

Within this class, reference the ORADataFactory class implemented within the createConsumer method:

   this.consumer = ((AQjmsSession)session).createConsumer(dest, selector,  
                    null, this.connector.isNoLocal());

Custom Transformer

The last step is to create a custom transformer which will transform the ADT into a POJO, allowing the message to be consumed by Mule. The code snippet below is an example of setting values in the POJO from the ADT fields within a custom transformer class:

    AQjmsAdtMessage aqPayload = (AQjmsAdtMessage)message.getPayload();   
    myPOJO qType = new myPOJO();   
    try {   
       AQCustomPayloadObject custPayload = (AQCustomPayloadObject)aqPayload.getAdtPayload();   

Reference this new transformer within the JMS inbound endpoint:

<custom-transformer name="QtypeToPOJO" class="com.avio.CustomTransformer" doc:name="Java"/>


About the Author


Jennie has over 25 years of information technology experience, the majority of time spent in application integration. She has broad experience in integration architecture design and implementation utilizing several integration toolsets. As an architect, she has experience developing SOA/API Governance Framework and Reference Architectures.

Join the Conversation

Alex Gorbachev
April 11, 2017

Hi Jennie,

Thanks for this guide.

I created the RAW type queue to avoid complexity of the Object type to start with (i.e. aopid ORADataFactory implementation) because I did hit the issue with ADT payload as you stated. However, I get another issue right now and I'm at loss on how to troubleshoot it.

2017-04-11 18:23:43,759 [main] INFO  org.mule.transport.jms.JmsConnector - Registering listener: pythian_datahubFlow on endpointUri: jms://Q_AQ_EVENT2
2017-04-11 18:23:43,808 [main] INFO  org.mule.transport.service.DefaultTransportServiceDescriptor - Loading default inbound transformer: org.mule.transport.jms.transformers.JMSMessageToObject
2017-04-11 18:23:43,811 [main] INFO  org.mule.transport.service.DefaultTransportServiceDescriptor - Loading default response transformer: org.mule.transport.jms.transformers.ObjectToJMSMessage
2017-04-11 18:23:43,823 [main] INFO  org.mule.lifecycle.AbstractLifecycleManager - Initialising: 'null'. Object is: MultiConsumerJmsMessageReceiver
2017-04-11 18:23:43,828 [main] INFO  org.mule.transport.jms.MultiConsumerJmsMessageReceiver - Connecting clusterizable message receiver
2017-04-11 18:23:47,460 [main] ERROR org.mule.retry.notifiers.ConnectNotifier - Failed to connect/reconnect: jms://Q_AQ_EVENT2. Root Exception was: null. Type: class java.lang.NullPointerException
2017-04-11 18:23:47,461 [main] INFO  org.mule.retry.policies.SimpleRetryPolicy - Waiting for 2000ms before reconnecting. Failed attempt 1 of unlimited

Any hints?



Enter your first name. It will only be used to display with your comment.
Enter your email. This will be used to validate you as a real user but will NOT be displayed with the comment.