
RocketMQ的基礎消息模型是發布-訂閱(Pub/Sub)是一種消息范式,消息的發送者(稱為發布者、生產者、Producer)會將消息直接發送給特定的接收者(稱為訂閱者、消費者、Comsumer),如下圖所示。
消息通過生產者發送到某一個Topic,如果需要訂閱該Topic并消費里面的消息的話,就要創建對應的消費者進行消費,而本文主要會進行介紹對應的消息隊列的消費者。
本文主要會針對于RocketMQ的消費者Consumer的功能原理進行分析和介紹,消費者主要會通過以推(push),拉(pull)兩種模式對消息進行消費。同時也支持集群方式和廣播方式的消費。提供實時消息訂閱機制,可以滿足大多數用戶的需求。
(資料圖片僅供參考)
MQ的消費模式可以大致分為兩種,一種是推Push,一種是拉Pull。
Push是服務端主動推送消息給客戶端,優點是及時性較好,但如果客戶端沒有做好流控,一旦服務端推送大量消息到客戶端時,就會導致客戶端消息堆積甚至崩潰。
Push模式主要通過初始化DefaultMQPushConsumer對象進行消費數據信息,案例代碼如下所示。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");// 設置NameServer地址 consumer.setNamesrvAddr("localhost:9876");//訂閱一個或多個topic,并指定tag過濾條件,這里指定*表示接收所有tag的消息consumer.subscribe("TopicTest", "*");//注冊回調接口來處理從Broker中收到的消息consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { // 返回消息消費狀態,ConsumeConcurrentlyStatus.CONSUME_SUCCESS為消費成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// 啟動Consumerconsumer.start();
消費端的消費的位點計算值,可以在啟動前進行配置,主要方法可以通過下面代碼進行配置。
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);CONSUME_FROM_LAST_OFFSET:第一次啟動從隊列最后位置消費,后續再啟動接著上次消費的進度開始消費CONSUME_FROM_FIRST_OFFSET:第一次啟動從隊列初始位置消費,后續再啟動接著上次消費的進度開始消費CONSUME_FROM_TIMESTAMP:第一次啟動從指定時間點位置消費,后續再啟動接著上次消費的進度開始消費。
注意:第一次啟動是指從來沒有消費過消息的消費者,如果該消費者消費過,那么會在broker端記錄該消費者的消費位置,如果該消費者掛了再啟動,那么自動從上次消費的進度開始。
消費模式主要分為:集群消費(Clustering)和廣播消費(Broadcasting)這兩種。
集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。consumer.setMessageModel(MessageModel.BROADCASTING);CLUSTERING:默認模式,同一個ConsumerGroup,每個consumer只消費所訂閱消息的一部分內容,同一個ConsumerGroup里所有的Consumer消息加起來才是所訂閱topic整體,從而達到負載均衡的目的。BROADCASTING:同一個ConsumerGroup每個consumer都消費到所訂閱topic所有消息,也就是一個消費會被多次分發,被多個consumer消費。
DefaultMQPushConsumerImpl中各個對象的主要功能如下:
RebalancePushImpl:主要負責進行分配對應當前服務實例的消費者會從當前消費的topic中的那個Queue中進行消費消息;此外當消費者宕機或者下線的時候,還會執行rebalance再次平衡和分配給其他消費者對應的隊列控制。
PullAPIWrapper:主要與broker服務端建立長連接,一直進行定時從broker服務端處拉取消息數據,默認為:32條消息,之后還會調用ConsumeMessageService實現類,進行用戶注冊的Listener執行消息消費邏輯。
看一下consumer.registerMessageListener的源碼,如下所示。
/** * Register a callback to execute on message arrival for concurrent consuming. * @param messageListener message handling callback. */ @Override public void registerMessageListener(MessageListenerConcurrently messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); }
ConsumeMessageService:實現所謂的"Push-被動"消費機制;從Broker拉取的消息后,封裝成ConsumeRequest提交給ConsumeMessageSerivce,此service負責回調用戶的Listener消費消息。
OffsetStore:維護當前consumer的消費記錄(offset);有兩種實現,Local和Rmote,Local存儲在本地磁盤上,適用于BROADCASTING廣播消費模式;而Remote則將消費進度存儲在Broker上,適用于CLUSTERING集群消費模式;
MQClientFactory:負責管理client(consumer、producer),并提供多中功能接口供各個Service(Rebalance、PullMessage等)調用;大部分邏輯均在這個類中完成,總體流程架構如下圖所示。
我們先來看一下對應的DefaultMQPushConsumerImpl類的start方法源碼,源碼可以看出主要實現過程在consumer.start后調用DefaultMQPushConsumerImpl的同步start方法,如下所示。
public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; this.checkConfig(); this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately();}
主要我們能關注重點的代碼和組件的start,通過mQClientFactory.start();發我們發現他調用了很多組件的start方法:
this.mQClientAPIImpl.start():主要用于開啟請求-響應的網絡通道對象。this.startScheduledTask():主要開啟多個定時任務的功能this.pullMessageService.start():主要開啟拉取數據的業務組件。this.rebalanceService.start():主要開啟rebalance業務服務組件。this.defaultMQProducer.getDefaultMQProducerImpl().start(false):開啟push服務的對象組件作為門面。public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
重點我們先來主要看pullMessageService.start(),通過這里我們發現RocketMQ的Push模式底層其實也是通過pull實現的,接下來我們先來分析一下pullMessageService中的pullMessage方法的源碼。
private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); }}
源碼中需要進行根據消費組進行篩選對應的消費組,以方便選對應的消費組件DefaultMQPushConsumerImpl,如下圖所示。
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
最后還是通過DefaultMQPushConsumerImpl類的pullMessage方法來進行消息的邏輯處理,
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);
總結一下針對于限流的總體流程控制:
首先拉去消息數據的時候會先去判斷對應的ProcessQueue的對象元素是否還存在訂閱關系或者被刪除了,從而進行跳過那些不應該被消費的數據。final ProcessQueue processQueue = pullRequest.getProcessQueue();if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return;}
上面的邏輯是先會判斷和校驗PullRequest對象中的ProcessQueue對象的dropped是否為true(在RebalanceService線程中為topic下的MessageQueue創建拉取消息請求時要維護對應的ProcessQueue對象,若Consumer不再訂閱該topic則會將該對象的dropped置為true);若是則認為該請求是已經取消的,則直接跳出該方法。
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());try { this.makeSureStateOK();} catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); return;}if (this.isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return;}
如果運行狀態或者是暫停狀態this.isPause()=false(DefaultMQPushConsumerImpl.pause=true),則會進行執行??PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)?
?方法延遲再拉取消息,其中timeDelay=3000;
該方法的目的是在3秒之后再次將該PullRequest對象放入PullMessageService. pullRequestQueue隊列中;并跳出該方法
?DefaultMQPushConsumer.pullThresholdForQueue?
?的執行進行判斷。當調用的??processQueue.getMsgCount().get()?
?的數值大于??DefaultMQPushConsumer.pullThresholdForQueue?
? 的值時候會進行 PullMessageService.executePullRequestLater方法。long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; }主要進行消費者端進行速度和控制消費速度的流控。主要會通過?
? this.defaultMQPushConsumer.getPullThresholdSizeForQueue()?
?與進行計算消息的內存空間的總大小進行對比,單位是M,當大于系統定義的?? this.defaultMQPushConsumer.getPullThresholdSizeForQueue()?
?的閾值大小的時候,則會進行限流處理。if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; }
以上3和4步驟中的(DELAY_MILLS_WHEN_FLOW_CONTROL)50毫秒之后,才會將該PullRequest請求放入PullMessageService.pullRequestQueue隊列中。從而實現看限流的能力。
if (!this.consumeOrderly) { if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queue"s messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return; } }
則檢查ProcessQueue對象的??msgTreeMap:TreeMap
? 變量的第一個key值與最后一個key值之間的的差值,該key值表示查詢的隊列偏移量queueoffset;
若queueoffset差值大于閾值(DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默認是2000),則調用PullMessageService.executePullRequestLater方法,在50毫秒之后再該PullRequest請求放入PullMessageService.pullRequestQueue隊列中。
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.warn("find the consumer"s subscription failed, {}", pullRequest); return; }
如果當前的消費者屬于集群模式(RebalanceImpl.messageModel等于CLUSTERING)。
PullRequest對象的MessageQueue變量值,type =READ_FROM_MEMORY(從內存中獲取消費進度offset值)為參數調用DefaultMQPushConsumerImpl的offsetStore對象。實際代表著RemoteBrokerOffsetStore對象的readOffset(MessageQueue mq, ReadOffsetType type)方法從本地內存中獲取消費進度offset值。boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } }
若該offset值大于0,則置臨時變量commitOffsetEnable等于true否則為false;該offset值作為pullKernelImpl方法中的commitOffset參數,在Broker端拉取消息之后根據commitOffsetEnable參數值決定是否用該offset更新消息進度。
該readOffset方法的邏輯是:以入參MessageQueue對象從RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap
當每次拉取消息之后需要更新訂閱關系(由DefaultMQPushConsumer. postSubscriptionWhenPull參數表示,默認為false)并且以topic值參數從RebalanceImpl.subscriptionInner獲取的SubscriptionData對象的classFilterMode等于false(默認為false),則將sysFlag標記的第3個字節置為1,否則該字節置為0;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null) { if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter );
該sysFlag標記的第1個字節置為commitOffsetEnable的值;第2個字節(suspend標記)置為1;第4個字節置為classFilterMode的值。
調用底層的拉取消息API接口,方法進行消息拉取操作。
PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)
內部會存在將回調類PullCallback傳入該方法中,當采用異步方式拉取消息時,在收到響應之后會回調該回調類的方法。
try { this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 消息的通信方式為異步 CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); }
初始化匿名內部類PullCallback,實現了onSuccess/onException方法; 該方法只有在異步請求的情況下才會回調;
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); } };
主要針對于拉去底層消息的狀態進行分狀態處理,針對于this.pullAPIWrapper.pullKernelImpl的方法,我會在【下篇】進行介紹,此處不做講述分析。
此外針對于PullStatus狀態的分析-FOUND狀態的處理,主要更新本地的offset值,以及流程控制等。如下所示。
long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break;此外針對于PullStatus狀態的分析-NO_NEW_MSG狀態的處理,主要進行更新topic標簽的offset的數據值、以及下一次拉去消息的offset值。
pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);此外針對于PullStatus狀態的分析-NO_MATCHED_MSG:狀態的處理,主要進行更新topic標簽的offset的數據值、以及下一次拉去消息的offset值。
pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);此外針對于PullStatus狀態的分析-OFFSET_ILLEGAL:狀態的處理,重新同步遠程的offset數據值。
pullRequest.getProcessQueue().setDropped(true);DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000);