
Spring 集成提供了通道適配器,用于使用高級消息隊列協議 (AMQP) 接收和發送消息。
您需要將此依賴項包含在項目中:
(資料圖片僅供參考)
org.springframework.integration spring-integration-amqp 6.0.0
以下適配器可用:
入站通道適配器入站網關出站通道適配器出站網關異步出站網關RabbitMQ 流隊列入站通道適配器RabbitMQ 流隊列出站通道適配器Spring 集成還提供了點對點消息通道和由 AMQP 交換和隊列支持的發布-訂閱消息通道。
為了提供AMQP支持,Spring Integration依賴于(Spring AMQP),它將Spring的核心概念應用于基于AMQP的消息傳遞解決方案的開發。 Spring AMQP提供了與Spring JMS類似的語義。
雖然提供的AMQP通道適配器僅用于單向消息傳遞(發送或接收),但Spring Integration還提供了用于請求-回復操作的入站和出站AMQP網關。
提示: 您應該熟悉Spring AMQP項目的參考文檔。 它提供了有關Spring與AMQP集成的更深入的信息,特別是RabbitMQ。
以下清單顯示了 AMQP 入站通道適配器的可能配置選項:
@Beanpublic IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) { return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName")) .handle(m -> System.out.println(m.getPayload())) .get();}
容器 請注意,在使用XML配置外部容器時,不能使用Spring AMQP命名空間來定義容器。 這是因為命名空間至少需要一個元素。 在此環境中,偵聽器位于適配器內部。 因此,您必須使用常規 Spring 定義來定義容器,如以下示例所示:? |
盡管 Spring Integration JMS 和 AMQP 支持相似,但存在重要差異。 JMS 入站通道適配器正在使用底層,并且需要配置的輪詢器。 AMQP 入站通道適配器使用 和 是消息驅動的。 在這方面,它更類似于 JMS 消息驅動的通道適配器。? |
從版本 5.5 開始,可以使用在內部調用重試操作時使用的策略進行配置。 有關更多信息,請參閱 JavaDocs。??AmqpInboundChannelAdapter?
???org.springframework.amqp.rabbit.retry.MessageRecoverer?
???RecoveryCallback?
???setMessageRecoverer()?
?
有關批處理消息的更多信息,請參閱Spring AMQP 文檔。
要使用 Spring 集成生成批處理消息,只需使用 .??BatchingRabbitTemplate?
?
接收批處理消息時,默認情況下,偵聽器容器提取每個片段消息,適配器將為每個片段生成 。 從版本 5.2 開始,如果容器的屬性設置為 ,則由適配器執行去批處理,并生成一個有效負載為片段有效負載列表(如果適用,轉換后)。??Message>?
???deBatchingEnabled?
???false?
???Message
?>?
默認值為 ,但可以在適配器上覆蓋。??BatchingStrategy?
???SimpleBatchingStrategy?
?
當重試操作需要恢復時,必須與批處理一起使用。? |
版本 5.0.1 引入了輪詢通道適配器,允許您按需獲取單個消息 — 例如,使用 或 輪詢器。 有關詳細信息,請參閱延遲確認可輪詢消息源。??MessageSourcePollingTemplate?
?
它當前不支持 XML 配置。
以下示例顯示如何配置:??AmqpMessageSource?
?
@Beanpublic IntegrationFlow flow() { return IntegrationFlow.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE), e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false)) .handle(p -> { ... }) .get();}
有關配置屬性,請參閱Javadoc。
請參閱批處理消息。
對于輪詢適配器,沒有偵聽器容器,批處理消息始終是反批處理的(如果支持這樣做)。??BatchingStrategy?
?
入站網關支持入站通道適配器上的所有屬性(除了“通道”被“請求通道”替換),以及一些其他屬性。 以下清單顯示了可用的屬性:
@Bean // return the upper cased payloadpublic IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) { return IntegrationFlow.from(Amqp.inboundGateway(connectionFactory, "foo")) .transform(String.class, String::toUpperCase) .get();}
此適配器的唯一 ID。 自選。 |
將轉換后的消息發送到的消息通道。 必填。 |
對接收 AMQP 消息時要使用的 的引用。 自選。 默認情況下,只有標準的 AMQP 屬性(例如 )被復制到 Spring Integration 和從 Spring Integration 復制。 默認情況下,不會將 AMQP 中的任何用戶定義的標頭復制到 AMQP 消息或從 AMQP 消息復制。 如果提供了“請求標頭名稱”或“回復標頭名稱”,則不允許使用。? |
要從 AMQP 請求映射到 的 AMQP 標頭名稱的逗號分隔列表。 僅當未提供“標頭映射器”引用時,才能提供此屬性。 此列表中的值也可以是與標頭名稱匹配的簡單模式(例如 或或)。? |
要映射到 AMQP 回復消息的 AMQP 消息屬性中的名稱的逗號分隔列表。 所有標準標頭(例如)都映射到 AMQP 消息屬性,而用戶定義的標頭映射到“標頭”屬性。 僅當未提供“標頭映射器”引用時,才能提供此屬性。 此列表中的值也可以是要與標頭名稱(例如,或或)匹配的簡單模式。? |
消息通道,其中應回復消息。 自選。 |
設置用于從回復通道接收消息的基礎。 如果未指定,此屬性默認為 (1 秒)。 僅當容器線程在發送回復之前移交給另一個線程時,才適用。? |
自定義的 Bean 引用(以便更好地控制要發送的回復消息)。 您可以提供 的替代實現。? |
當 沒有屬性時要使用的。 如果未指定此選項,則提供 no,請求消息中不存在任何屬性,并且 拋出 AN 是因為無法路由回復。 如果未指定此選項并提供外部選項,則不會引發異常。 您必須指定此選項或配置默認值,并在該模板上, 如果您預計請求消息中不存在任何屬性的情況。? |
請參閱入站通道適配器中有關配置屬性的說明。??listener-container?
?
從版本 5.5 開始,可以使用在內部調用重試操作時使用的策略進行配置。 有關更多信息,請參閱 JavaDocs。??AmqpInboundChannelAdapter?
???org.springframework.amqp.rabbit.retry.MessageRecoverer?
???RecoveryCallback?
???setMessageRecoverer()?
?
請參閱批處理消息。
默認情況下,入站終端節點使用應答方式,這意味著容器會在下游集成流完成(或使用 或 將消息傳遞給另一個線程)時自動確認消息。 將模式設置為配置使用者,以便根本不使用確認(代理在發送消息后立即自動確認消息)。 設置模式以允許用戶代碼在處理過程中的某個其他點確認消息。 為了支持此功能,在此模式下,終結點分別在 和 中提供 和 標頭。??AUTO?
???QueueChannel?
???ExecutorChannel?
???NONE?
???MANUAL?
???Channel?
???deliveryTag?
???amqp_channel?
???amqp_deliveryTag?
?
您可以對 執行任何有效的 Rabbit 命令,但通常只使用 and(或)。 為了不干擾容器的操作,不應保留對通道的引用,而應僅在當前消息的上下文中使用它。??Channel?
???basicAck?
???basicNack?
???basicReject?
?
由于 是對“活動”對象的引用,因此它不能序列化,如果持久保存消息,則會丟失。? |
以下示例演示如何使用確認:??MANUAL?
?
@ServiceActivator(inputChannel = "foo", outputChannel = "bar")public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception { // Do some processing if (allOK) { channel.basicAck(deliveryTag, false); // perhaps do some more processing } else { channel.basicNack(deliveryTag, false, true); } return someResultForDownStreamProcessing;}
以下出站終結點具有許多類似的配置選項。 從版本 5.2 開始,已添加。 通常,當啟用發布者確認時,代理將快速返回一個 ack(或 nack),該 ack(或 nack)將被發送到相應的通道。 如果在收到確認之前關閉了通道,Spring AMQP 框架將合成一個 nack。 “丟失”確認不應發生,但如果設置此屬性,則終結點將定期檢查它們,并在時間過去而未收到確認時合成 nack。??confirm-timeout?
?
以下示例顯示了 AMQP 出站通道適配器的可用屬性:
@Beanpublic IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate, MessageChannel amqpOutboundChannel) { return IntegrationFlow.from(amqpOutboundChannel) .handle(Amqp.outboundAdapter(amqpTemplate) .routingKey("queue1")) // default exchange - route to queue "queue1" .get();}
此適配器的唯一 ID。 自選。 |
消息通道,消息應發送到該通道,以便將其轉換并發布到 AMQP 交換。 必填。 |
對已配置的 AMQP 模板的 Bean 引用。 可選(默認為 )。? |
向其發送消息的 AMQP 交換的名稱。 如果未提供,消息將發送到默認的無名稱交換。 與“交換名稱表達”相互排斥。 自選。 |
一個 SpEL 表達式,計算該表達式以確定消息發送到的 AMQP 交換的名稱,并將消息作為根對象。 如果未提供,消息將發送到默認的無名稱交換。 與“交易所名稱”相互排斥。 自選。 |
注冊多個使用者時此使用者的順序,從而啟用負載平衡和故障轉移。 可選(默認為 )。? |
發送消息時使用的固定路由密鑰。 默認情況下,這是一個空的 . 與“路由密鑰表達式”互斥。 自選。? |
一個 SpEL 表達式,計算該表達式以確定發送消息時要使用的路由密鑰,消息作為根對象(例如,“payload.key”)。 默認情況下,這是一個空的 . 與“路由密鑰”互斥。 自選。? |
郵件的默認傳遞方式:或 。 如果設置了傳遞方式,則覆蓋。 如果存在 Spring 集成消息標頭,則設置該值。 如果未提供此屬性,并且標頭映射器未設置此屬性,則默認值取決于 使用的基礎 Spring AMQP。 如果根本不自定義,則默認值為 。 自選。? |
定義相關性數據的表達式。 如果提供,這會將基礎 AMQP 模板配置為接收發布者確認。 需要專用和屬性設置為 . 收到發布者確認并提供相關數據時,將根據確認類型將其寫入 或 。 確認的有效負載是相關數據,由此表達式定義。 郵件的“amqp_publishConfirm”標頭設置為 () 或 ()。 示例:和 。 版本 4.1 引入了消息標頭。 它包含用于發布者確認的“nack”。 從版本 4.2 開始,如果表達式解析為實例(例如 ),則在 / 通道上發出的消息基于該消息,并添加其他標頭。 以前,無論類型如何,都會使用相關數據作為其有效負載創建新消息。 另請參閱發布者確認和返回的替代機制?。 自選。? |
正 () 發布者確認發送到的通道。 有效負載是由 定義的關聯數據。 如果表達式為 或 ,則消息是從原始消息構建的,標頭設置為 。 另請參閱發布者確認和返回的替代機制?。 可選(默認值為 )。? |
將負 () 發布者確認發送到的通道。 有效負載是由 定義的關聯數據(如果未配置)。 如果表達式為 或 ,則消息是從原始消息構建的,標頭設置為 。 當存在 時,消息是帶有有效負載的消息。 另請參閱發布者確認和返回的替代機制?。 可選(默認值為 )。? |
設置后,如果在此時間內未收到發布者確認(以毫秒為單位),適配器將合成否定確認 (nack)。 每 50% 檢查一次掛起的確認,因此發送 nack 的實際時間將在此值的 1 倍到 1.5 倍之間。 另請參閱發布者確認和返回的替代機制。 默認 none (不會生成 nacks)。 |
設置為 true 時,調用線程將阻塞,等待發布者確認。 這需要配置確認以及 . 線程將阻塞長達 (或默認為 5 秒)。 如果發生超時,將拋出 。 如果啟用了返回并返回了消息,或者在等待確認時發生任何其他異常,則將拋出 a 并顯示相應的消息。? |
返回的消息發送到的通道。 提供后,基礎 AMQP 模板配置為向適配器返回無法傳遞的消息。 如果未進行配置,則從從 AMQP 接收的數據構造消息,并具有以下附加標頭:、、、。 當存在 時,消息是帶有有效負載的消息。 另請參閱發布者確認和返回的替代機制?。 自選。? |
對用于在發送返回或否定確認的消息時生成實例的實現的引用。? |
對發送 AMQP 消息時要使用的 的引用。 默認情況下,只有標準的 AMQP 屬性(例如 )被復制到 Spring 集成 中。 任何用戶定義的標頭都不會通過默認的“DefaultAmqpHeaderMapper”復制到消息中。 如果提供了“請求標頭名稱”,則不允許。 自選。? |
要從 映射到 AMQP 消息的 AMQP 標頭名稱的逗號分隔列表。 如果提供了“標頭映射器”引用,則不允許。 此列表中的值也可以是與標頭名稱匹配的簡單模式(例如 或或)。? |
設置為 時,端點將在應用程序上下文初始化期間嘗試連接到代理。 這允許對錯誤配置進行“快速故障”檢測,但如果代理關閉,也會導致初始化失敗。 當(默認值)時,當發送第一條消息時,將建立連接(如果它尚不存在,因為其他組件建立了它)。? |
設置為 時,類型的有效負載將作為離散消息在單個調用范圍內在同一通道上發送。 需要 . when 為 true,在發送消息后調用。 使用事務模板,發送將在新事務或已啟動事務(如果存在)中執行。? |
返回通道 使用 a 需要將屬性設置為 的 和將屬性設置為 的 。 將多個出站終結點與返回符一起使用時,每個終結點都需要一個單獨的終結點。? |
以下清單顯示了 AMQP 出站網關的可能屬性:
@Beanpublic IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) { return f -> f.handle(Amqp.outboundGateway(amqpTemplate) .routingKey("foo")) // default exchange - route to queue "foo" .get();}@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")public interface MyGateway { String sendToRabbit(String data);}
此適配器的唯一 ID。 自選。 |
消息發送到的消息通道,消息被轉換并發布到 AMQP 交換。 必填。 |
對已配置的 AMQP 模板的 Bean 引用。 可選(默認為 )。? |
應向其發送消息的 AMQP 交換的名稱。 如果未提供,消息將發送到默認的無名稱 cxchange。 與“交換名稱表達”相互排斥。 自選。 |
一個 SpEL 表達式,計算該表達式以確定應將消息發送到的 AMQP 交換的名稱,并將消息作為根對象。 如果未提供,消息將發送到默認的無名稱交換。 與“交易所名稱”相互排斥。 自選。 |
注冊多個使用者時此使用者的順序,從而啟用負載平衡和故障轉移。 可選(默認為 )。? |
從 AMQP 隊列接收并轉換回復后應發送到的消息通道。 自選。 |
網關在向 發送回復消息時等待的時間。 這僅適用于 can 阻止的情況,例如容量限制當前已滿的 。 默認為無窮大。? |
當 時,如果屬性中未收到回復消息,網關將引發異常。 默認值為 。? |
發送消息時使用的。 默認情況下,這是一個空的 . 與“路由密鑰表達式”互斥。 自選。? |
一個 SpEL 表達式,經過計算以確定發送消息時使用的表達式,將消息作為根對象(例如,“payload.key”)。 默認情況下,這是一個空的 . 與“路由密鑰”互斥。 自選。? |
郵件的默認傳遞方式:或 。 如果設置了傳遞方式,則覆蓋。 如果存在 Spring 集成消息標頭,則設置該值。 如果未提供此屬性,并且標頭映射器未設置此屬性,則默認值取決于 使用的基礎 Spring AMQP。 如果根本不自定義,則默認值為 。 自選。? |
從版本 4.2 開始。 定義相關數據的表達式。 如果提供,這會將基礎 AMQP 模板配置為接收發布者確認。 需要專用和屬性設置為 . 收到發布者確認并提供關聯數據時,將根據確認類型將其寫入 或 。 確認的有效負載是相關數據,由此表達式定義。 郵件的標頭“amqp_publishConfirm”設置為 () 或 ()。 為了確認,Spring 集成提供了一個額外的標頭。 示例:和 。 如果表達式解析為實例(例如 ),則消息 在 / 通道上發出的基于該消息,并添加了其他標頭。 以前,無論類型如何,都會使用相關數據作為其有效負載創建新消息。 另請參閱發布者確認和返回的代機制?。 自選。? |
向其發送正 () 發布者確認的通道。 有效負載是由 定義的關聯數據。 如果表達式為 或 ,則消息是從原始消息構建的,標頭設置為 。 另請參閱發布者確認和返回的替代機制?。 可選(默認值為 )。? |
將負 () 發布者確認發送到的通道。 有效負載是由定義的關聯數據(如果未配置)。 如果表達式為 或 ,則消息是從原始消息構建的,標頭設置為 。 當存在 時,消息是帶有有效負載的消息。 另請參閱發布者確認和返回的替代機制?。 可選(默認值為 )。? |
設置后,如果在此時間內未收到發布者確認(以毫秒為單位),網關將合成否定確認 (nack)。 每 50% 檢查一次掛起的確認,因此發送 nack 的實際時間將在此值的 1 倍到 1.5 倍之間。 默認 none (不會生成 nacks)。 |
返回的消息發送到的通道。 提供后,基礎 AMQP 模板配置為向適配器返回無法傳遞的消息。 如果未進行配置,則根據從 AMQP 接收的數據構造消息,并具有以下附加標頭:、、 和 。 當存在 時,消息是帶有有效負載的消息。 另請參閱發布者確認和返回的替代機制?。 自選。? |
對用于在發送返回或否定確認的消息時生成實例的實現的引用。? |
設置為 時,端點將在應用程序上下文初始化期間嘗試連接到代理。 這允許在代理關閉時通過記錄錯誤消息來“快速失敗”檢測錯誤配置。 當(默認值)時,當發送第一條消息時,將建立連接(如果它尚不存在,因為其他組件建立了它)。? |
返回通道 使用 a 需要將屬性設置為 的 和將屬性設置為 的 。 將多個出站終結點與返回符一起使用時,每個終結點都需要一個單獨的終結點。? |
基礎的默認值為 5 秒。 如果需要更長的超時,則必須在 上配置它。? |
請注意,出站適配器和出站網關配置之間的唯一區別是屬性的設置。??expectReply?
?
上一節中討論的網關是同步的,因為發送線程掛起,直到 收到回復(或發生超時)。 Spring Integration 版本 4.3 添加了一個異步網關,該網關使用 from Spring AMQP。 發送消息時,線程會在發送操作完成后立即返回,收到消息后,將在模板的偵聽器容器線程上發送回復。 在輪詢器線程上調用網關時,這可能很有用。 線程已釋放,可用于框架中的其他任務。??AsyncRabbitTemplate?
?
以下清單顯示了 AMQP 異步出站網關的可能配置選項:
@Configurationpublic class AmqpAsyncApplication { @Bean public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) { return f -> f .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate) .routingKey("queue1")); // default exchange - route to queue "queue1" } @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input") public interface MyGateway { String sendToRabbit(String data); }}
此適配器的唯一 ID。 自選。 |
消息通道,消息應發送到該通道,以便將其轉換并發布到 AMQP 交換。 必填。 |
對已配置的 Bean 引用。 可選(默認為 )。? |
應向其發送消息的 AMQP 交換的名稱。 如果未提供,消息將發送到默認的無名稱交換。 與“交換名稱表達”相互排斥。 自選。 |
一個 SpEL 表達式,計算該表達式以確定消息發送到的 AMQP 交換的名稱,并將消息作為根對象。 如果未提供,消息將發送到默認的無名稱交換。 與“交易所名稱”相互排斥。 自選。 |
注冊多個使用者時此使用者的順序,從而啟用負載平衡和故障轉移。 可選(默認為 )。? |
從 AMQP 隊列接收并轉換回復后應發送到的消息通道。 自選。 |
網關在向 發送回復消息時等待的時間。 這僅適用于 can 阻止的情況,例如容量限制當前已滿的 。 默認值為無窮大。? |
如果在屬性中未收到回復消息,并且此設置為 ,網關將向入站消息的標頭發送錯誤消息。 如果在屬性中未收到回復消息,并且此設置為 ,則網關會將錯誤消息發送到默認值(如果可用)。 默認為 .? |
發送消息時要使用的路由密鑰。 默認情況下,這是一個空的 . 與“路由密鑰表達式”互斥。 自選。? |
一個 SpEL 表達式,經過計算以確定發送消息時要使用的路由密鑰, 將消息作為根對象(例如,“有效負載.key”)。 默認情況下,這是一個空的 . 與“路由密鑰”互斥。 自選。? |
郵件的默認傳遞方式:或 。 如果設置了傳遞方式,則覆蓋。 如果存在 Spring 集成消息標頭 (),則設置該值。 如果未提供此屬性,并且標頭映射器未設置此屬性,則默認值取決于 使用的基礎 Spring AMQP。 如果未自定義,則缺省值為 。 自選。? |
定義相關性數據的表達式。 如果提供,這會將基礎 AMQP 模板配置為接收發布者確認。 需要專用 和 ,其屬性設置為 。 收到發布者確認并提供相關數據時,確認將寫入 或 ,具體取決于確認類型。 確認的有效負載是此表達式定義的相關數據,消息的“amqp_publishConfirm”標頭設置為 () 或 ()。 例如,提供了一個額外的標頭 ()。 例子:。 如果表達式解析為實例(例如“#this”),則在 / 通道上發出的消息將基于該消息,并添加其他標頭。 另請參閱發布者確認和返回的替代機制?。 自選。? |
向其發送正 () 發布者確認的通道。 有效負載是由 定義的關聯數據。 要求基礎數據庫將其屬性設置為 。 另請參閱發布者確認和返回的替代機制?。 可選(默認值為 )。? |
從版本 4.2 開始。 將負 () 發布者確認發送到的通道。 有效負載是由 定義的關聯數據。 要求基礎數據庫將其屬性設置為 。 另請參閱發布者確認和返回的替代機制?。 可選(默認值為 )。? |
設置后,如果在此時間內未收到發布者確認(以毫秒為單位),網關將合成否定確認 (nack)。 每 50% 檢查一次掛起的確認,因此發送 nack 的實際時間將在此值的 1 倍到 1.5 倍之間。 另請參閱發布者確認和返回的替代機制。 默認 none (不會生成 nacks)。 |
返回的消息發送到的通道。 提供后,基礎 AMQP 模板配置為將無法傳遞的消息返回到網關。 該消息是根據從 AMQP 接收的數據構造的,具有以下附加標頭:、、 和 。 要求基礎數據庫將其屬性設置為 。 另請參閱發布者確認和返回的替代機制?。 自選。? |
設置為 時,端點在應用程序上下文初始化期間嘗試連接到代理。 這樣做允許“快速失敗”檢測錯誤配置,方法是在代理關閉時記錄錯誤消息。 當(默認值)建立連接時(如果由于建立了其他組件而不存在 it) 發送第一條消息時。? |
另請參閱異步服務激活器以獲取詳細信息。
兔子模板 當您使用確認和退貨時,我們建議將有線成專用的。 否則,可能會遇到意想不到的副作用。? |
將連接工廠配置為發布者確認并返回時,上述部分將討論消息通道的配置,以便異步接收確認和返回。 從版本 5.4 開始,還有一個通常更易于使用的附加機制。
在這種情況下,請勿配置 或 確認和返回通道。 相反,在標頭中添加一個實例;然后,您可以通過檢查已發送消息的實例中的未來狀態來等待稍后的結果。 在將來完成之前,將始終填充該字段(如果返回消息)。??confirm-correlation-expression?
???CorrelationData?
???AmqpHeaders.PUBLISH_CONFIRM_CORRELATION?
???CorrelationData?
???returnedMessage?
?
CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returnssomeFlow.getInputChannel().send(MessageBuilder.withPayload("test") .setHeader("rk", "someKeyThatWontRoute") .setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr) .build());...try { Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS); Message returned = corr.getReturnedMessage(); if (returned !- null) { // message could not be routed }}catch { ... }
為了提高性能,您可能希望發送多條消息并稍后等待確認,而不是一次發送一條。 返回的消息是轉換后的原始消息;您可以使用所需的任何其他數據對 A 進行子類。??CorrelationData?
?
到達通道適配器或網關的入站消息將使用消息轉換器轉換為有效負載。 默認情況下,使用 a,用于處理 java 序列化和文本。 默認情況下,標頭使用 映射。 如果發生轉換錯誤,并且未定義錯誤通道,則會將異常引發到容器,并由偵聽器容器的錯誤處理程序處理。 默認錯誤處理程序將轉換錯誤視為致命錯誤,消息將被拒絕(如果隊列已如此配置,則路由到死信交換)。 如果定義了錯誤通道,則有效負載為 具有屬性(無法轉換的 Spring AMQP 消息)和 . 如果容器是(默認值),并且錯誤流使用錯誤而不引發異常,則將確認原始消息。 如果錯誤流引發異常,則異常類型與容器的錯誤處理程序一起確定消息是否重新排隊。 如果容器配置了 ,則有效負載是具有附加屬性和 的 。 這使錯誤流能夠調用 或(或)消息,以控制其處置。??spring-messaging?
???Message>?
???SimpleMessageConverter?
???DefaultHeaderMapper.inboundMapper()?
???ErrorMessage?
???ListenerExecutionFailedException?
???failedMessage?
???cause?
???AcknowledgeMode?
???AUTO?
???AcknowledgeMode.MANUAL?
???ManualAckListenerExecutionFailedException?
???channel?
???deliveryTag?
???basicAck?
???basicNack?
???basicReject?
?
Spring AMQP 1.4 引入了 ,其中實際轉換器的選擇基于 在傳入內容類型消息屬性上。 這可由入站終端節點使用。??ContentTypeDelegatingMessageConverter?
?
從 Spring 集成版本 4.3 開始,您也可以在出站端點上使用 ,標頭指定使用哪個轉換器。??ContentTypeDelegatingMessageConverter?
???contentType?
?
以下示例配置了一個 ,默認轉換器為 (處理 Java 序列化和純文本),以及一個 JSON 轉換器:??ContentTypeDelegatingMessageConverter?
???SimpleMessageConverter?
?
將標頭設置為 to 的消息發送到會導致選擇 JSON 轉換器。??ctRequestChannel?
???contentType?
???application/json?
?
這適用于出站通道適配器和網關。
從版本 5.0 開始,添加到出站郵件的標頭永遠不會被映射標頭覆蓋(默認情況下)。 以前,只有當消息轉換器是 (在這種情況下,首先映射標頭以便可以選擇正確的轉換器)時才會出現這種情況。 對于其他轉換器,例如 ,映射標頭將覆蓋轉換器添加的任何標頭。 當出站郵件具有一些剩余的標頭(可能來自入站通道適配器)并且正確的出站被錯誤地覆蓋時,這會導致問題。 解決方法是在將消息發送到出站終結點之前使用標頭篩選器刪除標頭。? 但是,在某些情況下,需要以前的行為 - 例如,當包含 JSON 的有效負載不知道內容并將消息屬性設置為,但應用程序希望通過設置發送到出站終結點的消息標頭來覆蓋該行為。 正是這樣做的(默認情況下)。? 現在,在出站通道適配器和網關(以及 AMQP 支持的通道)上調用了一個屬性。 設置此選項可還原覆蓋轉換器添加的屬性的行為。? 從版本 5.1.9 開始,當我們生成回復并希望覆蓋轉換器填充的標頭時,提供了類似的情況。 有關更多信息,請參閱其 JavaDocs。? |
Spring AMQP 版本 1.6 引入了一種機制,允許為出站消息指定默認用戶 ID。 始終可以設置標頭,該標頭現在優先于默認值。 這可能對郵件收件人有用。 對于入站郵件,如果郵件發布者設置了該屬性,則該屬性將在標頭中可用。 請注意,RabbitMQ會驗證用戶 ID 是連接的實際用戶 ID,還是連接允許模擬。??AmqpHeaders.USER_ID?
???AmqpHeaders.RECEIVED_USER_ID?
?
要為出站消息配置缺省用戶標識,請在 上配置該標識,并將出站適配器或網關配置為使用該模板。 同樣,要在回復上設置用戶 ID 屬性,請將適當配置的模板注入入站網關。 有關更多信息,請參閱Spring AMQP 文檔。??RabbitTemplate?
?
Spring AMQP 支持RabbitMQ 延遲消息交換插件。 對于入站郵件,標頭映射到標頭。 設置標頭會導致在出站郵件中設置相應的標頭。 還可以在出站終結點上指定 and 屬性(使用 XML 配置時)。 這些屬性優先于標頭。??x-delay?
???AmqpHeaders.RECEIVED_DELAY?
???AMQPHeaders.DELAY?
???x-delay?
???delay?
???delayExpression?
???delay-expression?
???AmqpHeaders.DELAY?
?
有兩種消息通道實現可用。 一個是點對點,另一個是發布-訂閱。 這兩個通道都為基礎和(如本章前面的通道適配器和網關所示)提供了廣泛的配置屬性。 但是,我們在此處顯示的示例具有最小配置。 瀏覽 XML 架構以查看可用屬性。??AmqpTemplate?
???SimpleMessageListenerContainer?
?
點對點通道可能類似于以下示例:
在幕后,前面的示例導致聲明一個命名,并且此通道發送到該通道(從技術上講,通過使用與此名稱匹配的路由密鑰發送到無名稱的直接交換)。 此通道還在此 上注冊消費者。 如果希望通道是“可輪詢的”而不是消息驅動的,請為標志提供值 ,如以下示例所示:??Queue?
???si.p2pChannel?
???Queue?
???Queue?
???Queue?
???message-driven?
???false?
?
發布-訂閱通道可能如下所示:
在后臺,前面的示例會導致聲明名為的扇出交換,并且此通道發送到該扇出交換。 此通道還聲明一個以服務器命名的獨占、自動刪除、非持久,并將其綁定到扇出交換,同時注冊使用者以接收消息。 發布-訂閱-通道沒有“可輪詢”選項。 它必須是消息驅動的。??si.fanout.pubSubChannel?
???Queue?
???Queue?
?
從版本 4.1 開始,AMQP 支持的消息通道(與 一起)支持單獨配置 和 對于 . 請注意,以前是默認的。 現在,默認情況下,它適用于 .??channel-transacted?
???template-channel-transacted?
???transactional?
???AbstractMessageListenerContainer?
???RabbitTemplate?
???channel-transacted?
???true?
???false?
???AbstractMessageListenerContainer?
?
在版本 4.3 之前,AMQP 支持的通道僅支持具有有效負載和標頭的消息。 整個消息被轉換(序列化)并發送到 RabbitMQ。 現在,您可以將屬性(或使用 Java 配置時)設置為 。 當此標志為 時,將轉換消息有效負載并映射標頭,其方式類似于使用通道適配器時的方式。 這種安排允許 AMQP 支持的通道與不可序列化的有效負載一起使用(可能與其他消息轉換器一起使用,例如 )。 有關默認映射標頭的更多信息,請參閱AMQP 消息標頭。 您可以通過提供使用 和 屬性的自定義映射器來修改映射。 現在,您還可以指定 ,用于在沒有標頭時設置傳遞模式。 默認情況下,Spring AMQP 使用交付模式。??Serializable?
???extract-payload?
???setExtractPayload()?
???true?
???true?
???Jackson2JsonMessageConverter?
???outbound-header-mapper?
???inbound-header-mapper?
???default-delivery-mode?
???amqp_deliveryMode?
???MessageProperties?
???PERSISTENT?
?
與其他支持持久性的通道一樣,支持 AMQP 的通道旨在提供消息持久性以避免消息丟失。 它們不用于將工作分發給其他對等應用程序。 為此,請改用通道適配器。 |
從版本 5.0 開始,可輪詢通道現在會阻止指定的輪詢器線程(默認值為 1 秒)。 以前,與其他實現不同,如果沒有可用的消息,線程會立即返回到調度程序,而不管接收超時如何。 阻止比使用 a 檢索消息(沒有超時)要昂貴一些,因為必須創建一個使用者來接收每條消息。 若要恢復以前的行為,請將輪詢器的 0 設置為 0。? |
以下示例顯示如何使用 Java 配置配置通道:
@Beanpublic AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("foo"); factoryBean.setPubSub(false); return factoryBean;}@Beanpublic AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("bar"); factoryBean.setPubSub(false); return factoryBean;}@Beanpublic AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) { AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true); factoryBean.setConnectionFactory(connectionFactory); factoryBean.setQueueName("baz"); factoryBean.setPubSub(false); return factoryBean;}
以下示例顯示如何使用 Java DSL 配置通道:
@Beanpublic IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) { return IntegrationFlow.from(...) ... .channel(Amqp.pollableChannel(connectionFactory) .queueName("foo")) ... .get();}@Beanpublic IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) { return IntegrationFlow.from(...) ... .channel(Amqp.channel(connectionFactory) .queueName("bar")) ... .get();}@Beanpublic IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) { return IntegrationFlow.from(...) ... .channel(Amqp.publishSubscribeChannel(connectionFactory) .queueName("baz")) ... .get();}
Spring 集成 AMQP 適配器會自動映射所有 AMQP 屬性和標頭。 (這是對 4.3 的更改 - 以前,僅映射標準標頭)。 默認情況下,這些屬性通過使用DefaultAmqpHeaderMapper復制到 Spring Integration 中。MessageHeaders
您可以傳入自己的特定于 AMQP 的標頭映射程序的實現,因為適配器具有支持這樣做的屬性。
AMQP 消息屬性中的任何用戶定義的標頭都將復制到 AMQP 消息或從 AMQP 消息復制,除非 的 或屬性顯式否定。 默認情況下,對于出站映射器,不映射任何標頭。 請參閱本節后面出現的警告,了解原因。??requestHeaderNames?
???replyHeaderNames?
???DefaultAmqpHeaderMapper?
???x-*?
?
要覆蓋默認值并恢復到 4.3 之前的行為,請在屬性中使用 和。??STANDARD_REQUEST_HEADERS?
???STANDARD_REPLY_HEADERS?
?
映射用戶定義的標頭時,這些值還可以包含要匹配的簡單通配符模式(例如 或)。 匹配所有標頭。? |
從版本 4.1 開始,(超類)允許為 和 屬性(除了現有的 和 )配置令牌,以映射所有用戶定義的標頭。??AbstractHeaderMapper?
???DefaultAmqpHeaderMapper?
???NON_STANDARD_HEADERS?
???requestHeaderNames?
???replyHeaderNames?
???STANDARD_REQUEST_HEADERS?
???STANDARD_REPLY_HEADERS?
?
該類標識由 :??org.springframework.amqp.support.AmqpHeaders?
???DefaultAmqpHeaderMapper?
?
?amqp_appId?
???amqp_clusterId?
???amqp_contentEncoding?
???amqp_contentLength?
???content-type?
?(請參閱內容類型標頭)??amqp_correlationId?
???amqp_delay?
???amqp_deliveryMode?
???amqp_deliveryTag?
???amqp_expiration?
???amqp_messageCount?
???amqp_messageId?
???amqp_receivedDelay?
???amqp_receivedDeliveryMode?
???amqp_receivedExchange?
???amqp_receivedRoutingKey?
???amqp_redelivered?
???amqp_replyTo?
???amqp_timestamp?
???amqp_type?
???amqp_userId?
???amqp_publishConfirm?
???amqp_publishConfirmNackCause?
???amqp_returnReplyCode?
???amqp_returnReplyText?
???amqp_returnExchange?
???amqp_returnRoutingKey?
???amqp_channel?
???amqp_consumerTag?
???amqp_consumerQueue?
?如本節前面所述,使用 標頭映射模式 是復制所有標頭的常用方法。 但是,這可能會產生一些意想不到的副作用,因為某些 RabbitMQ 專有屬性/標頭也會被復制。 例如,使用聯合身份驗證?時,收到的消息可能具有一個名為 的屬性,該屬性包含發送消息的節點。 如果對入站網關上的請求和回復標頭映射使用通配符,則會復制此標頭,這可能會導致聯合身份驗證出現一些問題。 此回復消息可能會聯合回發送代理,發送代理可能會認為消息正在循環,因此以靜默方式丟棄它。 如果您希望使用通配符標頭映射的便利性,則可能需要篩選出下游流中的一些標頭。 例如,為了避免將標頭復制回回復,您可以在將回復發送到 AMQP 入站網關之前使用。 或者,可以顯式列出實際要映射的屬性,而不是使用通配符。 出于這些原因,對于入站消息,映射器(默認情況下)不映射任何標頭。 它也不會將 映射到標頭,以避免該標頭從入站消息傳播到出站消息。 相反,此標頭映射到 ,不會映射到輸出。? |
從版本 4.3 開始,標頭映射中的模式可以通過在模式前面加上 來否定。 否定模式獲得優先級,因此諸如 不映射(也不是 )之類的列表。 標準標頭加上 和 被映射。 否定技術可能很有用,例如,當 JSON 反序列化邏輯以不同的方式在接收方下游完成時,不映射傳入消息的 JSON 類型標頭。 為此,應為入站通道適配器/網關的標頭映射器配置模式。??!?
???STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1?
???thing1?
???thing2?
???thing3?
???bad?
???qux?
???!json_*?
?
如果您有一個用戶定義的標頭,該標頭以您希望映射的標頭開頭,則需要對其進行轉義,如下所示:。 現在已映射名為標題的標頭。? |
從版本 5.1 開始,如果出站消息中不存在相應的 or 標頭,則將分別回退到映射和 to。 入站屬性將像以前一樣映射到標頭。 當消息使用者使用有狀態重試時填充屬性很有用。? |
?contentType?
?與其他標頭不同,不以 ;這允許跨不同技術透明地傳遞 contentType 標頭。 例如,發送到 RabbitMQ 隊列的入站 HTTP 消息。??AmqpHeaders.CONTENT_TYPE?
???amqp_?
?
標頭映射到Spring AMQP的屬性,隨后映射到RabbitMQ的屬性。??contentType?
???MessageProperties.contentType?
???content_type?
?
在版本 5.1 之前,此標頭也映射為映射中的條目;這是不正確的,此外,該值可能是錯誤的,因為基礎 Spring AMQP 消息轉換器可能已更改內容類型。 這樣的更改將反映在 first-class 屬性中,但不反映在 RabbitMQ 標頭映射中。 入站映射忽略標頭映射值。 不再映射到標頭映射中的條目。??MessageProperties.headers?
???content_type?
???contentType?
?
本節介紹入站和出站消息的消息排序。
如果需要對入站消息進行嚴格排序,則必須將入站偵聽器容器的屬性配置為 。 這是因為,如果消息失敗并重新傳遞,它將在現有的預取消息之后到達。 從 Spring AMQP 2.0 版本開始,默認為提高性能。 嚴格的訂購要求是以性能下降為代價的。??prefetchCount?
???1?
???prefetchCount?
???250?
?
請考慮以下集成流程:
@Beanpublic IntegrationFlow flow(RabbitTemplate template) { return IntegrationFlow.from(Gateway.class) .split(s -> s.delimiters(",")) .transform(String::toUpperCase) .handle(Amqp.outboundAdapter(template).routingKey("rk")) .get();}
假設我們發送消息 ,并發送到網關。 雖然消息 、 很可能是按順序發送的,但不能保證。 這是因為模板為每個發送操作從緩存中“借用”一個通道,并且不能保證對每條消息使用相同的通道。 一種解決方案是在拆分器之前啟動事務,但是在 RabbitMQ 中事務成本高昂,并且會使性能降低數百倍。??A?
???B?
???C?
???A?
???B?
???C?
?
為了以更有效的方式解決這個問題,從版本 5.1 開始,Spring 集成提供了 這是一個 . 請參閱處理消息建議。 在拆分器之前應用時,它可確保在同一通道上執行所有下游操作,并且可以選擇等到收到所有已發送消息的發布者確認(如果連接工廠配置為確認)。 以下示例演示如何使用:??BoundRabbitChannelAdvice?
???HandleMessageAdvice?
???BoundRabbitChannelAdvice?
?
@Beanpublic IntegrationFlow flow(RabbitTemplate template) { return IntegrationFlow.from(Gateway.class) .split(s -> s.delimiters(",") .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10)))) .transform(String::toUpperCase) .handle(Amqp.outboundAdapter(template).routingKey("rk")) .get();}
請注意,在建議和出站適配器中使用了相同的(實現)。 該建議在模板的方法中運行下游流,以便所有操作都在同一通道上運行。 如果提供了可選的超時,則當流完成時,建議將調用該方法,如果在指定時間內未收到確認,則會引發異常。??RabbitTemplate?
???RabbitOperations?
???invoke?
???waitForConfirmsOrDie?
?
下游流(、 和其他流)中不得有線程切換。? |
要試驗 AMQP 適配器,請查看 Spring 集成示例 git 存儲庫中提供的示例,網址為https://github.com/SpringSource/spring-integration-samples
目前,一個示例通過使用出站通道適配器和入站通道適配器演示了 Spring 集成 AMQP 適配器的基本功能。 由于示例中的 AMQP 代理實現使用RabbitMQ。
為了運行該示例,您需要一個正在運行的 RabbitMQ 實例。 僅具有基本默認值的本地安裝就足夠了。 有關詳細的 RabbitMQ 安裝過程,請參閱https://www.rabbitmq.com/install.html |
啟動示例應用程序后,在命令提示符下輸入一些文本,包含該輸入文本的消息將調度到 AMQP 隊列。 作為回報,Spring Integration 檢索該消息并將其打印到控制臺。
下圖說明了此示例中使用的一組基本 Spring 集成組件。
AMQP 示例圖像的彈簧集成圖::images/spring-integration-amqp-sample-graph.png[]
版本 6.0 引入了對 RabbitMQ 流隊列的支持。
這些終結點的 DSL 工廠類是 。??Rabbit?
?
@BeanIntegrationFlow flow(Environment env) { @Bean IntegrationFlow simpleStream(Environment env) { return IntegrationFlow.from(RabbitStream.inboundAdapter(env) .configureContainer(container -> container.queueName("my.stream"))) // ... .get(); } @Bean IntegrationFlow superStream(Environment env) { return IntegrationFlow.from(RabbitStream.inboundAdapter(env) .configureContainer(container -> container.superStream("my.stream", "my.consumer"))) // ... .get(); }}
@BeanIntegrationFlow outbound(RabbitStreamTemplate template) { return f -> f // ... .handle(RabbitStream.outboundStreamAdapter(template));}