[發(fā)明專利]基于延遲時間的分布式消息處理方法、裝置和系統(tǒng)在審
| 申請?zhí)枺?/td> | 202110244835.1 | 申請日: | 2021-03-05 |
| 公開(公告)號: | CN112905359A | 公開(公告)日: | 2021-06-04 |
| 發(fā)明(設(shè)計)人: | 郝健 | 申請(專利權(quán))人: | 上海中通吉網(wǎng)絡(luò)技術(shù)有限公司 |
| 主分類號: | G06F9/54 | 分類號: | G06F9/54;G06F9/50;G06F16/22;G06F16/27 |
| 代理公司: | 北京細軟智谷知識產(chǎn)權(quán)代理有限責任公司 11471 | 代理人: | 劉明華 |
| 地址: | 201799 上*** | 國省代碼: | 上海;31 |
| 權(quán)利要求書: | 查看更多 | 說明書: | 查看更多 |
| 摘要: | |||
| 搜索關(guān)鍵詞: | 基于 延遲時間 分布式 消息 處理 方法 裝置 系統(tǒng) | ||
1.一種基于延遲時間的分布式消息處理方法,其特征在于,包括:
獲取待延遲消費數(shù)據(jù)和所述待延遲消費數(shù)據(jù)對應的延遲時長;所述待延遲消費數(shù)據(jù)和所述延遲時長均攜帶有所述待延遲消費數(shù)據(jù)被發(fā)送的初始時間片;
將所述延遲時長存儲到配置中心,將所述待延遲消費數(shù)據(jù)存儲到分布式存儲集群;
根據(jù)當前時間和所述配置中心存儲的所有延遲時長,確定每個所述初始時間片對應的索引時間片;
按照預設(shè)檢測周期,對所述分布式存儲集群進行檢測,將得到的每個與所述索引時間片相匹配的所述待延遲消費數(shù)據(jù)均作為待消費數(shù)據(jù);
將所有所述待消費數(shù)據(jù)進行立即消費。
2.根據(jù)權(quán)利要求1所述的基于延遲時間的分布式消息處理方法,其特征在于,所述將所述待消費數(shù)據(jù)進行立即消費,包括:
獲取預先存儲到所述配置中心的執(zhí)行線程數(shù);
利用根據(jù)所述執(zhí)行線程數(shù)設(shè)置的執(zhí)行線程,對所述待消費數(shù)據(jù)進行消費。
3.根據(jù)權(quán)利要求1所述的基于延遲時間的分布式消息處理方法,其特征在于,還包括:
接收所述待消費數(shù)據(jù)的消費反饋;
若所述待消費數(shù)據(jù)的消費反饋表示消費未成功,將所述待消費數(shù)據(jù)的消費次數(shù)加一,并判斷所述消費次數(shù)是否超出預先存儲到所述配置中心的預設(shè)重試次數(shù);
若所述消費次數(shù)超出所述預設(shè)重試次數(shù),則將所述待消費數(shù)據(jù)從所述分布式存儲集群轉(zhuǎn)移到預先設(shè)置的死信隊列中,以便對所述待消費數(shù)據(jù)進行人工干預處理;
若所述消費次數(shù)未超出所述預設(shè)重試次數(shù),則重新將所述待消費數(shù)據(jù)存儲到所述分布式存儲集群中,以便對所述待消費數(shù)據(jù)進行重新消費。
4.根據(jù)權(quán)利要求3所述的基于延遲時間的分布式消息處理方法,其特征在于,所述接收所述待消費數(shù)據(jù)的消費反饋之后,還包括:
若所述待消費數(shù)據(jù)的消息反饋表示消費成功,則將所述待消費數(shù)據(jù)從所述分布式存儲集群移除。
5.根據(jù)權(quán)利要求4所述的基于延遲時間的分布式消息處理方法,其特征在于,還包括:
將所述消息反饋表示消費成功的所述待消費數(shù)據(jù)作為已消費數(shù)據(jù);
將所有所述已消費數(shù)據(jù)存儲到已消費信息隊列。
6.一種基于延遲時間的分布式消息處理裝置,其特征在于,包括:
獲取模塊,用于獲取待延遲消費數(shù)據(jù)和所述待延遲消費數(shù)據(jù)對應的延遲時長;所述待延遲消費數(shù)據(jù)和所述延遲時長均攜帶有所述待延遲消費數(shù)據(jù)被發(fā)送的初始時間片;
存儲模塊,用于將所述延遲時長存儲到配置中心,將所述待延遲消費數(shù)據(jù)存儲到分布式存儲集群;
確定模塊,用于根據(jù)當前時間和所述配置中心存儲的所有延遲時長,確定每個所述初始時間片對應的索引時間片;
檢測模塊,用于按照預設(shè)檢測周期,對所述分布式存儲集群進行檢測,將得到的每個與所述索引時間片相匹配的所述待延遲消費數(shù)據(jù)均作為待消費數(shù)據(jù);
消費模塊,用于將所有所述待消費數(shù)據(jù)進行立即消費。
7.根據(jù)權(quán)利要求6所述的基于延遲時間的分布式消息處理裝置,其特征在于,所述消費模塊包括:線程獲取單元和線程消費單元;
所述線程獲取單元,用于獲取預先存儲到所述配置中心的執(zhí)行線程數(shù);
所述線程消費單元,用于利用根據(jù)所述執(zhí)行線程數(shù)設(shè)置的執(zhí)行線程,對所述待消費數(shù)據(jù)進行消費。
8.根據(jù)權(quán)利要求6所述的基于延遲時間的分布式消息處理裝置,其特征在于,還包括:接收模塊、判斷模塊和轉(zhuǎn)移模塊;
所述接收模塊,用于接收所述待消費數(shù)據(jù)的消費反饋;
所述判斷模塊,用于若所述待消費數(shù)據(jù)的消費反饋表示消費未成功,將所述待消費數(shù)據(jù)的消費次數(shù)加一,并判斷所述消費次數(shù)是否超出預先存儲到所述配置中心的預設(shè)重試次數(shù);
所述轉(zhuǎn)移模塊,用于若所述消費次數(shù)超出所述預設(shè)重試次數(shù),則將所述待消費數(shù)據(jù)從所述分布式存儲集群轉(zhuǎn)移到預先設(shè)置的死信隊列中,以便對所述待消費數(shù)據(jù)進行人工干預處理;若所述消費次數(shù)未超出所述預設(shè)重試次數(shù),則重新將所述待消費數(shù)據(jù)存儲到所述分布式存儲集群中,以便對所述待消費數(shù)據(jù)進行重新消費。
該專利技術(shù)資料僅供研究查看技術(shù)是否侵權(quán)等信息,商用須獲得專利權(quán)人授權(quán)。該專利全部權(quán)利屬于上海中通吉網(wǎng)絡(luò)技術(shù)有限公司,未經(jīng)上海中通吉網(wǎng)絡(luò)技術(shù)有限公司許可,擅自商用是侵權(quán)行為。如果您想購買此專利、獲得商業(yè)授權(quán)和技術(shù)合作,請聯(lián)系【客服】
本文鏈接:http://www.szxzyx.cn/pat/books/202110244835.1/1.html,轉(zhuǎn)載請聲明來源鉆瓜專利網(wǎng)。





