【问题标题】:receive sqs message attributes using the camel dsl?使用camel dsl接收sqs消息属性?
【发布时间】:2015-01-31 11:12:13
【问题描述】:

有人知道如何在 java 中使用 Camel DSL 接收 SQS 消息属性吗? 我收到以下错误:

“无法创建路由收款人路由:路由(批量路由)[[From[aws-sqs://myqueue?amazonSQSEndpoint=...因为无法解析端点:aws-sqs://myqueue?amazonSQSEndpoint= sqs.us-west-1.amazonaws.com&accessKey=*****&secretKey=******************&maxMessagesPerPoll=1&messageAttributeNames=%5BuserID%5 由于:找不到适合属性的设置器:messageAttributeNames,因为没有相同类型的设置器方法:java.lang.String 也没有类型转换:没有类型转换器可用于从类型:java.lang.String 转换为所需类型:java.util .Collection 值为 [userID] "

请找到我的代码

StringBuilder QueueURI = new StringBuilder();
QueueURI(PropertyUtils.AWS_SQS)
        .append(propertyUtils.queueName)
        .append(PropertyUtils.AMAZON_SQS_REGION)
        .append(propertyUtils.sqsRegion);
QueueURI(PropertyUtils.AWS_ACCESS_KEY).append(
         propertyUtils.awsAccessKey);
QueueURI(PropertyUtils.AWS_SECRET_KEY).append(
         propertyUtils.awsSecretKey);
QueueURI(PropertyUtils.MAX_MESSAGES_PER_POLL_1);
QueueURI("&messageAttributeNames=");


Collection<String> collection = new ArrayList<String>();
collection.add("userID");

//aws-sqs://myqueue?amazonSQSEndpoint=sqs.us-west-1.amazonaws.com&accessKey=*****&secretKey=****************&maxMessagesPerPoll=1&messageAttributeNames=[userID]

from(QueueURI.ToString() + collection)
       .routeId("batch route")
       .process(userValidator);

【问题讨论】:

    标签: java amazon-web-services apache-camel amazon-sqs


    【解决方案1】:

    默认情况下,Camel 没有 java.lang.String => java.util.Collection TypeConverter。您可以实现一个 org.apache.camel.TypeConverter,然后可以将其注册到 CamelContext 的 TypeConverterRegistry。

    我使用的是 Spring,所以我利用了 Spring 的转换支持:

    import org.apache.camel.Exchange;
    import org.apache.camel.TypeConversionException;
    import org.apache.camel.support.TypeConverterSupport;
    import org.springframework.core.convert.ConversionService;
    import org.springframework.core.convert.support.DefaultConversionService;
    
    public class TypeConverterBridge extends TypeConverterSupport {
        private ConversionService cs = new DefaultConversionService();
    
        @Override
        public <T> T convertTo(Class<T> type, Exchange exchange, Object value) throws TypeConversionException {
            if (cs.canConvert(value.getClass(), type)) {
                return cs.convert(value, type);
            }
            return null;
        }
    }
    

    然后用我的 CamelContext 注册 TypeConverter:

    camelContext.getTypeConverterRegistry().addFallbackTypeConverter(new TypeConverterBridge(), false);
    

    【讨论】:

      【解决方案2】:

      您可以在名为 CamelAwsSqsAttributes 的标头中找到 SQS 消息的属性,如下所述:http://camel.apache.org/aws-sqs.html

      该标题是一个Map&lt;String, String&gt;,其中包含您要查找的内容。如果您想查看它们,可以执行以下操作:

      ...
      from(QueueURI.ToString() + collection)
         .routeId("batch route")
         .log("Attributes: ${header.CamelAwsSqsAttributes}")
         .process(userValidator); 
      

      【讨论】:

      • 一种方法是定义一个@Bean public Collection sqsAttributeNames() {} 并将 &messageAttributeNames=#sqsAttributeNames 添加到 sqs url
      猜你喜欢
      • 1970-01-01
      • 2014-09-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-11-10
      • 1970-01-01
      • 2019-03-23
      • 2020-10-31
      相关资源
      最近更新 更多