【问题标题】:Spring Integration Flow with Jdbc Message source which has dynamic query具有动态查询的 Jdbc 消息源的 Spring 集成流
【发布时间】:2019-07-29 08:48:18
【问题描述】:

我正在尝试使用以 kafka 作为代理的 spring 云数据流从 oracle DB 进行更改数据捕获。我为此使用轮询机制。我定期使用基本选择查询轮询数据库以捕获任何更新的数据。为了更好的防故障系统,我将上次轮询时间保存在 oracle DB 中,并使用它来获取上次轮询后更新的数据。

public MessageSource<Object> jdbcMessageSource() {
    JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
            new JdbcPollingChannelAdapter(this.dataSource, this.properties.getQuery());
    jdbcPollingChannelAdapter.setUpdateSql(this.properties.getUpdate());
    return jdbcPollingChannelAdapter;
}

@Bean
public IntegrationFlow pollingFlow() {
    IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),spec -> spec.poller(Pollers.fixedDelay(3000)));
    flowBuilder.channel(this.source.output());
    flowBuilder.transform(trans,"transform");
    return flowBuilder.get();

}

我在应用程序属性中的查询如下:

query: select * from kafka_test where LAST_UPDATE_TIME >(select LAST_POLL_TIME from poll_time)

update : UPDATE poll_time SET LAST_POLL_TIME = CURRENT_TIMESTAMP

这对我来说非常适合。我可以通过这种方法从数据库中获取 CDC。

我现在正在查看的问题如下:

仅仅为了维持轮询时间而创建一个表是一种负担。我正在寻找在 kafka 主题中维护最后一次投票时间,并在我进行下一次投票时从 kafka 主题中检索该时间。

我已经修改了jdbcMessageSource方法如下尝试:

public MessageSource<Object> jdbcMessageSource() {
    String query = "select * from kafka_test where LAST_UPDATE_TIME > '"+<Last poll time value read from kafka comes here>+"'";

    JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
            new JdbcPollingChannelAdapter(this.dataSource, query);
    return jdbcPollingChannelAdapter;
}

但是 Spring Data Flow 只实例化 pollingFlow( )(请参见上面的代码)bean 一次。因此,首先运行的查询将保持不变。我想用每次轮询的新轮询时间来更新查询。

有没有办法可以编写自定义Integrationflow,以便在每次进行投票时更新此查询?

我已经为此尝试过IntegrationFlowContext,但没有成功。

提前致谢!!!

【问题讨论】:

    标签: spring-boot apache-kafka spring-integration spring-cloud-dataflow oracle-cdc


    【解决方案1】:

    在上述两个答案的帮助下,我能够找出方法。 编写 jdbc template 并将其包装为 bean 并将其用于 Integration Flow

    @EnableBinding(Source.class)
    @AllArgsConstructor
    public class StockSource {
    
      private DataSource dataSource;
    
      @Autowired
      private JdbcTemplate jdbcTemplate;
    
      private MessageChannelFactory messageChannelFactory;  // You can use normal message channel which is available in spring cloud data flow as well.
    
      private List<String> findAll() {
        jdbcTemplate = new JdbcTemplate(dataSource);
        String time = "10/24/60" . (this means 10 seconds for oracle DB)
        String query = << your query here like.. select * from test where (last_updated_time > time) >>;
        return jdbcTemplate.query(query, new RowMapper<String>() {
          @Override
          public String mapRow(ResultSet rs, int rowNum) throws SQLException {
              ...
              ...
              any row mapper operations that you want to do with you result after the poll.
              ...
              ...
              ...
            // Change the time here for the next poll to the DB. 
            return result;
          }
        });
      }
    
      @Bean
      public IntegrationFlow supplyPollingFlow() {
    
        IntegrationFlowBuilder flowBuilder = IntegrationFlows
            .from(this::findAll, spec -> {
              spec.poller(Pollers.fixedDelay(5000));
            });
        flowBuilder.channel(<<Your message channel>>);
        return flowBuilder.get();
      }
    
    }
    

    在我们的用例中,我们在 kafka 主题中保留了上次轮询时间。这是为了减少应用程序状态。现在对数据库的每个新轮询,都将在 where 条件下有一个新时间。

    P.S:您的消息代理 (kafka/rabbit mq) 应该在您的本地运行,或者如果托管在不同的平台上,则连接到它们。

    神速!!!

    【讨论】:

      【解决方案2】:

      有关标准适配器中动态查询的机制,请参见 Artem 的回答;然而,另一种选择是简单地将 JdbcTemplate 包装在 Bean 中并使用

      调用它
      IntegrationFlows.from(myPojo(), "runQuery", e -> ...)
          ...
      

      甚至是一个简单的 lambda

          .from(() -> jdbcTemplate...)
      

      【讨论】:

      • 请看我的回答,
      • @Gary Russell 非常感谢您的回答。这确实帮助我们解决了问题。
      【解决方案3】:

      我们有这个测试配置(抱歉,它是一个 XML):

      <inbound-channel-adapter query="select * from item where status=:status" channel="target"
                                   data-source="dataSource" select-sql-parameter-source="parameterSource"
                                   update="delete from item"/>
      
      
          <beans:bean id="parameterSource" factory-bean="parameterSourceFactory"
                      factory-method="createParameterSourceNoCache">
              <beans:constructor-arg value=""/>
          </beans:bean>
      
          <beans:bean id="parameterSourceFactory"
                      class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
              <beans:property name="parameterExpressions">
                  <beans:map>
                      <beans:entry key="status" value="@statusBean.which()"/>
                  </beans:map>
              </beans:property>
              <beans:property name="sqlParameterTypes">
                  <beans:map>
                      <beans:entry key="status" value="#{ T(java.sql.Types).INTEGER}"/>
                  </beans:map>
              </beans:property>
          </beans:bean>
      
          <beans:bean id="statusBean"
                      class="org.springframework.integration.jdbc.config.JdbcPollingChannelAdapterParserTests$Status"/>
      

      注意ExpressionEvaluatingSqlParameterSourceFactory 及其createParameterSourceNoCache() 工厂。此结果可用于select-sql-parameter-source

      JdbcPollingChannelAdapter 有一个setSelectSqlParameterSource 关于此事。

      因此,您配置了一个ExpressionEvaluatingSqlParameterSourceFactory,以便能够将某些查询参数解析为某个 bean 方法调用的表达式,从而从 Kafka 获取所需的值。那么createParameterSourceNoCache()会帮你得到一个预期的SqlParameterSource

      文档中也有一些信息:https://docs.spring.io/spring-integration/docs/current/reference/html/#jdbc-inbound-channel-adapter

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-11-05
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-03-03
        • 1970-01-01
        相关资源
        最近更新 更多