【发布时间】:2021-08-19 16:00:53
【问题描述】:
我想为我的程序(非键控数据)使用翻转窗口功能,因为它正在处理流数据,但只有 300 条消息/秒。我想把它提高到至少 5K/秒。为此,我想使用翻滚窗口 2 秒来加快其性能。但我不确定如何在我的情况下使用它。
注意:我使用 Geomesa HBase 平台来保存消息。 另外,我没有在这里粘贴我的整个应用程序代码,因为我只需要窗口函数,这里的代码足以让您理解
这是我的 flink 代码
public class Tranport {
public static void main(String[] args) throws Exception {
// fetch runtime arguments
String bootstrapServers = "xx.xx.xxx.xxx:xxxx";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set up the Consumer and create a datastream from this source
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapServers);
properties.setProperty("group.id", "group_id");
final FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>("lc", new SimpleStringSchema(), properties);
flinkConsumer.setStartFromTimestamp(Long.parseLong("0"));
DataStream<String> readingStream = env.addSource(flinkConsumer);
readingStream.rebalance().map(new RichMapFunction<String, String>() {
private static final long serialVersionUID = -2547861355L; // random number
DataStore lc_live = null;
SimpleFeatureType sft_live;
SimpleFeatureBuilder SFbuilderLive; // feature builder for live
List<SimpleFeature> lc_live_features; //
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("In open method.");
// --- GEOMESA, GEOTOOLS APPROACH ---//
// define connection parameters to xxx GeoMesa-HBase DataStore
Map<String, Serializable> params_live = new HashMap<>();
params_live.put("xxxx", "xxx"); // HBase table name
params_live.put("xxxx","xxxx");
try {
lc_live = DataStoreFinder.getDataStore(params_live);
if (lc_live == null) {
System.out.println("Could not connect to live");
} else {
System.out.println("Successfully connected to live");
}
} catch (IOException e) {
e.printStackTrace();
}
// create simple feature type for x table in HBASE
StringBuilder attributes1 = new StringBuilder();
attributes1.append("xxx:String,");
attributes1.append("xxx:Long,");
attributes1.append("source:String,");
attributes1.append("xxx:String,");
attributes1.append("xxx:Double,");
attributes1.append("status:String,");
attributes1.append("forecast:Double,");
attributes1.append("carsCount:Integer,");
attributes1.append("*xxx:Point:srid=4326");
sft_history = SimpleFeatureTypes.createType("xxxx", attributes1.toString());
try {
lc_history.createSchema(sft_history);
} catch (IOException e) {
e.printStackTrace();
}
// Initialize the variables
numberOfMessagesProcessed = 0;
numberOfMessagesFailed = 0;
numberOfMessagesSkipped = 0;
// for lc_Live
lc_live_features = new ArrayList<>();
SFbuilderLive = new SimpleFeatureBuilder(sft_live);
这里我想创建一个翻滚窗口函数(Window All),它可以把所有的 在 2 秒的窗口内流式传输消息并将它们推送到我在下面创建的数组列表中
// live GeoMesa-HBase DataStore
// copy the list into a local variable and empty the list for the next iteration
List<SimpleFeature> LocalFeatures = live_features;
live_features = new ArrayList<>();
LocalFeatures = Collections.unmodifiableList(LocalFeatures);
try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer = live.getFeatureWriterAppend(sft_live.getTypeName(), Transaction.AUTO_COMMIT)) {
System.out.println("Writing " + LocalFeatures.size() + " features to live");
for (SimpleFeature feature : LocalFeatures) {
SimpleFeature toWrite = writer.next();
toWrite.setAttributes(feature.getAttributes());
((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
toWrite.getUserData().putAll(feature.getUserData());
writer.write();
}
} catch (IOException e) {
e.printStackTrace();
}
【问题讨论】:
-
听起来您希望通过使用 windows 来提高性能。这是非常不可能的。我建议您使用分析器来查看时间的去向。
-
@DavidAnderson 是的,我想提高它的性能,我这样做是因为这个应用程序正在处理和存储大量的流数据,对于每条消息,它都会重复整个过程,这就是为什么它需要时间.我想每 2 秒将消息推送到一个数组列表中,这将一次处理所有消息。因此将提高性能
标签: java apache-kafka apache-flink windowing geomesa