網(wǎng)上有很多關(guān)于pos機發(fā)送數(shù)據(jù),你來簡單回答一下RocketMQ的默認發(fā)送流程的知識,也有很多人為大家解答關(guān)于pos機發(fā)送數(shù)據(jù)的問題,今天pos機之家(www.shbwcl.net)為大家整理了關(guān)于這方面的知識,讓我們一起來看下吧!
本文目錄一覽:
pos機發(fā)送數(shù)據(jù)
今天我們就開始學(xué)習(xí)下默認消息發(fā)送流程,學(xué)習(xí)他的實現(xiàn)思路,也幫助我們工作中,遇到了問題不會手足無措。
思考問題消息發(fā)送者是如何做負載均衡的?消息發(fā)送者是如何保證高可用的?消息發(fā)送批量消息如何保證一致性的?默認發(fā)送流程-工作原理源碼入口:org.apache.rocketMq.client.producer.DefaultMQProducer#Send(org.apache.rocketmq.common.message.Message)
啟動Demo:
DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("xxx:9876"); producer.start();Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);SendResult sendResult = producer.send(msg);
流程:
1.校驗主題,設(shè)置主題
msg.setTopic(withNamespace(msg.getTopic()));
public String withNamespace(String resource) { return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);}
2.默認發(fā)送方式為同步發(fā)送,默認超時時間為3s
private int sendMsgTimeout = 3000;
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);}
3.確認 producer service 運行狀態(tài)是否為運行中
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#makeSureStateOK
//檢查狀態(tài),如果不是RUNNING狀態(tài)則拋出異常private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The producer service state not OK, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); }}
4.校驗信息
topic長達是否大于TOPIC_MAX_LENGTH,topic是否為空是否通過正則校驗,body是否為空,body大小是否超過4Mpublic static void checkTopic(String topic) throws MQClientException { if (UtilAll.isBlank(topic)) { throw new MQClientException("The specified topic is blank", null); } if (topic.length() > TOPIC_MAX_LENGTH) { throw new MQClientException( String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null); } if (isTopicOrGroupIllegal(topic)) { throw new MQClientException(String.format( "The specified topic[%s] contains illegal characters, allowing only %s", topic, "^[%|a-zA-Z0-9_-]+$"), null); }}
// body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
5.找到主題發(fā)布的信息,未找到則拋出異常
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo
消息生產(chǎn)者更新和維護路由信息緩存
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 消息生產(chǎn)者更新和維護路由信息緩存 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; }}
6.通過TopicPublishInfo 找到對應(yīng)的MessageQueue下的,BrokerName信息
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue
獲取到BrokerName對應(yīng)的MessageQueue信息
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); }}
如果lastBrokerName為null,通過對 sendWhichQueue 方法獲取一個隊列
取余,然后從messageQueueList中獲取一個MessageQueue
public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos);}
7.最后消息發(fā)送
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
1.根絕BrokerName獲取到broker地址
在啟動階段,對BrokerAddrTable信息進行了維護
public String findBrokerAddressInPublish(final String brokerName) { HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); if (map != null && !map.isEmpty()) { return map.get(MixAll.MASTER_ID); } return null;}
如果未找到,則通過主題查找主題信息,通過更新路由信息后,在嘗試獲取,如果還未找到則拋出異常
if (null == brokerAddr) { // 1.1 如果未找到,則通過主題查找主題信息,通過更新路由信息后,在嘗試獲取,如果還未找到則拋出異常 tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}
2.為消息分配全局唯一ID
// 為消息分配全局唯一IDif (!(msg instanceof Messagebatch)) { MessageClientIDSetter.setUniqID(msg);}
在RocketMQ消息發(fā)送-請求與響應(yīng)文章中,我們已經(jīng)學(xué)習(xí)了請求參數(shù)中,創(chuàng)建了全局唯一的MsgId,可以回頭看一看
3.注冊鉤子消息發(fā)送鉤子函數(shù)
這里主要做了三件事情,確認MsgType類型、是否為延遲消息、調(diào)用鉤子函數(shù)內(nèi)的方法
if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); // 3.1 通過isTrans來確定MsgType類型 if ("true".equals(isTrans)) { context.setMsgType(MessageType.Trans_Msg_Half); } // 3.2 如果msg里面 __STARTDELIVERTIME 或者 DELAY 不為空,則設(shè)置為延遲消息 if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } // 3.3 調(diào)用鉤子函數(shù)里的方法 this.executeSendMessageHookBefore(context);}
4.設(shè)置發(fā)送信息請求頭SendMessageRequestHeader
最后根據(jù)默認發(fā)送方式,進行消息的發(fā)送
主要利用NettyRemotingClient進行發(fā)送,這里就先不展開來說了 入口: MQClientAPIImpl.sendMessage()
問題答復(fù)消息發(fā)送者是如何做負載均衡的?默認采用輪詢,每一個消息發(fā)送者全局會維護一個 Topic 上一次選擇的隊列,然后基于這個序號進行遞增輪詢AllocateMessageQueueAveragely平均分配,按照總數(shù)除以消費者個數(shù)進行,對每個消費者進行分配AllocateMessageQueueAveragelyByCircle 輪流平均分配,按照消費者個數(shù),進行輪詢分配消息發(fā)送者是如何保證高可用的?在上面的步驟中通過TopicPublishInfo 找到對應(yīng)的MessageQueue下的,BrokerName信息,利用參數(shù)sendLatencyFaultEnable來開啟關(guān)閉故障規(guī)避機制sendLatencyFaultEnable 設(shè)置為 true:開啟延遲規(guī)避機制,一旦消息發(fā)送失敗會將 broker-a “悲觀”地認為在接下來的一段時間內(nèi)該 Broker 不可用,在為未來某一段時間內(nèi)所有的客戶端不會向該 Broker 發(fā)送消息。使用本次消息發(fā)送延遲時間來計算Broker故障規(guī)避時長,不參與消息發(fā)送隊列負載final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq;}
但是這樣子做可能帶來的后果是Broker沒有可用的情況,或者是某個Broker數(shù)據(jù)激增,增加消費者的壓力,所以默認不開啟規(guī)避機制,遇到消息發(fā)送失敗,規(guī)避 broker-a,但是在下一次消息發(fā)送時,即再次調(diào)用broker-a。
消息發(fā)送批量消息如何保證一致性的?將一個Topic下的消息,通過batch方法包一起發(fā)送客戶端ID與使用陷阱摘自丁威老師的文章
總結(jié)這段時間主要學(xué)習(xí)了RocketMQ的消息發(fā)送,主要是以源碼為主,深入了解了消息發(fā)送的啟動和消息發(fā)送的流程,以及認識到客戶端ID與使用陷阱 一圖總結(jié)
作者:叫我小郭_鏈接:https://juejin.cn/post/7105315713157431332來源:稀土掘金
以上就是關(guān)于pos機發(fā)送數(shù)據(jù),你來簡單回答一下RocketMQ的默認發(fā)送流程的知識,后面我們會繼續(xù)為大家整理關(guān)于pos機發(fā)送數(shù)據(jù)的知識,希望能夠幫助到大家!
