【新要聞】Spring Integration對Redis的支持

2022-12-13 14:18:50 來源:51CTO博客

Spring Integration 2.1引入了對Redis?的支持:“一個開源的高級鍵值存儲”。 這種支持以基于 Redis 以及發布-訂閱消息傳遞適配器的形式出現,Redis 通過其PUBLISH、SUBSCRIBE和UNSUBSCRIBE?命令支持這些適配器。??MessageStore??

您需要將此依賴項包含在項目中:

    org.springframework.integration    spring-integration-redis    6.0.0

您還需要包含 Redis 客戶端依賴項,例如 Lettuce。


(相關資料圖)

要下載、安裝和運行 Redis,請參閱Redis 文檔。

連接到 Redis

要開始與 Redis 交互,您首先需要連接到它。 Spring Integration 使用另一個 Spring 項目 SpringData Redis提供的支持,該項目提供了典型的 Spring 構造:和 。 這些抽象簡化了與多個 Redis 客戶端 Java API 的集成。 目前,Spring Data Redis 支持Jedis和Lettuce。??ConnectionFactory????Template??

用??RedisConnectionFactory??

要連接到 Redis,您可以使用接口的實現之一。 以下清單顯示了接口定義:??RedisConnectionFactory??

public interface RedisConnectionFactory extends PersistenceExceptionTranslator {    /**     * Provides a suitable connection for interacting with Redis.     * @return connection for interacting with Redis.     */    RedisConnection getConnection();}

以下示例顯示了如何在 Java 中創建:??LettuceConnectionFactory??

LettuceConnectionFactory cf = new LettuceConnectionFactory();cf.afterPropertiesSet();

下面的示例展示了如何在 Spring 的 XML 配置中創建:??LettuceConnectionFactory??

    

的實現提供了一組屬性,例如端口和主機,您可以根據需要設置這些屬性。 擁有 的實例后,可以創建 的實例并將其注入 .??RedisConnectionFactory????RedisConnectionFactory????RedisTemplate????RedisConnectionFactory??

用??RedisTemplate??

與 Spring 中的其他模板類(例如 和 )一樣,它是一個簡化 Redis 數據訪問代碼的輔助類。 有關及其變體的更多信息(例如),請參閱Spring Data Redis 文檔。??JdbcTemplate????JmsTemplate????RedisTemplate????RedisTemplate????StringRedisTemplate??

以下示例演示如何在 Java 中創建 的實例:??RedisTemplate??

RedisTemplate rt = new RedisTemplate();rt.setConnectionFactory(redisConnectionFactory);

下面的示例展示了如何在 Spring 的 XML 配置中創建 的實例:??RedisTemplate??

    

使用 Redis 發送消息

如簡介中所述,Redis 通過其 、 和命令提供對發布-訂閱消息傳遞的支持。 與JMS和AMQP一樣,Spring Integration提供了消息通道和適配器,用于通過Redis發送和接收消息。??PUBLISH????SUBSCRIBE????UNSUBSCRIBE??

瑞迪斯發布/訂閱頻道

與 JMS 類似,在某些情況下,生產者和使用者都打算成為同一應用程序的一部分,在同一進程中運行。 可以使用一對入站和出站通道適配器來實現此目的。 但是,與Spring Integration的JMS支持一樣,有一種更簡單的方法來解決這個用例。 您可以創建發布-訂閱通道,如以下示例所示:

A 的行為很像 Spring 集成主命名空間中的普通元素。 它可以由任何終結點的 和屬性引用。 不同之處在于,此通道由 Redis 主題名稱支持:屬性指定的值。 但是,與 JMS 不同的是,本主題不必提前創建,甚至不必由 Redis 自動創建。 在 Redis 中,主題是扮演地址角色的簡單值。 生成者和使用者可以使用與其主題名稱相同的值進行通信。 對此通道的簡單訂閱意味著可以在生產和消費終結點之間實現異步發布-訂閱消息傳遞。 但是,與通過在簡單的 Spring 集成元素中添加元素創建的異步消息通道不同,消息不會存儲在內存中隊列中。 相反,這些消息通過 Redis 傳遞,這使您可以依賴它對持久性和集群的支持以及與其他非 Java 平臺的互操作性。??publish-subscribe-channel????????input-channel????output-channel????String????topic-name????String????String??????????

Redis 入站通道適配器

Redis 入站通道適配器 () 以與其他入站適配器相同的方式將傳入的 Redis 消息調整為 Spring 消息。 它接收特定于平臺的消息(在本例中為 Redis),并使用策略將它們轉換為 Spring 消息。 以下示例演示如何配置 Redis 入站通道適配器:??RedisInboundChannelAdapter????MessageConverter??

    

前面的示例顯示了 Redis 入站通道適配器的簡單但完整的配置。 請注意,前面的配置依賴于熟悉的 Spring 范式,即自動發現某些 bean。 在這種情況下,將隱式注入適配器。 可以改用屬性顯式指定它。??redisConnectionFactory????connection-factory??

另請注意,上述配置為適配器注入了自定義 . 該方法類似于 JMS,其中實例用于在 Redis 消息和 Spring 集成消息有效負載之間進行轉換。 默認值為 .??MessageConverter????MessageConverter????SimpleMessageConverter??

入站適配器可以訂閱多個主題名稱,因此屬性中的值集以逗號分隔。??topics??

從 V3.0 開始,除了現有屬性之外,入站適配器現在還具有該屬性。 此屬性包含一組以逗號分隔的 Redis 主題模式。 有關 Redis 發布-訂閱的更多信息,請參閱Redis 發布/訂閱。??topics????topic-patterns??

入站適配器可以使用 對 Redis 消息正文進行反序列化。 的屬性可以設置為空字符串,這將生成屬性的值。 在這種情況下,Redis 消息的原始正文作為消息有效負載提供。??RedisSerializer????serializer????????null????RedisSerializer????byte[]??

從 V5.0 開始,您可以使用 的屬性向入站適配器提供實例。 此外,收到的 Spring 集成消息現在具有指示已發布消息來源的標頭:主題或模式。 您可以將此下游用于路由邏輯。??Executor????task-executor????????RedisHeaders.MESSAGE_SOURCE??

Redis 出站通道適配器

Redis 出站通道適配器將傳出的 Spring 集成消息調整為 Redis 消息,方式與其他出站適配器相同。 它接收 Spring 集成消息,并使用策略將它們轉換為特定于平臺的消息(在本例中為 Redis)。 以下示例演示如何配置 Redis 出站通道適配器:??MessageConverter??

    

該配置與 Redis 入站通道適配器并行。 適配器隱式注入 ,該 定義為其 Bean 名稱。 此示例還包括可選(和自定義)(bean)。??RedisConnectionFactory????redisConnectionFactory????MessageConverter????testConverter??

從 Spring Integration 3.0 開始,提供了該屬性的替代方法:您可以使用該屬性在運行時確定消息的 Redis 主題。 這些屬性是互斥的。??????topic????topic-expression??

Redis 隊列入站通道適配器

Spring Integration 3.0引入了一個隊列入站通道適配器,用于從Redis列表中“彈出”消息。 默認情況下,它使用“右彈出”,但您可以將其配置為使用“左彈出”。 適配器是消息驅動的。 它使用內部偵聽器線程,不使用輪詢器。

以下清單顯示了 的所有可用屬性:??queue-inbound-channel-adapter??

組件 Bean 名稱。 如果未提供該屬性,則會在應用程序上下文中創建并注冊 ,并將此屬性作為 Bean 名稱。 在這種情況下,端點本身使用 Bean 名稱加 . (如果 Bean 名稱為 ,則端點注冊為 。??channel????DirectChannel????id????id????.adapter????thing1????thing1.adapter??

要從此終端節點向其發送實例的實例。??MessageChannel????Message??

一個屬性,用于指定此終結點是否應在應用程序上下文啟動后自動啟動。 默認為 .??SmartLifecycle????true??

一個屬性,用于指定啟動此終結點的階段。 默認為 .??SmartLifecycle????0??

對 Bean 的引用。 默認為 .??RedisConnectionFactory????redisConnectionFactory??

執行基于隊列的“pop”操作以獲取 Redis 消息的 Redis 列表的名稱。

從終端節點的偵聽任務收到異常時向其發送實例的實例。 默認情況下,基礎使用應用程序上下文中的默認值。??MessageChannel????ErrorMessage????MessagePublishingErrorHandler????errorChannel??

豆引用。 它可以是一個空字符串,這意味著“沒有序列化程序”。 在這種情況下,來自入站 Redis 消息的原始消息將作為有效負載發送到 。 默認情況下,它是一個 .??RedisSerializer????byte[]????channel????Message????JdkSerializationRedisSerializer??

“pop”操作等待隊列中的 Redis 消息的超時(以毫秒為單位)。 默認值為 1 秒。

偵聽器任務在重新啟動偵聽器任務之前,在“pop”操作出現異常后應休眠的時間(以毫秒為單位)。

指定此終端節點是否期望 Redis 隊列中的數據包含整個實例。 如果此屬性設置為 ,則不能為空字符串,因為消息需要某種形式的反序列化(默認情況下為 JDK 序列化)。 其默認值為 。??Message????true????serializer????false??

對 Spring (或標準 JDK 1.5+) bean 的引用。 它用于基礎偵聽任務。 它默認為 .??TaskExecutor????Executor????SimpleAsyncTaskExecutor??

指定此端點應使用“右彈出”(當)還是“左彈出”(當)從 Redis 列表中讀取消息。 如果 ,則 Redis 列表在與默認 Redis 隊列出站通道適配器一起使用時充當隊列。 將其設置為與通過“右推”寫入列表的軟件一起使用,或實現類似堆棧的消息順序。 其默認值為 。 從版本 4.3 開始。??true????false????true????FIFO????false????true??

必須配置多個線程進行處理;否則,當 在出現錯誤后嘗試重新啟動偵聽器任務時,可能會出現死鎖。 可用于處理這些錯誤,以避免重新啟動,但最好不要將應用程序暴露在可能的死鎖情況下。 有關可能的實現,請參閱 Spring 框架參考手冊?。??task-executor????RedisQueueMessageDrivenEndpoint????errorChannel????TaskExecutor??

Redis 隊列出站通道適配器

Spring Integration 3.0引入了一個隊列出站通道適配器,用于從Spring Integration消息“推送”到Redis列表。 默認情況下,它使用“左推”,但您可以將其配置為使用“右推”。 以下清單顯示了 Redis 的所有可用屬性:??queue-outbound-channel-adapter??

組件 Bean 名稱。 如果未提供該屬性,則會在應用程序上下文中創建并注冊 ,并將此屬性作為 Bean 名稱。 在這種情況下,端點注冊的 Bean 名稱為 加 。 (如果 Bean 名稱為 ,則端點注冊為 。??channel????DirectChannel????id????id????.adapter????thing1????thing1.adapter??

此終結點從中接收實例的 。??MessageChannel????Message??

對 Bean 的引用。 默認為 .??RedisConnectionFactory????redisConnectionFactory??

執行基于隊列的“推送”操作以發送 Redis 消息的 Redis 列表的名稱。 此屬性與 互斥。??queue-expression??

用于確定 Redis 列表名稱的 SpEL。 它使用運行時的傳入作為變量。 此屬性與 互斥。??Expression????Message????#root????queue??

豆類引用。 它默認為 . 但是,對于有效負載,如果未提供引用,則使用 a。??RedisSerializer????JdkSerializationRedisSerializer????String????StringRedisSerializer????serializer??

指定此終端節點是應僅將有效負載發送還是將整個負載發送到 Redis 隊列。 默認為 .??Message????true??

指定此端點應使用“左推”(當)還是“右推”(當)將消息寫入 Redis 列表。 如果為 ,則 Redis 列表在與默認 Redis 隊列入站通道適配器一起使用時充當隊列。 將其設置為與以“左彈出”從列表中讀取的軟件一起使用,或實現類似堆棧的消息順序。 默認為 . 從版本 4.3 開始。??true????false????true????FIFO????false????true??

瑞迪斯應用程序事件

從 Spring Integration 3.0 開始,Redis 模塊提供了 的實現,而 又是一個 . 封裝了 Redis 操作的異常(端點是事件的“源”)。 例如,在捕獲操作中的異常后發出這些事件。 例外可以是任何泛型或 . 使用 處理這些事件對于確定后臺 Redis 任務的問題和采取管理操作非常有用。??IntegrationEvent????org.springframework.context.ApplicationEvent????RedisExceptionEvent????????BoundListOperations.rightPop????org.springframework.data.redis.RedisSystemException????org.springframework.data.redis.RedisConnectionFailureException??????

紅與紅消息存儲

企業集成模式(EIP) 一書中所述,郵件存儲允許您保留消息。 當考慮可靠性時,這在處理具有緩沖消息功能的組件(聚合器、重新排序器等)時非常有用。 在 Spring 集成中,該策略還為聲明檢查模式提供了基礎,EIP 中也有描述。??MessageStore??

Spring 集成的 Redis 模塊提供了 . 以下示例演示如何將其與聚合器一起使用:??RedisMessageStore??

    

前面的示例是一個 Bean 配置,它需要 a 作為構造函數參數。??RedisConnectionFactory??

缺省情況下,使用 Java 序列化來序列化消息。 但是,如果要使用其他序列化技術(如 JSON),可以通過設置 .??RedisMessageStore????valueSerializer????RedisMessageStore??

從版本 4.3.10 開始,框架分別為實例和實例提供了 Jackson 序列化程序和解序列化程序實現 — 和 。 必須使用的選項配置它們。 此外,還應設置為每個序列化的復雜對象添加類型信息(如果您信任源)。 然后在反序列化期間使用該類型信息。 該框架提供了一個名為 的實用程序方法,該方法已隨前面提到的所有屬性和序列化程序一起提供。 此實用程序方法附帶一個參數,用于限制 Java 包進行反序列化以避免安全漏洞。 默認的受信任軟件包:、、。 若要在 中管理 JSON 序列化,必須以類似于以下示例的方式對其進行配置:??Message????MessageHeaders????MessageJacksonDeserializer????MessageHeadersJacksonSerializer????SimpleModule????ObjectMapper????enableDefaultTyping????ObjectMapper????JacksonJsonUtils.messagingAwareMapper()????trustedPackages????java.util????java.lang????org.springframework.messaging.support????org.springframework.integration.support????org.springframework.integration.message????org.springframework.integration.store????RedisMessageStore??

RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();RedisSerializer serializer = new GenericJackson2JsonRedisSerializer(mapper);store.setValueSerializer(serializer);

從版本 4.3.12 開始,支持允許區分同一 Redis 服務器上的存儲實例的選項。??RedisMessageStore????prefix??

Redis 通道消息存儲庫

前面顯示的內容將每個組維護為單個鍵(組 ID)下的值。 雖然您可以使用它來支持持久性,但為此目的提供了一個專用的(從版本 4.0 開始)。 此存儲在每個通道、發送消息和接收消息時使用 。 默認情況下,此存儲也使用 JDK 序列化,但您可以修改值序列化程序,如前所述。??RedisMessageStore????QueueChannel????RedisChannelMessageStore????LIST????LPUSH????RPOP??

我們建議使用此存儲支持通道,而不是使用常規 . 以下示例定義了一個 Redis 消息存儲庫,并在具有隊列的通道中使用它:??RedisMessageStore??

        

用于存儲數據的鍵的格式為:(在前面的示例中,)。??:????redisMessageStore:somePersistentQueueChannel??

此外,還提供了子類。 當您將其與 一起使用時,將按 (FIFO) 優先級順序接收消息。 它使用標準標頭并支持優先級值 ()。 具有其他優先級的消息(以及沒有優先級的消息)將按FIFO順序在任何具有優先級的消息之后檢索。??RedisChannelPriorityMessageStore????QueueChannel????IntegrationMessageHeaderAccessor.PRIORITY????0 - 9??

這些存儲僅實現而不實現 。 它們只能用于支持.??BasicMessageGroupStore????MessageGroupStore????QueueChannel??

紅地元數據存儲

Spring Integration 3.0引入了一個新的基于Redis的MetadataStore(參見Metadata Store)實現。 可以使用 來維護跨應用程序重新啟動的狀態。 您可以將此新實現與適配器一起使用,例如:??RedisMetadataStore????MetadataStore????MetadataStore??

飼料文件郵票自來水龍

要指示這些適配器使用 new ,請聲明一個名為 的 Spring Bean。 源入站通道適配器和源入站通道適配器都自動拾取并使用聲明的 . 下面的示例演示如何聲明這樣的 Bean:??RedisMetadataStore????metadataStore????RedisMetadataStore??

    

由RedisProperties支持。 與它的交互使用BoundHashOperations,這反過來又需要整個存儲區。 在 的情況下,這扮演了一個區域的角色,這在分布式環境中很有用,當多個應用程序使用相同的 Redis 服務器時。 缺省情況下,此值為 。??RedisMetadataStore????key????Properties????MetadataStore????key????key????MetaData??

從版本 4.0 開始,此存儲實現了 ,使其在多個應用程序實例之間可靠地共享,其中只允許一個實例存儲或修改鍵的值。??ConcurrentMetadataStore??

您不能將 (例如,在 中)與 Redis 集群一起使用,因為當前不支持原子性命令。??RedisMetadataStore.replace()????AbstractPersistentAcceptOnceFileListFilter????WATCH??

Redis 存儲入站通道適配器

Redis 存儲入站通道適配器是一個輪詢使用者,它從 Redis 集合讀取數據并將其作為有效負載發送。 以下示例演示如何配置 Redis 存儲入站通道適配器:??Message??

    

前面的示例演示如何使用元素配置 Redis 存儲入站通道適配器,為各種屬性提供值,例如:??store-inbound-channel-adapter??

??key??或:正在使用的集合的密鑰的名稱。key-expression??collection-type??:此適配器支持的集合類型的枚舉。 支持的集合包括 、、 、 和 。LISTSETZSETPROPERTIESMAP??connection-factory??:對 實例的引用。o.s.data.redis.connection.RedisConnectionFactory??redis-template??:對 實例的引用。o.s.data.redis.core.RedisTemplate所有其他入站適配器通用的其他屬性(例如“通道”)。

不能同時設置 和 。??redis-template????connection-factory??

默認情況下,適配器使用 . 這將使用鍵、值、哈希鍵和哈希值的實例。 如果 Redis 存儲包含使用其他技術序列化的對象,則必須提供配置了適當序列化程序的對象。 例如,如果使用設置為 的 Redis 存儲出站適配器寫入存儲,則必須提供如下所示的配置:??StringRedisTemplate????StringRedisSerializer????RedisTemplate????extract-payload-elements????false????RedisTemplate??

                                    

對鍵和哈希鍵使用序列化程序,對值和哈希值使用默認的 JDK 序列化序列化程序。??RedisTemplate????String??

因為它具有 的文本值,所以前面的示例相對簡單且靜態。 有時,您可能需要根據某些條件在運行時更改密鑰的值。 為此,請改用,其中提供的表達式可以是任何有效的 SpEL 表達式。??key????key-expression??

此外,您可能希望對從 Redis 集合讀取的成功處理數據執行一些后處理。 例如,您可能希望在處理完值后移動或刪除該值。 您可以使用 Spring Integration 2.2 中添加的事務同步功能來執行此操作。 以下示例使用和事務同步:??key-expression??

                                          

可以使用元素將輪詢器聲明為事務性輪詢器。 此元素可以引用真正的事務管理器(例如,如果流的其他部分調用 JDBC)。 如果您沒有“真實”事務,則可以使用 ,這是 Spring 的實現,可以在沒有實際事務時使用 Redis 適配器的事務同步功能。??transactional????o.s.i.transaction.PseudoTransactionManager????PlatformTransactionManager??

這不會使 Redis 活動本身成為事務性活動。 它允許在成功(提交)或失敗(回滾)之前或之后執行操作的同步。

輪詢器是事務性的后,您可以設置 on 元素的實例。 創建 的實例。 為方便起見,我們公開了一個默認的基于 SpEL 的 ,它允許您配置 SpEL 表達式,其執行與事務協調(同步)。 支持提交前、提交后和回滾后的表達式,以及發送評估結果(如果有)的通道(每種事件一個)。 對于每個子元素,可以指定 和 屬性。 如果僅存在該屬性,則接收到的消息將作為特定同步方案的一部分發送到那里。 如果僅存在該屬性,并且表達式的結果為非 null 值,則會生成一條消息,并將結果作為有效負載發送到默認通道 (),并顯示在日志中(在級別)。 如果您希望評估結果轉到特定渠道,請添加屬性。 如果表達式的結果為 null 或 void,則不會生成任何消息。??o.s.i.transaction.TransactionSynchronizationFactory????transactional????TransactionSynchronizationFactory????TransactionSynchronization????TransactionSynchronizationFactory????expression????channel????channel????expression????NullChannel????DEBUG????channel??

有關事務同步的詳細信息,請參閱事務同步。

RedisStore 出站通道適配器

RedisStore 出站通道適配器允許您將消息負載寫入 Redis 集合,如以下示例所示:

前面的配置 Redis 存儲出站通道適配器使用該元素。 它為各種屬性提供值,例如:??store-inbound-channel-adapter??

??key??或:正在使用的集合的密鑰的名稱。key-expression??extract-payload-elements??:如果設置為 (默認值) 并且有效負載是“多值”對象(即 a 或 a)的實例,則使用 “addAll” 和 “putAll” 語義存儲該對象。 否則,如果設置為 ,則有效負載將存儲為單個條目,而不管其類型如何。 如果有效負載不是“多值”對象的實例,則忽略此屬性的值,并且有效負載始終存儲為單個條目。trueCollectionMapfalse??collection-type??:此適配器支持的類型的枚舉。 支持的集合包括 、、 、 和 。CollectionLISTSETZSETPROPERTIESMAP??map-key-expression??:返回所存儲條目的密鑰名稱的 SpEL 表達式。 僅當 is or 和 "extract-payload-elements" 為 false 時,它才適用。collection-typeMAPPROPERTIES??connection-factory??:對 實例的引用。o.s.data.redis.connection.RedisConnectionFactory??redis-template??:對 實例的引用。o.s.data.redis.core.RedisTemplate所有其他入站適配器通用的其他屬性(例如“通道”)。

不能同時設置 和 。??redis-template????connection-factory??

默認情況下,適配器使用 . 這將使用鍵、值、哈希鍵和哈希值的實例。 但是,如果設置為 ,則將使用具有鍵和哈希鍵的實例以及值和哈希值的實例 s。 使用 JDK 序列化程序時,了解 Java 序列化用于所有值非常重要,無論該值是否實際上是集合。 如果需要對值的序列化進行更多控制,請考慮提供自己的值,而不是依賴這些默認值。??StringRedisTemplate????StringRedisSerializer????extract-payload-elements????false????RedisTemplate????StringRedisSerializer????JdkSerializationRedisSerializer????RedisTemplate??

由于它具有 和其他屬性的文本值,因此前面的示例相對簡單且靜態。 有時,您可能需要根據某些條件在運行時動態更改值。 為此,請使用它們的等效項(、 等),其中提供的表達式可以是任何有效的 SpEL 表達式。??key????-expression????key-expression????map-key-expression??

Redis 出站命令網關

Spring Integration 4.0 引入了 Redis 命令網關,允許您使用通用方法執行任何標準的 Redis 命令。 以下清單顯示了 Redis 出站網關的可用屬性:??RedisConnection#execute??

此終結點從中接收實例的 。??MessageChannel????Message??

此終結點發送回復實例的位置。??MessageChannel????Message??

指定此出站網關是否必須返回非空值。 默認為 . 當 Redis 返回值時拋出 A。??true????ReplyRequiredException????null??

等待回復消息發送的超時(以毫秒為單位)。 它通常用于基于隊列的有限回復通道。

對 Bean 的引用。 默認為 . 它與“redis-template”屬性互斥。??RedisConnectionFactory????redisConnectionFactory??

對 Bean 的引用。 它與“連接工廠”屬性互斥。??RedisTemplate??

對 實例的引用。 如有必要,它用于將每個命令參數序列化為 byte[]。??org.springframework.data.redis.serializer.RedisSerializer??

返回命令鍵的 SpEL 表達式。 它默認為郵件頭。 它不得計算為 。??redis_command????null??

計算為命令參數的逗號分隔的 SpEL 表達式。 與屬性互斥。 如果這兩個屬性都未提供,則 將用作命令參數。 參數表達式的計算結果可以為“null”以支持可變數量的參數。??arguments-strategy????payload??

一個標志,用于指定在配置時間時,評估的 Redis 命令字符串是否可用作表達式評估上下文中的變量。 否則,將忽略此屬性。??boolean????#cmd????o.s.i.redis.outbound.ExpressionArgumentsStrategy????argument-expressions??

對 實例的引用。 它與屬性互斥。 如果這兩個屬性都未提供,則 將用作命令參數。??o.s.i.redis.outbound.ArgumentsStrategy????argument-expressions????payload??

您可以將 用作通用組件來執行任何所需的 Redis 操作。 以下示例演示如何從 Redis 原子序數獲取遞增值:????

有效負載應具有 的名稱 ,該名稱可能由 Bean 定義提供。??Message????redisCounter????org.springframework.data.redis.support.atomic.RedisAtomicInteger??

該方法具有泛型作為其返回類型。 實際結果取決于命令類型。 例如,返回一個 . 有關命令及其參數和結果類型的更多信息,請參閱Redis 規范。??RedisConnection#execute????Object????MGET????List??

Redis 隊列出站網關

Spring 集成引入了 Redis 隊列出站網關來執行請求和回復場景。 它將對話推送到提供的 ,將該值作為其鍵推送到 Redis 列表,并等待來自鍵為 . 每個交互使用不同的 UUID。 以下清單顯示了 Redis 出站網關的可用屬性:??UUID????queue????UUID????UUID" plus ".reply??

此終結點從中接收實例的 。??MessageChannel????Message??

此終結點發送回復實例的位置。??MessageChannel????Message??

指定此出站網關是否必須返回非空值。 默認情況下,此值為。 否則,當 Redis 返回值時會拋出 a。??false????ReplyRequiredException????null??

等待回復消息發送的超時(以毫秒為單位)。 它通常用于基于隊列的有限回復通道。

對 Bean 的引用。 默認為 . 它與“redis-template”屬性互斥。??RedisConnectionFactory????redisConnectionFactory??

出站網關向其發送對話的 Redis 列表的名稱。??UUID??

注冊多個網關時此出站網關的順序。

豆引用。 它可以是一個空字符串,這意味著“沒有序列化程序”。 在這種情況下,來自入站 Redis 消息的原始消息將作為有效負載發送到 。 默認情況下,它是一個 .??RedisSerializer????byte[]????channel????Message????JdkSerializationRedisSerializer??

指定此終端節點是否期望 Redis 隊列中的數據包含整個實例。 如果此屬性設置為 ,則不能為空字符串,因為消息需要某種形式的反序列化(默認情況下為 JDK 序列化)。??Message????true????serializer??

Redis 隊列入站網關

Spring Integration 4.1 引入了 Redis 隊列入站網關來執行請求和回復場景。 它從提供的 中彈出一個對話,從 Redis 列表中彈出值作為其鍵,并使用加號鍵將回復推送到 Redis 列表。 以下清單顯示了 Redis 隊列入站網關的可用屬性:??UUID????queue????UUID????UUID????.reply??

此終端節點發送從 Redis 數據創建的實例的位置。??MessageChannel????Message??

此終結點從中等待回復實例的位置。 可選 - 標頭仍在使用中。??MessageChannel????Message????replyChannel??

對 Spring(或標準 JDK)bean 的引用。 它用于基礎偵聽任務。 它默認為 .??TaskExecutor????Executor????SimpleAsyncTaskExecutor??

等待回復消息發送的超時(以毫秒為單位)。 它通常用于基于隊列的有限回復通道。

對 Bean 的引用。 默認為 . 它與“redis-template”屬性互斥。??RedisConnectionFactory????redisConnectionFactory??

對話的 Redis 列表的名稱。??UUID??

注冊多個網關時此入站網關的順序。

豆引用。 它可以是一個空字符串,這意味著“沒有序列化程序”。 在這種情況下,來自入站 Redis 消息的原始消息將作為有效負載發送到 。 它默認為 . (請注意,在 4.3 之前的版本中,它是默認的。 要恢復該行為,請提供對 的引用。??RedisSerializer????byte[]????channel????Message????JdkSerializationRedisSerializer????StringRedisSerializer????StringRedisSerializer??

等待獲取接收消息的超時(以毫秒為單位)。 它通常用于基于隊列的有限請求通道。

指定此終端節點是否期望 Redis 隊列中的數據包含整個實例。 如果此屬性設置為 ,則不能為空字符串,因為消息需要某種形式的反序列化(默認情況下為 JDK 序列化)。??Message????true????serializer??

在重新啟動偵聽器任務之前,偵聽器任務在“右彈出”操作出現異常后應休眠的時間(以毫秒為單位)。

必須配置多個線程進行處理;否則,當 在出現錯誤后嘗試重新啟動偵聽器任務時,可能會出現死鎖。 可用于處理這些錯誤,以避免重新啟動,但最好不要將應用程序暴露在可能的死鎖情況下。 有關可能的實現,請參閱 Spring 框架參考手冊?。??task-executor????RedisQueueMessageDrivenEndpoint????errorChannel????TaskExecutor??

Redis 流出站通道適配器

Spring Integration 5.4 引入了反應式 Redis 流出站通道適配器,用于將消息負載寫入 Redis 流。 出站通道適配器用于向流中添加 。 以下示例演示如何將 Java 配置和服務類用于 Redis 流出站通道適配器。??ReactiveStreamOperations.add(…)????Record??

@Bean@ServiceActivator(inputChannel = "messageChannel")public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {    ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =        new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey");     reactiveStreamMessageHandler.setSerializationContext(serializationContext);     reactiveStreamMessageHandler.setHashMapper(hashMapper);     reactiveStreamMessageHandler.setExtractPayload(true);     return reactiveStreamMessageHandler;}

構造使用 和流名稱的實例以添加記錄。 另一個構造函數變體基于 SpEL 表達式,用于根據請求消息評估流密鑰。??ReactiveRedisStreamMessageHandler????ReactiveRedisConnectionFactory??

設置用于在添加到流之前序列化記錄鍵和值。??RedisSerializationContext??

Set,提供Java類型和Redis哈希/映射之間的契約。??HashMapper??

如果為“true”,通道適配器將從請求消息中提取有效負載,以便添加流記錄。 或者使用整條消息作為值。 默認為 .??true??

Redis 流入站通道適配器

Spring Integration 5.4 引入了 Reactive Stream 入站通道適配器,用于從 Redis 流讀取消息。 入站通道適配器使用 或基于自動確認標志從 Redis 流讀取記錄。 以下示例演示如何將 Java 配置用于 Redis 流入站通道適配器。??StreamReceiver.receive(…)????StreamReceiver.receiveAutoAck()??

@Beanpublic ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(       ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {ReactiveRedisStreamMessageProducer messageProducer =            new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey");     messageProducer.setStreamReceiverOptions(                 StreamReceiver.StreamReceiverOptions.builder()                      .pollTimeout(Duration.ofMillis(100))                      .build());    messageProducer.setAutoStartup(true);     messageProducer.setAutoAck(false);     messageProducer.setCreateConsumerGroup(true);     messageProducer.setConsumerGroup("my-group");     messageProducer.setConsumerName("my-consumer");     messageProducer.setOutputChannel(fromRedisStreamChannel);     messageProducer.setReadOffset(ReadOffset.latest());     messageProducer.extractPayload(true);     return messageProducer;}

構造一個使用和流式傳輸密鑰讀取記錄的實例。??ReactiveRedisStreamMessageProducer????ReactiveRedisConnectionFactory??

A 使用反應式基礎設施使用 Redis 流。??StreamReceiver.StreamReceiverOptions??

一個屬性,用于指定此終結點是否應在應用程序上下文啟動后自動啟動。 默認為 . 如果 ,應手動啟動。??SmartLifecycle????true????false????RedisStreamMessageProducer????messageProducer.start()??

如果為 ,則不會自動確認收到的消息。 消息的確認將延遲到使用消息的客戶端。 默認為 .??false????true??

如果為 ,將創建一個使用者組。 在創建消費者組流期間,還將創建(如果尚不存在)。 使用者組跟蹤消息傳遞并區分使用者。 默認為 .??true????false??

設置使用者組名稱。 它默認為定義的 Bean 名稱。

設置使用者名稱。 從組 讀取消息。??my-consumer????my-group??

要從此終結點向其發送消息的消息通道。

定義要讀取消息的偏移量。 默認為 .??ReadOffset.latest()??

如果為“true”,通道適配器將從 中提取有效負載值。 否則,整體將用作有效負載。 默認為 .??Record????Record????true??

如果設置為 ,則 Redis 驅動程序不會自動確認 Redis 流中的標頭,而是將標頭添加到消息中,以實例作為值生成。 目標集成流的責任是在基于此類記錄為消息完成業務邏輯時調用其回調。 即使在反序列化期間發生異常并進行了配置,也需要類似的邏輯。 因此,目標錯誤處理程序必須決定確認或納克此類失敗的消息。 與 一起,還將這些標頭填充到消息中以生成:、 和 。??autoAck????false????Record????IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK????SimpleAcknowledgment????acknowledge()????errorChannel????IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK????ReactiveRedisStreamMessageProducer????RedisHeaders.STREAM_KEY????RedisHeaders.STREAM_MESSAGE_ID????RedisHeaders.CONSUMER_GROUP????RedisHeaders.CONSUMER??

從版本 5.5 開始,您可以在 上顯式配置選項,包括新引入的函數,如果 Redis Stream 使用者在發生反序列化錯誤時應繼續輪詢,則需要該函數。 默認函數將消息發送到錯誤通道(如果提供),并可能確認失敗的消息,如上所述。 所有這些都與外部提供的.??StreamReceiver.StreamReceiverOptionsBuilder????ReactiveRedisStreamMessageProducer????onErrorResume????StreamReceiver.StreamReceiverOptionsBuilder????StreamReceiver.StreamReceiverOptions??

瑞迪斯鎖注冊表

Spring Integration 4.0 引入了 . 某些組件(例如,聚合器和重新排序器)使用從實例獲取的鎖來確保一次只有一個線程操作一個組。 在單個組件中執行此功能。 您現在可以在這些組件上配置外部鎖定注冊表。 當您將它與共享 一起使用時,您可以使用 跨多個應用程序實例提供此功能,以便一次只有一個實例可以操作該組。??RedisLockRegistry????LockRegistry????DefaultLockRegistry????MessageGroupStore????RedisLockRegistry??

當一個本地線程釋放一個鎖時,另一個本地線程通常可以立即獲取該鎖。 如果線程使用不同的注冊表實例釋放鎖,則最多可能需要 100 毫秒才能獲取鎖。

若要避免“掛起”鎖(當服務器發生故障時),此注冊表中的鎖將在默認的 60 秒后過期,但您可以在注冊表上配置此值。 鎖的持有時間通常要短得多。

由于密鑰可能會過期,因此嘗試解鎖過期的鎖會導致引發異常。 但是,受此類鎖保護的資源可能已受到損害,因此應將此類異常視為嚴重。 應將過期時間設置為足夠大的值以防止出現這種情況,但應將其設置得足夠低,以便在服務器發生故障后可以在合理的時間內恢復鎖定。

從版本 5.0 開始,實現 ,這將刪除上次獲取的鎖,這些鎖當前未鎖定。??RedisLockRegistry????ExpirableLockRegistry????age??

5.5.6 版本的字符串,支持通過 . 有關更多信息,請參閱其 JavaDocs。??RedisLockRegistry????RedisLockRegistry.locks????RedisLockRegistry.setCacheCapacity()??

字符串 對于版本 5.5.13,公開了一個選項,用于確定應在哪種模式下進行 Redis 鎖獲取:??RedisLockRegistry????setRedisLockType(RedisLockType)??

??RedisLockType.SPIN_LOCK??- 鎖通過周期循環(100ms)獲取,檢查是否可以獲取鎖。 違約。??RedisLockType.PUB_SUB_LOCK??- 鎖是通過 redis 發布訂閱獲取的。

發布-訂閱是首選模式 - 客戶端 Redis 服務器之間的網絡抖動更少,性能更高 - 當訂閱收到有關在其他進程中解鎖的通知時,會立即獲取鎖定。 但是,Redis 不支持主/副本連接中的發布-訂閱(例如在 AWS ElastiCache 環境中),因此選擇繁忙旋轉模式作為默認值,以使注冊表在任何環境中工作。

標簽: 通道適配器 有效負載 應用程序

上一篇:今日播報!【Shell文本三劍客--awk】
下一篇:全球要聞:[ Linux ] 一篇帶你理解Linux下線程概念