【发布时间】:2021-08-08 13:55:10
【问题描述】:
我们可以使用 Logstash-Jdbc 插件将数据从 Oracle/任何 db 同步到弹性。但是,我找不到任何方法来操作这个 jdbc 插件中来自 DB 的数据。我想在我的 Spring Boot 应用程序中使用 Logstash/any 插件来做同样的事情,我想在保存到弹性之前操作数据和列名。
【问题讨论】:
标签: oracle spring-boot elasticsearch jdbc logstash
我们可以使用 Logstash-Jdbc 插件将数据从 Oracle/任何 db 同步到弹性。但是,我找不到任何方法来操作这个 jdbc 插件中来自 DB 的数据。我想在我的 Spring Boot 应用程序中使用 Logstash/any 插件来做同样的事情,我想在保存到弹性之前操作数据和列名。
【问题讨论】:
标签: oracle spring-boot elasticsearch jdbc logstash
有许多Logstash 输入插件,您可以使用 Logstash 内部的 grok 过滤器进行基本的流处理,但是我提供使用 Kafka 输入插件来进行流处理,将您的数据发送到 Logstash。
使用您的 Kafka 代理创建消费者,并在您的 Spring 项目中使用发布者类发布您的文档,然后使用 Logstash 配置输入将您的数据更多地摄取到您的索引中,在此路线图中,您在 Apache 的帮助下拥有强大的消费者-发布者管道卡夫卡。
找一个例子如下,
<!--Kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.8.RELEASE</version>
</dependency>
处理完文档和数据后,创建一个发布者类来发布这些文档
public class DriverProducer {
@Autowired
KafkaTemplate<Integer, String > kafkaTemplate;
@Autowired
ObjectMapper objectMapper;
public void messenger(Object convey) throws JsonProcessingException {
String message=objectMapper.writeValueAsString(convey);
ListenableFuture<SendResult<Integer,String>> listenableFuture=kafkaTemplate.sendDefault(null, message);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable throwable) {
failHandler(null, message, throwable);
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
successHandler(result);
}
});
}
private void failHandler(Integer key, String message, Throwable throwable){
//log.error("Unable to send the message for following Error :"+throwable.getMessage());
try {
throw throwable;
}
catch (Throwable anotherThrowable){
//log.error("**Supreme Error on throwing the throwable**"+anotherThrowable.getMessage());
}
}
private void successHandler (SendResult<Integer,String> result){
//log.info("Message sent successfully :"+ result);
}
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public static class Convey{
private SagaSequence sequence;
private Integer key;
private Date date;
}
您的 Logstash 配置文件可能如下所示
input {
kafka{
group_id => "35834"
topics => ["Second-Topic"]
bootstrap_servers => "localhost:9092"
codec => json
}
}
filter {
}
output {
file {
path => "/SOMEPATH"
}
elasticsearch {
hosts => ["localhost:9200"]
document_type => "_doc"
index => "logger"
}
stdout { codec => rubydebug
}
}
【讨论】: