今亮點!Spring Integration 對MongoDB的支持

2022-12-13 11:18:00 來源:51CTO博客

2.1版本引入了對MongoDB的支持:“高性能,開源,面向文檔的數據庫”。


(資料圖片)

您需要將此依賴項包含在項目中:

    org.springframework.integration    spring-integration-mongodb    6.0.0

要下載、安裝和運行 MongoDB,請參閱MongoDB 文檔。

連接到 MongoDb

阻塞還是被動?

從版本 5.3 開始,Spring Integration 提供了對反應式 MongoDB 驅動程序的支持,以便在訪問 MongoDB 時啟用非阻塞 I/O。 要啟用反應式支持,請將 MongoDB 反應式流驅動程序添加到依賴項中:

    org.mongodb    mongodb-driver-reactivestreams

對于常規同步客戶端,您需要將其各自的驅動程序添加到依賴項中:

    org.mongodb    mongodb-driver-sync

它們都在框架中,以獲得更好的最終用戶選擇支持。??optional??

要開始與MongoDB交互,您首先需要連接到它。 Spring 集成建立在另一個 Spring 項目Spring Data MongoDB提供的支持之上。 它提供了名為 and 的工廠類,簡化了與 MongoDB 客戶端 API 的集成。??MongoDatabaseFactory????ReactiveMongoDatabaseFactory??

Spring Data 默認提供阻塞 MongoDB 驅動程序,但您可以通過包含上述依賴項來選擇被動使用。

用??MongoDatabaseFactory??

要連接到MongoDB,您可以使用接口的實現。??MongoDatabaseFactory??

以下示例演示如何使用:??SimpleMongoClientDatabaseFactory??

MongoDatabaseFactory mongoDbFactory =        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");

??SimpleMongoClientDatabaseFactory??采用兩個參數:一個實例和一個指定數據庫名稱的參數。 如果需要配置屬性(如 、 等),可以使用基礎類提供的構造函數之一傳遞這些屬性。 有關如何配置 MongoDB 的更多信息,請參閱Spring-Data-MongoDB參考。??MongoClient????String????host????port????MongoClients??

用??ReactiveMongoDatabaseFactory??

要使用反應式驅動程序連接到MongoDB,您可以使用接口的實現。??ReactiveMongoDatabaseFactory??

以下示例演示如何使用:??SimpleReactiveMongoDatabaseFactory??

ReactiveMongoDatabaseFactory mongoDbFactory =        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");

MongoDB消息存儲

企業集成模式(EIP) 一書中所述,郵件存儲允許您保留消息。 如果可靠性是一個問題,則在處理能夠緩沖消息(、、、和其他)的組件時,這樣做會很有用。 在 Spring 集成中,該策略還為聲明檢查模式提供了基礎,EIP 中也有描述。??QueueChannel????aggregator????resequencer????MessageStore??

Spring Integration的MongoDB模塊提供了,它是策略(主要由聲明檢查模式使用)和策略(主要由聚合器和重新排序器模式使用)的實現。??MongoDbMessageStore????MessageStore????MessageGroupStore??

以下示例將 配置為使用 a 和 :??MongoDbMessageStore????QueueChannel????aggregator??

        

前面的示例是一個簡單的 Bean 配置,它需要 a 作為構造函數參數。??MongoDbFactory??

通過使用 Spring Data Mongo 映射機制將具有所有嵌套屬性的 Mongo 文檔展開。 當您需要訪問 或 進行審核或分析(例如,針對存儲的消息)時,它非常有用。??MongoDbMessageStore????Message????payload????headers??

使用自定義實現將實例存儲為 MongoDB 文檔,并且 的屬性(和值)存在一些限制。??MongoDbMessageStore????MappingMongoConverter????Message????payload????header????Message??

從版本 5.1.6 開始,可以使用傳播到內部實現中的自定義轉換器進行配置。 有關更多信息,請參閱 JavaDocs。??MongoDbMessageStore????MappingMongoConverter????MongoDbMessageStore.setCustomConverters(Object… customConverters)??

Spring Integration 3.0 引入了 . 它同時實現 和 接口。 此類可以作為構造函數參數接收 ,例如,您可以使用它配置自定義 . 另一個構造函數需要 a 和 a ,這允許您為實例及其屬性提供一些自定義轉換。 請注意,默認情況下,使用 標準 Java 序列化來向 MongoDB 寫入和讀取實例(請參閱),并且依賴于 中其他屬性的默認值。 它從提供的和 . 由 存儲的集合的默認名稱是 。 我們建議使用此實現在消息包含復雜數據類型時創建可靠且靈活的解決方案。??ConfigurableMongoDbMessageStore????MessageStore????MessageGroupStore????MongoTemplate????WriteConcern????MappingMongoConverter????MongoDbFactory????Message????ConfigurableMongoDbMessageStore????Message????MongoDbMessageBytesConverter????MongoTemplate????MongoTemplate????MongoDbFactory????MappingMongoConverter????ConfigurableMongoDbMessageStore????configurableStoreMessages??

MongoDB通道消息存儲

4.0版引入了新的. 它針對在實例中使用進行了優化。 使用 ,您可以在實例中使用它來實現持久化消息的優先級順序輪詢。 優先級 MongoDB 文檔字段從 () 消息標頭填充。??MongoDbChannelMessageStore????MessageGroupStore????QueueChannel????priorityEnabled = true????????IntegrationMessageHeaderAccessor.PRIORITY????priority??

此外,所有MongoDB實例現在都有一個文檔字段。 該值是對同一集合中的簡單文檔的操作的結果,該文檔是按需創建的。 當消息存儲在同一毫秒內時,該字段用于在操作中提供先進先出 (FIFO) 消息順序(如果已配置,則在優先級范圍內)。??MessageStore????sequence????MessageGroup????sequence????$inc????sequence????sequence????poll??

我們不建議對優先級和非優先級使用相同的 Bean,因為該選項適用于整個存儲區。 但是,這兩種類型都可以使用相同的方法,因為來自存儲的消息輪詢已排序并使用索引。 要配置該方案,可以從一個消息存儲庫 Bean 擴展另一個消息存儲庫 Bean,如以下示例所示:??MongoDbChannelMessageStore????priorityEnabled????collection????MongoDbChannelMessageStore??

                

MongoDB 元數據存儲

Spring Integration 4.2引入了一個新的基于MongoDB的(參見元數據存儲)實現。 可以使用 在應用程序重新啟動期間維護元數據狀態。 您可以將此新實現與適配器一起使用,例如:??MetadataStore????MongoDbMetadataStore????MetadataStore??

要指示這些適配器使用 new ,請聲明一個 Bean 名稱為 的 Spring Bean。 源入站通道適配器自動拾取并使用聲明的 . 下面的示例演示如何聲明名稱為 : 的 Bean:???MongoDbMetadataStore?????metadataStore?????MongoDbMetadataStore?????metadataStore???

@Beanpublic MetadataStore metadataStore(MongoDbFactory factory) {    return new MongoDbMetadataStore(factory, "integrationMetadataStore");}

還實現了,讓它在多個應用程序實例之間可靠地共享,其中只允許一個實例存儲或修改鍵的值。 所有這些操作都是原子的,這要歸功于MongoDB的保證。??MongoDbMetadataStore????ConcurrentMetadataStore??

MongoDB 入站通道適配器

MongoDB 入站通道適配器是一個輪詢使用者,它從 MongoDB 讀取數據并將其作為有效負載發送。 以下示例演示如何配置 MongoDB 入站通道適配器:??Message??

    

如前面的配置所示,您可以使用該元素并為各種屬性提供值來配置 MongoDb 入站通道適配器,例如:??inbound-channel-adapter??

??query??:JSON 查詢(請參閱MongoDB 查詢))??query-expression??:計算結果為 JSON 查詢字符串(如上面的屬性)或 的實例的 SpEL 表達式。 與屬性互斥。queryo.s.data.mongodb.core.query.Queryquery??entity-class??:有效負載對象的類型。 如果未提供,則返回 a。com.mongodb.DBObject??collection-name??或 :標識要使用的 MongoDB 集合的名稱。collection-name-expression??mongodb-factory??:對實例的引用o.s.data.mongodb.MongoDbFactory??mongo-template??:對實例的引用o.s.data.mongodb.core.MongoTemplate所有其他入站適配器通用的其他屬性(例如“通道”)。

You cannot set bothand.??mongo-template????mongodb-factory??

前面的示例相對簡單且靜態,因為它具有 的文本值,并使用 的默認名稱。 有時,您可能需要根據某些條件在運行時更改這些值。 為此,請使用它們的等效項 ( 和 ),其中提供的表達式可以是任何有效的 SpEL 表達式。??query????collection????-expression????query-expression????collection-name-expression??

此外,您可能希望對從MongoDB讀取的成功處理數據進行一些后處理。 例如;您可能希望在處理文檔后移動或刪除文檔。 您可以使用 Spring Integration 2.2 添加的事務同步功能來執行此操作,如以下示例所示:

                                

以下示例顯示了前面示例中引用的內容:??DocumentCleaner??

public class DocumentCleaner {    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {        if (target instanceof List){            List documents = (List) target;            for (Object document : documents) {                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);            }        }    }}

可以使用元素將輪詢器聲明為事務性輪詢器。 此元素可以引用真正的事務管理器(例如,如果流的其他部分調用 JDBC)。 如果你沒有“真實”事務,你可以使用 ,它是 Spring 的實現,可以在沒有實際事務時使用 Mongo 適配器的事務同步功能。??transactional????o.s.i.transaction.PseudoTransactionManager????PlatformTransactionManager??

這樣做不會使MongoDB本身成為事務。 它允許在成功(提交)或失敗(回滾)之前或之后執行操作的同步。

輪詢器是事務性的后,您可以設置 on 元素的實例。 A 創建 的實例。 為方便起見,我們公開了一個基于 SpEL 的默認表達式,允許您配置 SpEL 表達式,其執行與事務協調(同步)。 支持提交前、提交后和回滾后的表達式,以及發送評估結果(如果有)的每個事件的通道。 對于每個子元素,可以指定 和 屬性。 如果僅存在該屬性,則接收到的消息將作為特定同步方案的一部分發送到那里。 如果僅存在該屬性,并且表達式的結果為非 null 值,則會生成一條消息,并將結果作為有效負載發送到默認通道 (),并顯示在日志中(在級別上)。 如果您希望評估結果轉到特定渠道,請添加屬性。 如果表達式的結果為 null 或 void,則不會生成任何消息。??o.s.i.transaction.TransactionSynchronizationFactory????transactional????TransactionSynchronizationFactory????TransactionSynchronization????TransactionSynchronizationFactory????expression????channel????channel????expression????NullChannel????DEBUG????channel??

有關事務同步的詳細信息,請參閱事務同步。

從版本 5.5 開始,可以使用 配置 ,該 必須使用 MongoDb 語法計算為 a 或實例。 它可以用作上述后處理過程的替代方法,并修改從集合中提取的那些實體,因此在下一個輪詢周期中不會再次從集合中提取它們(假設更新更改了查詢中使用的某些值)。 當集群中使用同一集合的多個實例時,仍建議使用事務來實現執行隔離和數據一致性。??MongoDbMessageSource????updateExpression????String????update????org.springframework.data.mongodb.core.query.Update????MongoDbMessageSource??

MongoDB 更改流入站通道適配器

從版本 5.3 開始,該模塊引入了 - Spring Data API 的響應式實現。 此組件生成 a 的消息,默認情況下以 of 作為有效負載,并生成一些與更改流相關的標頭(請參閱)。 建議將其與 作為按需訂閱和下游事件消費的 AS 結合使用。??spring-integration-mongodb????MongoDbChangeStreamMessageProducer????MessageProducerSupport????ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class)????Flux????body????ChangeStreamEvent????MongoHeaders????MongoDbChangeStreamMessageProducer????FluxMessageChannel????outputChannel??

此通道適配器的 Java DSL 配置可能如下所示:

@BeanIntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {    return IntegrationFlow.from(            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)                    .domainType(Person.class)                    .collection("person")                    .extractBody(false))            .channel(MessageChannels.flux())            .get();}

當 停止,或下游取消訂閱,或 MongoDb 更改流生成 時,已完成。 通道適配器可以再次啟動,并創建新的源數據,并在 中自動訂閱。 如果需要使用來自其他地方的更改流事件,則可以重新配置此通道適配器以在啟動之間使用新選項。??MongoDbChangeStreamMessageProducer????OperationType.INVALIDATE????Publisher????Publisher????MessageProducerSupport.subscribeToPublisher(Publisher>)??

有關更改流支持的更多信息,請參閱Spring Data MongoDb文檔。

MongoDB 出站通道適配器

MongoDB 出站通道適配器允許您將消息有效負載寫入 MongoDB 文檔存儲,如以下示例所示:

如前面的配置所示,您可以使用該元素配置 MongoDB 出站通道適配器,為各種屬性提供值,例如:??outbound-channel-adapter??

??collection-name??或 :標識要使用的 MongoDb 集合的名稱。collection-name-expression??mongo-converter??:對實例的引用有助于將原始 Java 對象轉換為 JSON 文檔表示形式。o.s.data.mongodb.core.convert.MongoConverter??mongodb-factory??:對 實例的引用。o.s.data.mongodb.MongoDbFactory??mongo-template??:對 實例的引用。 注意:您不能同時設置mongo模板和mongodb出廠設置。o.s.data.mongodb.core.MongoTemplate所有入站適配器中通用的其他屬性(例如“通道”)。

前面的示例相對簡單和靜態,因為它具有 的文字值。 有時,您可能需要根據某些條件在運行時更改此值。 為此,請使用 ,其中提供的表達式是任何有效的 SpEL 表達式。??collection-name????collection-name-expression??

MongoDB Outbound Gateway

版本 5.0 引入了 MongoDB 出站網關。 它允許您通過向其請求通道發送消息來查詢數據庫。 然后,網關將響應發送到回復通道。 可以使用消息負載和標頭來指定查詢和集合名稱,如以下示例所示:

@SpringBootApplicationpublic class MongoDbJavaApplication {    public static void main(String[] args) {        new SpringApplicationBuilder(MongoDbJavaApplication.class)            .web(false)            .run(args);    }    @Autowired    private MongoDbFactory;    @Autowired    private MongoConverter;    @Bean    public IntegrationFlow gatewaySingleQueryFlow() {        return f -> f                .handle(queryOutboundGateway())                .channel(c -> c.queue("retrieveResults"));    }    private MongoDbOutboundGatewaySpec queryOutboundGateway() {        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)                .query("{name : "Bob"}")                .collectionNameFunction(m -> m.getHeaders().get("collection"))                .expectSingleResult(true)                .entityClass(Person.class);    }}

您可以將以下屬性與 MongoDB 出站網關一起使用:

??collection-name??或 :標識要使用的 MongoDB 集合的名稱。collection-name-expression??mongo-converter??:對實例的引用有助于將原始 Java 對象轉換為 JSON 文檔表示形式。o.s.data.mongodb.core.convert.MongoConverter??mongodb-factory??:對 實例的引用。o.s.data.mongodb.MongoDbFactory??mongo-template??:對 實例的引用。 注意:不能同時設置 和 。o.s.data.mongodb.core.MongoTemplatemongo-templatemongodb-factory??entity-class??:要傳遞給 MongoTemplate 中的 and 方法的實體類的完全限定名。 如果未提供此屬性,則默認值為 。find(..)findOne(..)org.bson.Document??query??或 :指定 MongoDB 查詢。 有關更多查詢示例,請參閱MongoDB 文檔。query-expression??collection-callback??:對 實例的引用。 最好是自 5.0.11 起具有請求消息上下文的實例。 有關更多信息,請參閱其 Javadocs。 注意:您不能同時擁有這兩個屬性和任何查詢屬性。org.springframework.data.mongodb.core.CollectionCallbacko.s.i.mongodb.outbound.MessageCollectionCallbackcollection-callback

作為 and 屬性的替代方法,可以通過將該屬性用作對功能接口實現的引用來指定其他數據庫操作。 以下示例指定計數操作:??query????query-expression????collectionCallback????MessageCollectionCallback??

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)            .collectionCallback((collection, requestMessage) -> collection.count())            .collectionName("myCollection");}

MongoDB 反應式通道適配器

從版本 5.3 開始,提供了 和 實現。 它們基于來自 Spring 數據,需要依賴關系。??ReactiveMongoDbStoringMessageHandler????ReactiveMongoDbMessageSource????ReactiveMongoOperations????org.mongodb:mongodb-driver-reactivestreams??

這是當集成流定義中涉及反應式流組合時,框架中本機支持的實現。 有關詳細信息,請參閱ReactiveMessageHandler。??ReactiveMongoDbStoringMessageHandler????ReactiveMessageHandler??

從配置的角度來看,與許多其他標準通道適配器沒有區別。 例如,對于Java DSL,可以使用這樣的通道適配器:

@Beanpublic IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {    return f -> f            .channel(MessageChannels.flux())            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));}

在此示例中,我們將通過提供的連接到 MongoDb,并將請求消息中的數據存儲到具有名稱的默認集合中。 真正的操作將從內部創建的反應性流組合中按需執行。??ReactiveMongoDatabaseFactory????data????ReactiveStreamsConsumer??

這是基于提供的 or 和 MongoDb 查詢(或表達式)、調用或操作的實現,根據具有預期類型的選項來轉換查詢結果。 當(或根據選項)訂閱所生成消息的有效負載時,按需執行查詢執行和結果評估。 框架可以在拆分器時自動(本質上)訂閱這樣的有效負載,并在下游使用。 否則,目標應用程序有責任訂閱下游終結點中的輪詢發布者。??ReactiveMongoDbMessageSource????AbstractMessageSource????ReactiveMongoDatabaseFactory????ReactiveMongoOperations????find()????findOne()????expectSingleResult????entityClass????Publisher????Flux????Mono????expectSingleResult????flatMap????FluxMessageChannel??

使用Java DSL,這樣的通道適配器可以配置為:

@Beanpublic IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {    return IntegrationFlow            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{"name" : "Name"}")                            .entityClass(Person.class),                    c -> c.poller(Pollers.fixedDelay(1000)))            .split()            .channel(c -> c.flux("output"))            .get();}

從版本 5.5 開始,可以使用 . 它具有與 阻塞 . 有關更多信息,請參閱MongoDB 入站通道適配器和 JavaDocs。??ReactiveMongoDbMessageSource????updateExpression????MongoDbMessageSource????AbstractMongoDbMessageSourceSpec??

標簽: 通道適配器 可以使用 有效負載

上一篇:快訊:小樂樂改數字題
下一篇:機器學習 -- 分類