【问题标题】:How to write data from flink pipeline to redis efficiently如何高效地将数据从 flink 管道写入 redis
【发布时间】:2018-12-27 15:41:16
【问题描述】:

我正在 Apache flink sql api 中构建管道。 管道执行简单的投影查询。但是,我需要在查询之前和查询之后编写一次元组(确切地说是每个元组中的一些元素)。 事实证明,我用来写入 redis 的代码会严重降低性能。即 flink 以非常小的数据速率产生背压。 我的代码有什么问题以及如何改进。请有任何建议。

当我停止写redis之前和之后的表现都很出色。 这是我的管道代码:

public class QueryExample {
    public static Long throughputCounterAfter=new Long("0");
    public static void main(String[] args) {
        int k_partitions = 10;
        reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(5 * 32);
        Properties props = new Properties();
        props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
        props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
        // not to be shared with another job consuming the same topic
        props.setProperty("group.id", "flink-group");
        props.setProperty("enable.auto.commit","false");
        FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
                new SimpleStringSchema(),
                props);

        DataStream<String> purchasesStream = env
                .addSource(purchasesConsumer)
                .setParallelism(Math.min(5 * 32, k_partitions));
        DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks =
                purchasesStream
                        .flatMap(new PurchasesParser())
                        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<Integer, Integer, Integer, Long>>(Time.seconds(10)) {

                            @Override
                            public long extractTimestamp(Tuple4<Integer, Integer, Integer, Long> element) {
                                return element.getField(3);
                            }
                        });

        Table purchasesTable = tEnv.fromDataStream(purchaseWithTimestampsAndWatermarks, "userID, gemPackID,price, rowtime.rowtime");
        tEnv.registerTable("purchasesTable", purchasesTable);

        purchaseWithTimestampsAndWatermarks.flatMap(new WriteToRedis());
        Table result = tEnv.sqlQuery("SELECT  userID, gemPackID, rowtime from purchasesTable");
        DataStream<Tuple2<Boolean, Row>> queryResultAsDataStream = tEnv.toRetractStream(result, Row.class);
        queryResultAsDataStream.flatMap(new WriteToRedis());
        try {
            env.execute("flink SQL");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }




/**
 * write to redis
 */
public static class WriteToRedis extends RichFlatMapFunction<Tuple4<Integer, Integer, Integer, Long>, String> {
    RedisReadAndWrite redisReadAndWrite;

    @Override
    public void open(Configuration parameters) {
        LOG.info("Opening connection with Jedis to {}", "redis");
        this.redisReadAndWrite = new RedisReadAndWrite("redis",6379);

    }

    @Override
    public void flatMap(Tuple4<Integer, Integer, Integer, Long> input, Collector<String> out) throws Exception {
        this.redisReadAndWrite.write(input.f0+":"+input.f3+"","time_seen", TimeUnit.NANOSECONDS.toMillis(System.nanoTime())+"");
    }
}
}


public class RedisReadAndWrite {
    private Jedis flush_jedis;

    public RedisReadAndWrite(String redisServerName , int port) {
        flush_jedis=new Jedis(redisServerName,port);
    }


    public void write(String key,String field, String value) {
        flush_jedis.hset(key,field,value);

    }
}

补充部分: 我尝试了第二个实现,使用 Jedis 批量编写 toredis 的过程函数。但是我收到以下错误。 org.apache.flink.runtime.client.JobExecutionException: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: 套接字未连接。我试图使批处理消息的数量更小,但一段时间后我仍然收到错误。

下面是流程函数的实现:

/** * 使用进程函数写入redis */

public static class WriteToRedisAfterQueryProcessFn extends ProcessFunction<Tuple2<Boolean, Row>, String> {
    Long timetoFlush;
    @Override
    public void open(Configuration parameters) {
        flush_jedis=new Jedis("redis",6379,1800);
        p = flush_jedis.pipelined();
        this.timetoFlush=System.currentTimeMillis()-initialTime;
    }

    @Override
    public void processElement(Tuple2<Boolean, Row> input, Context context, Collector<String> collector) throws Exception {
        p.hset(input.f1.getField(0)+":"+new Instant(input.f1.getField(2)).getMillis()+"","time_updated",TimeUnit.NANOSECONDS.toMillis(System.nanoTime())+"");
        throughputAccomulationcount++;
        System.out.println(throughputAccomulationcount);
        if(throughputAccomulationcount==50000){
            throughputAccomulationcount=0L;
            p.sync();
        }
    }
}

【问题讨论】:

    标签: redis apache-flink flink-streaming backpressure flink-sql


    【解决方案1】:

    毫无疑问,您遇到的性能不佳是因为您在每次写入时都向 redis 发出同步请求。 @kkrugler 已经提到了异步 i/o,这是针对这种情况的常见补救措施。这将需要切换到支持异步操作的 redis 客户端之一。

    在使用外部服务时,另一种常用的解决方案是将多组写入批处理在一起。使用 jedis,您可以使用 pipelining。例如,您可以将 WriteToRedis RichFlatMapFunction 替换为 ProcessFunction,该函数以一定大小的批次对 redis 进行流水线写入,并根据需要依赖超时来刷新其缓冲区。你可以使用 Flink 的 ListState 作为缓冲区。

    【讨论】:

    • 非常感谢@David Anderson 的回复。我尝试了第二个建议,您可以在问题的附加部分(流程功能的实现)中看到。但是我收到以下错误。 org.apache.flink.runtime.client.JobExecutionException: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: 套接字未连接。我试图使批处理消息的数量更小,但一段时间后我仍然收到错误
    • 50000 是一个非常大的批量大小。我敢打赌,50 比 50000 效果更好。但是不管批量大小,你不应该对偶尔出现的错误感到惊讶。要让这样的事情不间断地运行,您必须准备好在失败时重试。
    • 此外,它的编写方式可能会在发生异常时丢失飞行中的记录。这就是为什么我建议将它们保持在 Flink 状态(ListState),直到它们被成功发送到 redis。
    • 非常感谢。我在不使用管道的情况下将其作为批处理和线程处理,并且效果很好。
    【解决方案2】:

    通常在写入外部服务时,这会成为 Flink 工作流程的瓶颈。提高性能的最简单方法是通过AsyncFunction 对工作流的该部分进行多线程处理。详情请见this documentation

    -- 肯

    【讨论】:

    • 谢谢肯。 AsyncFunction 是一个非常好的解决方案,很高兴知道。但是,我选择对应用程序要求的消息进行批处理和线程化。
    • 如果您正在执行自己的多线程,请注意,这通常会使支持检查点(以及因此可重新启动/可恢复的工作流程)变得更加困难。
    猜你喜欢
    • 1970-01-01
    • 2016-04-14
    • 2018-07-30
    • 2012-01-28
    • 2017-11-19
    • 1970-01-01
    • 2019-08-19
    • 2022-09-20
    • 2015-12-14
    相关资源
    最近更新 更多