【问题标题】:Create Exchange and queues in Qpid broker在 Qpid 代理中创建 Exchange 和队列
【发布时间】:2018-10-26 02:46:51
【问题描述】:

您好,我正在使用嵌入式代理 Qpid 测试 Spring 集成项目。但问题是我如何在 qpid 中进行队列和交换。我认为 rabbit-config.xml 会在 qpid 代理中进行队列和交换,但无济于事。我的流程是创建队列并在 qpid 代理中交换将消息传递给它们,并且绑定到这些队列的入站 amqp 适配器将获取消息,我可以继续测试

错误:队列:在 VirtualHost“默认”上找不到“push.customer.arkona.controller.search”。

    qpid-config.json:

    {   "name": "EmbeddedBroker",   "modelVersion": "2.0",   "storeVersion" : 1,   "authenticationproviders" : [ {
        "name" : "noPassword",
        "type" : "Anonymous",
        "secureOnlyMechanisms": []
            },
        {
          "name" : "passwordFile",
          "type" : "PlainPasswordFile",
          "path" : "${qpid.home_dir}${file.separator}src${file.separator}main${file.separator}resources${file.separator}password.properties",
          "secureOnlyMechanisms": []
        }    ],   "ports" : [
        {
          "name": "AMQP",
          "port": "${qpid.amqp_port}",
          "authenticationProvider": "passwordFile",
          "protocols": [
            "AMQP_0_10",
            "AMQP_0_8",
            "AMQP_0_9",
            "AMQP_0_9_1"
          ]
        }],
        "virtualhostnodes" : [ {
        "name" : "default",
        "type" : "JSON",
        "defaultVirtualHostNode" : "true",
        "virtualHostInitialConfiguration" : "${qpid.initial_config_virtualhost_config}",
         "storeType" : "DERBY"   
} 
] 
}

password.properties 有

客人:客人

我创建了一个单独的配置文件来运行我的测试。这是rabbitmq配置。除此之外,我还有一个 rabbit-context xml 文件,其中定义了所有队列、交换。

@Configuration
@Profile("qpid")
public class QpidConfig {

    String amqpPort = "5672";

    //String qpidHomeDir = "complete";
    String configFileName = "src/main/resources/qpid-config.json";

    @Bean
    BrokerOptions brokerOptions() {

        File tmpFolder= Files.createTempDir();

        //small hack, because userDir is not same when running Application and ApplicationTest
        //it leads to some issue locating the files after, so hacking it here
        String userDir=System.getProperty("user.dir").toString();

        File file = new File(userDir);
        String homePath = file.getAbsolutePath();

        BrokerOptions brokerOptions=new BrokerOptions();

        brokerOptions.setConfigProperty("qpid.work_dir", tmpFolder.getAbsolutePath());
        brokerOptions.setConfigProperty("qpid.amqp_port",amqpPort);
        brokerOptions.setConfigProperty("qpid.home_dir", homePath);
        brokerOptions.setInitialConfigurationLocation(homePath + "/"+configFileName);

        return brokerOptions;
    }

    @SuppressWarnings("rawtypes")
    @Bean
    Broker broker() throws Exception {

            org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
        broker.startup(brokerOptions());
        return (Broker) broker;
    }

    private ConnectionFactory connectionFactory() {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        return factory;
    }

    @Bean(name ="rabbitConnectionFactory")
    public CachingConnectionFactory rabbitConnectionFactory(){
        return new CachingConnectionFactory(connectionFactory());
    }

    @Bean(name="rabbitTemplate")
    public RabbitTemplate rabbitTemplate(){
        return new RabbitTemplate(rabbitConnectionFactory());
    }

    @Bean(name ="arkonaHeaderMapper")
    public DefaultAmqpHeaderMapper syncerHeaderMapper() {
        DefaultAmqpHeaderMapper amqpHeaderMapper = DefaultAmqpHeaderMapper.inboundMapper();
        amqpHeaderMapper.setRequestHeaderNames("*");
        amqpHeaderMapper.setReplyHeaderNames("*");
        return amqpHeaderMapper;
    }


}

编辑

我的 rabbit-context.xml

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <rabbit:queue name="pull.appt.arkona.scheduler.adapter" />
    <rabbit:queue name="pull.appt.arkona.adapter.processor" />

    <rabbit:queue name="pull.customer.arkona.to.lookup" />
    <rabbit:queue name="pull.customer.arkona.lookup.processor" />

    <rabbit:queue name="pull.customer.arkona.scheduler.adapter" />

    <rabbit:queue name="pull.ro.arkona.to.lookup" />
    <rabbit:queue name="pull.ro.arkona.adapter.processor" />

    <rabbit:queue name="pull.ro.arkona.scheduler.adapter" />

    <rabbit:queue name="pull.closed.arkona.scheduler.adapter" />
    <rabbit:queue name="pull.parts.arkona.scheduler.adapter" />

    <rabbit:queue name="pull.closed.arkona.adapter.processor" />
    <rabbit:queue name="pull.parts.arkona.adapter.processor" />

    <rabbit:queue name="pull.vehicle.arkona.to.lookup" />
    <rabbit:queue name="pull.vehicle.arkona.lookup.processor" />

    <rabbit:direct-exchange name="dms.arkona.exchange" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="pull.appt.arkona.scheduler.adapter" key="pull.appt.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.appt.arkona.adapter.processor" key="pull.appt.arkona.adapter.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.customer.arkona.to.lookup" key="pull.customer.arkona.to.lookup.key"></rabbit:binding>
            <rabbit:binding queue="pull.customer.arkona.lookup.processor" key="pull.customer.arkona.lookup.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.customer.arkona.scheduler.adapter" key="pull.customer.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.ro.arkona.to.lookup" key="pull.ro.arkona.to.lookup.key"></rabbit:binding>
            <rabbit:binding queue="pull.ro.arkona.adapter.processor" key="pull.ro.arkona.adapter.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.ro.arkona.scheduler.adapter" key="pull.ro.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.vehicle.arkona.to.lookup" key="pull.vehicle.arkona.to.lookup.key"></rabbit:binding>
            <rabbit:binding queue="pull.vehicle.arkona.lookup.processor" key="pull.vehicle.arkona.lookup.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.closed.arkona.scheduler.adapter" key="pull.closed.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.closed.arkona.adapter.processor" key="pull.closed.arkona.adapter.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.parts.arkona.scheduler.adapter" key="pull.parts.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.parts.arkona.adapter.processor" key="pull.parts.arkona.adapter.processor.key"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
</beans>

【问题讨论】:

    标签: unit-testing junit rabbitmq spring-integration qpid


    【解决方案1】:

    您的应用程序上下文中是否有RabbitAdmin? (它检测队列/交换/绑定,并在建立连接时声明它们)。

    我刚刚使用 QPID 6.1.2 测试了 Spring Integration AMQP Sample,它创建的一切正常...

    <!-- Infrastructure -->
    
    <rabbit:connection-factory id="connectionFactory" host="xx.xx.xx.xx" virtual-host="default" />
    
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
    
    <rabbit:admin connection-factory="connectionFactory" />
    
    <rabbit:queue name="si.test.queue" />
    
    <rabbit:direct-exchange name="si.test.exchange">
        <rabbit:bindings>
            <rabbit:binding queue="si.test.queue" key="si.test.binding" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
    

    编辑

    启动应用对我来说也很好用...

    @SpringBootApplication
    public class So50364236Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So50364236Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> template.convertAndSend("so50364236", "foo");
        }
    
        @Bean
        public Queue queue() {
            return new Queue("so50364236");
        }
    
        @RabbitListener(queues = "so50364236")
        public void listen(String in) {
            System.out.println(in);
        }
    
    }
    

    spring.rabbitmq.addresses=xx.x.x.x
    spring.rabbitmq.virtual-host=default
    

    2018-05-16 13:17:25.013  INFO 34714 --- [           main] com.example.So50364236Application        : Started So50364236Application in 1.151 seconds (JVM running for 1.579)
    foo
    

    我在代理的管理页面上看到了队列。

    EDIT2

    这是另一个在 XML 文件中声明队列的启动应用程序;使用 QPID 6.1.6 嵌入式...

    qpid-config.json

    {
        "name": "EmbeddedBroker",
        "modelVersion": "2.0",
        "storeVersion": 1,
        "authenticationproviders": [
            {
                "name": "noPassword",
                "type": "Anonymous",
                "secureOnlyMechanisms": []
            },
            {
                "name": "passwordFile",
                "type": "PlainPasswordFile",
                "path": "${qpid.home_dir}${file.separator}etc${file.separator}passwd",
                "secureOnlyMechanisms": []
            }
        ],
        "ports": [
            {
                "name": "AMQP",
                "port": "${qpid.amqp_port}",
                "authenticationProvider": "passwordFile",
                "protocols": [
                    "AMQP_0_10",
                    "AMQP_0_8",
                    "AMQP_0_9",
                    "AMQP_0_9_1"
                ]
            }
        ],
        "virtualhostnodes": [
            {
                "name": "default",
                "type": "JSON",
                "defaultVirtualHostNode": "true",
                "virtualHostInitialConfiguration": "${qpid.initial_config_virtualhost_config}",
                "storeType": "DERBY"
            }
        ]
    }
    

    config.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    
        <rabbit:queue name="so50364236b" />
    
    </beans>
    

    application.properties

    spring.rabbitmq.addresses=localhost:8888
    

    启动应用

    @SpringBootApplication
    @ImportResource("config.xml")
    public class So50364236Application {
    
        public static void main(String[] args) {
            new SpringApplicationBuilder(So50364236Application.class)
                .web(WebApplicationType.NONE)
                .run(args);
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> template.convertAndSend("so50364236b", "foo");
        }
    
        @Bean
        BrokerOptions brokerOptions() throws Exception {
    
            Path tmpFolder = Files.createTempDirectory("qpidWork");
            Path homeFolder = Files.createTempDirectory("qpidHome");
            File etc = new File(homeFolder.toFile(), "etc");
            etc.mkdir();
            FileOutputStream fos = new FileOutputStream(new File(etc, "passwd"));
            fos.write("guest:guest\n".getBytes());
            fos.close();
    
            BrokerOptions brokerOptions = new BrokerOptions();
    
            brokerOptions.setConfigProperty("qpid.work_dir", tmpFolder.toAbsolutePath().toString());
            brokerOptions.setConfigProperty("qpid.amqp_port", "8888");
            brokerOptions.setConfigProperty("qpid.home_dir", homeFolder.toAbsolutePath().toString());
            Resource config = new ClassPathResource("qpid-config.json");
            brokerOptions.setInitialConfigurationLocation(config.getFile().getAbsolutePath());
    
            return brokerOptions;
        }
    
        @Bean
        Broker broker() throws Exception {
            org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
            broker.startup(brokerOptions());
            return broker;
        }
    
        @RabbitListener(queues = "so50364236b")
        public void listen(String in) {
            System.out.println(in);
        }
    
    }
    

    [Broker] BRK-1004 : Qpid Broker Ready
    received: foo
    

    也许您正在做的事情导致未声明引导的管理员。不清楚为什么要添加自己的连接工厂和模板;您是否也尝试过添加自己的RabbitAdmin

    【讨论】:

    • 好吧,你的意思是说我需要为 rabbit admin 创建另一个 rabbit-context.xml?因为我已经有了 rabbit-context
    • 我正在使用 spring-boot,并认为它会自动使用这些绑定创建 rabbit admin bean
    • 一个启动应用程序也适合我。请参阅我的答案的编辑。编辑您的问题以显示您的引导代码和配置。
    • 每当我运行测试时,日志都会显示它从 rabbit-context main] osbfxml.XmlBeanDefinitionReader 加载 bean 定义:从类路径资源加载 XML bean 定义 [rabbitmq-context. xml]
    • 仍然收到错误 reply-code=404, reply-text=Queue: 'push.customer.arkona.controller.add' not found on VirtualHost 'default'。 i>
    猜你喜欢
    • 2013-09-10
    • 2019-12-14
    • 1970-01-01
    • 2018-12-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-28
    • 2022-01-20
    相关资源
    最近更新 更多