【问题标题】:Spring Autowire configuration in flinkflink 中的 Spring Autowire 配置
【发布时间】:2020-01-29 18:17:21
【问题描述】:

我正在尝试使用 flink 和 springboot 的组合,但我遇到了一些问题。 假设我有这个流程。

  1. 获取json字符串,其中一个字段日期包含日期字符串。
  2. 使用map函数和ObjectMapper将其解析为LocalDateTime的对象
  3. 打印

这是描述我的探针的简单用例。

所以,我有代表包含 LocalDateTime 字段的 Word 类。

@Data
public class Word {
    @JsonDeserialize(using = LocalDateTimeSerde.class)
    LocalDateTime date;
}

LocalDateTimeDeserlization 看起来像这样(我想自动装配应用配置):

@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@JsonComponent
public class LocalDateTimeSerde extends JsonDeserializer<LocalDateTime> {
    private final AppConf conf;

    @Override
    public LocalDateTime deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern(this.conf.getDateFormatter());
        return LocalDateTime.parse(jsonParser.getText(), formatter);
    }
}

AppConf.java表示应用的配置是:

@Data
@Configuration
@ConfigurationProperties(value = "app")
public class AppConf {
    private String dateFormatter;

}

DemoApplication.java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
String example = "{\"date\":\"2019-01-29 00:00\"}";
var stream = env
        .fromElements(example)
        .map(x->new ObjectMapper().readValue(x,Word.class))
        .returns(Word.class);
stream.print();

env.execute("Demo App");

我得到的例外是:

Caused by: java.lang.IllegalArgumentException: Class com.example.demo.LocalDateTimeSerde has no default (no arg) constructor

这里的主要问题是反序列化的代码是在TaskManager上运行的,而那里springboot没有参与,所以它没有将AppConf注入到类中。

添加@NoArgsConstructor 并不能解决问题

我想我知道为什么会发生这种情况(因为 flink master 将类序列化给工作人员,然后 springboot 不“扫描组件”并获得控制权。

有什么解决办法吗?我真的很想在worker的函数中将spring和flink结合起来。

谢谢。

【问题讨论】:

    标签: spring spring-boot apache-flink autowired flink-streaming


    【解决方案1】:

    总的来说,我个人认为混合这些概念并不是一个好主意。最简单的解决方案是仅在作业管理器上使用AutoWired,并在进入 Flink-land 时使用显式依赖注入。

    例如,您可以在 DemoApplication 中提取日期模式并将其设置在 ObjectMapper 上。 (不要忘记在你的真实代码中只初始化一次 ObjectMapper!)

    如果你真的想使用 AutoWiring。我想您需要手动触发任务管理器上的自动装配。有一个related post specifically for ObjectMapper

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-09-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-02-07
      • 1970-01-01
      • 2014-10-30
      相关资源
      最近更新 更多