1,问题思考,在spark里面不管是离线还是实时去使用广播变量,都比较简单,通过可以广播一些静态变量,静态文件,可是在flink里面怎么使用呢?
2,在网上查找一个代码 跟本地实践,Flink 可以有两种广播方式:
1)广播配置文件 或者静态变量
我使用的是Flink的分布式缓存 具体代码如下
package com.coder.flink.core.cache
import java.io.{File, FileInputStream}
import java.util
import java.util.Properties
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object CacheProperties {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
//在分布式缓存中将本地的文件进行注册
env.registerCachedFile("D:/wxgz-local/resources_ceshi/too.properties", "too")
val weight: DataSet[String] = env.fromElements("a","b")
weight.map(new RichMapFunction[String, String] {
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val file: File = getRuntimeContext.getDistributedCache.getFile("too")
val prop = new Properties
prop.load(new FileInputStream(file))
val value = prop.getProperty("cycle")
println(s"value = ${value}")
// val aa: util.List[_] = FileUtils.readLines(file, "UTF-8");
// println(aa)
}
override def map(value: String): String = {
"11"
}
}).print()
// env.execute("test cache")
}
}
2)配置流实现,这个是网上找的demo 简化了,代码如下,个人觉得没spark广播变量那么简单好用:
package com.coder.flink.core.broadcast;
import com.coder.flink.core.utils.RedisUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* 作者(author):miao
* 时间(date): 2019-04-01 15:17
* 功能描述(description):使用广播流实现配置的动态更新
*/
public class BroadcastStreamDemo_test {
public static void main(String[] args) throws Exception {
// 构建流处理环境
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置处理环境的并发度为4
environment.setParallelism(4);
final MapStateDescriptor<String, String> CONFIG_KEYWORDS = new MapStateDescriptor<>(
"config-keywords",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
// 自定义广播流(单例)
BroadcastStream<Map<String, String>> broadcastStream = environment.addSource(new RichSourceFunction<Map<String, String>>() {
private volatile boolean isRunning = true;
//测试数据集
private String[] dataSet = new String[]{
"java",
"swift",
"php",
"go",
"python"
};
private volatile Map<String, String> map = new ConcurrentHashMap(16);
/**
* 数据源:模拟每30秒随机更新一次拦截的关键字
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Map<String, String>> ctx) throws Exception {
while (isRunning) {
long newTime = System.currentTimeMillis();
//todo 定时刷新,睡眠6秒
TimeUnit.SECONDS.sleep(3);
if (newTime % 3 == 0) {
map.clear();
map.put("A", "AAAA");
} else if (newTime % 3 == 1) {
map.clear();
map.put("B", "BBBB");
} else if (newTime % 3 == 2) {
map.clear();
map.put("C", "CCCC");
} else {
map.clear();
map.put("D", "DDDD");
}
ctx.collect(map);
System.out.println("发送的Map:" + map.toString());
}
}
@Override
public void cancel() {
isRunning = false;
}
}).setParallelism(1).broadcast(CONFIG_KEYWORDS);
// 自定义数据流(单例)
DataStream<String> dataStream = environment.addSource(new RichSourceFunction<String>() {
private volatile boolean isRunning = true;
//测试数据集
private String[] dataSet = new String[]{
"你喜欢武藤兰吗",
"永井流奈也还可以呢",
"青木真麻你了解过吗",
"坂本真綾也是很不错的呢",
"佐藤遥希呢,很漂亮呢"
};
/**
* 模拟每3秒随机产生1条消息
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<String> ctx) throws Exception {
int size = dataSet.length;
while (isRunning) {
TimeUnit.SECONDS.sleep(3);
int seed = (int) (Math.random() * size);
ctx.collect(dataSet[seed]);
System.out.println("kafka接收数据:" + dataSet[seed]);
}
}
@Override
public void cancel() {
isRunning = false;
}
}).setParallelism(1);
// 数据流和广播流连接处理并将拦截结果打印
dataStream.connect(broadcastStream).process(new BroadcastProcessFunction<String, Map<String, String>, String>() {
//todo redis 发送的数据
private volatile Map<String, String> keywords;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//todo 给 keywords 默认值 默认去查Redis
if(keywords ==null){
System.out.println("2222 = " + 2222);
Jedis jedis = RedisUtils.getJedis();
Map<String, String> resultMap = jedis.hgetAll("flink_test3");
System.out.println("resultMap.toString() = " + resultMap.toString());
keywords = resultMap ;
}
}
@Override
public void processBroadcastElement(Map<String, String> value, Context ctx, Collector<String> out) throws Exception {
//todo 获取的Map的数据
keywords = value;
// System.out.println(new Date() + ",获取的Map的数据:" + value.toString());
}
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
out.collect("kafka的消息:" + value + ", Map的数据:" + keywords);
}
}).print();
// 懒加载执行
environment.execute();
}
}