【问题标题】:Using Spring AMQP, my receiver always misses the first message after it is started使用 Spring AMQP,我的接收器在启动后总是错过第一条消息
【发布时间】:2015-11-30 23:28:25
【问题描述】:

我正在使用 Spring Boot 1.3.0.RELEASE 和 spring-boot-starter-amqp v 1.3.0。 Rabbit 代理的版本是 RabbitMQ 3.5.6、Erlang 18.1。

我编写了一个小型 AMQP 消息侦听器应用程序,它只接受消息并将它们写入数据库。它很大程度上基于Messaging with Rabbit MQ Spring Guide.

我的更改是将 TopicExchange 转换为 FanoutExchange 并添加对 setConcurrentConsumers() 的调用。而且我加了JDBC,也去掉了发送消息的代码。

除了一个问题,它运行良好;每次我启动它,它总是错过第一条消息。发送应用程序没有重新启动,并且非常可靠,当我使发送应用程序发送消息时,此应用程序错过了我发送的第一个消息。在那之后,它似乎都得到了它们。

代码贴在下面,感谢您的帮助。

@SpringBootApplication
public class Application implements CommandLineRunner 
{
    @Autowired
    AnnotationConfigApplicationContext context;

    @Autowired
    ConfigurationService cs;

    @Bean
    Queue queue() 
    {
        return new Queue(cs.getRabbitQueue(), false, false, true);
    }

    @Bean
    Binding binding(Queue queue, FanoutExchange exchange) 
    {
        return BindingBuilder.bind(queue).to(exchange);
    }


    // Added by JWA
    @Bean
    public ConnectionFactory connectionFactory()
    {
        CachingConnectionFactory cf = new CachingConnectionFactory(cs.getRabbitHost());

        cf.setUsername(cs.getRabbitUserName());
        cf.setPassword(cs.getRabbitPassword());
        cf.setVirtualHost(cs.getRabbitVirtualHost());

        return cf;
    }

    @Bean
    public FanoutExchange fanout()
    {
        return new FanoutExchange("logs", false, false);
    }


    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) 
    {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(cs.getRabbitQueue());
        container.setConcurrentConsumers(cs.getRabbitNumListeners());
        container.setMessageListener(listenerAdapter);

        return container;
    }

    @Bean
    Receiver receiver() 
    {
        return new Receiver();
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) 
    {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    public static void main(String[] args) throws InterruptedException 
    {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception 
    {
            // Nothing to do here
    }

}


public class Receiver 
{
    @Autowired 
    JdbcTemplate jdbcTemplate;

    private final static String sql = "INSERT INTO msc_reporting_log (eventtime, rectype, userid, prospect_key, userip, userhostname, phase, decision, reason, loghost, sourcehost) values (?, ?, ?, HEXTORAW(?), ?, ?, ?, ?, ?, ?, ?)";

    private String hn;

    public void receiveMessage(String message) 
    {
        System.out.println("Received: " + message);

        LogMessage lm = extractJson(message);

        logWithLogger(lm);
        logToDatabase(lm);
    }

    private void logWithLogger(LogMessage lm)
    {
        String msg = "MESSAGE_RECEIVED," + lm;
        Logger.getGlobal().info(msg);
    }

    private void logToDatabase(LogMessage m)
    {
        jdbcTemplate.update(sql, m.getEventTime(), m.getType(), m.getUserId(), m.getUserProspectKey(), m.getUserIP(), m.getUserHostName(), m.getPhase(), m.getDecision(), m.getReason(), hn, m.getSourceHostName());
    }

    private LogMessage extractJson(String m)
    {
        try
        {
            ObjectMapper mapper = new ObjectMapper();
            LogMessage lm = mapper.readValue(m,  LogMessage.class);
            return lm;
        }
        catch(JsonMappingException jme)
        {
            Logger.getGlobal().log(Level.SEVERE, "Error mapping JSON", jme);
            jme.printStackTrace();
        }
        catch(JsonParseException jpe)
        {
            Logger.getGlobal().log(Level.SEVERE, "Error parsing JSON", jpe);
            jpe.printStackTrace();
        }
        catch(IOException ioe)
        {
            Logger.getGlobal().log(Level.SEVERE, "IO Error while extracting JSON", ioe);
            ioe.printStackTrace();
        }

        return null;
    }

}

更新

就这个错误发生的时间而言,发布者程序在我启动这个监听器之前就已经运行了。我启动监听器。然后我让发布者发送一条消息,通常是通过不正确的登录。发布者生成消息,而接收者应用似乎根本没有注意到。我再做一次,然后收到第二条消息。

我已修改接收器应用程序以改用匿名队列,因为我想运行此记录器应用程序的多个实例以实现冗余。问题仍然存在。这是新的接收者代码、发布者代码和接收者 pom:

接收器 POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>edu.xxxxx.ua</groupId>
    <artifactId>DecisionsLogger</artifactId>
    <version>0.1.0</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.3.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>ojdbc7</artifactId>
            <version>12.1.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.7</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-releases</id>
            <name>Spring Releases</name>
            <url>https://repo.spring.io/libs-release</url>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-releases</id>
            <name>Spring Releases</name>
            <url>https://repo.spring.io/libs-release</url>
        </pluginRepository>
    </pluginRepositories>
</project>

新应用,和上一个有同样的问题:

@SpringBootApplication
public class Application implements CommandLineRunner 
{
    @Autowired
    AnnotationConfigApplicationContext context;

    @Autowired
    ConfigurationService cs;

    public String getLocalHostname()
    {
        return cs.getLocalHostName();
    }

    @Bean
    Queue queue() 
    {
        //return new Queue(cs.getRabbitQueue(), false, false, true);
        return new AnonymousQueue();
    }

    @Bean
    Binding binding(FanoutExchange exchange) 
    {
        return BindingBuilder.bind(queue()).to(exchange);
    }

    @Bean
    public ConnectionFactory connectionFactory()
    {
        CachingConnectionFactory cf = new CachingConnectionFactory(cs.getRabbitHost());

        cf.setUsername(cs.getRabbitUserName());
        cf.setPassword(cs.getRabbitPassword());
        cf.setVirtualHost(cs.getRabbitVirtualHost());

        return cf;
    }

    @Bean
    public FanoutExchange fanout()
    {
        return new FanoutExchange(cs.getRabbitExchange(), false, false);
    }


    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) 
    {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(cs.getRabbitMinListeners());
        factory.setMaxConcurrentConsumers(cs.getRabbitMaxListeners());
        return factory;
    }   


    @Bean
    Receiver receiver() 
    {
        return new Receiver();
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) 
    {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    public static void main(String[] args) throws InterruptedException 
    {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception 
    {
        while(true) {}
            // Nothing to do here
    }

}

发布者代码:

@Autowired
private RabbitTemplate rabbitTemplate;

与上面的@Autowired 在同一类中的其他地方:

private void send(Message m)
{
    if(!isActive)
        return;


    if(rabbitTemplate == null)
    {
        DecLogger.DEC.fine(() -> "Unable to send Rabbit Message - rabbitTemplate is null");
        return;
    }

    if(configSvc.getRabbitQueue() == null)
    {
        DecLogger.DEC.fine(() -> "Unable to send Rabbit Message - queueName is null");
        return;
    }


    ObjectMapper mapper = new ObjectMapper();       

    String time = ZonedDateTime.now().toString();
    m.setEventTime(time);

    try
    {
        String tmpStr = mapper.writeValueAsString(m);

        rabbitTemplate.convertAndSend(configSvc.getRabbitExchange(), configSvc.getRabbitQueue(), tmpStr);

        DecLogger.DEC.finest(() -> "Sent Rabbit Message: " + tmpStr);
    }
    catch(Exception e)
    {
        DecLogger.DEC.fine(() -> "Failed sending Rabbit Message");
        DecLogger.DEC.fine(() -> "Exception: " + e);
    }
}

上面使用的Message类不是来自Spring Framework(我应该重命名它):

abstract class Message 
{
    @JsonProperty
    private String eventTime;

    @JsonProperty 
    private String type;

    @JsonProperty
    protected String sourceHostName;


    public Message(String type, String sourceHostName)
    {
        setType(type);
        setSourceHostName(sourceHostName);
    }

    public void setEventTime(String time)
    {
        this.eventTime = time;
    }

    private void setType(String type)
    {
        this.type = type;
    }

    private void setSourceHostName(String sourceHostName)
    {
        this.sourceHostName = sourceHostName;
    }


}

问题已解决

问题出在生产者方面。这一行:

        rabbitTemplate.convertAndSend(configSvc.getRabbitExchange(), configSvc.getRabbitQueue(), tmpStr);

被替换为这一行:

        rabbitTemplate.convertAndSend(configSvc.getRabbitExchange(), "", tmpStr);

唯一的变化是第二个参数。显然在 convertAndSend() 的第二个参数中提供了路由键导致了这个问题。

【问题讨论】:

    标签: spring-boot spring-amqp


    【解决方案1】:

    你所看到的对我来说毫无意义;我的第一步是打开调试日志并观察消息流——这样你就可以看到消息是否到达了容器。如果没有,您需要寻找其他地方。

    要查找的另一件事是管理控制台中处于未确认状态的消息。这意味着在某个地方还有另一个消费者;您可以在管理控制台上看到消费者。

    旁白:为什么要混合启动 jar 版本?我并不是说这是问题所在,但我会使用 1.2.7 启动器和启动 1.2.7 或将所有内容切换到 1.3.0。

    我看到了问题:

    return new Queue(cs.getRabbitQueue(), false, false, true);
    

    这会创建一个auto-delete 队列。

    因此,当您的应用未运行时,队列会消失,并且发布到 fanout 不会执行任何操作。

    Boot 提供的RabbitAdmin 会在应用启动时重新创建队列。

    试试:

    return new Queue(cs.getRabbitQueue());
    

    应用停止后队列将保留。

    【讨论】:

    • 我编辑了答案 - 当您的应用停止时,队列正在被删除。
    • 嗨,加里,非常感谢您的回复,但我不关注。我启动我的应用程序,我看到它创建了一个队列(我正在使用 rabbit 服务器上的基于 Web 的管理控制台观看它)。然后我会发送一条消息,而我的接收器应用程序中没有任何迹象表明消息已到达。我发送了另一个,它按预期工作。在发送消息和尝试接收消息之间,我从未停止过应用程序。
    • 另外,我打算在应用程序未运行时让队列消失。实际上,自从我发布该注释以改用 AnonymousQueue 类后,我对应用程序进行了相当大的更改,因为我需要多个消费者来接收消息。我得到了那个工作,但是仍然会发生丢失第一条消息的相同问题。当我明天回去工作时,我会尝试你的修复并查看我的新代码,并尝试理解你的意思。非常感谢!
    • 哦!我明白您对引导 jar 版本的意思,感谢您指出这一点。明天我会降级启动器。
    • 这真的没有意义。一定有其他事情发生。感谢您发布完整的应用程序。我将它粘贴到我的,那时我注意到自动删除队列。你能发布你的出版商吗?编辑问题,不要尝试在评论中添加它。还请提供您的确切测试程序/时间。
    猜你喜欢
    • 1970-01-01
    • 2017-01-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-07-08
    • 1970-01-01
    • 1970-01-01
    • 2018-06-24
    相关资源
    最近更新 更多