【问题标题】:Read data from Redis to Flink从 Redis 读取数据到 Flink
【发布时间】:2017-05-26 03:35:08
【问题描述】:

我一直在尝试寻找一个连接器来将数据从 Redis 读取到 Flink。 Flink 的文档包含对连接器写入 Redis 的描述。我需要在我的 Flink 作业中从 Redis 读取数据。在Using Apache Flink for data streaming 中,Fabian 提到可以从 Redis 读取数据。可以用于什么目的的连接器?

【问题讨论】:

    标签: redis apache-flink flink-streaming


    【解决方案1】:

    目前Flink Redis Connector不可用,但可以通过扩展RichSinkFunction/SinkFunction类来实现。

    public class RedisSink extends RichSinkFunction<String> {
    
      @Override
      public void open(Configuration parameters) throws Exception {
          //open redis connection
      }
    
      @Override
      public void invoke(String map) throws Exception {
         //sink data to redis
      }
    
      @Override
      public void close() throws Exception {
         super.close();
      }
    
    }
    

    【讨论】:

      【解决方案2】:

      我们正在生产中运行一个大致像这样的产品

      class RedisSource extends RichSourceFunction[SomeDataType] {
      
        var client: RedisClient = _
      
        override def open(parameters: Configuration) = {
          client = RedisClient() // init connection etc
        }
      
        @volatile var isRunning = true
      
        override def cancel(): Unit = {
          isRunning = false
          client.close()
        }
      
        override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) {
            for {
              data <- ??? // get some data from the redis client
            } yield ctx.collect(SomeDataType(data))
      
        }
      } 
      

      我认为这真的取决于你需要从 redis 中获取什么。以上可用于从列表/队列中获取消息,转换/推送,然后将其从队列中删除。 Redis 还支持 Pub/Sub,因此可以订阅、抓取 SourceConext 并将消息推送到下游。

      【讨论】:

      • 我怎样才能调用这个类...请帮助我
      【解决方案3】:

      关于为 Apache Flink 提供流式 redis 源连接器的讨论已经很多(请参阅 FLINK-3033),但没有可用的连接器。不过,实现它应该不难。

      【讨论】:

        【解决方案4】:

        让您的 Flink 程序使用 Jedis 与 Redis 对话的挑战之一是将适当的库放入您提交给 Flink 的 JAR 文件中。如果没有这个,你会得到调用堆栈,表明某些类是未定义的。这是我创建的 Maven pom.xml 的 sn-p,用于将 Redis 及其依赖组件 apache commons-pool2 移动到我的 JAR 中。

            <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-dependency-plugin</artifactId>
                    <version>2.9</version>
                    <executions>
                        <execution>
                            <id>unpack</id>
                            <!-- executed just before the package phase -->
                            <!-- https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html -->
                            <phase>prepare-package</phase>
                            <goals>
                                <goal>unpack</goal>
                            </goals>
                            <configuration>
                                <artifactItems>
                                    <artifactItem>
                                        <groupId>org.apache.commons</groupId>
                                        <artifactId>commons-pool2</artifactId>
                                        <version>2.4.2</version>
                                        <type>jar</type>
                                        <overWrite>false</overWrite>
                                        <outputDirectory>${project.build.directory}/classes</outputDirectory>
                                        <includes>org/apache/commons/**</includes>
                                    </artifactItem>
                                    <artifactItem>
                                        <groupId>redis.clients</groupId>
                                        <artifactId>jedis</artifactId>
                                        <version>2.9.0</version>
                                        <type>jar</type>
                                        <overWrite>false</overWrite>
                                        <outputDirectory>${project.build.directory}/classes</outputDirectory>
                                        <includes>redis/clients/**</includes>
                                    </artifactItem>
        
                                </artifactItems>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
        
            </plugins>
        </build>
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2016-11-08
          • 1970-01-01
          • 1970-01-01
          • 2019-08-01
          • 1970-01-01
          • 2017-08-21
          • 1970-01-01
          • 2018-01-31
          相关资源
          最近更新 更多