觀速訊丨精華推薦 | 【深入淺出 RocketMQ原理及實戰】「底層源碼挖掘系列」透徹剖析貫穿RocketMQ的消費者端的運行核心的流程(上篇)

2023-01-25 20:21:05 來源:51CTO博客

精華推薦 | 【深入淺出 RocketMQ原理及實戰】「底層源碼挖掘系列」透徹剖析貫穿RocketMQ的消費者端的運行核心的流程

上篇:分析對應總體消費流程的判斷和校驗以及限流控制和回調等處理流程分析下篇:分析基于上篇的總體流程的底層的消息通訊以及拉去處理數據傳輸流程分析

RocketMQ的消息模型

RocketMQ的基礎消息模型是發布-訂閱(Pub/Sub)是一種消息范式,消息的發送者(稱為發布者、生產者、Producer)會將消息直接發送給特定的接收者(稱為訂閱者、消費者、Comsumer),如下圖所示。

消息通過生產者發送到某一個Topic,如果需要訂閱該Topic并消費里面的消息的話,就要創建對應的消費者進行消費,而本文主要會進行介紹對應的消息隊列的消費者。

本文主旨

本文主要會針對于RocketMQ的消費者Consumer的功能原理進行分析和介紹,消費者主要會通過以推(push),拉(pull)兩種模式對消息進行消費。同時也支持集群方式和廣播方式的消費。提供實時消息訂閱機制,可以滿足大多數用戶的需求。


(資料圖片僅供參考)

RocketMQ提供Push模式也提供了Pull模式

MQ的消費模式可以大致分為兩種,一種是推Push,一種是拉Pull。

Push模式處理消費消費

Push是服務端主動推送消息給客戶端,優點是及時性較好,但如果客戶端沒有做好流控,一旦服務端推送大量消息到客戶端時,就會導致客戶端消息堆積甚至崩潰。

DefaultMQPushConsumer的使用和初始化

Push模式主要通過初始化DefaultMQPushConsumer對象進行消費數據信息,案例代碼如下所示。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");// 設置NameServer地址 consumer.setNamesrvAddr("localhost:9876");//訂閱一個或多個topic,并指定tag過濾條件,這里指定*表示接收所有tag的消息consumer.subscribe("TopicTest", "*");//注冊回調接口來處理從Broker中收到的消息consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {        // 返回消息消費狀態,ConsumeConcurrentlyStatus.CONSUME_SUCCESS為消費成功        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  }});// 啟動Consumerconsumer.start();
消費的位點配置

消費端的消費的位點計算值,可以在啟動前進行配置,主要方法可以通過下面代碼進行配置。

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
CONSUME_FROM_LAST_OFFSET:第一次啟動從隊列最后位置消費,后續再啟動接著上次消費的進度開始消費CONSUME_FROM_FIRST_OFFSET:第一次啟動從隊列初始位置消費,后續再啟動接著上次消費的進度開始消費CONSUME_FROM_TIMESTAMP:第一次啟動從指定時間點位置消費,后續再啟動接著上次消費的進度開始消費。

注意:第一次啟動是指從來沒有消費過消息的消費者,如果該消費者消費過,那么會在broker端記錄該消費者的消費位置,如果該消費者掛了再啟動,那么自動從上次消費的進度開始。

消費的模式的配置

消費模式主要分為:集群消費(Clustering)和廣播消費(Broadcasting)這兩種。

集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。
consumer.setMessageModel(MessageModel.BROADCASTING);
CLUSTERING:默認模式,同一個ConsumerGroup,每個consumer只消費所訂閱消息的一部分內容,同一個ConsumerGroup里所有的Consumer消息加起來才是所訂閱topic整體,從而達到負載均衡的目的。BROADCASTING:同一個ConsumerGroup每個consumer都消費到所訂閱topic所有消息,也就是一個消費會被多次分發,被多個consumer消費。
DefaultMQPushConsumer的運行原理和流程

DefaultMQPushConsumerImpl中各個對象的主要功能如下:

平衡和分配隊列組件實現類-RebalancePushImpl

RebalancePushImpl:主要負責進行分配對應當前服務實例的消費者會從當前消費的topic中的那個Queue中進行消費消息;此外當消費者宕機或者下線的時候,還會執行rebalance再次平衡和分配給其他消費者對應的隊列控制。

長連接進行拉去消息組件實現類-PullAPIWrapper

PullAPIWrapper:主要與broker服務端建立長連接,一直進行定時從broker服務端處拉取消息數據,默認為:32條消息,之后還會調用ConsumeMessageService實現類,進行用戶注冊的Listener執行消息消費邏輯。

看一下consumer.registerMessageListener的源碼,如下所示。

/** * Register a callback to execute on message arrival for concurrent consuming. * @param messageListener message handling callback. */ @Override public void registerMessageListener(MessageListenerConcurrently messageListener) {    this.messageListener = messageListener;    this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); }
回調用戶的注冊的MessageListener組件實現類-ConsumeMessageService

ConsumeMessageService:實現所謂的"Push-被動"消費機制;從Broker拉取的消息后,封裝成ConsumeRequest提交給ConsumeMessageSerivce,此service負責回調用戶的Listener消費消息。

存儲Offset的消費記錄的位移組件實現類--OffsetStore

OffsetStore:維護當前consumer的消費記錄(offset);有兩種實現,Local和Rmote,Local存儲在本地磁盤上,適用于BROADCASTING廣播消費模式;而Remote則將消費進度存儲在Broker上,適用于CLUSTERING集群消費模式;

綜合門面功能接口供各個Service組件實現類--MQClientFactory

MQClientFactory:負責管理client(consumer、producer),并提供多中功能接口供各個Service(Rebalance、PullMessage等)調用;大部分邏輯均在這個類中完成,總體流程架構如下圖所示。

DefaultMQPushConsumerImpl的start方法的源碼

我們先來看一下對應的DefaultMQPushConsumerImpl類的start方法源碼,源碼可以看出主要實現過程在consumer.start后調用DefaultMQPushConsumerImpl的同步start方法,如下所示。

public synchronized void start() throws MQClientException {        switch (this.serviceState) {            case CREATE_JUST:                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());                this.serviceState = ServiceState.START_FAILED;                this.checkConfig();                this.copySubscription();                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {                    this.defaultMQPushConsumer.changeInstanceNameToPID();                }                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);                this.pullAPIWrapper = new PullAPIWrapper(                    mQClientFactory,                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);                if (this.defaultMQPushConsumer.getOffsetStore() != null) {                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();                } else {                    switch (this.defaultMQPushConsumer.getMessageModel()) {                        case BROADCASTING:                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());                            break;                        case CLUSTERING:                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());                            break;                        default:                            break;                    }                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);                }                this.offsetStore.load();                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {                    this.consumeOrderly = true;                    this.consumeMessageService =                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {                    this.consumeOrderly = false;                    this.consumeMessageService =                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());                }                this.consumeMessageService.start();                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);                if (!registerOK) {                    this.serviceState = ServiceState.CREATE_JUST;                    this.consumeMessageService.shutdown();                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),                        null);                }                mQClientFactory.start();                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());                this.serviceState = ServiceState.RUNNING;                break;            case RUNNING:            case START_FAILED:            case SHUTDOWN_ALREADY:                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "                    + this.serviceState                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),                    null);            default:                break;        }        this.updateTopicSubscribeInfoWhenSubscriptionChanged();        this.mQClientFactory.checkClientInBroker();        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();        this.mQClientFactory.rebalanceImmediately();}
DefaultMQPushConsumerImpl的start方法的源碼

主要我們能關注重點的代碼和組件的start,通過mQClientFactory.start();發我們發現他調用了很多組件的start方法:

this.mQClientAPIImpl.start():主要用于開啟請求-響應的網絡通道對象。this.startScheduledTask():主要開啟多個定時任務的功能this.pullMessageService.start():主要開啟拉取數據的業務組件。this.rebalanceService.start():主要開啟rebalance業務服務組件。this.defaultMQProducer.getDefaultMQProducerImpl().start(false):開啟push服務的對象組件作為門面。
public void start() throws MQClientException {        synchronized (this) {            switch (this.serviceState) {                case CREATE_JUST:                    this.serviceState = ServiceState.START_FAILED;                    // If not specified,looking address from name server                    if (null == this.clientConfig.getNamesrvAddr()) {                        this.mQClientAPIImpl.fetchNameServerAddr();                    }                    // Start request-response channel                    this.mQClientAPIImpl.start();                    // Start various schedule tasks                    this.startScheduledTask();                    // Start pull service                    this.pullMessageService.start();                    // Start rebalance service                    this.rebalanceService.start();                    // Start push service                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);                    log.info("the client factory [{}] start OK", this.clientId);                    this.serviceState = ServiceState.RUNNING;                    break;                case RUNNING:                    break;                case SHUTDOWN_ALREADY:                    break;                case START_FAILED:                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);                default:                    break;            }        }    }

重點我們先來主要看pullMessageService.start(),通過這里我們發現RocketMQ的Push模式底層其實也是通過pull實現的,接下來我們先來分析一下pullMessageService中的pullMessage方法的源碼。

private void pullMessage(final PullRequest pullRequest) {        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());        if (consumer != null) {            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;            impl.pullMessage(pullRequest);        } else {            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);        }}
DefaultMQPushConsumerImpl的pullMessage方法的源碼

源碼中需要進行根據消費組進行篩選對應的消費組,以方便選對應的消費組件DefaultMQPushConsumerImpl,如下圖所示。

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

最后還是通過DefaultMQPushConsumerImpl類的pullMessage方法來進行消息的邏輯處理,

DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);
DefaultMQPushConsumerImpl的邏輯限流控制流程

總結一下針對于限流的總體流程控制:

首先拉去消息數據的時候會先去判斷對應的ProcessQueue的對象元素是否還存在訂閱關系或者被刪除了,從而進行跳過那些不應該被消費的數據。
final ProcessQueue processQueue = pullRequest.getProcessQueue();if (processQueue.isDropped()) {     log.info("the pull request[{}] is dropped.", pullRequest.toString());     return;}

上面的邏輯是先會判斷和校驗PullRequest對象中的ProcessQueue對象的dropped是否為true(在RebalanceService線程中為topic下的MessageQueue創建拉取消息請求時要維護對應的ProcessQueue對象,若Consumer不再訂閱該topic則會將該對象的dropped置為true);若是則認為該請求是已經取消的,則直接跳出該方法。


更新PullRequest對象中的ProcessQueue對象的時間戳(ProcessQueue.lastPullTimestamp)為當前時間戳。此外會判斷當前的Consumer消費者組件是否運行中,主要是通過DefaultMQPushConsumerImpl.serviceState是否為RUNNING。
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try {    this.makeSureStateOK();} catch (MQClientException e) {    log.warn("pullMessage exception, consumer state not ok", e);    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);    return;}if (this.isPause()) {  log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);  return;}

如果運行狀態或者是暫停狀態this.isPause()=false(DefaultMQPushConsumerImpl.pause=true),則會進行執行??PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)??方法延遲再拉取消息,其中timeDelay=3000;

該方法的目的是在3秒之后再次將該PullRequest對象放入PullMessageService. pullRequestQueue隊列中;并跳出該方法


主要進行消費者端進行速度和控制消費速度的流控。若ProcessQueue對象的msgCount大于了消費端的流控閾值,默認值為1000,主要通過??DefaultMQPushConsumer.pullThresholdForQueue??的執行進行判斷。當調用的??processQueue.getMsgCount().get()??的數值大于??DefaultMQPushConsumer.pullThresholdForQueue?? 的值時候會進行 PullMessageService.executePullRequestLater方法。
long cachedMessageCount = processQueue.getMsgCount().get();        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);            if ((queueFlowControlTimes++ % 1000) == 0) {                log.warn(                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);            }            return;        }
主要進行消費者端進行速度和控制消費速度的流控。主要會通過?? this.defaultMQPushConsumer.getPullThresholdSizeForQueue()??與進行計算消息的內存空間的總大小進行對比,單位是M,當大于系統定義的?? this.defaultMQPushConsumer.getPullThresholdSizeForQueue()??的閾值大小的時候,則會進行限流處理。
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);            if ((queueFlowControlTimes++ % 1000) == 0) {                log.warn(                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);            }            return;        }

以上3和4步驟中的(DELAY_MILLS_WHEN_FLOW_CONTROL)50毫秒之后,才會將該PullRequest請求放入PullMessageService.pullRequestQueue隊列中。從而實現看限流的能力。


當上面的直接限流之后,還會有跨度限流的控制,首先系統還會判斷當前的消費方式是否順序消費(即DefaultMQPushConsumerImpl.consumeOrderly等于false)。
if (!this.consumeOrderly) {            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {                    log.warn(                        "the queue"s messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),                        pullRequest, queueMaxSpanFlowControlTimes);                }                return;            }        }

則檢查ProcessQueue對象的??msgTreeMap:TreeMap?? 變量的第一個key值與最后一個key值之間的的差值,該key值表示查詢的隊列偏移量queueoffset;

若queueoffset差值大于閾值(DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默認是2000),則調用PullMessageService.executePullRequestLater方法,在50毫秒之后再該PullRequest請求放入PullMessageService.pullRequestQueue隊列中。


PullRequest.messageQueue對象的topic值為參數RebalanceImpl.subscriptionInner,ConcurrentHashMap, SubscriptionData>中獲取對應的SubscriptionData對象,若該對象為null,考慮到并發的關系,調用executePullRequestLater方法。
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());        if (null == subscriptionData) {            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);            log.warn("find the consumer"s subscription failed, {}", pullRequest);            return; }
DefaultMQPushConsumerImpl的消費模式分類

如果當前的消費者屬于集群模式(RebalanceImpl.messageModel等于CLUSTERING)。

PullRequest對象的MessageQueue變量值,type =READ_FROM_MEMORY(從內存中獲取消費進度offset值)為參數調用DefaultMQPushConsumerImpl的offsetStore對象。實際代表著RemoteBrokerOffsetStore對象的readOffset(MessageQueue mq, ReadOffsetType type)方法從本地內存中獲取消費進度offset值。
boolean commitOffsetEnable = false;        long commitOffsetValue = 0L;        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);            if (commitOffsetValue > 0) {                commitOffsetEnable = true;            }   }

若該offset值大于0,則置臨時變量commitOffsetEnable等于true否則為false;該offset值作為pullKernelImpl方法中的commitOffset參數,在Broker端拉取消息之后根據commitOffsetEnable參數值決定是否用該offset更新消息進度。

該readOffset方法的邏輯是:以入參MessageQueue對象從RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap 變量中獲取消費進度偏移量;若該偏移量不為null則返回該值,否則返回-1;


DefaultMQPushConsumerImpl的訂閱模型

當每次拉取消息之后需要更新訂閱關系(由DefaultMQPushConsumer. postSubscriptionWhenPull參數表示,默認為false)并且以topic值參數從RebalanceImpl.subscriptionInner獲取的SubscriptionData對象的classFilterMode等于false(默認為false),則將sysFlag標記的第3個字節置為1,否則該字節置為0;

SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());        if (sd != null) {            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {                subExpression = sd.getSubString();            }            classFilter = sd.isClassFilterMode();        }        int sysFlag = PullSysFlag.buildSysFlag(            commitOffsetEnable, // commitOffset            true, // suspend            subExpression != null, // subscription            classFilter // class filter );

該sysFlag標記的第1個字節置為commitOffsetEnable的值;第2個字節(suspend標記)置為1;第4個字節置為classFilterMode的值。

DefaultMQPushConsumerImpl的底層客戶端如何拉取消息的通信方法

調用底層的拉取消息API接口,方法進行消息拉取操作。

PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)

內部會存在將回調類PullCallback傳入該方法中,當采用異步方式拉取消息時,在收到響應之后會回調該回調類的方法。

try {            this.pullAPIWrapper.pullKernelImpl(                pullRequest.getMessageQueue(),                subExpression,                subscriptionData.getExpressionType(),                subscriptionData.getSubVersion(),                pullRequest.getNextOffset(),                this.defaultMQPushConsumer.getPullBatchSize(),                sysFlag,                commitOffsetValue,                BROKER_SUSPEND_MAX_TIME_MILLIS,                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,                // 消息的通信方式為異步                CommunicationMode.ASYNC,                pullCallback            );        } catch (Exception e) {            log.error("pullKernelImpl exception", e);            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);        }

初始化匿名內部類PullCallback,實現了onSuccess/onException方法; 該方法只有在異步請求的情況下才會回調;

PullCallback pullCallback = new PullCallback() {            @Override            public void onSuccess(PullResult pullResult) {                if (pullResult != null) {                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,                        subscriptionData);                    switch (pullResult.getPullStatus()) {                        case FOUND:                            long prevRequestOffset = pullRequest.getNextOffset();                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());                            long pullRT = System.currentTimeMillis() - beginTimestamp;                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),                                pullRequest.getMessageQueue().getTopic(), pullRT);                            long firstMsgOffset = Long.MAX_VALUE;                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                            } else {                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(                                    pullResult.getMsgFoundList(),                                    processQueue,                                    pullRequest.getMessageQueue(),                                    dispatchToConsume);                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());                                } else {                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                                }                            }                            if (pullResult.getNextBeginOffset() < prevRequestOffset                                || firstMsgOffset < prevRequestOffset) {                                log.warn(                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",                                    pullResult.getNextBeginOffset(),                                    firstMsgOffset,                                    prevRequestOffset);                            }                            break;                        case NO_NEW_MSG:                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                            break;                        case NO_MATCHED_MSG:                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                            break;                        case OFFSET_ILLEGAL:                            log.warn("the pull request offset illegal, {} {}",                                pullRequest.toString(), pullResult.toString());                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());                            pullRequest.getProcessQueue().setDropped(true);                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {                                @Override                                public void run() {                                    try {                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),                                            pullRequest.getNextOffset(), false);                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());                                        log.warn("fix the pull request offset, {}", pullRequest);                                    } catch (Throwable e) {                                        log.error("executeTaskLater Exception", e);                                    }                                }                            }, 10000);                            break;                        default:                            break;                    }                }            }            @Override            public void onException(Throwable e) {                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                    log.warn("execute the pull request exception", e);                }                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);            }        };

主要針對于拉去底層消息的狀態進行分狀態處理,針對于this.pullAPIWrapper.pullKernelImpl的方法,我會在【下篇】進行介紹,此處不做講述分析。

此外針對于PullStatus狀態的分析-FOUND狀態的處理,主要更新本地的offset值,以及流程控制等。如下所示。

long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT = System.currentTimeMillis() - beginTimestamp;                           DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),                                pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {    firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(                                    pullResult.getMsgFoundList(),                                    processQueue,                                    pullRequest.getMessageQueue(),                                    dispatchToConsume);                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());                                } else {                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                                }                            }                            if (pullResult.getNextBeginOffset() < prevRequestOffset                                || firstMsgOffset < prevRequestOffset) {                                log.warn(                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",                                    pullResult.getNextBeginOffset(),                                    firstMsgOffset,                                    prevRequestOffset);                            }                            break;
此外針對于PullStatus狀態的分析-NO_NEW_MSG狀態的處理,主要進行更新topic標簽的offset的數據值、以及下一次拉去消息的offset值。
pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
此外針對于PullStatus狀態的分析-NO_MATCHED_MSG:狀態的處理,主要進行更新topic標簽的offset的數據值、以及下一次拉去消息的offset值。
pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
此外針對于PullStatus狀態的分析-OFFSET_ILLEGAL:狀態的處理,重新同步遠程的offset數據值。
pullRequest.getProcessQueue().setDropped(true);DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {   @Override   public void run() {     try {     DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),                                            pullRequest.getNextOffset(), false);                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());                                     DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());                                        log.warn("fix the pull request offset, {}", pullRequest);                                    } catch (Throwable e) {                                        log.error("executeTaskLater Exception", e);                                    }                                } }, 10000);

標簽: 如下圖所示 我們先來

上一篇:Nginx與LUA(7)
下一篇:天天熱點!【學懂Java】(六)常用類