如基于 SNS HTTP(S) 订阅的 Amazon Doc 中所述...more
在您将 HTTP 或 HTTPS 终端节点订阅到某个主题之前,您必须确保 HTTP 或 HTTPS 终端节点可以处理 Amazon SNS 用来发送订阅确认和通知消息的 HTTP POST 请求。当您订阅 HTTP 终端节点时,Amazon SNS 会向其发送订阅确认请求。当您创建订阅时,您的终端节点必须准备好接收和处理此请求,因为 Amazon SNS 会在那时发送此请求。在您确认订阅之前,Amazon SNS 不会向终端节点发送通知。确认订阅后,Amazon SNS 将在订阅主题执行发布操作时向终端节点发送通知。
在处理SubscriptionConfirmation请求之前,状态始终处于Pending Confirmation状态,不会收到任何Notification消息。
这是自动处理 SubscriptionConfirmation、Notification 和 UnsubscribeConfirmation 消息类型请求和 Content-Type 对于上述由 AWS SNS 触发的请求,总是 text/plain 格式。
Spring-boot方式
为 SNS 消费者使用 Spring-boot 和以下代码的主要优点是您不需要依赖任何 AWS 指定的配置及其依赖项。如果您的微服务是使用 spring-boot 而不是 spring-cloud-aws-messaging 构建的,那么这是一种实用的方法。您的消费者端应用程序需要执行以下步骤。
在请求处理方法中加入如下注解来处理
SubscriptionConfirmation 和 UnsubscribeConfirmation 消息类型。
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface SNSSubscriptionUnSubscriptionConfirmation {
}
如果请求处理方法使用@SNSSubscriptionUnSubscriptionConfirmation, 注释,则将触发以下方面,它将根据 Http(s) 处理 SubscriptionConfirmation 和 UnsubscribeConfirmation 消息类型) 请求头x-amz-sns-message-type。
@Aspect
@Component
@Slf4j
public class SNSSubscriptionUnSubscriptionActivation {
@Before(value = "@annotation(SNSSubscriptionUnSubscriptionConfirmation)")
public void SNSSubscriptionUnSubscriptionConfirmationActivation(JoinPoint joinPoint) {
Object[] args = joinPoint.getArgs();
HttpServletRequest httpServletRequest = (HttpServletRequest)args[0];
String requestBody = (String)args[1];
String messageType = httpServletRequest.getHeader("x-amz-sns-message-type");
String topicArn = httpServletRequest.getHeader("x-amz-sns-topic-arn");
if(!StringUtils.isEmpty(messageType)){
if("SubscriptionConfirmation".equals(messageType)){
activateSNSSubscriptionUnSubscription(requestBody, messageType, topicArn, "SubscribeURL");
} else if("UnsubscribeConfirmation".equals(messageType)){
activateSNSSubscriptionUnSubscription(requestBody, messageType, topicArn, "UnsubscribeURL");
}
}
}
private void activateSNSSubscriptionUnSubscription(String requestBody, final String messageType, String topicArn, String subscribeUnsubscribeURLKey) {
log.info(messageType + " payload: {}", requestBody);
JsonMapper mapper = JsonMapper.builder()
.configure(JsonReadFeature.ALLOW_UNQUOTED_FIELD_NAMES, true)
.build();
try {
Map<String, String> maps = mapper.readValue(requestBody, new TypeReference<Map<String, String>>() {
});
String subscribeUnsubscribeURL = maps.get(subscribeUnsubscribeURLKey);
RestTemplate restTemplate = new RestTemplate();
//Manually activating the subscribe and UnsubscribeURL requests by making direct HTTP call using rest controller
ResponseEntity<Void> response = restTemplate.exchange(subscribeUnsubscribeURL, HttpMethod.GET, null, Void.class);
if (response.getStatusCode().is2xxSuccessful())
log.info("topicArn: {} messageType: {} Successful: {}", topicArn, messageType, response.getStatusCode());
else {
log.error("topicArn: {} messageType: {} failure Status: {}", topicArn, messageType, response.getStatusCode());
}
} catch (JsonProcessingException e){
log.error("topicArn: {} messageType: {} failure error: {}", topicArn, messageType, e.getMessage());
}catch(HttpClientErrorException e) {
log.error("topicArn: {} messageType: {} failure error: {}", topicArn, messageType, e.getResponseBodyAsString());
}
}
}
控制器的 handler 方法处理 Notification messageType,因为其余两个 messageType 由 SNSSubscriptionUnSubscriptionActivation 处理,作为带有 @SNSSubscriptionUnSubscriptionConfirmation() 注释的请求处理程序方法。
@Slf4j
@RestController
public class AWSSNSConsumerController {
@PostMapping(value = "subscribed-endpoint", consumes = MediaType.TEXT_PLAIN_VALUE)
@SNSSubscriptionUnSubscriptionConfirmation()
public ResponseEntity<Void> notification(HttpServletRequest httpServletRequest, @RequestBody() String requestBody) {
log.info("Notification payload: {}", requestBody);
String topicArn = httpServletRequest.getHeader("x-amz-sns-topic-arn");
String messageType = httpServletRequest.getHeader("x-amz-sns-message-type");
if ("Notification".equals(messageType)) {
JsonMapper mapper = JsonMapper.builder()
.configure(JsonReadFeature.ALLOW_UNQUOTED_FIELD_NAMES, true)
.build();
try {
Map<String, String> maps = mapper.readValue(requestBody, new TypeReference<Map<String, String>>() {
});
String message = maps.get("Message");
log.info("topic : {} message: {} ", topicArn, message);
} catch (JsonProcessingException e) {
log.error("topic : {} Notification failure error: {}", topicArn, e.getMessage());
} catch (HttpClientErrorException e) {
log.error("topic : {} Notification failure error: {}", topicArn, e.getResponseBodyAsString());
}
}
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
}
}
如果您使用的是 spring-cloud-aws,请关注这个很棒的 tutorial