【问题标题】:Issue getting MessageAttributes in spring-cloud SNS/SQS listener在 spring-cloud SNS/SQS 监听器中获取 MessageAttributes 的问题
【发布时间】:2019-07-07 11:02:41
【问题描述】:

我正在使用 spring-boot-1.5.10 和 spring-cloud 并使用 spring-cloud-starter-aws-messaging。我能够发送和接收消息,但无法获取 SNS 消息属性。任何帮助都会非常显着。请在下面找到代码,

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.19.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.aws.sample</groupId>
    <artifactId>aws</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>aws</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Edgware.SR5</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-aws</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-aws-messaging</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

controller.java

@RestController
@RequestMapping(value = "/sns")
@AllArgsConstructor
public class SimpleSnsController {

    private NotificationMessagingTemplate notificationMessagingTemplate;

    @PostMapping("/saveEmployee")
    public String save(@RequestBody Employee employee){
        Map<String,Object> headers = new HashMap<>();
        headers.put("subject", "send employee details to sqs");
        headers.put("name","murugan");
        headers.put("traceId","sample");
        //notificationMessagingTemplate.sendNotification("sample-sns", employee, "send employee details to sqs");

        notificationMessagingTemplate.convertAndSend("sample-sns", employee, headers);
        return "success";
    }

    //@SqsListener(value = "sample-queue")
    @SqsListener(value = "${sqs.consumer.name}")
    public void receiveSnsSqs(String message, @NotificationMessage Employee employee) {
        System.out.println("SNS Consumer received the message::"+message);
        System.out.println("SNS Consumer received the notificationMessage::"+employee);
        //Here i would like to get the message attribute
    }
}

收到的输出消息:

{
  "Type" : "Notification",
  "MessageId" : "ba9dab52-aae8-5940-a3e2-ff8c8458ef52",
  "TopicArn" : "arn:aws:sns:XXX",
  "Message" : "{\"name\":\"David\",\"age\":\"31\",\"designation\":\"developer\"}",
  "Timestamp" : "2019-02-13T14:40:48.501Z",
  "SignatureVersion" : "1",
  "Signature" : "XXX",
  "SigningCertURL" : "XXX",
  "UnsubscribeURL" : "XXX",
  "MessageAttributes" : {
    "traceId" : {"Type":"String","Value":"sample"},
    "subject" : {"Type":"String","Value":"send employee details to sqs"},
    "name" : {"Type":"String","Value":"murugan"},
    "id" : {"Type":"String","Value":"68bf17f2-0f88-4cc5-0609-0ccd42b19ce4"},
    "SenderId" : {"Type":"String","Value":"David"},
    "contentType" : {"Type":"String","Value":"application/json;charset=UTF-8"},
    "timestamp" : {"Type":"Number.java.lang.Long","Value":"1550068848349"}
  }
}

我想获取我在 SNS 生产者中设置的消费者中的消息属性,例如 name、traceId。我浏览了很多但找不到任何解决方案。任何帮助都会非常重要。

【问题讨论】:

  • 听起来它是作为消息的一部分出现的,您只需将其解析出来 - 为什么使用基本的 json 解析器无法为您完成这项工作?
  • 是的,我可以使用它....如果消息来自 sqs 他们我可以使用@Header 注释来获取这些标题,但这里我们使用的是 SNS,所以我们不能使用该注释。 ...
  • 我想我不明白 - 您正在寻找像 'name' 和 'traceId' 这样的属性,但我在您的输出消息中看到它们。
  • 是的,我得到的是一个 JSON 字符串......而且我必须解析它似乎......没有办法使用 @headers 注释或任何其他更简单的方式获取属性

标签: spring spring-cloud amazon-sqs amazon-sns spring-cloud-aws


【解决方案1】:

尝试启用原始消息传递。 它不会对原始SNS消息进行包装,并允许您通过普通注解@Header、@Headers获取消息和标头

https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html

如果您不能使用原始消息传递,我制作了一个新注释来帮助检索通知标头

@NotificationHeader

import org.springframework.core.annotation.AliasFor;
import org.springframework.messaging.handler.annotation.ValueConstants;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface NotificationHeader {
    /**
     * Alias for {@link #name}.
     */
    @AliasFor("name")
    String value() default "";

    /**
     * The name of the request header to bind to.
     */
    @AliasFor("value")
    String name() default "";

    /**
     * Whether the header is required.
     * <p>Default is {@code true}, leading to an exception if the header is
     * missing. Switch this to {@code false} if you prefer a {@code null}
     * value in case of a header missing.
     * @see #defaultValue
     */
    boolean required() default true;

    /**
     * The default value to use as a fallback.
     * <p>Supplying a default value implicitly sets {@link #required} to {@code false}.
     */
    String defaultValue() default ValueConstants.DEFAULT_NONE;
}

*NotificationHeaderArgumentResolver

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.cloud.aws.messaging.support.NotificationMessageArgumentResolver;
import org.springframework.core.MethodParameter;
import org.springframework.core.convert.ConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver;
import org.springframework.util.Assert;

public class NotificationHeaderArgumentResolver extends HeaderMethodArgumentResolver {

    private NotificationMessageArgumentResolver notificationArgumentResolver;

    public NotificationHeaderArgumentResolver(ConversionService cs, ConfigurableBeanFactory beanFactory) {
        super(cs, beanFactory);

        notificationArgumentResolver = new NotificationMessageArgumentResolver(new NoOptMessageConverter());
    }

    @Override
    public boolean supportsParameter(MethodParameter parameter) {
        return parameter.hasParameterAnnotation(NotificationHeader.class);
    }

    @Override
    @Nullable
    protected Object resolveArgumentInternal(MethodParameter parameter, Message<?> message, String name)
            throws Exception {

        Message notificationMessage = (Message) notificationArgumentResolver.resolveArgument(parameter, message);

        return super.resolveArgumentInternal(parameter, notificationMessage, name);
    }

    @Override
    protected NamedValueInfo createNamedValueInfo(MethodParameter parameter) {
        NotificationHeader annotation = parameter.getParameterAnnotation(NotificationHeader.class);
        Assert.state(annotation != null, "No Header annotation");
        return new HeaderNamedValueInfo(annotation);
    }

    private static class HeaderNamedValueInfo extends NamedValueInfo {

        private HeaderNamedValueInfo(NotificationHeader annotation) {
            super(annotation.name(), annotation.required(), annotation.defaultValue());
        }
    }

    public static class NoOptMessageConverter implements MessageConverter {
        @Override
        public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers) {
            return null;
        }

        @Override
        public Object fromMessage(Message<?> message, Class<?> targetClass) {
            return message;
        }
    }
}

*NotificationHeaderConfiguration

    @Bean
    public QueueMessageHandlerFactory queueMessageHandlerFactory() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();

        queueMessageHandlerFactory.setArgumentResolvers(Collections.singletonList(new NotificationHeaderArgumentResolver(null, null)));

        return queueMessageHandlerFactory;
    }

【讨论】:

  • “原始消息传递”配置为 SNS 订阅的一部分。
  • @NotificationHeader 效果很好。想知道为什么 Spring Cloud 消息没有涵盖它
猜你喜欢
  • 2019-08-05
  • 2021-10-15
  • 2018-11-11
  • 2018-01-31
  • 2020-02-02
  • 2023-03-23
  • 2022-01-13
  • 2015-02-27
  • 2011-02-25
相关资源
最近更新 更多