天天短訊!Spring AMQP項目

2022-12-19 10:06:01 來源:51CTO博客

Spring AMQP項目將Spring的核心概念應用于基于AMQP的消息傳遞解決方案的開發。 我們提供了一個“模板”作為發送和接收消息的高級抽象。 我們還為消息驅動的 POJO 提供支持。 這些庫有助于管理 AMQP 資源,同時促進依賴關系注入和聲明性配置的使用。 在所有這些情況下,你可以看到與 Spring 框架中的 JMS 支持的相似之處。 有關其他與項目相關的信息,請訪問Spring AMQP項目主頁。


(資料圖片僅供參考)

2. 最新消息

2.1. 自 2.4 以來 3.0 的變化

2.1.1. Java 17, Spring Framework 6.0

此版本需要 Spring Framework 6.0 和 Java 17

2.1.2. 遠程處理

不再支持遠程處理功能(使用 RMI)。

2.1.3. 觀察

現在支持使用千分尺啟用計時器觀察和跟蹤。 有關更多信息,請參閱千分尺觀察。

2.1.4. 原生鏡像

提供了對創建本機映像的支持。 有關詳細信息,請參閱本機映像。

2.1.5. 異步兔子模板

現在返回 s 而不是 s。 有關詳細信息,請參閱異步兔子模板?。??AsyncRabbitTemplate????CompletableFuture????ListenableFuture??

2.1.6. 流支持更改

??RabbitStreamOperations???并且方法現在返回而不是 .??RabbitStreamTemplate????CompletableFuture????ListenableFuture??

現在支持超級流和單個活躍使用者。

有關更多信息,請參閱使用 RabbitMQ 流插件。

2.1.7. 變更??@RabbitListener??

批處理偵聽器現在可以同時使用 . 批處理消息傳遞適配器現在可確保該方法適用于使用批處理。 將容器工廠設置為 時,該屬性也設置為 。 有關詳細信息,請參閱批處理@RabbitListener。??Collection????List????consumerBatchEnabled????true????batchListener????true??

??MessageConverter??s 現在可以返回空值;這目前由 . 有關詳細信息,請參閱從消息轉換??Optional.empty()????Jackson2JsonMessageConverter??

現在,您可以通過容器工廠而不是通過 上的屬性配置 。 有關詳細信息,請參閱回復管理。??ReplyPostProcessor????@RabbitListener??

2.1.8. 連接工廠變更

中的默認值現在為 。這會導致在提供多個地址時連接到隨機主機。 有關詳細信息,請參閱連接到集群。??addressShuffleMode????AbstractConnectionFactory????RANDOM??

不再使用 RabbitMQ 庫來確定哪個節點是隊列的領導者。 有關詳細信息,請參閱隊列相關性和LocalizedQueueConnectionFactory。??LocalizedQueueConnectionFactory????http-client??

3. 簡介

參考文檔的第一部分是對Spring AMQP和基本概念的高級概述。 它包括一些代碼片段,可幫助您盡快啟動并運行。

3.1. 不耐煩的人快速瀏覽

3.1.1. 簡介

這是開始春季AMQP的五分鐘導覽。

先決條件:安裝并運行 RabbitMQ 代理 (https://www.rabbitmq.com/download.html)。 然后獲取 spring-rabbit JAR 及其所有依賴項 - 最簡單的方法是在構建工具中聲明依賴項。 例如,對于 Maven,您可以執行類似于以下內容的操作:

  org.springframework.amqp  spring-rabbit  3.0.0

對于 Gradle,您可以執行類似以下步驟的操作:

compile "org.springframework.amqp:spring-rabbit:3.0.0"
兼容性

Spring 框架版本的最低依賴關系是 5.2.0。

Java 客戶機庫的最低版本為 5.7.0。??amqp-client??

流隊列的最小 Java 客戶機庫為 0.7.0。??stream-client??

非常非常快

本節提供最快的介紹。

首先,添加以下語句以使本節后面的示例正常工作:??import??

以下示例使用普通的命令式 Java 發送和接收消息:

ConnectionFactory connectionFactory = new CachingConnectionFactory();AmqpAdmin admin = new RabbitAdmin(connectionFactory);admin.declareQueue(new Queue("myqueue"));AmqpTemplate template = new RabbitTemplate(connectionFactory);template.convertAndSend("myqueue", "foo");String foo = (String) template.receiveAndConvert("myqueue");

請注意,原生 Java Rabbit 客戶端中還有一個。 我們在前面的代碼中使用了 Spring 抽象。 它緩存通道(以及可選的連接)以供重用。 我們依賴于代理中的默認交換(因為在發送中沒有指定任何交換),以及所有隊列通過其名稱默認綁定到默認交換(因此,我們可以在發送中使用隊列名稱作為路由密鑰)。 這些行為在 AMQP 規范中定義。??ConnectionFactory??

使用 XML 配置

以下示例與前面的示例相同,但將資源配置外部化為 XML:

ApplicationContext context =    new GenericXmlApplicationContext("classpath:/rabbit-context.xml");AmqpTemplate template = context.getBean(AmqpTemplate.class);template.convertAndSend("myqueue", "foo");String foo = (String) template.receiveAndConvert("myqueue");
                

缺省情況下,聲明會自動查找 、 的 bean,并代表用戶將它們聲明給代理。 因此,您不需要在簡單的 Java 驅動程序中顯式使用該 Bean。 有很多選項可用于配置 XML 架構中組件的屬性。 您可以使用 XML 編輯器的自動完成功能來瀏覽它們并查看其文檔。??????Queue????Exchange????Binding??

使用爪噠配置

以下示例重復與前面的示例相同的示例,但使用了 Java 中定義的外部配置:

ApplicationContext context =    new AnnotationConfigApplicationContext(RabbitConfiguration.class);AmqpTemplate template = context.getBean(AmqpTemplate.class);template.convertAndSend("myqueue", "foo");String foo = (String) template.receiveAndConvert("myqueue");........@Configurationpublic class RabbitConfiguration {    @Bean    public CachingConnectionFactory connectionFactory() {        return new CachingConnectionFactory("localhost");    }    @Bean    public RabbitAdmin amqpAdmin() {        return new RabbitAdmin(connectionFactory());    }    @Bean    public RabbitTemplate rabbitTemplate() {        return new RabbitTemplate(connectionFactory());    }    @Bean    public Queue myQueue() {       return new Queue("myqueue");    }}
具有 Spring 引導自動配置和異步 POJO 偵聽器

Spring 引導會自動配置基礎結構 bean,如以下示例所示:

@SpringBootApplicationpublic class Application {    public static void main(String[] args) {        SpringApplication.run(Application.class, args);    }    @Bean    public ApplicationRunner runner(AmqpTemplate template) {        return args -> template.convertAndSend("myqueue", "foo");    }    @Bean    public Queue myQueue() {        return new Queue("myqueue");    }    @RabbitListener(queues = "myqueue")    public void listen(String in) {        System.out.println(in);    }}

4. 參考資料

參考文檔的這一部分詳細介紹了構成Spring AMQP的各種組件。主要章節介紹了開發 AMQP 應用程序的核心類。 本部分還包括有關示例應用程序的章節。

4.1. 使用彈簧 AMQP

本章探討了使用 Spring AMQP 開發應用程序的基本組件接口和類。

4.1.1. AMQP 抽象

Spring AMQP 由兩個模塊組成(每個模塊在發行版中由一個 JAR 表示):和 。 “spring-amqp”模塊包含該軟件包。 在該包中,您可以找到表示核心 AMQP“模型”的類。 我們的目的是提供不依賴于任何特定 AMQP 代理實現或客戶端庫的通用抽象。 最終用戶代碼可以跨供應商實現更具可移植性,因為它可以僅針對抽象層進行開發。 然后,這些抽象由特定于代理的模塊實現,例如“spring-rabbit”。 目前只有一個 RabbitMQ 實現。 但是,除了RabbitMQ之外,這些抽象已經在.NET中使用Apache Qpid進行了驗證。 由于 AMQP 在協議級別運行,原則上可以將 RabbitMQ 客戶端與任何支持相同協議版本的代理一起使用,但我們目前不測試任何其他代理。??spring-amqp????spring-rabbit????org.springframework.amqp.core??

本概述假定您已經熟悉 AMQP 規范的基礎知識。 如果沒有,請查看其他資源中列出的資源

??Message??

0-9-1 AMQP 規范未定義類或接口。 相反,在執行諸如 之類的操作時,內容將作為字節數組參數傳遞,其他屬性將作為單獨的參數傳入。 Spring AMQP將類定義為更通用的AMQP域模型表示的一部分。 該類的目的是將主體和屬性封裝在單個實例中,以便 API 可以反過來更簡單。 下面的示例演示類定義:??Message????basicPublish()????Message????Message????Message??

public class Message {    private final MessageProperties messageProperties;    private final byte[] body;    public Message(byte[] body, MessageProperties messageProperties) {        this.body = body;        this.messageProperties = messageProperties;    }    public byte[] getBody() {        return this.body;    }    public MessageProperties getMessageProperties() {        return this.messageProperties;    }}

該接口定義了幾個常見屬性,例如“messageId”、“timestamp”、“內容類型”等。 您還可以通過調用該方法,使用用戶定義的“標頭”擴展這些屬性。??MessageProperties????setHeader(String key, Object value)??

從版本 、、 和 開始,如果消息正文是序列化的 Java 對象,則在執行操作(例如在日志消息中)時不再反序列化(缺省情況下)。 這是為了防止不安全的反序列化。 默認情況下,只有 和 類被反序列化。 要恢復到以前的行為,可以通過調用 來添加允許的類/包模式。 支持簡單的通配符,例如??com.something。??無法反序列化的主體在日志消息中表示。??1.5.7????1.6.11????1.7.4????2.0.0????Serializable????toString()????java.util????java.lang????Message.addAllowedListPatterns(…)????, *.MyClass????byte[]??

交換

該接口表示 AMQP 交換,即消息生產者發送到的內容。 經紀商虛擬主機中的每個交易所都有一個唯一的名稱以及一些其他屬性。 以下示例顯示了該接口:??Exchange????Exchange??

public interface Exchange {    String getName();    String getExchangeType();    boolean isDurable();    boolean isAutoDelete();    Map getArguments();}

如您所見,an 也有一個由 中定義的常量表示的“類型”。 基本類型包括:、、 和 。 在核心包中,您可以找到每種類型的接口實現。 這些類型的行為在如何處理隊列綁定方面有所不同。 例如,交換允許隊列由固定路由密鑰(通常是隊列的名稱)綁定。 交換支持具有路由模式的綁定,這些模式可能分別包含“*”和“#”通配符,分別表示“恰好一個”和“零個或多個”。 交易所發布到綁定到它的所有隊列,而不考慮任何路由密鑰。 有關這些和其他 Exchange 類型的詳細信息,請參閱其他資源。??Exchange????ExchangeTypes????direct????topic????fanout????headers????Exchange????Exchange????Direct????Topic????Fanout??

AMQP規范還要求任何經紀商提供沒有名稱的“默認”直接交易所。 聲明的所有隊列都綁定到該默認值,其名稱作為路由密鑰。 您可以在AmqpTemplate?中了解有關默認 Exchange 在 Spring AMQP 中的用法的更多信息。??Exchange??

隊列

該類表示消息使用者從中接收消息的組件。 與各種類一樣,我們的實現旨在成為此核心 AMQP 類型的抽象表示。 以下清單顯示了該類:??Queue????Exchange????Queue??

public class Queue  {    private final String name;    private volatile boolean durable;    private volatile boolean exclusive;    private volatile boolean autoDelete;    private volatile Map arguments;    /**     * The queue is durable, non-exclusive and non auto-delete.     *     * @param name the name of the queue.     */    public Queue(String name) {        this(name, true, false, false);    }    // Getters and Setters omitted for brevity}

請注意,構造函數采用隊列名稱。 根據實現,管理模板可能會提供用于生成唯一命名隊列的方法。 此類隊列可用作“回復”地址或其他臨時情況。 因此,自動生成隊列的“獨占”和“自動刪除”屬性都將設置為“true”。

有關使用命名空間支持(包括隊列參數)聲明隊列的信息,請參閱配置代理中有關隊列的部分。

捆綁

假設生產者發送到交易所,消費者從隊列接收,將隊列連接到交易所的綁定對于通過消息傳遞連接這些生產者和消費者至關重要。 在Spring AMQP中,我們定義了一個類來表示這些連接。 本節回顧將隊列綁定到交易所的基本選項。??Binding??

您可以使用固定路由密鑰將隊列綁定到 ,如以下示例所示:??DirectExchange??

new Binding(someQueue, someDirectExchange, "foo.bar");

您可以使用路由模式將隊列綁定到 ,如以下示例所示:??TopicExchange??

new Binding(someQueue, someTopicExchange, "foo.*");

您可以將隊列綁定到沒有路由密鑰的 ,如以下示例所示:??FanoutExchange??

new Binding(someQueue, someFanoutExchange);

我們還提供了一個促進“流暢 API”樣式,如以下示例所示:??BindingBuilder??

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");

為清楚起見,前面的示例顯示了該類,但是當對“bind()”方法使用靜態導入時,此樣式效果很好。??BindingBuilder??

就其本身而言,類的實例僅保存有關連接的數據。 換句話說,它不是一個“活動”組件。 但是,正如稍后將在配置代理中看到的那樣,該類可以使用實例來實際觸發代理上的綁定操作。 此外,正如您在同一部分中所看到的,您可以通過在類中使用 Spring 的注釋來定義實例。 還有一個方便的基類,它進一步簡化了生成與 AMQP 相關的 Bean 定義的方法,并識別隊列、交換和綁定,以便在應用程序啟動時在 AMQP 代理上聲明它們。??Binding????AmqpAdmin????Binding????Binding????@Bean????@Configuration??

也在核心包中定義。 作為實際 AMQP 消息傳遞中涉及的主要組件之一,它在其自己的部分中進行了詳細討論(請參閱AmqpTemplate)。??AmqpTemplate??

4.1.2. 連接和資源管理

雖然我們在上一節中描述的 AMQP 模型是通用的并且適用于所有實現,但當我們進入資源管理時,細節特定于代理實現。 因此,在本節中,我們將重點介紹僅存在于“spring-rabbit”模塊中的代碼,因為此時,RabbitMQ 是唯一受支持的實現。

管理與 RabbitMQ 代理的連接的核心組件是接口。 實現的職責是提供 的實例,該實例是 的包裝器。??ConnectionFactory????ConnectionFactory????org.springframework.amqp.rabbit.connection.Connection????com.rabbitmq.client.Connection??

選擇連接工廠

有三家連接工廠可供選擇

??PooledChannelConnectionFactory????ThreadChannelConnectionFactory????CachingConnectionFactory??

前兩個是在 2.3 版中添加的。

對于大多數用例,應使用 。 如果要確保嚴格的消息排序而不需要使用作用域內操作,則可以使用 。 如果要使用相關的發布者確認,或者希望通過其打開多個連接,則應使用 。??PooledChannelConnectionFactory????ThreadChannelConnectionFactory????CachingConnectionFactory????CacheMode??

所有三個工廠都支持簡單的發布者確認。

將 配置為使用單獨的連接時,現在可以從版本 2.3.2 開始,將發布連接工廠配置為其他類型。 默認情況下,發布工廠的類型相同,在主工廠上設置的任何屬性也會傳播到發布工廠。??RabbitTemplate??

??PooledChannelConnectionFactory??

此工廠基于 Apache Pool2 管理單個連接和兩個通道池。 一個池用于事務通道,另一個池用于非事務通道。 池是默認配置的;提供回調以配置池;有關更多信息,請參閱 Apache 文檔。??GenericObjectPool??

Apache jar 必須位于類路徑上才能使用此工廠。??commons-pool2??

@BeanPooledChannelConnectionFactory pcf() throws Exception {    ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();    rabbitConnectionFactory.setHost("localhost");    PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);    pcf.setPoolConfigurer((pool, tx) -> {        if (tx) {            // configure the transactional pool        }        else {            // configure the non-transactional pool        }    });    return pcf;}
??ThreadChannelConnectionFactory??

此工廠管理單個連接和兩個連接,一個用于事務通道,另一個用于非事務通道。 此工廠確保同一線程上的所有操作都使用相同的通道(只要它保持打開狀態)。 這有助于嚴格的消息排序,而無需作用域內操作。 為避免內存泄漏,如果應用程序使用許多生存期較短的線程,則必須調用工廠的線程來釋放通道資源。 從版本 2.3.7 開始,線程可以將其通道傳輸到另一個線程。 有關詳細信息,請參閱多線程環境中的嚴格消息排序。??ThreadLocal????closeThreadChannel()??

??CachingConnectionFactory??

提供的第三個實現是 ,默認情況下,它建立可由應用程序共享的單個連接代理。 共享連接是可能的,因為使用 AMQP 進行消息傳遞的“工作單元”實際上是一個“通道”(在某些方面,這類似于 JMS 中的連接和會話之間的關系)。 連接實例提供了一個方法。 該實現支持緩存這些通道,并根據通道是否為事務性為通道維護單獨的緩存。 創建 的實例時,可以通過構造函數提供“主機名”。 您還應該提供“用戶名”和“密碼”屬性。 若要配置通道緩存的大小(默認值為 25),可以調用該方法。??CachingConnectionFactory????createChannel????CachingConnectionFactory????CachingConnectionFactory????setChannelCacheSize()??

從版本 1.3 開始,您可以配置緩存連接以及僅通道。 在這種情況下,每次調用 都會創建一個新連接(或從緩存中檢索空閑連接)。 關閉連接會將其返回到緩存(如果尚未達到緩存大小)。 在此類連接上創建的通道也會被緩存。 在某些環境中使用單獨的連接可能很有用,例如從 HA 群集使用,在 與負載均衡器配合使用,以連接到不同的集群成員等。 要緩存連接,請將 設置為 。??CachingConnectionFactory????createConnection()????cacheMode????CacheMode.CONNECTION??

這不會限制連接數。 相反,它指定允許多少個空閑的打開連接。

從版本 1.5.5 開始,提供了一個名為的新屬性。 設置此屬性時,它將限制允許的連接總數。 設置后,如果達到限制,則用于等待連接變為空閑狀態。 如果超過該時間,則拋出 。??connectionLimit????channelCheckoutTimeLimit????AmqpTimeoutException??

當緩存模式為 時,自動聲明隊列和其他 (請參閱交換、隊列和綁定的自動聲明?)不受支持。??CONNECTION??

此外,在撰寫本文時,庫默認為每個連接創建一個固定的線程池(默認大小:線程)。 使用大量連接時,應考慮在 上設置自定義。 然后,所有連接都可以使用相同的執行器,并且可以共享其線程。 執行程序的線程池應不受限制或針對預期用途進行適當設置(通常,每個連接至少一個線程)。 如果在每個連接上創建了多個通道,則池大小會影響并發性,因此變量(或簡單緩存)線程池執行器將是最合適的。??amqp-client????Runtime.getRuntime().availableProcessors() * 2????executor????CachingConnectionFactory??

重要的是要了解緩存大小(默認情況下)不是限制,而只是可以緩存的通道數。 如果緩存大小為 10,則實際上可以使用任意數量的通道。 如果使用的通道超過 10 個,并且它們都返回到緩存中,則緩存中將有 10 個通道。 其余部分是物理關閉的。

從版本 1.6 開始,默認通道緩存大小已從 1 增加到 25。 在高容量、多線程環境中,小緩存意味著以高速率創建和關閉通道。 增加默認緩存大小可以避免此開銷。 您應該通過 RabbitMQ 管理 UI 監控正在使用的通道,并考慮進一步增加緩存大小,如果您 看到許多頻道正在創建和關閉。 緩存僅按需增長(以滿足應用程序的并發要求),因此此更改不會 影響現有的小批量應用程序。

從版本 1.4.2 開始,具有一個名為 的屬性。 當此屬性大于零時,將限制可在連接上創建的通道數。 如果達到限制,調用線程將阻塞,直到通道可用或達到此超時,在這種情況下,將拋出 a。??CachingConnectionFactory????channelCheckoutTimeout????channelCacheSize????AmqpTimeoutException??

框架內使用的通道(例如,)將可靠地返回到緩存中。 如果在框架外部創建通道,(例如, 通過直接訪問連接并調用 ),您必須可靠地(通過關閉)返回它們,也許在一個塊中,以避免通道不足。??RabbitTemplate????createChannel()????finally??

以下示例演示如何創建新的 :??connection??

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");Connection connection = connectionFactory.createConnection();

使用 XML 時,配置可能類似于以下示例:

            

還有一個僅在框架的單元測試代碼中可用的實現。 它比 更簡單,因為它不緩存通道,但由于缺乏性能和彈性,它不適合在簡單測試之外的實際使用。 如果出于某種原因需要實現自己的基類,基類可能會提供一個很好的起點。??SingleConnectionFactory????CachingConnectionFactory????ConnectionFactory????AbstractConnectionFactory??

通過使用 rabbit 命名空間可以快速方便地創建 A,如下所示:??ConnectionFactory??

在大多數情況下,此方法更可取,因為框架可以為您選擇最佳默認值。 創建的實例是 . 請記住,通道的默認緩存大小為 25。 如果要緩存更多通道,請通過設置“channelCacheSize”屬性來設置更大的值。 在 XML 中,它如下所示:??CachingConnectionFactory??

                

此外,使用命名空間,您可以添加“channel-cache-size”屬性,如下所示:

默認緩存模式為 ,但您可以將其配置為緩存連接。 在下面的示例中,我們使用:??CHANNEL????connection-cache-size??

可以使用命名空間提供主機和端口屬性,如下所示:

或者,如果在群集環境中運行,則可以使用 addresses 屬性,如下所示:

有關 的信息,請參閱連接到群集。??address-shuffle-mode??

以下示例具有自定義線程工廠,該工廠的線程名稱前綴為 :??rabbitmq-??

    
地址解析器

從版本 2.1.15 開始,您現在可以使用 解析連接地址。 這將覆蓋 和 屬性的任何設置。??AddressResolver????addresses????host/port??

命名連接

從版本 1.7 開始,提供了用于注入 . 生成的名稱用于目標 RabbitMQ 連接的特定于應用程序的標識。 如果 RabbitMQ 服務器支持連接名稱,則連接名稱將顯示在管理 UI 中。 此值不必是唯一的,也不能用作連接標識符,例如,在 HTTP API 請求中。 此值應該是人類可讀的,并且是鍵下的一部分。 您可以使用簡單的 Lambda,如下所示:??ConnectionNameStrategy????AbstractionConnectionFactory????ClientProperties????connection_name??

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

該參數可用于通過某些邏輯區分目標連接名稱。 默認情況下,的 、表示對象的十六進制字符串和內部計數器用于生成 . 命名空間組件也隨屬性一起提供。??ConnectionFactory????beanName????AbstractConnectionFactory????connection_name????????connection-name-strategy??

的實現將連接名稱設置為應用程序屬性。 可以將其聲明為 并將其注入到連接工廠中,如以下示例所示:??SimplePropertyValueConnectionNameStrategy????@Bean??

@Beanpublic SimplePropertyValueConnectionNameStrategy cns() {    return new SimplePropertyValueConnectionNameStrategy("spring.application.name");}@Beanpublic ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();    ...    connectionFactory.setConnectionNameStrategy(cns);    return connectionFactory;}

該屬性必須存在于應用程序上下文的 .??Environment??

使用 Spring 引導及其自動配置的連接工廠時,只需聲明 . 引導自動檢測 bean 并將其連接到工廠。??ConnectionNameStrategy????@Bean??

阻止的連接和資源限制

連接可能會被阻止,無法與內存警報對應的代理進行交互。 從版本 2.0 開始,可以為 提供要通知連接阻止和取消阻止事件的實例。 此外,分別通過其內部實現發出 a 和 。 這些允許您提供應用程序邏輯,以便對代理上的問題做出適當的反應,并(例如)采取一些糾正措施。??org.springframework.amqp.rabbit.connection.Connection????com.rabbitmq.client.BlockedListener????AbstractConnectionFactory????ConnectionBlockedEvent????ConnectionUnblockedEvent????BlockedListener??

當應用程序配置了單個 ,就像默認情況下使用 Spring 引導自動配置一樣,當連接被代理阻止時,應用程序將停止工作。 當它被經紀人阻止時,它的任何客戶都會停止工作。 如果我們在同一個應用程序中有生產者和消費者,當生產者阻止連接(因為代理上不再有資源)并且消費者無法釋放它們(因為連接被阻止)時,我們最終可能會陷入死鎖。 為了緩解此問題,我們建議再有一個具有相同選項的單獨實例 - 一個用于生產者,一個用于使用者。 對于在使用者線程上執行的事務生成者,不可能單獨,因為它們應重用與使用者事務關聯的。??CachingConnectionFactory????CachingConnectionFactory????CachingConnectionFactory????Channel??

從版本 2.0.2 開始,具有自動使用第二個連接工廠的配置選項,除非正在使用事務。 有關詳細信息,請參閱使用單獨的連接。 對于發布者連接與主策略相同,并附加到調用方法的結果中。??RabbitTemplate????ConnectionNameStrategy????.publisher??

從版本 1.7.7 開始,提供了 an,當無法創建 a 時會拋出該 (例如,因為達到限制并且緩存中沒有可用通道)。 您可以在 中使用此異常在 進行一些退避后恢復操作。??AmqpResourceNotAvailableException????SimpleConnection.createChannel()????Channel????channelMax????RetryPolicy??

配置基礎客戶端連接工廠

使用 Rabbit 客戶端的實例。 在 上設置等效屬性時,會傳遞許多配置屬性(、 和 等)。 若要設置其他屬性(例如),可以定義 Rabbit 工廠的實例,并使用 的相應構造函數提供對它的引用。 使用命名空間(如前所述)時,需要在屬性中提供對已配置工廠的引用。 為方便起見,提供了一個工廠 Bean 來幫助在 Spring 應用程序上下文中配置連接工廠,如下一節所述。??CachingConnectionFactory????ConnectionFactory????host????port????userName????password????requestedHeartBeat????connectionTimeout????CachingConnectionFactory????clientProperties????CachingConnectionFactory????connection-factory??

默認情況下,4.0.x 客戶端啟用自動恢復。 雖然與此功能兼容,但Spring AMQP具有自己的恢復機制,并且通常不需要客戶端恢復功能。 我們建議禁用自動恢復,以避免在代理可用但連接尚未恢復時獲取實例。 您可能會注意到此異常,例如,在 中配置 時,即使故障轉移到集群中的另一個代理也是如此。 由于自動恢復連接在計時器上恢復,因此使用 Spring AMQP 的恢復機制可以更快地恢復連接。 從版本 1.7.1 開始,Spring AMQP 禁用自動恢復,除非您顯式創建自己的 RabbitMQ 連接工廠并將其提供給 . 默認情況下,由 創建的 RabbitMQ 實例還禁用了該選項。??amqp-client????AutoRecoverConnectionNotCurrentlyOpenException????RetryTemplate????RabbitTemplate????amqp-client????CachingConnectionFactory????ConnectionFactory????RabbitConnectionFactoryBean??

??RabbitConnectionFactoryBean??和配置 SSL

從版本 1.4 開始,提供了方便的功能,以便使用依賴關系注入在基礎客戶端連接工廠上方便地配置 SSL 屬性。 其他二傳手委托給底層工廠。 以前,您必須以編程方式配置 SSL 選項。 以下示例演示如何配置:??RabbitConnectionFactoryBean????RabbitConnectionFactoryBean??

        

有關配置 SSL 的信息,請參閱RabbitMQ 文檔。 省略 和 配置以在不進行證書驗證的情況下通過 SSL 進行連接。 下一個示例演示如何提供密鑰和信任存儲區配置。??keyStore????trustStore??

該屬性是一個 Spring 指向包含以下鍵的屬性文件:??sslPropertiesLocation????Resource??

keyStore=file:/secret/keycert.p12trustStore=file:/secret/trustStorekeyStore.passPhrase=secrettrustStore.passPhrase=secret

和是春天指著商店。 通常,此屬性文件由操作系統保護,應用程序具有讀取訪問權限。??keyStore????truststore????Resources??

從 Spring AMQP 版本 1.5 開始,您可以直接在工廠 Bean 上設置這些屬性。 如果同時提供了離散屬性 和,則后者中的屬性將覆蓋 離散值。??sslPropertiesLocation??

從版本 2.0 開始,默認情況下會驗證服務器證書,因為它更安全。 如果出于某種原因希望跳過此驗證,請將工廠 Bean 的屬性設置為 。 從版本 2.1 開始,現在默認調用。 要恢復到以前的行為,請將該屬性設置為 。??skipServerCertificateValidation????true????RabbitConnectionFactoryBean????enableHostnameVerification()????enableHostnameVerification????false??

從版本 2.2.5 開始,工廠 Bean 將始終使用 TLS v1.2;以前,它在某些情況下使用 v1.1,在其他情況下使用 v1.2(取決于其他屬性)。 如果由于某種原因需要使用 v1.1,請設置屬性:。??sslAlgorithm????setSslAlgorithm("TLSv1.1")??

連接到群集

要連接到群集,請在 上配置屬性:??addresses????CachingConnectionFactory??

@Beanpublic CachingConnectionFactory ccf() {    CachingConnectionFactory ccf = new CachingConnectionFactory();    ccf.setAddresses("host1:5672,host2:5672,host3:5672");    return ccf;}

從版本 3.0 開始,每當建立新連接時,基礎連接工廠將通過選擇隨機地址來嘗試連接到主機。 若要恢復到以前嘗試從第一個連接到最后一個的行為,請將該屬性設置為 。??addressShuffleMode????AddressShuffleMode.NONE??

從版本 2.3 開始,添加了隨機模式,這意味著在創建連接后,第一個地址將移動到末尾。 如果您希望從所有節點上的所有分片消費,您可能希望將此模式與RabbitMQ 分片插件一起使用,并具有合適的并發性。??INORDER????CacheMode.CONNECTION??

@Beanpublic CachingConnectionFactory ccf() {    CachingConnectionFactory ccf = new CachingConnectionFactory();    ccf.setAddresses("host1:5672,host2:5672,host3:5672");    ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);    return ccf;}
路由連接工廠

從 1.3 版開始,引入了 。 此工廠提供了一種機制,用于為多個映射配置映射,并在運行時確定某些映射的目標。 通常,實現會檢查線程綁定上下文。 為方便起見,Spring AMQP 提供了 ,它從 獲取當前線程綁定。 以下示例顯示了如何在 XML 和 Java 中配置 a:??AbstractRoutingConnectionFactory????ConnectionFactories????ConnectionFactory????lookupKey????SimpleRoutingConnectionFactory????lookupKey????SimpleResourceHolder????SimpleRoutingConnectionFactory??

                                                
public class MyService {    @Autowired    private RabbitTemplate rabbitTemplate;    public void service(String vHost, String payload) {        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);        rabbitTemplate.convertAndSend(payload);        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());    }}

使用后取消綁定資源非常重要。 有關更多信息,請參見 的JavaDoc。??AbstractRoutingConnectionFactory??

從版本 1.4 開始,支持 SpEL 和屬性,它們在每個 AMQP 協議交互操作(、、或)上進行評估,解析為所提供的值。 您可以使用 Bean 引用,例如在表達式中。 對于操作,要發送的消息是根評估對象。 對于操作,是根評估對象。??RabbitTemplate????sendConnectionFactorySelectorExpression????receiveConnectionFactorySelectorExpression????send????sendAndReceive????receive????receiveAndReply????lookupKey????AbstractRoutingConnectionFactory????@vHostResolver.getVHost(#root)????send????receive????queueName??

路由算法如下:如果選擇器表達式被計算或被計算到,或者提供的不是 的實例,則一切都像以前一樣工作,依賴于提供的實現。 如果評估結果不是 ,但沒有目標,并且 配置了 。 在 的情況下,它確實回退到基于 的實現。 但是,如果拋出 ,則拋出 an。??null????null????ConnectionFactory????AbstractRoutingConnectionFactory????ConnectionFactory????null????ConnectionFactory????lookupKey????AbstractRoutingConnectionFactory????lenientFallback = true????AbstractRoutingConnectionFactory????routing????determineCurrentLookupKey()????lenientFallback = false????IllegalStateException??

命名空間支持還提供組件上的 and 屬性。??send-connection-factory-selector-expression????receive-connection-factory-selector-expression??????

此外,從版本 1.4 開始,您可以在偵聽器容器中配置路由連接工廠。 在這種情況下,隊列名稱列表將用作查找鍵。 例如,如果使用 配置容器,則查找鍵為 (請注意,鍵中沒有空格)。??setQueueNames("thing1", "thing2")????[thing1,thing]"??

從版本 1.6.9 開始,可以通過在偵聽器容器上使用向查找鍵添加限定符。 例如,這樣做可以偵聽具有相同名稱但位于不同虛擬主機中的隊列(每個虛擬主機都有一個連接工廠)。??setLookupKeyQualifier??

例如,對于查找鍵限定符和偵聽隊列的容器,您可以向其注冊目標連接工廠的查找鍵可以是 。??thing1????thing2????thing1[thing2]??

目標(和默認,如果提供)連接工廠必須具有相同的發布者確認和返回設置。 請參閱發布者確認并返回。

從版本 2.4.4 開始,可以禁用此驗證。 如果確認和返回之間的值需要不相等,則可以使用 來關閉驗證。 請注意,添加到的第一個連接工廠將確定 和 的一般值。??AbstractRoutingConnectionFactory#setConsistentConfirmsReturns????AbstractRoutingConnectionFactory????confirms????returns??

如果您遇到要檢查的某些消息確認/返回而其他消息則不會確認/返回的情況,這可能會很有用。 例如:

@Beanpublic RabbitTemplate rabbitTemplate() {    final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();    cf.setHost("localhost");    cf.setPort(5672);    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);    cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);    PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);    final Map connectionFactoryMap = new HashMap<>(2);    connectionFactoryMap.put("true", cachingConnectionFactory);    connectionFactoryMap.put("false", pooledChannelConnectionFactory);    final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();    routingConnectionFactory.setConsistentConfirmsReturns(false);    routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);    routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);    final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);    final Expression sendExpression = new SpelExpressionParser().parseExpression(            "messageProperties.headers["x-use-publisher-confirms"] ?: false");    rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);}

這樣,帶有標頭的消息將通過緩存連接發送,您可以確保消息傳遞。 有關確保郵件傳遞的詳細信息,請參閱發布者確認并返回。??x-use-publisher-confirms: true??

隊列相關性和??LocalizedQueueConnectionFactory??

在集群中使用 HA 隊列時,為了獲得最佳性能,您可能需要連接到物理代理 潛在顧客隊列所在的位置。 可以配置多個代理地址。 這是為了進行故障轉移,客戶端會嘗試按順序進行連接。 使用管理插件提供的 REST API 來確定哪個節點是隊列的引導節點。 然后,它創建(或從緩存中檢索)僅連接到該節點的節點。 如果連接失敗,則確定新的引導節點,使用者連接到該節點。 配置了默認連接工廠,以防無法確定隊列的物理位置,在這種情況下,它會正常連接到群集。??CachingConnectionFactory????LocalizedQueueConnectionFactory????CachingConnectionFactory????LocalizedQueueConnectionFactory??

是 和 使用隊列名稱作為查找鍵,如上面的路由連接工廠中所述。??LocalizedQueueConnectionFactory????RoutingConnectionFactory????SimpleMessageListenerContainer??

因此(使用隊列名稱進行查找),僅當容器配置為偵聽單個隊列時,才能使用 。??LocalizedQueueConnectionFactory??

必須在每個節點上啟用 RabbitMQ 管理插件。

此連接工廠適用于長期連接,例如 使用的 . 它不適用于短連接用途,例如使用 a,因為在建立連接之前調用 REST API 會產生開銷。 此外,對于發布操作,隊列是未知的,并且消息無論如何都會發布到所有集群成員,因此查找節點的邏輯幾乎沒有價值。??SimpleMessageListenerContainer????RabbitTemplate??

以下示例配置顯示了如何配置工廠:

@Autowiredprivate ConfigurationProperties props;@Beanpublic CachingConnectionFactory defaultConnectionFactory() {    CachingConnectionFactory cf = new CachingConnectionFactory();    cf.setAddresses(this.props.getAddresses());    cf.setUsername(this.props.getUsername());    cf.setPassword(this.props.getPassword());    cf.setVirtualHost(this.props.getVirtualHost());    return cf;}@Beanpublic LocalizedQueueConnectionFactory queueAffinityCF(        @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {    return new LocalizedQueueConnectionFactory(defaultCF,            StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),            StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),            StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),            this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),            false, null);}

請注意,前三個參數是 、 和 的數組。 這些是位置性的,因為當容器嘗試連接到隊列時,它會使用管理 API 來確定哪個節點是隊列的引導節點,并連接到與該節點位于同一陣列位置的地址。??addresses????adminUris????nodes??

從版本 3.0 開始,RabbitMQ 不再用于訪問 Rest API。 相反,默認情況下,如果使用 from Spring Webflux 在類路徑上;否則使用 a。??http-client????WebClient????spring-webflux????RestTemplate??

添加到類路徑:??WebFlux??

例 1.馬文

  org.springframework.amqp  spring-rabbit

例 2.格拉德爾

compile "org.springframework.amqp:spring-rabbit"

您還可以通過實現和重寫其 和(可選)方法來使用其他 REST 技術。??LocalizedQueueConnectionFactory.NodeLocator????createClient, ``restCall????close??

lqcf.setNodeLocator(new NodeLocator() {    @Override    public MyClient createClient(String userName, String password) {        ...    }    @Override    public HashMap restCall(MyClient client, URI uri) {        ...    });});

該框架提供 和 ,默認值如上所述。??WebFluxNodeLocator????RestTemplateNodeLocator??

發布商確認并返回

通過將屬性設置為“true”和“true”來支持已確認(具有相關性)和返回的消息。??CachingConnectionFactory????publisherConfirmType????ConfirmType.CORRELATED????publisherReturns??

設置這些選項后,工廠創建的實例將包裝在 中,用于方便回調。 當獲得這樣的通道時,客戶端可以向 . 該實現包含用于路由確認或返回到相應偵聽器的邏輯。 以下各節將進一步介紹這些功能。??Channel????PublisherCallbackChannel????PublisherCallbackChannel.Listener????Channel????PublisherCallbackChannel??

另請參閱??作用域內操作??。??simplePublisherConfirms??

有關更多背景信息,請參閱 RabbitMQ 團隊的博客文章,標題為“介紹發布者確認”。

連接和通道偵聽器

連接工廠支持注冊和實現。 這允許您接收連接和通道相關事件的通知。 (建立連接時 使用 A 執行聲明 - 有關詳細信息,請參閱交換、隊列和綁定的自動聲明)。 以下清單顯示了接口定義:??ConnectionListener????ChannelListener????ConnectionListener????RabbitAdmin????ConnectionListener??

@FunctionalInterfacepublic interface ConnectionListener {    void onCreate(Connection connection);    default void onClose(Connection connection) {    }    default void onShutDown(ShutdownSignalException signal) {    }}

從版本 2.0 開始,可以為對象提供實例,以便在連接阻止和取消阻止事件時收到通知。 以下示例顯示了通道偵聽器接口定義:??org.springframework.amqp.rabbit.connection.Connection????com.rabbitmq.client.BlockedListener??

@FunctionalInterfacepublic interface ChannelListener {    void onCreate(Channel channel, boolean transactional);    default void onShutDown(ShutdownSignalException signal) {    }}

請參閱發布是異步的 — 如何檢測成功和失敗,了解您可能希望注冊 的一種方案。??ChannelListener??

記錄通道關閉事件

版本 1.5 引入了一種機制,使用戶能夠控制日志記錄級別。

使用默認策略記錄通道關閉,如下所示:??CachingConnectionFactory??

不記錄正常通道關閉 (200 OK)。如果通道由于被動隊列聲明失敗而關閉,則會在調試級別記錄該通道。如果頻道因非特定消費者狀況而被拒絕而關閉,則會記錄在 信息級別。basic.consume所有其他記錄在錯誤級別。

若要修改此行為,可以將自定義注入到 in 屬性中。??ConditionalExceptionLogger????CachingConnectionFactory????closeExceptionLogger??

另請參閱使用者事件。

運行時緩存屬性

與 1.6 版本相同,現在通過該方法提供緩存統計信息。 這些統計信息可用于調整緩存,以在生產中對其進行優化。 例如,高水位線可用于確定是否應增加緩存大小。 如果它等于緩存大小,則可能需要考慮進一步增加。 下表描述了這些屬性:??CachingConnectionFactory????getCacheProperties()????CacheMode.CHANNEL??

表 1.CacheMode.CHANNEL 的緩存屬性

財產

意義

connectionName

由 生成的連接的名稱。??ConnectionNameStrategy??

channelCacheSize

當前配置的允許空閑的最大通道數。

localPort

連接的本地端口(如果可用)。 這可用于與 RabbitMQ 管理 UI 上的連接和通道相關聯。

idleChannelsTx

當前處于空閑(緩存)狀態的事務通道數。

idleChannelsNotTx

當前處于空閑(緩存)狀態的非事務性通道數。

idleChannelsTxHighWater

并發空閑(緩存)的最大事務通道數。

idleChannelsNotTxHighWater

非事務性通道的最大數量已同時處于空閑狀態(緩存)。

下表描述了這些屬性:??CacheMode.CONNECTION??

表 2.CacheMode.CONNECTION 的緩存屬性

財產

意義

connectionName:

由 生成的連接的名稱。??ConnectionNameStrategy??

openConnections

表示與代理的連接的連接對象的數量。

channelCacheSize

當前配置的允許空閑的最大通道數。

connectionCacheSize

當前配置的允許空閑的最大連接數。

idleConnections

當前處于空閑狀態的連接數。

idleConnectionsHighWater

并發空閑的最大連接數。

idleChannelsTx:

此連接當前處于空閑(緩存)狀態的事務通道數。 您可以使用屬性名稱的一部分與 RabbitMQ 管理 UI 上的連接和通道相關聯。??localPort??

idleChannelsNotTx:

此連接當前處于空閑(緩存)狀態的非事務性通道數。 屬性名稱的一部分可用于與 RabbitMQ 管理 UI 上的連接和通道相關聯。??localPort??

idleChannelsTxHighWater:

并發空閑(緩存)的最大事務通道數。 屬性名稱的 localPort 部分可用于與 RabbitMQ 管理 UI 上的連接和通道相關聯。

idleChannelsNotTxHighWater:

非事務性通道的最大數量已同時處于空閑狀態(緩存)。 您可以使用屬性名稱的一部分與 RabbitMQ 管理 UI 上的連接和通道相關聯。??localPort??

屬性(或)也包括在內。??cacheMode????CHANNEL????CONNECTION??

圖1.JVisualVM 示例

RabbitMQ 自動連接/拓撲恢復

自Spring AMQP的第一個版本以來,該框架在代理發生故障時提供了自己的連接和通道恢復。 此外,如配置代理中所述,在重新建立連接時,會重新聲明任何基礎結構 Bean(隊列和其他)。 因此,它不依賴于庫現在提供的自動恢復。 默認情況下,已啟用自動恢復。 兩種恢復機制之間存在一些不兼容之處,因此默認情況下,Spring 將底層的屬性設置為 。 即使該屬性是 ,Spring 也會通過立即關閉任何恢復的連接來有效地禁用它。??RabbitAdmin????amqp-client????amqp-client????automaticRecoveryEnabled????RabbitMQ connectionFactory????false????true??

缺省情況下,只有定義為 bean 的元素(隊列、交換、綁定)才會在連接失敗后重新聲明。 有關如何更改該行為的信息,請參閱恢復自動刪除聲明。

4.1.3. 添加自定義客戶端連接屬性

現在允許您訪問基礎連接工廠以允許,例如, 設置自定義客戶端屬性。 以下示例演示如何執行此操作:??CachingConnectionFactory??

connectionFactory.getRabbitConnectionFactory().getClientProperties().put("thing1", "thing2");

查看連接時,這些屬性將顯示在 RabbitMQ 管理 UI 中。

4.1.4.??AmqpTemplate??

與 Spring 框架和相關項目提供的許多其他高級抽象一樣,Spring AMQP 提供了一個發揮核心作用的“模板”。 定義主操作的接口稱為 。 這些操作涵蓋了發送和接收消息的一般行為。 換句話說,它們不是任何實現所獨有的——因此名稱中的“AMQP”。 另一方面,該接口的實現與AMQP協議的實現相關聯。 與本身是接口級 API 的 JMS 不同,AMQP 是一種線級協議。 該協議的實現提供自己的客戶端庫,因此模板接口的每個實現都依賴于特定的客戶端庫。 目前,只有一個實現:。 在下面的示例中,我們經常使用 . 但是,當您查看配置示例或實例化模板或調用資源庫的任何代碼摘錄時,您可以看到實現類型(例如,)。??AmqpTemplate????RabbitTemplate????AmqpTemplate????RabbitTemplate??

如前所述,該接口定義了發送和接收消息的所有基本操作。 我們將分別在發送消息和接收消息中探討消息發送和接收。??AmqpTemplate??

另請參閱異步兔子模板。

添加重試功能

從版本 1.3 開始,您現在可以將 配置為使用 來幫助處理代理連接問題。 有關完整信息,請參閱春季重試項目。 以下只是一個使用指數退避策略和默認值的示例,該策略在將異常拋給調用方之前進行了三次嘗試。??RabbitTemplate????RetryTemplate????SimpleRetryPolicy??

下面的示例使用 XML 命名空間:

                                                            

以下示例使用 Java 中的注釋:??@Configuration??

@Beanpublic RabbitTemplate rabbitTemplate() {    RabbitTemplate template = new RabbitTemplate(connectionFactory());    RetryTemplate retryTemplate = new RetryTemplate();    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();    backOffPolicy.setInitialInterval(500);    backOffPolicy.setMultiplier(10.0);    backOffPolicy.setMaxInterval(10000);    retryTemplate.setBackOffPolicy(backOffPolicy);    template.setRetryTemplate(retryTemplate);    return template;}

從版本 1.4 開始,除了屬性之外,該選項在 上受支持。 它用作 的第二個參數。??retryTemplate????recoveryCallback????RabbitTemplate????RetryTemplate.execute(RetryCallback retryCallback, RecoveryCallback recoveryCallback)??

這在一定程度上受到限制,因為重試上下文僅包含字段。 對于更復雜的用例,您應該使用外部,以便可以通過上下文的屬性將其他信息傳達給 。 以下示例演示如何執行此操作:??RecoveryCallback????lastThrowable????RetryTemplate????RecoveryCallback??

retryTemplate.execute(    new RetryCallback() {        @Override        public Object doWithRetry(RetryContext context) throws Exception {            context.setAttribute("message", message);            return rabbitTemplate.convertAndSend(exchange, routingKey, message);        }    }, new RecoveryCallback() {        @Override        public Object recover(RetryContext context) throws Exception {            Object message = context.getAttribute("message");            Throwable t = context.getLastThrowable();            // Do something with message            return null;        }    });}

在這種情況下,您不會將 注入 .??RetryTemplate????RabbitTemplate??

發布是異步的 — 如何檢測成功和失敗

發布消息是一種異步機制,默認情況下,RabbitMQ 會丟棄無法路由的消息。 若要成功發布,可以接收異步確認,如相關發布者確認和返回中所述。 請考慮兩種故障情況:

發布到交易所,但沒有匹配的目標隊列。發布到不存在的交易所。

第一種情況包含在發布商退貨中,如相關發布商確認和退貨中所述。

對于第二種情況,將丟棄消息,并且不會生成任何返回。 基礎通道已關閉,但出現異常。 默認情況下,會記錄此異常,但您可以向 注冊 以獲取此類事件的通知。 以下示例演示如何添加:??ChannelListener????CachingConnectionFactory????ConnectionListener??

this.connectionFactory.addConnectionListener(new ConnectionListener() {    @Override    public void onCreate(Connection connection) {    }    @Override    public void onShutDown(ShutdownSignalException signal) {        ...    }});

您可以檢查信號的屬性以確定發生的問題。??reason??

要檢測發送線程上的異常,可以在 上檢測到異常。 但是,事務會顯著降低性能,因此在僅為這一個用例啟用事務之前,請仔細考慮這一點。??setChannelTransacted(true)????RabbitTemplate????txCommit()??

相關發布者確認并返回

支持發布者的實現確認并返回。??RabbitTemplate????AmqpTemplate??

對于返回的消息,模板的屬性必須設置為或必須為特定消息計算。 此功能需要將其屬性設置為 (請參閱發布者確認并返回)。 返回由客戶端通過調用注冊 發送到客戶端。 回調必須實現以下方法:??mandatory????true????mandatory-expression????true????CachingConnectionFactory????publisherReturns????true????RabbitTemplate.ReturnsCallback????setReturnsCallback(ReturnsCallback callback)??

void returnedMessage(ReturnedMessage returned);

具有以下屬性:??ReturnedMessage??

??message??- 返回的消息本身??replyCode??- 指示退貨原因的代碼??replyText??- 返回的文本原因 - 例如NO_ROUTE??exchange??- 消息發送到的交易所??routingKey??- 使用的路由密鑰

每個 僅支持一個 。 另請參閱回復超時。??ReturnsCallback????RabbitTemplate??

對于發布者確認(也稱為發布者確認),模板需要 將其屬性設置為 。 確認通過它通過調用注冊 發送到客戶端。 回調必須實現此方法:??CachingConnectionFactory????publisherConfirm????ConfirmType.CORRELATED????RabbitTemplate.ConfirmCallback????setConfirmCallback(ConfirmCallback callback)??

void confirm(CorrelationData correlationData, boolean ack, String cause);

是客戶端在發送原始消息時提供的對象。 對于 為 true,對于 為 false。 例如,如果生成 時可用,則原因可能包含 的原因。 例如,將消息發送到不存在的交易所時。 在這種情況下,經紀人關閉通道。 關閉的原因包含在 . 在 1.4 版中添加了。??CorrelationData????ack????ack????nack????nack????nack????nack????cause????cause??

只有一個受 .??ConfirmCallback????RabbitTemplate??

當兔子模板發送操作完成時,通道將關閉。 這排除了在連接工廠緩存已滿時接收確認或返回(當緩存中有空間時,通道未物理關閉,返回和確認正常進行)。 當緩存已滿時,框架將關閉延遲最多五秒鐘,以便有時間接收確認和返回。 使用確認時,收到最后一次確認時通道將關閉。 僅使用回車時,通道將保持打開狀態整整五秒鐘。 通常建議將連接工廠的值設置為足夠大的值,以便將發布消息的通道返回到緩存而不是關閉。 您可以使用 RabbitMQ 管理插件監控通道使用情況。 如果您看到通道正在快速打開和關閉,則應考慮增加緩存大小以減少服務器上的開銷。??channelCacheSize??

在版本 2.1 之前,為發布者確認啟用的通道在收到確認之前返回到緩存。 其他一些進程可以簽出通道并執行一些導致通道關閉的操作,例如將消息發布到不存在的交換。 這可能會導致確認丟失。 版本 2.1 及更高版本在確認未完成時不再將通道返回到緩存。 在每次操作后對通道執行邏輯。 通常,這意味著一次只有一個確認未完成。??RabbitTemplate????close()??

從版本 2.2 開始,將在連接工廠的某個線程上調用回調。 這是為了避免在回調中執行 Rabbit 操作時出現潛在的死鎖。 對于以前的版本,回調直接在連接 I/O 線程上調用;如果執行某些 RPC 操作(例如打開新通道),這將死鎖,因為 I/O 線程會阻塞等待結果,但結果需要由 I/O 線程本身處理。 對于這些版本,有必要將工作(例如發送消息)移交給回調中的另一個線程。 這不再是必需的,因為框架現在將回調調用傳遞給執行器。??executor????amqp-client??

只要返回回調在 60 秒或更短的時間內執行,仍然保持在 ack 之前接收返回消息的保證。 確認計劃在返回回調退出后或 60 秒后(以先到者為準)傳遞。

該對象具有可用于獲取結果的 ,而不是在模板上使用 。 以下示例演示如何配置實例:??CorrelationData????CompletableFuture????ConfirmCallback????CorrelationData??

CorrelationData cd1 = new CorrelationData();this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

由于它是 ,因此您可以在準備就緒時將結果用于異步回調。 該對象是具有 2 個屬性的簡單 bean:和(例如實例)。 不會為代理生成的實例填充原因。 它是為框架生成的實例填充的(例如,在實例未完成時關閉連接)。??CompletableFuture????get()????whenComplete()????Confirm????ack????reason????nack????nack????nack????ack??

此外,當同時啟用確認和返回時,將填充返回的消息,只要 具有唯一的 ;默認情況下,從版本 2.3 開始,情況始終如此。 保證在將來使用 .??CorrelationData????CorrelationData????id????ack??

另請參閱作用域內操作,了解等待發布者確認的更簡單機制。

作用域內操作

通常,使用模板時,會將 從緩存中簽出(或創建),用于操作,然后返回到緩存中以供重用。 在多線程環境中,無法保證下一個操作使用相同的通道。 但是,有時您可能希望更好地控制通道的使用,并確保在同一通道上執行所有操作。??Channel??

從版本 2.0 開始,提供了一個名為的新方法,該方法帶有 . 在回調范圍內和提供的參數上執行的任何操作都使用相同的專用 ,該 將在末尾關閉(不返回到緩存)。 如果通道是 ,則在收到所有確認后,它將返回到緩存中(請參閱相關發布服務器確認和返回)。??invoke????OperationsCallback????RabbitOperations????Channel????PublisherCallbackChannel??

@FunctionalInterfacepublic interface OperationsCallback {    T doInRabbit(RabbitOperations operations);}

為什么可能需要這樣做的一個示例是,如果您希望在底層 . Spring API 以前沒有公開此方法,因為如前所述,通道通常是緩存和共享的。 現在提供 和 ,委托給 范圍內使用的專用通道。 出于顯而易見的原因,這些方法不能在該范圍之外使用。??waitForConfirms()????Channel????RabbitTemplate????waitForConfirms(long timeout)????waitForConfirmsOrDie(long timeout)????OperationsCallback??

請注意,其他地方提供了允許您將確認與請求相關聯的更高級別的抽象(請參閱相關發布者確認和返回)。 如果只想等到代理確認交割,則可以使用以下示例中顯示的技術:

Collection messages = getMessagesToSend();Boolean result = this.template.invoke(t -> {    messages.forEach(m -> t.convertAndSend(ROUTE, m));    t.waitForConfirmsOrDie(10_000);    return true;});

如果您希望在 范圍內的同一通道上調用操作,則必須使用用于操作的相同通道構造 admin。??RabbitAdmin????OperationsCallback????RabbitTemplate????invoke??

如果模板操作已在現有事務范圍內執行,則前面的討論沒有實際意義,例如,在事務處理偵聽器容器線程上運行并在事務處理模板上執行操作時。 在這種情況下,操作在該通道上執行,并在線程返回到容器時提交。 在這種情況下沒有必要使用。??invoke??

以這種方式使用 consure 時,實際上并不需要為將確認與請求相關聯而設置的大部分基礎結構(除非還啟用了返回)。 從版本 2.2 開始,連接工廠支持名為 的新屬性。 如果設置為 ,則可以避免基礎結構,并且可以更有效地進行確認處理。??publisherConfirmType????ConfirmType.SIMPLE??

此外,在發送的消息中設置屬性。 如果要檢查(或記錄或以其他方式使用)特定確認,可以使用重載方法執行此操作,如以下示例所示:??RabbitTemplate????publisherSequenceNumber????MessageProperties????invoke??

public  T invoke(OperationsCallback action, com.rabbitmq.client.ConfirmCallback acks,        com.rabbitmq.client.ConfirmCallback nacks);

這些對象(for 和實例)是 Rabbit 客戶端回調,而不是模板回調。??ConfirmCallback????ack????nack??

以下示例日志和實例:??ack????nack??

Collection messages = getMessagesToSend();Boolean result = this.template.invoke(t -> {    messages.forEach(m -> t.convertAndSend(ROUTE, m));    t.waitForConfirmsOrDie(10_000);    return true;}, (tag, multiple) -> {        log.info("Ack: " + tag + ":" + multiple);}, (tag, multiple) -> {        log.info("Nack: " + tag + ":" + multiple);}));

作用域內操作綁定到線程。 有關多線程環境中嚴格排序的討論,請參閱多線程環境中的嚴格消息排序。

多線程環境中的嚴格消息排序

作用域內操作中的討論僅適用于在同一線程上執行操作的情況。

請考慮以下情況:

??thread-1??將消息發送到隊列并將工作交給thread-2??thread-2??將消息發送到同一隊列

由于 RabbitMQ 的異步性質和緩存通道的使用;不確定是否將使用相同的通道,因此無法保證消息到達隊列的順序。 (在大多數情況下,它們會按順序到達,但無序交付的概率不為零)。 要解決此用例,您可以使用具有大小的有界通道緩存(與 一起)來確保消息始終發布在同一通道上,并保證順序。 為此,如果連接工廠有其他用途(如使用者),則應為模板使用專用連接工廠,或將模板配置為使用嵌入在主連接工廠中的發布者連接工廠(請參閱使用單獨的連接)。??1????channelCheckoutTimeout??

這最好用一個簡單的 Spring 引導應用程序來說明:

@SpringBootApplicationpublic class Application {  private static final Logger log = LoggerFactory.getLogger(Application.class);  public static void main(String[] args) {    SpringApplication.run(Application.class, args);  }  @Bean  TaskExecutor exec() {    ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();    exec.setCorePoolSize(10);    return exec;  }  @Bean  CachingConnectionFactory ccf() {    CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");    CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();    publisherCF.setChannelCacheSize(1);    publisherCF.setChannelCheckoutTimeout(1000L);    return ccf;  }  @RabbitListener(queues = "queue")  void listen(String in) {    log.info(in);  }  @Bean  Queue queue() {    return new Queue("queue");  }  @Bean  public ApplicationRunner runner(Service service, TaskExecutor exec) {    return args -> {      exec.execute(() -> service.mainService("test"));    };  }}@Componentclass Service {  private static final Logger LOG = LoggerFactory.getLogger(Service.class);  private final RabbitTemplate template;  private final TaskExecutor exec;  Service(RabbitTemplate template, TaskExecutor exec) {    template.setUsePublisherConnection(true);    this.template = template;    this.exec = exec;  }  void mainService(String toSend) {    LOG.info("Publishing from main service");    this.template.convertAndSend("queue", toSend);    this.exec.execute(() -> secondaryService(toSend.toUpperCase()));  }  void secondaryService(String toSend) {    LOG.info("Publishing from secondary service");    this.template.convertAndSend("queue", toSend);  }}

即使發布是在兩個不同的線程上執行的,它們也將使用相同的通道,因為緩存的上限為單個通道。

從版本 2.3.7 開始,支持 使用 and 方法將線程的通道傳輸到另一個線程。 第一個方法返回一個上下文,該上下文傳遞給調用第二個方法的第二個線程。 線程可以綁定非事務通道或事務通道(或兩者之一);除非使用兩個連接工廠,否則不能單獨傳輸它們。 示例如下:??ThreadChannelConnectionFactory????prepareContextSwitch????switchContext??

@SpringBootApplicationpublic class Application {  private static final Logger log = LoggerFactory.getLogger(Application.class);  public static void main(String[] args) {    SpringApplication.run(Application.class, args);  }  @Bean  TaskExecutor exec() {    ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();    exec.setCorePoolSize(10);    return exec;  }  @Bean  ThreadChannelConnectionFactory tccf() {    ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();    rabbitConnectionFactory.setHost("localhost");    return new ThreadChannelConnectionFactory(rabbitConnectionFactory);  }  @RabbitListener(queues = "queue")  void listen(String in) {    log.info(in);  }  @Bean  Queue queue() {    return new Queue("queue");  }  @Bean  public ApplicationRunner runner(Service service, TaskExecutor exec) {    return args -> {      exec.execute(() -> service.mainService("test"));    };  }}@Componentclass Service {  private static final Logger LOG = LoggerFactory.getLogger(Service.class);  private final RabbitTemplate template;  private final TaskExecutor exec;  private final ThreadChannelConnectionFactory connFactory;  Service(RabbitTemplate template, TaskExecutor exec,      ThreadChannelConnectionFactory tccf) {    this.template = template;    this.exec = exec;    this.connFactory = tccf;  }  void mainService(String toSend) {    LOG.info("Publishing from main service");    this.template.convertAndSend("queue", toSend);    Object context = this.connFactory.prepareSwitchContext();    this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));  }  void secondaryService(String toSend, Object threadContext) {    LOG.info("Publishing from secondary service");    this.connFactory.switchContext(threadContext);    this.template.convertAndSend("queue", toSend);    this.connFactory.closeThreadChannel();  }}

調用 后,如果當前線程執行更多操作,它們將在新通道上執行。 當不再需要線程綁定通道時,關閉它非常重要。??prepareSwitchContext??

消息傳遞集成

從 1.4 版開始,(建立在 之上)提供了與 Spring 框架消息抽象的集成,即 . 這允許您使用抽象發送和接收消息。 這種抽象被其他Spring項目使用,例如Spring Integration和Spring的STOMP支持。 涉及兩個消息轉換器:一個用于在 Spring 消息傳遞和 Spring AMQP 的抽象之間進行轉換,另一個用于在 Spring AMQP 的抽象和底層 RabbitMQ 客戶端庫所需的格式之間進行轉換。 默認情況下,消息負載由提供實例的消息轉換器轉換。 或者,您可以使用其他一些有效負載轉換器注入自定義,如以下示例所示:??RabbitMessagingTemplate????RabbitTemplate????org.springframework.messaging.Message????spring-messaging????Message????Message????Message????Message????RabbitTemplate????MessagingMessageConverter??

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();amqpMessageConverter.setPayloadConverter(myPayloadConverter);rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
已驗證的用戶標識

從版本 1.6 開始,該模板現在支持 ( 使用 Java 配置時)。 如果發送了消息,那么在計算此表達式后將設置用戶 id 屬性(如果尚未設置)。 評估的根對象是要發送的消息。??user-id-expression????userIdExpression??

以下示例演示如何使用該屬性:??user-id-expression??

第一個示例是文字表達式。 第二個從應用程序上下文中的連接工廠 Bean 獲取屬性。??username??

使用單獨的連接

從版本 2.0.2 開始,可以將屬性設置為盡可能使用與偵聽器容器使用的連接不同的連接。 這是為了避免當生產者因任何原因被阻止時,消費者被阻止。 為此,連接工廠維護第二個內部連接工廠;默認情況下,它與主工廠的類型相同,但如果希望使用不同的工廠類型進行發布,則可以顯式設置。 如果 rabbit 模板在偵聽器容器啟動的事務中運行,則無論此設置如何,都將使用該容器的通道。??usePublisherConnection????true??

通常,不應將此設置為 的模板使用 。 使用采用連接工廠的構造函數。 如果使用采用模板的其他構造函數,請確保模板的屬性為 。 這是因為,管理員通常用于聲明偵聽器容器的隊列。 使用屬性設置為 的模板意味著獨占隊列(如 )將在與偵聽器容器使用的連接不同的連接上聲明。 在這種情況下,容器無法使用隊列。??RabbitAdmin????true????RabbitAdmin????false????true????AnonymousQueue??

標簽: 應用程序 詳細信息 緩存大小

上一篇:java之增強for和迭代器精選
下一篇:關系型數據庫設計三大范式