【问题标题】:How to implement an Oracle AQ queue in Spring Boot?如何在 Spring Boot 中实现 Oracle AQ 队列?
【发布时间】:2021-04-09 18:58:19
【问题描述】:

我已经了解了如何使用 AQ(Streams?)包创建 Oracle 数据库。我还在 Oracle 中创建了一些队列(手动)。 (使用 PL/SQL 和 SQL)。

但是,我很难从 Spring 建立正确的连接。

以下作品(使用oracle.AQjava包):

private final String aqUrl = "jdbc:oracle:thin:@localhost:1521:orcl";
private final String aqUser = "queue_mut";
private final String aqPassword = "******";
private final String aqSchema = "queue_mut";
private final String aqTable = "aq_table1";
private final String aqQueue = "aq_queue1";


@Test
public void testManualAQ() throws ClassNotFoundException, SQLException, AQException {

    Class.forName("oracle.jdbc.driver.OracleDriver");
    Connection connection = DriverManager.getConnection(aqUrl, aqUser, aqPassword);
    connection.setAutoCommit(false);

    Class.forName("oracle.AQ.AQOracleDriver");
    AQSession aqSession = AQDriverManager.createAQSession(connection);
    AQQueueTable q_table = aqSession.createQueueTable(aqSchema, aqTable, new AQQueueTableProperty("RAW"));
    aqSession.createQueue(q_table, aqQueue, new AQQueueProperty());

}

(基于https://docs.oracle.com/cd/B10501_01/appdev.920/a96587/apexampl.htm

这表明我可以连接到 Oracle 并使用 AQ 功能。

现在,我正在尝试创建 Java 配置的 bean 以使用 JmsTemplate

@Resource
private JmsTemplate jmsTemplate;

@Test
public void testJmsTemplate() {
    String xmlval = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
            "<product id=\"10\">\n" +
            " <description>Foo</description>\n" +
            " <price>2.05</price>\n" +
            "</product>";

    jmsTemplate.convertAndSend(aqSchema + ".jms_ws_incoming_queue", xmlval);
}

(是的,队列已经存在;-))

使用以下配置类:

import oracle.jms.AQjmsFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.sql.DataSource;

@Configuration
public class OracleAQConfiguration {

    @Bean
    public DataSourceTransactionManager transactionManager(DataSource dataSource) {
        DataSourceTransactionManager manager = new DataSourceTransactionManager();
        manager.setDataSource(dataSource);
        return manager;
    }

    @Bean
    public ConnectionFactory connectionFactory(DataSource dataSource) throws JMSException {
        return AQjmsFactory.getQueueConnectionFactory(dataSource);
    }

    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setConnectionFactory(connectionFactory);
        return jmsTemplate;
    }
}

还有 yml 属性:

spring:
  datasource:
    url: jdbc:oracle:thin:@localhost:1521:orcl
    username: queue_mut
    password: ******
    driverClassName: oracle.jdbc.driver.OracleDriver

但是这样我得到了我无法理解的错误:

2017-04-19 12:11:17,151  INFO my.project.QueueTest: Started QueueTest in 5.305 seconds (JVM running for 6.588)

org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is oracle.jms.AQjmsException: Error creating the db_connection; nested exception is java.lang.ClassCastException: com.sun.proxy.$Proxy102 cannot be cast to oracle.jdbc.internal.OracleConnection

    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:487)
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:570)
    at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:658)
    at my.project.QueueTest.testJmsTemplate(QueueTest.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: oracle.jms.AQjmsException: Error creating the db_connection
    at oracle.jms.AQjmsDBConnMgr.getConnection(AQjmsDBConnMgr.java:625)
    at oracle.jms.AQjmsDBConnMgr.<init>(AQjmsDBConnMgr.java:399)
    at oracle.jms.AQjmsConnection.<init>(AQjmsConnection.java:249)
    at oracle.jms.AQjmsConnectionFactory.createConnection(AQjmsConnectionFactory.java:513)
    at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:474)
    ... 36 more
Caused by: java.lang.ClassCastException: com.sun.proxy.$Proxy102 cannot be cast to oracle.jdbc.internal.OracleConnection
    at oracle.jms.AQjmsGeneralDBConnection.getProviderKey(AQjmsGeneralDBConnection.java:98)
    at oracle.jms.AQjmsGeneralDBConnection.<init>(AQjmsGeneralDBConnection.java:67)
    at oracle.jms.AQjmsDBConnMgr.getConnection(AQjmsDBConnMgr.java:566)
    ... 41 more

我相信发生 Cast 异常是因为它是 ProxyConnection[PooledConnection[oracle.jdbc.driver.T4CConnection@40016ce1]]。但我不知道如何解决这个问题。

【问题讨论】:

    标签: java spring jms oracle-aq advanced-queuing


    【解决方案1】:

    更改jdbc 库,在我的情况下这已修复(如果不这样做,请尝试使用其他版本):

    <dependency>
        <groupId>com.oracle</groupId>
        <artifactId>ojdbc7</artifactId>
        <version>12.1.0.2.0</version>
    </dependency>
    

    【讨论】:

      【解决方案2】:

      我们在尝试从 Spring Boot 访问 Oracle AQ 时遇到了同样的异常。研究表明,由于数据库连接池库不允许访问oracle AQ库所需的底层连接而引发此异常。(dbcp和tomcat连接池库均抛出异常,不相同但相似)

      当我们从依赖项中删除数据库连接池库时,此异常消失了,这会导致整个应用程序没有数据库连接池的不良状态。

      我们注意到如果我们使用下面的方法不会抛出异常 AQjmsFactory.getQueueConnectionFactory(url, info);

      也许解决方案也缺少连接池,但这仅限于从 AQ 读取的组件。应用程序中的其他组件将受益于连接池

      这是 Bean 定义的 java 配置:

      @Bean
      public QueueConnectionFactory connectionFactory() throws Exception {
          OracleServiceInfo serviceInfo = (OracleServiceInfo) this.cloud().getServiceInfo(NAME_PRIMARY_DS);
          Properties info = new Properties();
          String url = serviceInfo.getJdbcUrl();
          info.put("driver-name", "oracle.jdbc.OracleDriver");
          info.put("user", serviceInfo.getUserName());
          info.put("password", serviceInfo.getPassword());
          return oracle.jms.AQjmsFactory.getQueueConnectionFactory(url, info);
      }
      
      @Bean
      public JmsTemplate jmsTemplate() throws Exception {
          JmsTemplate jmsTemplate = new JmsTemplate();
          jmsTemplate.setConnectionFactory(connectionFactory());
          return jmsTemplate;
      }
      

      我还不确定这是否是一个好的解决方案。但这绝对是摆脱问题中讨论的异常的一种方法。

      【讨论】:

      • this.cloud 的那部分是什么?
      • 这个示例来自一个 Spring Boot 项目,该项目从 Pivotal Cloud Foundry 中的服务绑定获取连接详细信息
      【解决方案3】:

      您好,我也花了很长时间才使连接正常工作,但最终成功了,方法如下:

      首先确保您的 Oracle AQ Queue 表的负载未设置为 RAW,但最好设置为 Text:SYS.AQ$_JMS_TEXT_MESSAGE

      接下来使用类似于下面的 OracleAQConfiguration:

      import oracle.jdbc.pool.OracleDataSource;
      import oracle.jms.AQjmsFactory;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      import javax.jms.JMSException;
      import javax.jms.QueueConnectionFactory;
      import javax.sql.DataSource;
      import java.sql.SQLException;
      
      @Configuration
      public class OracleAQConfiguration {
      
          // Values are retrieved from custom added props in Spring application.properties
      
          @Value("${myapplication.datasource.user}")
          private String user;
      
          @Value("${myapplication.datasource.password}")
          private String password;
      
          @Value("${myapplication.datasource.connectionstring}")
          private String connectionstring;
      
          @Bean
          /**
           * Spring bean with the configuration details of where the Oracle database is containing the QUEUES
           */
          public DataSource dataSource() throws SQLException {
              OracleDataSource ds = new OracleDataSource();
              ds.setUser(user);
              ds.setPassword(password);
              ds.setURL(connectionstring);
              ds.setImplicitCachingEnabled(true);
              ds.setFastConnectionFailoverEnabled(true);
              return ds;
          }
      
          @Bean
          /**
           * The KEY component effectively connecting to the Oracle AQ system using the datasource input
           */
          public QueueConnectionFactory connectionFactory(DataSource dataSource) throws JMSException {
              return AQjmsFactory.getQueueConnectionFactory(dataSource);
          }
      
      }
      

      接下来使用类似于下面的 JMSConfiguration。 在这里,我读取和写入同一个队列,这在真实的应用程序集成场景中是不可能的。但可以测试

      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.jdbc.datasource.DataSourceTransactionManager;
      import org.springframework.jms.core.JmsTemplate;
      import org.springframework.jms.listener.DefaultMessageListenerContainer;
      
      import javax.jms.ConnectionFactory;
      import javax.sql.DataSource;
      
      @Configuration
      public class JMSConfiguration {
          private static final String QUEUENAME_WRITE = "MYQUEUE";
          private static final String QUEUENAME_READ = "MYQUEUE";
      
          @Autowired
          private JMSReceiver jmsReceiver;
      
          @Bean
          /**
           * Spring bean to WRITE/SEND/ENQUEUE messages on a queue with a certain name
           */
          public JmsTemplate jmsTemplate(ConnectionFactory conFactory) {
              JmsTemplate jmsTemplate = new JmsTemplate();
              jmsTemplate.setDefaultDestinationName(QUEUENAME_WRITE);
              jmsTemplate.setSessionTransacted(true);
              jmsTemplate.setConnectionFactory(conFactory);
      
              return jmsTemplate;
          }
      
          /**
           * Spring bean to READ/RECEIVE/DEQUEUE messages of a queue with a certain name
           * All of this happens under a code managed transaction
           * to commit the change on Oracle (remove of the message from the queue table)
           * Reference the application custom code handling the message here
           */
          @Bean
          public DefaultMessageListenerContainer messageListenerContainer(ConnectionFactory conFactory, DataSource dataSource) {
              DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
              dmlc.setDestinationName(QUEUENAME_READ);
              dmlc.setSessionTransacted(true);
              dmlc.setConnectionFactory(conFactory);
      
              DataSourceTransactionManager manager = new DataSourceTransactionManager();
              manager.setDataSource(dataSource);
              dmlc.setTransactionManager(manager);
      
              // Add here our self-written JMS Receiver
              dmlc.setMessageListener(jmsReceiver);
              return dmlc;
          }
      
      }
      

      最后处理传入的 JMS 消息使用类似的东西:

      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.jms.listener.SessionAwareMessageListener;
      import org.springframework.stereotype.Component;
      
      import javax.jms.JMSException;
      import javax.jms.Message;
      import javax.jms.Session;
      import javax.jms.TextMessage;
      
      @Component
      public class JMSReceiver implements SessionAwareMessageListener {
          private static final Logger logger = LoggerFactory.getLogger(JMSReceiver.class);
      
          @Override
          public void onMessage(Message message, Session session) throws JMSException {
              // We know/assume the Queue Payload type was set to 'TextMessage'
              TextMessage txtMessage = (TextMessage) message;
              logger.info("JMS Text Message received: " + txtMessage.getText());
      
              // ... further implementation
          }
      
      }
      

      【讨论】:

        【解决方案4】:

        问题是 AQ 代码需要一个 OracleConnection,但是当池化连接时,连接被包装,因此它失败了

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2011-05-14
          • 2017-04-17
          • 2017-03-25
          • 2017-03-25
          • 2011-11-25
          • 2014-05-22
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多