文章目录1. Publish / Subscribe发布 / 订阅2、引入依赖3、生产者代码编写3.1 创建交换机3.2 声明两个队列3.3 绑定队列和交换机3.4 发送消息3.5 完整代码4、消费者代码编写4.1 消费者一4.1 消费者二5. 观察结果1. Publish / Subscribe发布 / 订阅在发布 / 订阅模型中多了一个 Exchange 角色。Exchange 常见有三种类型分别代表不同的路由规则Fanout广播将消息交给所有绑定到交换机的队列Publish / Subscribe 模式Direct定向把消息交给符合指定 routing key 的队列Routing 模式Topic通配符把消息交给符合 routing pattern路由模式的队列Topics 模式它们分别对应不同的工作模式我们来看看 Publish / Subscribe 模式。步骤1、引入依赖2、编写生产者代码3、编写消费者代码2、引入依赖先引入 rabbitmq 的依赖!-- Source: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.20.0/versionscopecompile/scope/dependency3、生产者代码编写和前面两个的区别是需要创建交换机并且绑定队列和交换机。3.1 创建交换机需要用到下面的函数来创建exchangeDeclare(Stringexchange,BuiltinExchangeTypetype,booleandurable,booleanautoDelete,booleaninternal,MapString,Objectarguments)参数exchange交换机名称type交换机类型DIRECT(“direct”)定向、直连、routingFANOUT(“fanout”)扇形广播每个队列都能收到消息TOPIC(“topic”)通配符HEADERS(“headers”)参数匹配(工作用的较少)durable是否持久化true - 持久化false - 非持久化持久化可以将交换器存盘在服务器重启的时候不会丢失相关信息autoDelete自动删除自动删除的前提是至少有一个队列或者交换器与这个交换器绑定之后所有与这个交换器绑定的队列或者交换器都与此解绑。而不是这种理解当与此交换器连接的客户端都断开时RabbitMQ 会自动删除本交换器。internal内部使用一般为 false如果设置为 true表示内部使用客户端程序无法直接发送消息到这个交换器中只能通过交换器路由到交换器这种方式。arguments参数先在 Constants 函数中定义一个交换机和两个队列// 发布订阅模式publicstaticfinalStringFANOUT_EXCHANGEfanout.exchange;// 声明交换机publicstaticfinalStringFANOUT_QUEUE1fanout.queue1;// 声明队列publicstaticfinalStringFANOUT_QUEUE2fanout.queue2;// 声明队列接着在生产者中编写代码// 3. 声明交换机使用内置的交换机即可channel.exchangeDeclare(Constants.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT,true);3.2 声明两个队列后面验证是否两个队列都能收到消息。// 4. 声明队列// 如果没有一个这样的队列会自动创建如果有则不创建channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);3.3 绑定队列和交换机需要用到的函数如下queueBind(Stringqueue,Stringexchange,StringroutingKey)参数如下queue队列名称exchange交换机名称routingKey路由key路由规则如果交换机类型为 fanout且 routingkey 设置为表示每个消费者都可以收到全部信息。代码如下所示// 5. 绑定队列和交换机channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,);channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,);3.4 发送消息需要用到下面的函数basicPublish(Stringexchange,StringroutingKey,AMQP.BasicPropertiesprops,byte[]body)参数说明Exchange交换机名称routingKey如果交换机类型为 fanout且 routingkey 设置为表示每个消费者都可以收到全部信息。代码如下所示// 6. 发送消息StringmsgHello fanout... ;channel.basicPublish(Constants.FANOUT_EXCHANGE,,null,msg.getBytes());System.out.println(消息发送成功);3.5 完整代码如下所示packagefanout;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassProducer{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactorynewConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnectionfactory.newConnection();// 2. 开启 channel 通道Channelchannelconnection.createChannel();// 3. 声明交换机使用内置的交换机即可channel.exchangeDeclare(Constants.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT,true);// 4. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);// 5. 绑定队列和交换机channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,);channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,);// 6. 发送消息StringmsgHello fanout... ;channel.basicPublish(Constants.FANOUT_EXCHANGE,,null,msg.getBytes());System.out.println(消息发送成功);// 7. 资源释放channel.close();connection.close();}}4、消费者代码编写交换机和队列的绑定关系及声明已经在生产方写完所以消费者不需要再写了。这里只需要去掉声明队列的代码就可以了。1、创建Channel2、接收消息并处理4.1 消费者一代码如下所示packagefanout;importcom.rabbitmq.client.*;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer1{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactorynewConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnectionfactory.newConnection();// 2. 开启 channel 通道Channelchannelconnection.createChannel();// 3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);// 4. 接收消息并消费DefaultConsumerconsumernewDefaultConsumer(channel){// 从队列中收到消息后, 就会执行的方法OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{// 收到消息以后就进行打印System.out.println(接收到消息: newString(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1,true,consumer);// 5. 不需要释放资源}}4.1 消费者二代码如下所示packagefanout;importcom.rabbitmq.client.*;importconstant.Constants;importjava.io.IOException;importjava.util.concurrent.TimeoutException;publicclassConsumer2{publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{// 1. 建立连接ConnectionFactoryfactorynewConnectionFactory();factory.setHost(Constants.HOST);// MQ所在的服务器地址factory.setPort(Constants.PORT);// 端口号factory.setUsername(Constants.USERNAME);// 账号factory.setPassword(Constants.PASSWORD);// 密码factory.setVirtualHost(Constants.VIRTUAL_HOST);// 虚拟主机Connectionconnectionfactory.newConnection();// 2. 开启 channel 通道Channelchannelconnection.createChannel();// 3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);// 4. 接收消息并消费DefaultConsumerconsumernewDefaultConsumer(channel){// 从队列中收到消息后, 就会执行的方法OverridepublicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{// 收到消息以后就进行打印System.out.println(接收到消息: newString(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2,true,consumer);// 5. 不需要释放资源}}5. 观察结果先运行生产者可以看到两个队列分别有了一条消息。然后在 Exchange 中找到咱们自己创建的交换机 fanout.exchange可以看到 fanout.exchange 中多了队列绑定关系。然后运行 Consumer 1 代码然后运行 Consumer 2 代码可以看到生产者只发送了 1 次消息而两个消费者都收到了该消息。