【问题标题】:Implementing Spring + Apache Flink project with Postgres使用 Postgres 实现 Spring + Apache Flink 项目
【发布时间】:2020-05-26 16:55:25
【问题描述】:

我有一个使用 apache flink 处理数据流信号的 SpringBoot gradle 项目。当一个新信号通过数据流时,我想使用已经创建的 postgres 数据库表中的 ID 查询查找(即 findById() )它的详细信息,以便获取有关信号的其他信息并丰富数据。我想避免使用 spring 依赖项来执行查找(即 Autowire 存储库),并希望坚持使用 flink 实现进行查找。

我在哪里可以指定如何添加 postgres 连接配置信息,例如端口、数据库、url、用户名、密码等...(为简单起见,可以假设 postgres db 在我的机器上是本地的)。是否像在 application.properties 文件中添加配置一样简单?如果是这样,我如何编写查询方法以在非主键值搜索时查找 postgres 表中的记录?

一些在线资源建议使用此框架代码,但我不确定它如何/id 适合我的用例。 (我创建了一个 EventEntity 模型,其中包含我正在查找的表中的所有参数/列)。

like so

    public class DatabaseMapper extends RichFlatMapFunction<String, EventEntity> {

        // Declare DB connection & query statements

        public void open(Configuration parameters) throws Exception {
            //Initialize DB connection
            //prepare query statements
        }

        @Override
        public void flatMap(String value, Collector<EventEntity> out) throws Exception {

        }
    }

【问题讨论】:

    标签: java postgresql spring-boot apache-flink flink-streaming


    【解决方案1】:

    您的示例代码是正确的。您可以在open() 方法中为 PostgreSQL 设置所有自定义初始化和准备代码。然后,您可以在 flatMap() 函数中使用预先配置的字段。

    这是 Redis 操作的一个示例

    • 我在这里使用了 RichAsyncFunction,我建议您按照建议的最佳做法执行相同的操作。阅读此处了解更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html)
    • 您可以在构造方法中传递配置参数并在初始化过程中使用它

          public static class AsyncRedisOperations extends RichAsyncFunction<Object,Object> {
      
              private JedisPool jedisPool;
              private Configuration redisConf;
      
              public AsyncRedisOperations(Configuration redisConf) {
                this.redisConf = redisConf;
              } 
      
              @Override
              public void open(Configuration parameters) {
      
                JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
                jedisPoolConfig.setMaxTotal(this.redisConf.getInteger("pool", 8));
                jedisPoolConfig.setMaxIdle(this.redisConf.getInteger("pool", 8));
                jedisPoolConfig.setMaxWaitMillis(this.redisConf.getInteger("maxWait", 0));
      
                JedisPool jedisPool = new JedisPool(jedisPoolConfig,
                  this.redisConf.getString("host", "192.168.10.10"),
                  this.redisConf.getInteger("port", 6379), 5000);
      
                try {
                  this.jedisPool = jedisPool;
                  this.logger.info("Redis connected: " + jedisPool.getResource().isConnected());
                } catch (Exception e) {
                  this.logger.error(BaseUtil.append("Exception while connecting Redis"));
                }
      
              }
      
              @Override
              public void asyncInvoke(Object in, ResultFuture<Object> out) {
      
                try (Jedis jedis = this.jedisPool.getResource()) {
                  String key = jedis.get(key);
                  this.logger.info("Redis Key: " + key);
                } 
      
              }
          }      
      

    【讨论】:

    • 这很有帮助,谢谢!
    • 谢谢。您能否提供一个更大的示例来说明如何将 Apache Flink 与 Spring Boot 结合使用。这将非常有用。
    猜你喜欢
    • 1970-01-01
    • 2016-02-26
    • 1970-01-01
    • 1970-01-01
    • 2016-09-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-11
    相关资源
    最近更新 更多