【问题标题】:Can we use Logstash in SpringBoot to sync data from RDBMS我们可以在 Spring Boot 中使用 Logstash 从 RDBMS 同步数据吗
【发布时间】:2021-08-08 13:55:10
【问题描述】:

我们可以使用 Logstash-Jdbc 插件将数据从 Oracle/任何 db 同步到弹性。但是,我找不到任何方法来操作这个 jdbc 插件中来自 DB 的数据。我想在我的 Spring Boot 应用程序中使用 Logstash/any 插件来做同样的事情,我想在保存到弹性之前操作数据和列名。

【问题讨论】:

    标签: oracle spring-boot elasticsearch jdbc logstash


    【解决方案1】:

    有许多Logstash 输入插件,您可以使用 Logstash 内部的 grok 过滤器进行基本的流处理,但是我提供使用 Kafka 输入插件来进行流处理,将您的数据发送到 Logstash。

    使用您的 Kafka 代理创建消费者,并在您的 Spring 项目中使用发布者类发布您的文档,然后使用 Logstash 配置输入将您的数据更多地摄取到您的索引中,在此路线图中,您在 Apache 的帮助下拥有强大的消费者-发布者管道卡夫卡。

    找一个例子如下,

            <!--Kafka-->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.5.8.RELEASE</version>
            </dependency>

    处理完文档和数据后,创建一个发布者类来发布这些文档

    public class DriverProducer {
    
        @Autowired
        KafkaTemplate<Integer, String > kafkaTemplate;
    
        @Autowired
        ObjectMapper objectMapper;
    
    
        public void messenger(Object convey) throws JsonProcessingException {
    
            String message=objectMapper.writeValueAsString(convey);
    
            ListenableFuture<SendResult<Integer,String>> listenableFuture=kafkaTemplate.sendDefault(null, message);
    
            listenableFuture.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
                @Override
                public void onFailure(Throwable throwable) {
                failHandler(null, message, throwable);
                }
    
                @Override
                public void onSuccess(SendResult<Integer, String> result) {
                successHandler(result);
                }
            });
        }
        private void failHandler(Integer key, String message, Throwable throwable){
            //log.error("Unable to send the message for following Error :"+throwable.getMessage());
    
            try {
                throw throwable;
            }
            catch (Throwable anotherThrowable){
                //log.error("**Supreme Error on throwing the throwable**"+anotherThrowable.getMessage());
            }
        }
    
        private void successHandler (SendResult<Integer,String> result){
    
            //log.info("Message sent successfully :"+ result);
        }
    
        @AllArgsConstructor
        @NoArgsConstructor
        @Getter
        @Setter
        public static class Convey{
    
            private SagaSequence sequence;
            private Integer key;
            private Date date;
      }

    您的 Logstash 配置文件可能如下所示

    input {
        kafka{
            group_id => "35834"
            topics => ["Second-Topic"]
            bootstrap_servers => "localhost:9092"
            codec => json
        }
    }
    
    filter {
    
    }
    
    output {
        file {
            path => "/SOMEPATH"
        }
        elasticsearch {
            hosts => ["localhost:9200"]
            document_type => "_doc"
            index => "logger"
        }
        stdout { codec => rubydebug
        }
    }

    【讨论】:

      猜你喜欢
      • 2020-06-04
      • 2016-06-17
      • 2021-05-14
      • 2020-01-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-08
      相关资源
      最近更新 更多