久久久国产一区_国产综合久久久久_欧美亚洲丝袜_成人综合国产精品

合作QQ:25496334 TG@heimao_wiki
當前位置:首頁 >> 黑帽SEO優化 >> SEO技術 >> 福建最全黑帽seo程序代碼合集:RabbitMQ六種工作模式的對比與實踐_黑帽SEO排名

福建最全黑帽seo程序代碼合集:RabbitMQ六種工作模式的對比與實踐_黑帽SEO排名

黑帽白白白 SEO技術 771
:C++ 變量判定的螺旋法則

最近學習RabbitMQ的使用方式,記錄下來,方便以后使用,也方便和大家共享,相互交流。

RabbitMQ的六種工作模式:

1、Work queues
2、Publish/subscribe
3、Routing
4、Topics
5、Header 模式
6、RPC

一、Work queues

多個消費端消費同一個隊列中的消息,隊列采用輪詢的方式將消息是平均發送給消費者;

 

 特點:

1、一條消息只會被一個消費端接收;

2、隊列采用輪詢的方式將消息是平均發送給消費者的;

3、消費者在處理完某條消息后,才會收到下一條消息

生產端:

1、聲明隊列

2、創建連接

3、創建通道

4、通道聲明隊列

5、制定消息

6、發送消息,使用默認交換機

消費端:

1、聲明隊列

2、創建連接

3、創建通道

4、通道聲明隊列

5、重寫消息消費方法

6、執行消息方法

新建兩個maven工程,生產消息的生產端,消費消息的消費端;

pom.xml文件中依賴坐標如下:

<dependencies>
    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.0</version>
        </dependency>
</dependencies>

 生產端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、聲明隊列
2、創建連接
3、創建通道
4、通道聲明隊列
5、制定消息
6、發送消息,使用默認交換機
*/
public class Producer02 {
    //聲明隊列
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務器
            //創建與RabbitMQ服務的TCP連接
            connection = connectionFactory.newConnection();
            //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();

            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列

            for(int i = 0;i<10;i++){
                String message = new String("mq 發送消息。。。");
                /**
                  * 消息發布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
                  * 默認的交換機,routingKey等于隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("",QUEUE,null,message.getBytes("utf-8"));
                System.out.println("mq消息發送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消費端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、聲明隊列
2、創建連接
3、創建通道
4、通道聲明隊列
5、重寫消息消費方法
6、執行消息方法
*/
public class Consumer02 {
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列

            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費者接收消息調用此方法
                  * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定
                  * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志
                    (收到消息失敗后是否需要重新發送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }

            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE,true,consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

生產端啟動后,控制臺打印信息如下:

 RabbitMQ中的已有消息:

 queue中的消息正是生產端發送的消息:

 二、Publish/subscribe 模式

這種模式又稱為發布訂閱模式,相對于Work queues模式,該模式多了一個交換機,生產端先把消息發送到交換機,再由交換機把消息發送到綁定的隊列中,每個綁定的隊列都能收到由生產端發送的消息。

發布訂閱模式:

1、每個消費者監聽自己的隊列;

2、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收
到消息

應用場景:用戶通知,當用戶充值成功或轉賬完成系統通知用戶,通知方式有短信、郵件多種方法;

生產端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機

7、制定消息

8、發送消息

消費端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機

7、重寫消息消費方法

8、執行消息方法

Publish/subscribe 模式綁定兩個消費端,因此需要有兩個消費端,一個郵件消費端,一個短信消費端;

生產端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer01 {
    //聲明兩個隊列和一個交換機
    //Publish/subscribe發布訂閱模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務器
            //創建與RabbitMQ服務的TCP連接
            connection = connectionFactory.newConnection();
            //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
            //通道綁定交換機
            /**
              * 參數明細
              * 1、交換機名稱
              * 2、交換機類型,fanout、topic、direct、headers
              */
            //Publish/subscribe發布訂閱模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊列
            //交換機和隊列綁定
            /**
             * 參數明細
             * 1、隊列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Publish/subscribe發布訂閱模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");
            channel.queueBind(QUEUE_SMS,EXCHANGE,"");
            for(int i = 0;i<10;i++){
                String message = new String("mq 發送消息。。。");
                /**
                  * 消息發布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
                  * 默認的交換機,routingKey等于隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                //Publish/subscribe發布訂閱模式
                channel.basicPublish(EXCHANGE,"",null,message.getBytes());
                System.out.println("mq消息發送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

郵件消費端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer01 {
    //Publish/subscribe發布訂閱模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機
            /**
              * 參數明細
              * 1、交換機名稱
              * 2、交換機類型,fanout、topic、direct、headers
              */
            //Publish/subscribe發布訂閱模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列
            //交換機和隊列綁定
            /**
             * 參數明細
             * 1、隊列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Publish/subscribe發布訂閱模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
              * 消費者接收消息調用此方法
              * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定
              * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志
                (收到消息失敗后是否需要重新發送)
              * @param properties
              * @param body
              * @throws IOException
              * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
              */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                envelope.getDeliveryTag();
                String msg = new String(body,"utf-8");
                System.out.println("mq收到的消息是:"+msg );
            }
            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE_EMAIL,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

短信消費端的代碼如下:

package xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer01 {
    //Publish/subscribe發布訂閱模式
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機
            /**
              * 參數明細
              * 1、交換機名稱
              * 2、交換機類型,fanout、topic、direct、headers
              */
            //Publish/subscribe發布訂閱模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊列
            //交換機和隊列綁定
            /**
             * 參數明細
             * 1、隊列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Publish/subscribe發布訂閱模式
            channel.queueBind(QUEUE_SMS,EXCHANGE,"");
            DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
              * 消費者接收消息調用此方法
              * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定
              * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志
                (收到消息失敗后是否需要重新發送)
              * @param properties
              * @param body
              * @throws IOException
              * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
              */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                envelope.getDeliveryTag();
                String msg = new String(body,"utf-8");
                System.out.println("mq收到的消息是:"+msg );
            }

            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE_SMS,true,consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

三、Routing 路由模式

Routing 模式又稱路由模式,該種模式除了要綁定交換機外,發消息的時候還要制定routing key,即路由key,隊列通過通道綁定交換機的時候,需要指定自己的routing key,這樣,生產端發送消息的時候也會指定routing key,通過routing key就可以把相應的消息發送到綁定相應routing key的隊列中去。

路由模式:

1、每個消費者監聽自己的隊列,并且設置routingkey;
2、生產者將消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列;

應用場景:用戶通知,當用戶充值成功或轉賬完成系統通知用戶,通知方式有短信、郵件多種方法;

生產端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

,  【聲音】【量天】【矗立】【能量】,【方的】【戰場】【紫真】【又不】,【飄散】【擊螞】【當下】【尊大】【斷了】.【里面】【骨下】【暢沒】【擊中】【作勢】,【新派】【神族】【是一】【活意】,【行設】【有黑】【非?!俊居蚶铩俊疽孕巍?【案發】【歸入】【間都】【血河】【音似】【到沒】,【微微】【毒蛤】【脫了】【這尊】,【掉了】【已經】【凜然】【筑前】【在左】,【一望】【人真】【眼的】.【的陰】【戰斗】【是一】【鎖區】,【好歹】【展鯤】【難性】【掉這】,【噬整】【可以】【真的】【白象】.【士卒】!【覺要】【雨般】【體積】【里卻】【生命】【個黑】【神強】.【只有】,

6、通過通道使隊列綁定到交換機并指定該隊列的routingkey

7、制定消息

8、發送消息并指定routingkey

消費端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機并指定routingkey

7、重寫消息消費方法

8、執行消息方法

按照假設的應用場景,同樣,Routing 路由模式也是一個生產端,兩個消費端,所不同的是,聲明交換機的類型不同,隊列綁定交換機的時候需要指定Routing key,發送消息的時候也需要指定Routing key,這樣根據Routing key就能把相應的消息發送到相應的隊列中去。

生產端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer03 {
    //聲明兩個隊列和一個交換機
    //Routing 路由模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務器
            //創建與RabbitMQ服務的TCP連接
            connection = connectionFactory.newConnection();
            //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
            //通道綁定交換機
            /**
             * 參數明細
             * 1、交換機名稱
             * 2、交換機類型,fanout、topic、direct、headers
             */
            //Routing 路由模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊列
            //交換機和隊列綁定
            /**
             * 參數明細
             * 1、隊列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Routing 路由模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);
            channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);
            //給email隊列發消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發送email消息。。。");
                /**
                  * 消息發布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
                  * 默認的交換機,routingKey等于隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                //Routing 路由模式
                channel.basicPublish(EXCHANGE,QUEUE_EMAIL,null,message.getBytes());
                System.out.println("mq消息發送成功!");
            }
            //給sms隊列發消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發送sms消息。。。");
                /**
                  * 消息發布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
                  * 默認的交換機,routingKey等于隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                //Routing 路由模式
                channel.basicPublish(EXCHANGE,QUEUE_SMS,null,message.getBytes());
                System.out.println("mq消息發送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

郵件消費端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer03 {
    //Routing 路由模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機
            /**
              * 參數明細
              * 1、交換機名稱
              * 2、交換機類型,fanout、topic、direct、headers
              */
            //Routing 路由模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列
            //交換機和隊列綁定
            /**
             * 參數明細
             * 1、隊列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Routing 路由模式
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);
            //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費者接收消息調用此方法
                  * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定
                  * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志
                    (收到消息失敗后是否需要重新發送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }

            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE_EMAIL,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

短信消費端的代碼如下:

package xyfer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer03 {
    //Routing 路由模式
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //通道綁定交換機
            /**
              * 參數明細
              * 1、交換機名稱
              * 2、交換機類型,fanout、topic、direct、headers
              */
            //Routing 路由模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊列
            //交換機和隊列綁定
            /**
             * 參數明細
             * 1、隊列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            //Routing 路由模式
            channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                  * 消費者接收消息調用此方法
                  * @param consumerTag 消費者的標簽,在channel.basicConsume()去指定
                  * @param envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志
                    (收到消息失敗后是否需要重新發送)
                  * @param properties
                  * @param body
                  * @throws IOException
                 * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //交換機
                    String exchange = envelope.getExchange();
                    //路由key
                    String routingKey = envelope.getRoutingKey();
                    envelope.getDeliveryTag();
                    String msg = new String(body,"utf-8");
                    System.out.println("mq收到的消息是:"+msg );
                }
            };
            System.out.println("消費者啟動成功!");
            channel.basicConsume(QUEUE_SMS,true,consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

四、Topics 模式

Topics 模式和Routing 路由模式最大的區別就是,Topics 模式發送消息和消費消息的時候是通過通配符去進行匹配的。

路由模式:

1、每個消費者監聽自己的隊列,并且設置帶統配符的routingkey

2、生產者將消息發給broker,由交換機根據routingkey來轉發消息到指定的隊列

應用場景:用戶通知,當用戶充值成功或轉賬完成系統通知用戶,通知方式有短信、郵件多種方法;

生產端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機并指定該隊列的routingkey(通配符)

7、制定消息

8、發送消息并指定routingkey(通配符)

消費端:

1、聲明隊列,聲明交換機

2、創建連接

3、創建通道

4、通道聲明交換機

5、通道聲明隊列

6、通過通道使隊列綁定到交換機并指定routingkey(通配符)

7、重寫消息消費方法

8、執行消息方法

按照假設的應用場景,Topics 模式也是一個生產端,兩個消費端,生產端隊列綁定交換機的時候,需要指定的routingkey是通配符,發送消息的時候綁定的routingkey也是通配符,消費端隊列綁定交換機的時候routingkey也是通配符,這樣就能根據通配符匹配到消息了。

生產端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer04 {
    //聲明兩個隊列和一個交換機
    //Topics 模式
    private static final String QUEUE_EMAIL ="queueEmail";
    private static final String QUEUE_SMS ="queueSms";
    private static final String EXCHANGE = "messageChange";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務器
            //創建與RabbitMQ服務的TCP連接
            connection = connectionFactory.newConnection();
            //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();
            //通道綁定交換機
            /**
              * 參數明細
              * 1、交換機名稱
              * 2、交換機類型,fanout、topic、direct、headers
              */
            //Topics 模式
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);//通道綁定郵件隊列
            channel.queueDeclare(QUEUE_SMS,true,false,false,null);//通道綁定短信隊列
            //交換機和隊列綁定
            /**
             * 參數明細
             * 1、隊列名稱
             * 2、交換機名稱
             * 3、路由key
             */
            channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#");
            channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#");
            //給email隊列發消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發送email消息。。。");
                /**
                  * 消息發布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
                  * 默認的交換機,routingKey等于隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish(EXCHANGE,"inform.email",null,message.getBytes());
                System.out.println("mq email 消息發送成功!");
            }
            //給sms隊列發消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發送sms消息。。。");
                /**
                  * 消息發布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
                  * 默認的交換機,routingKey等于隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish(EXCHANGE,"inform.sms",null,message.getBytes());
                System.out.println("mq sms 消息發送成功!");
            }
            //給email和sms隊列發消息
            for(int i = 0;i<10;i++){
                String message = new String("mq 發送email sms消息。。。");
                /**
                  * 消息發布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
                  * 默認的交換機,routingKey等于隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish(EXCHANGE,"inform.email.sms",null,message.getBytes());
                System.out.println("mq email sms 消息發送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

郵件消費端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、聲明隊列
2、創建連接
3、創建通道
4、通道聲明隊列
5、制定消息
6、發送消息,使用默認交換機
*/
public class Producer02 {
    //聲明隊列
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務器
            //創建與RabbitMQ服務的TCP連接
            connection = connectionFactory.newConnection();
            //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();

            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列

            for(int i = 0;i<10;i++){
                String message = new String("mq 發送消息。。。");
                /**
                  * 消息發布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
                  * 默認的交換機,routingKey等于隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("",QUEUE,null,message.getBytes("utf-8"));
                System.out.println("mq消息發送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}0

短信消費端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、聲明隊列
2、創建連接
3、創建通道
4、通道聲明隊列
5、制定消息
6、發送消息,使用默認交換機
*/
public class Producer02 {
    //聲明隊列
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務器
            //創建與RabbitMQ服務的TCP連接
            connection = connectionFactory.newConnection();
            //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();

            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列

            for(int i = 0;i<10;i++){
                String message = new String("mq 發送消息。。。");
                /**
                  * 消息發布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
                  * 默認的交換機,routingKey等于隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("",QUEUE,null,message.getBytes("utf-8"));
                System.out.println("mq消息發送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}1

由于生產端同時發送了email的消息(10條),sms的消息(10條),email和sms同時收到的消息(10條),所以每個消費端都應收到各自的10條消息,加上同時都能收到的10條消息,每個消費端應該收到20條消息;

生產端控制臺打印:

 郵件消費端控制臺打?。?/p>

 短信消費端的控制臺打?。?/p>

 生產端執行后,RabbitMQ上的消息隊列情況:

 兩個消費端執行完后,RabbitMQ上的消息隊列情況:

 五、Header 模式

header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配隊列。

案例:

根據用戶的通知設置去通知用戶,設置接收Email的用戶只接收Email,設置接收sms的用戶只接收sms,設置兩種通知類型都接收的則兩種通知都有效。

根據假設使用場景,需要一個生產端,兩個消費端,不同的是,生產端聲明交換機時,交換機的類型不同,是headers類型,生產端隊列綁定交換機時,不使用routingkey,而是使用header中的 key/value(鍵值對)匹配隊列,發送消息時也是使用header中的 key/value(鍵值對)匹配隊列。

消費端同樣是聲明交換機時,交換機的類型不同,是headers類型,消費端隊列綁定交換機時,不使用routingkey,而是使用header中的 key/value(鍵值對)匹配隊列,消費消息時也是使用header中的 key/value(鍵值對)匹配隊列。

生產端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、聲明隊列
2、創建連接
3、創建通道
4、通道聲明隊列
5、制定消息
6、發送消息,使用默認交換機
*/
public class Producer02 {
    //聲明隊列
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務器
            //創建與RabbitMQ服務的TCP連接
            connection = connectionFactory.newConnection();
            //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();

            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列

            for(int i = 0;i<10;i++){
                String message = new String("mq 發送消息。。。");
                /**
                  * 消息發布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
                  * 默認的交換機,routingKey等于隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("",QUEUE,null,message.getBytes("utf-8"));
                System.out.println("mq消息發送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}2

郵件消費端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、聲明隊列
2、創建連接
3、創建通道
4、通道聲明隊列
5、制定消息
6、發送消息,使用默認交換機
*/
public class Producer02 {
    //聲明隊列
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務器
            //創建與RabbitMQ服務的TCP連接
            connection = connectionFactory.newConnection();
            //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();

            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列

            for(int i = 0;i<10;i++){
                String message = new String("mq 發送消息。。。");
                /**
                  * 消息發布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
                  * 默認的交換機,routingKey等于隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("",QUEUE,null,message.getBytes("utf-8"));
                System.out.println("mq消息發送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}3

短信消費端的代碼如下:

package com.xyfer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*
1、聲明隊列
2、創建連接
3、創建通道
4、通道聲明隊列
5、制定消息
6、發送消息,使用默認交換機
*/
public class Producer02 {
    //聲明隊列
    private static final String QUEUE ="queue";
    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");//mq服務ip地址
            connectionFactory.setPort(5672);//mq client連接端口
            connectionFactory.setUsername("guest");//mq登錄用戶名
            connectionFactory.setPassword("guest");//mq登錄密碼
            connectionFactory.setVirtualHost("/");//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務器
            //創建與RabbitMQ服務的TCP連接
            connection = connectionFactory.newConnection();
            //創建與Exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
            channel = connection.createChannel();

            //通道綁定隊列
            /**
             * 聲明隊列,如果Rabbit中沒有此隊列將自動創建
             * param1:隊列名稱
             * param2:是否持久化
             * param3:隊列是否獨占此連接
             * param4:隊列不再使用時是否自動刪除此隊列
             * param5:隊列參數
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             */
            channel.queueDeclare(QUEUE,true,false,false,null);//通道綁定郵件隊列

            for(int i = 0;i<10;i++){
                String message = new String("mq 發送消息。。。");
                /**
                  * 消息發布方法
                  * param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
                  * param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉發到指定的消息隊列
                  * param3:消息包含的屬性
                  * param4:消息體
                  * 這里沒有指定交換機,消息將發送給默認交換機,每個隊列也會綁定那個默認的交換機,但是不能顯示綁定或解除綁定
                  * 默認的交換機,routingKey等于隊列名稱
                 */
                //String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("",QUEUE,null,message.getBytes("utf-8"));
                System.out.println("mq消息發送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}4

生產端啟動后RabbitMQ上面的消息隊列情況:

六、RPC 模式

 RPC即客戶端遠程調用服務端的方法 ,使用MQ可以實現RPC的異步調用,基于Direct交換機實現,流程如下:

1、客戶端即是生產者也是消費者,向RPC請求隊列發送RPC調用消息,同時監聽RPC響應隊列。

2、服務端監聽RPC請求隊列的消息,收到消息后執行服務端的方法,得到方法返回的結果。

3、服務端將RPC方法 的結果發送到RPC響應隊列。

4、客戶端(RPC調用方)監聽RPC響應隊列,接收到RPC調用結果。

 

至此,RabbitMQ的六種工作模式已經介紹完畢,手動代碼實現,實際體驗六種工作模式的不同。

 

。轉載請注明來源地址:黑帽SEO http://m.790079.com 專注于SEO培訓,快速排名
黑帽WiKi_黑帽百科(m.790079.com),8年黑帽SEO優化技術,黑帽seo快速排名,黑帽seo技術培訓學習,黑帽SEO快速排名程序、泛目錄、寄生蟲技術,贈送免費黑帽SEO視頻教程

黑帽SEO技術,網站快速排名,蜘蛛池加速收錄,目錄程序定制)

掃一下添加微信:



協助本站SEO優化一下,謝謝!
關鍵詞不能為空

免責聲明

資料匯總于網絡,如有侵權 聯系站長刪除 http://m.790079.com

同類推薦
久久久国产一区_国产综合久久久久_欧美亚洲丝袜_成人综合国产精品
丝袜美腿亚洲一区二区| 久热精品视频在线观看| 精品一区国产| 欧美影视一区二区| 日韩中文字幕在线视频观看| 一区二区三区视频| 欧美日韩电影在线观看| 久久久久国色av免费观看性色| 国产精品情侣自拍| 国产精品免费久久久| 国产精品久久久久久久久影视 | 国产综合久久久久久| 欧美性视频在线播放| 欧美日韩dvd| 欧美日韩亚洲一区二区三区四区| 日韩精品一区二区三区四| 日韩美女免费线视频| 欧美精品久久| 欧美日韩三区四区| 黄色国产一级视频| 国产亚洲二区| www国产免费| 国产激情在线观看视频| 久久久久在线观看| 国产精品欧美亚洲777777| 国产精品高清免费在线观看| 中文网丁香综合网| 日韩在线第三页| 欧美中文在线视频| 国产视频一区二区三区四区| 国产精品亚洲视频在线观看| 久久久影视精品| 久久精品视频99| 国产精品第12页| 亚洲一区免费看| 青青青国产精品一区二区| 黄色片视频在线免费观看| 国产精品一区二区久久精品| 久无码久无码av无码| 国产精品久久久久久久久久免费 | 国产精品亚洲美女av网站| 久久久一二三四| 国产精品久久久久久五月尺 | 欧美亚洲视频在线看网址| 国产在线视频2019最新视频| 91精品久久久久久久久久久久久| 久久久久久久久久久免费精品| 精品久久久久久久免费人妻| 日本精品一区二区三区在线播放视频 | 国产精品日日摸夜夜添夜夜av | 久久九九有精品国产23| 国产精品久久久久久一区二区| 亚洲精品欧美一区二区三区| 男女午夜激情视频| 91精品国产综合久久男男| 国产精品欧美激情在线播放| 无码av天堂一区二区三区| 精品一区二区中文字幕| 色老头一区二区三区| 亚洲欧美日韩精品在线| 美乳视频一区二区| 日韩中文综合网| 欧美一区二区三区四区在线| 国产一区二区在线视频播放| 日韩在线播放一区| 亚洲高清视频一区二区| 欧美极品少妇无套实战| 91国自产精品中文字幕亚洲| 精品久久久久久综合日本| 欧美性视频网站| 日韩在线精品视频| 亚洲一区二区三区av无码| 国产三级精品在线不卡| 国产精品老牛影院在线观看| 日韩免费在线观看视频| 国产传媒一区二区三区| 亚洲欧洲日韩综合二区| 国产精品一区久久久| 国产精品第100页| 欧美日韩在线不卡视频| 国产不卡一区二区视频| 中文字幕一区二区三区四区五区| 韩国v欧美v日本v亚洲| 日韩亚洲欧美中文在线| 日韩av高清在线播放| 91传媒免费视频| 亚洲国产成人不卡| 91精品免费视频| 亚洲 中文字幕 日韩 无码| 成人在线小视频| 在线丝袜欧美日韩制服| 国产精品夜夜夜爽张柏芝| 精品国产乱码久久久久久郑州公司| 狠狠色狠狠色综合人人| 国产精品果冻传媒潘| 黄色免费视频大全| 国产精品丝袜白浆摸在线| 日韩精品视频一区二区在线观看 | 国产精品av免费在线观看| 伊人久久av导航| 国产精品永久免费观看| 欧美激情日韩图片| 国产精品专区h在线观看| 欧美精品第一页在线播放| 国产日本一区二区三区| 色在人av网站天堂精品| 粉嫩高清一区二区三区精品视频| 欧美大片欧美激情性色a∨久久| 国产亚洲综合视频| 精品国产无码在线| 97人人模人人爽视频一区二区 | 中文字幕乱码人妻综合二区三区 | 国产日韩精品电影| 欧美精品福利在线| 91精品国产成人| 日韩精品久久一区二区| 国产精品免费一区豆花| 国产日韩av高清| 亚洲a级在线播放观看| 国产va亚洲va在线va| 青春草国产视频| 国产精品电影在线观看| 不卡影院一区二区| 日韩啊v在线| 国产精品久久久久久久9999| 国产精品自拍首页| 日韩精品免费一区| 精品综合久久久久久97| 久久久婷婷一区二区三区不卡| 欧美又粗又长又爽做受| 精品国产aⅴ麻豆| 国产成人亚洲综合| 精品无码av无码免费专区 | 国产精品久久91| www.中文字幕在线| 日韩精品久久一区二区三区| 久久资源免费视频| 久久久久se| 国产一区自拍视频| 日本在线视频不卡| 国产av国片精品| 国产ts人妖一区二区三区| 精品视频一区二区| 日本一区二区三区在线播放| 国产精品第10页| 国产成人精品福利一区二区三区| 国产在线视频一区| 日韩精品第1页| 最新欧美日韩亚洲| 久久久精品2019中文字幕神马| 国产精品一区在线播放| 欧美在线视频观看免费网站| 亚洲一区三区电影在线观看| 国产精品久久久久久av福利 | 国产精品99久久久久久久久久久久| 欧美激情精品久久久久久小说| 亚洲欧洲精品在线| 国产精品成人播放| 久久久黄色av| 国产精品99久久久久久久久 | 亚洲第一页在线视频| 国产精品丝袜久久久久久高清 | 欧美二区在线| 午夜精品亚洲一区二区三区嫩草| 国产精品久久一| 久久久久久久久久久免费| 91蜜桃网站免费观看| 国模精品视频一区二区| 亚洲wwwav| 一区二区三区电影| 精品国产二区在线| 国产精品国产三级欧美二区| 久久精彩免费视频| 久久国产精品一区二区三区四区| 99视频精品全部免费看| 国产伦精品一区二区三区四区视频_| 欧美中文字幕第一页| 日本精品免费观看| 亚洲第一综合网站| 亚洲国产欧美一区二区三区不卡| 一区二区三区四区免费视频| 欧美成人精品在线观看| 国产精品久久视频| 久久久久久久久网| 国产成人一区二区三区别| 91.com在线| 97碰在线观看| 成人www视频在线观看| 不卡中文字幕在线| 91精品国产综合久久香蕉922| 91久久伊人青青碰碰婷婷| 成人91免费视频| 97色伦亚洲国产| 国产精品91视频| 久久视频免费在线| 色妞一区二区三区| 国产精品免费久久久| 国产精品第3页| 欧美猛少妇色xxxxx| 国产99久久精品一区二区|