【问题标题】:How to send email reactive in spring web-flux如何在 spring web-flux 中发送电子邮件响应式
【发布时间】:2018-12-08 01:37:42
【问题描述】:

我希望在我的新 Spring 应用程序中保持完全响应式。因此我在 MongoDB 中使用 web-flux/reactor 和 ReactiveRepository。

您知道如何将 java-mail 响应式集成到技术堆栈中吗?有什么选择吗?

【问题讨论】:

    标签: spring-boot jakarta-mail spring-webflux reactive-streams


    【解决方案1】:

    我发现并仍在使用的唯一有用的非阻塞 SMTP 客户端是 https://vertx.io/docs/vertx-mail-client/java/
    我什至将它与 spring-webflux 和 mongodb-driver-reactivestreams 集成,以便它们共享同一个 Netty EventLoopGroup。

    Mono.create<MailResult> { sink ->
      mailClient.sendMail(email) { asyncResult ->
        if (asyncResult.succeeded()) {
          sink.success(asyncResult.result()
        } else {
          sink.error(asyncResult.cause()
        }
      }
    }
    

    【讨论】:

    • 你能分享一个例子吗?
    • @RicardKollcaku,我添加了代码示例。这是你要求的吗?
    • 但这是否在 netty 事件循环中运行,我尝试使用 blockhund 来检测阻塞,它给出异常并且是顶点线程而不是反应器线程。和 blockhund 检测阻塞
    【解决方案2】:

    如何使用microsoft graph api,并使用microsoft的exahnge服务器发送电子邮件 https://docs.microsoft.com/en-us/graph/use-the-api。 它不是原始问题的答案,但我想知道可以在那里应用相同的概念,或者是否有人使用此 API 有类似的东西。

    【讨论】:

      【解决方案3】:

      为了发送电子邮件并且仍然是非阻塞,您可以在另一个线程中运行有关发送电子邮件的代码。如果您使用的是 Spring WebFlux,这可以很容易地完成,只需将用于发送电子邮件的代码包装在 Mono(A Reactor 库发布器)的以下工厂方法中。

      Mono.fromCallable()

      Mono.fromRunnable()

      短代码

      Mono.fromCallable(()-> sendEmail())
          .subscribe();
      

      sendEmail() 是您发送电子邮件的函数。

      这也是文档中推荐的 - How Do I Wrap a Synchronous, Blocking Call?

      长代码

      以下是我在应用程序中使用的完整示例代码 -

          Mono.fromCallable(() -> {
                  try {
                      MimeMessageHelper helper = new MimeMessageHelper(sender.createMimeMessage());
                      helper.setTo(to);
                      helper.setSubject(subject);
                      helper.setText(body);
                      sender.send(helper.getMimeMessage());
                      log.info("Email send successfully, subject {} , to {}", subject, to);
                      return true;
                  } catch (Exception e) {
                      log.warn("Failed to send email with subject {}, due to {}",subject,e.getMessage(), e});
                      return false;
                  }
      
       )
      .subscribe(result -> log.info("Mail sent {}", result));
      

      当它是响应式堆栈时,永远不要忘记订阅:D

      【讨论】:

      • 我们不应该使用 subscribe(Schedulers.boundedElastic())
      • 据我了解,我们这里有两种情况:1)reactor 将在单独的线程中作为作业运行此代码,虽然这不会阻塞当前线程,但仍会阻塞使用的线程游泳池? 2) reactor 将在事件循环中运行这段代码,这意味着如果这段代码是阻塞的,它会阻塞整个事件循环?我想在这两种情况下,我们都需要一个对 IO 级别做出反应的客户端,以真正避免阻塞并节省线程。
      【解决方案4】:

      我也在寻找响应式 SMTP 客户端。

      我设法找到了它;)

      这里是:https://github.com/HubSpot/NioSmtpClient

      来自自述文件:

      基于 Netty 的 Java 高性能 SMTP 客户端。该客户端经过充分测试并在 HubSpot 大量使用。

      我已经在本地环境中验证了它,它真的很被动! 但是,它使用 completableFuture 而不是 Mono 或 Flux,因此需要手动包装它。

      总的来说,这个库看起来不错,但我认为如果作者提供一些facade 会更好,这将简化 SDK 的使用。 (无论如何它是开源的,所以我们可以改进它)。

      这里是一个例子,如何使用它(codeStyle 没关系,这只是一个例子):

      private static final String MESSAGE_DATA = "From: <sender@gmail.com\r\n" +
              "To: <recipient@gmail.com>\r\n" +
              "Subject: test mail\r\n\r\n" +
              "Hello stackOverFlow!";
      
      public static void main(String[] args) {
          final SmtpSessionFactory smtpSessionFactory = createSmtpSessionFactory();
      
          final SmtpSessionConfig smtpSessionConfig = SmtpSessionConfig.builder().remoteAddress(InetSocketAddress.createUnresolved("smtp.gmail.com", 587)).build();
          Mono.fromFuture(smtpSessionFactory.connect(smtpSessionConfig))
                  .flatMap(connection -> doInSession(connection, req(EHLO, "gmail.com")))
                  .flatMap(connection -> Mono.fromFuture(connection.getSession().startTls()))
                  .flatMap(connection -> Mono.fromFuture(connection.getSession().authLogin("sender@gmail.com", "SomeStrongPasswordLike123456")))
                  .flatMap(connection -> doInSession(connection, req(MAIL, "FROM:<" + "sender@gmail.com" + ">")))
                  .flatMap(connection -> doInSession(connection, req(RCPT, "TO:<" + "recipient@gmail.com" + ">")))
                  .flatMap(connection -> doInSession(connection, req(DATA)))
                  .map(connection -> connection.getSession()
                          .send(MessageContent.of(Unpooled.wrappedBuffer(MESSAGE_DATA.getBytes(StandardCharsets.UTF_8)))))
                  .flatMap(Mono::fromFuture)
                  .flatMap(connection -> doInSession(connection, req(QUIT)))
                  .flatMap(connection -> Mono.fromFuture(connection.getSession().close()))
                  .block();
      }
      
      private static SmtpSessionFactory createSmtpSessionFactory() {
          ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("niosmtpclient-%d").build();
          final SmtpSessionFactoryConfig config = SmtpSessionFactoryConfig.builder()
                  .eventLoopGroup(new NioEventLoopGroup(4, threadFactory))
                  .executor(Executors.newCachedThreadPool(threadFactory))
                  .build();
          return new SmtpSessionFactory(config);
      }
      
      private static Mono<SmtpClientResponse> doInSession(SmtpClientResponse connection, SmtpRequest request) {
          return Mono.fromFuture(connection.getSession().send(request));
      }
      
      private static SmtpRequest req(SmtpCommand command, CharSequence... arguments) {
          return new DefaultSmtpRequest(command, arguments);
      }
      

      它是如何工作的(一般):

      1. 我们定义了会话工厂(见方法createSmtpSessionFactory
      2. 我们打开与服务器的对话(连接)
      3. 我们启动 TLS
      4. 我们使用我们的凭据进行身份验证
      5. 我们将发件人的电子邮件地址告诉服务器
      6. 我们将收件人的电子邮件地址告诉服务器
      7. 我们开始数据发送阶段
      8. 我们发送数据(数据必须遵循某种模式。发件人:...收件人:...主题:...。请参阅MESSAGE_DATA 变量)
      9. 我们通知服务器我们正在完成对话。
      10. 我们关闭会话

      【讨论】:

        【解决方案5】:

        我找到了a solution。它使用spring-boot-starter-data-mongodb-reactive 和 Mailgun 或 SendGrid 等外部​​服务的 API。关键是使用响应式 WebClient :

        1. 构建 WebClient 实例(例如,使用 Sendgrid API):

          String endpoint = “https://api.sendgrid.com/v3/“;
          String sendUri = endpoint + “mail/send”;
          
          WebClient client = WebClient.builder().filter(…).clientConnector(new ReactorClientHttpConnector(HttpClient.create())).baseUrl(endpoint).build()
          
        2. 实现响应对象:

          @Data
          class Response implements Serializable {
              private boolean status;
              private String id;
              private String message;
          }
          
          @Data
          class NotificationStatusResponse implements Serializable {
          
              private LocalDateTime timestamp;
              private int status;
              private String message;
              private String traceId;
              private String responseId;
              private String providerResponseId;
              private String providerResponseMessage;
          }
          
        3. 发送您的信息:

          public Mono<NotificationStatusResponse> send(NotificationMessage<EmailId> email) throws NotificationSendFailedException {
          
              Mono<NotificationStatusResponse> response = null;
              try {
                  MultiValueMap<String, Object> formMap = new LinkedMultiValueMap<>(); // email parameters here: from, to, subject, html etc.
                  response = client.post().uri(sendUri)
                  .header("Authorization", "Basic " + “your credentials here”)
                  .contentType(MediaType.MULTIPART_FORM_DATA).syncBody(formMap).retrieve()
                  .bodyToMono(Response.class).map(this::getNotificationStatusResponse)
                  .doOnSuccess(message -> log.debug("sent email successfully"))
                  .doOnError((error -> log.error("email failed ", error)));
              } catch (WebClientException webClientException) {
                  throw new NotificationSendFailedException("webClientException received", webClientException);
              }
              return response;
          
              NotificationStatusResponse getNotificationStatusResponse(Response response) {
                  NotificationStatusResponse notificationStatusResponse = new NotificationStatusResponse();
                  notificationStatusResponse.setStatus(200);
                  notificationStatusResponse.setTimestamp(LocalDateTime.now());
                  notificationStatusResponse.setProviderResponseId(response.getId());
                  notificationStatusResponse.setProviderResponseMessage(response.getMessage());
                  return notificationStatusResponse;
              }
          }
          

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2011-07-14
          • 1970-01-01
          • 2013-09-21
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多