【问题标题】:Spring Integration not forwarding a custom header?Spring Integration 不转发自定义标头?
【发布时间】:2017-01-23 11:50:40
【问题描述】:

我在my-transformer 中添加了一些标题:

public Message<?> transform(final Message<?> message) {        
    List<Item> items = doStuff(message);

    final MessageBuilder<?> messageBuilder = MessageBuilder
            .withPayload(message.getPayload())
            .copyHeadersIfAbsent(message.getHeaders());


    for (final Item item : items) {
        messageBuilder.setHeader(item.getHeaderName(), item.getValue());
    }

    return messageBuilder.build();
}

并且我编写了一个集成测试来确认我的标头出现在输出通道上:

 public static class HeaderTest extends TransformerTest {
    @Test
    public void test() throws Exception {
        channels.input().send(new GenericMessage<>(TransformerTest.EXAMPLE_PAYLOAD));
        final Message<?> out = this.collector.forChannel(this.channels.output()).poll(10, TimeUnit.SECONDS);

        assertThat(out, HeaderMatcher.hasHeader("header-test", notNullValue()));
    }
}

但是,当我创建如下流时:

http --port=1234 | my-transformer | log --expression=toString()

并发送了相同的EXAMPLE_PAYLOAD 我在logs 日志中收到了以下消息:GenericMessage [payload=..., headers={kafka_offset=0, id=f0a0727c-9351-274c-58b3-edee9ccbf6ce, kafka_receivedPartitionId=0, contentType=text/plain;charset=UTF-8, kafka_receivedTopic=myTopic.my-transformer, timestamp=1485171448947}]

为什么邮件标题中没有我的header-test

-- 编辑--

所以,如果我理解正确,我应该这样做:

public class MyTransformer implements Transformer {

    private final EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();

    @Override
    public Message<?> transform(final Message<?> message) {
        List<Item> items = doStuff(message);

        final MessageBuilder<byte[]> messageBuilder = MessageBuilder
                .withPayload(((String) message.getPayload()).getBytes())
                .copyHeadersIfAbsent(message.getHeaders());

        final int itemsSize = items.size();
        final String[] headerNames = new String[itemsSize];
        for (int i = 0; i < itemsSize; i++) {
            final Item item = items.get(i);
            messageBuilder.setHeader(item.getHeaderName(), item.getValue());
            headerNames[i] = item.getHeaderName();
        }

        final Message<byte[]> msg = messageBuilder.build();

        final byte[] rawMessageWithEmbeddedHeaders;
        try {
            rawMessageWithEmbeddedHeaders = converter.embedHeaders(new MessageValues(msg), headerNames);
        } catch (final Exception e) {
            throw new HeaderEmbeddingException(String.format("Cannot embed headers from '%s' into message: %s", items, msg), e);
        }

        return new GenericMessage<>(rawMessageWithEmbeddedHeaders);
    }
}

application.properties 中设置spring.cloud.stream.bindings.output.producer.headerMode=raw,然后在接收端转换消息有效负载?或者我可以让接收方自动转换消息负载吗?

【问题讨论】:

    标签: java spring spring-integration


    【解决方案1】:

    您没有说您使用的是 Spring XD 还是 Spring Cloud DataFlow,但每种情况下的解决方案都是相似的。

    由于 kafka 没有对 headers 的原生支持,我们必须将它们嵌入到消息负载中。由于我们不想传输不必要的标头,因此您必须通过在 servers.yml 中为 Spring XD 或 application.yml(或 .properties)中为 Spring Cloud Stream 设置标头名称来选择要传输的标头应用程序。

    编辑

    很遗憾,不支持模式。一种选择是自己使用EmbeddedHeadersMessageConverter,并将kafka mode 设置为raw(在变压器的输出目标上)。原始模式意味着活页夹不会嵌入标题。

    这样,下一个应用程序(不带模式raw)应该能够对标头进行解码,就好像它们已由转换器中的活页夹编码一样。 Javadocs here.

    您被限制为 255 个标题。

    【讨论】:

    • 糟糕,忘了说我使用的是 Spring CDF。问题是我事先不知道我的标题名称。有没有办法选择我的转换器中的所有标头?
    • 不幸的是,不支持像* 这样的模式。我已经为我的答案添加了解决方法。
    • 我觉得不错。
    • 那我就不能自动转换消息负载了吗?因为my-transformer 之后的应用程序正在获取GenericMessage [payload=byte[18], headers={kafka_offset=0, id=9fb76cde-0ee9-b5ed-0148-3315085b207c, kafka_receivedPartitionId=0, kafka_receivedTopic=example.my-transformer, header-test=something, timestamp=1485187162288}],而我的转换器的输入是 JSON。
    • 呃,是的;当然,绑定器完成的标头嵌入是在消息转换/序列化之后。我得试一试,看看能不能想出一个变通办法。我添加了an issue to support header patterns in Spring Cloud Stream
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-11-13
    • 2013-11-28
    • 1970-01-01
    相关资源
    最近更新 更多