【发布时间】:2020-01-29 18:17:21
【问题描述】:
我正在尝试使用 flink 和 springboot 的组合,但我遇到了一些问题。 假设我有这个流程。
- 获取json字符串,其中一个字段日期包含日期字符串。
- 使用map函数和ObjectMapper将其解析为LocalDateTime的对象
- 打印
这是描述我的探针的简单用例。
所以,我有代表包含 LocalDateTime 字段的 Word 类。
@Data
public class Word {
@JsonDeserialize(using = LocalDateTimeSerde.class)
LocalDateTime date;
}
LocalDateTimeDeserlization 看起来像这样(我想自动装配应用配置):
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@JsonComponent
public class LocalDateTimeSerde extends JsonDeserializer<LocalDateTime> {
private final AppConf conf;
@Override
public LocalDateTime deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(this.conf.getDateFormatter());
return LocalDateTime.parse(jsonParser.getText(), formatter);
}
}
AppConf.java表示应用的配置是:
@Data
@Configuration
@ConfigurationProperties(value = "app")
public class AppConf {
private String dateFormatter;
}
DemoApplication.java:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
String example = "{\"date\":\"2019-01-29 00:00\"}";
var stream = env
.fromElements(example)
.map(x->new ObjectMapper().readValue(x,Word.class))
.returns(Word.class);
stream.print();
env.execute("Demo App");
我得到的例外是:
Caused by: java.lang.IllegalArgumentException: Class com.example.demo.LocalDateTimeSerde has no default (no arg) constructor
这里的主要问题是反序列化的代码是在TaskManager上运行的,而那里springboot没有参与,所以它没有将AppConf注入到类中。
添加@NoArgsConstructor 并不能解决问题
我想我知道为什么会发生这种情况(因为 flink master 将类序列化给工作人员,然后 springboot 不“扫描组件”并获得控制权。
有什么解决办法吗?我真的很想在worker的函数中将spring和flink结合起来。
谢谢。
【问题讨论】:
标签: spring spring-boot apache-flink autowired flink-streaming