[發(fā)明專利]一種Kafka消息唯一消費(fèi)方法、系統(tǒng)、服務(wù)器及存儲(chǔ)介質(zhì)有效
| 申請(qǐng)?zhí)枺?/td> | 201811330631.4 | 申請(qǐng)日: | 2018-11-09 |
| 公開(公告)號(hào): | CN109493076B | 公開(公告)日: | 2022-06-17 |
| 發(fā)明(設(shè)計(jì))人: | 張振鐸 | 申請(qǐng)(專利權(quán))人: | 武漢斗魚網(wǎng)絡(luò)科技有限公司 |
| 主分類號(hào): | G06Q20/40 | 分類號(hào): | G06Q20/40 |
| 代理公司: | 武漢河山金堂專利事務(wù)所(普通合伙) 42212 | 代理人: | 胡清堂;陳懿 |
| 地址: | 430000 湖北省武漢市東湖開*** | 國(guó)省代碼: | 湖北;42 |
| 權(quán)利要求書: | 查看更多 | 說(shuō)明書: | 查看更多 |
| 摘要: | |||
| 搜索關(guān)鍵詞: | 一種 kafka 消息 唯一 消費(fèi) 方法 系統(tǒng) 服務(wù)器 存儲(chǔ) 介質(zhì) | ||
1.一種Kafka消息唯一消費(fèi)的方法,其特征在于,所述方法包括以下步驟:
S1、Kafka消息分發(fā)平臺(tái)包括多個(gè)Topic消息類,每個(gè)Topic消息類包括多個(gè)Partition分塊,每個(gè)Partition分塊包括消費(fèi)者的消費(fèi)記錄Offset,通過(guò)Redis中Hash結(jié)構(gòu),創(chuàng)建Hash結(jié)構(gòu)的多個(gè)第一Key值,所述第一Key值用于記錄消費(fèi)者對(duì)每個(gè)Topic消息類所有的Partition分塊消費(fèi)記錄Offset;
S2、在Redis的Hash結(jié)構(gòu)中創(chuàng)建第二Key值,所述第二Key值用于存儲(chǔ)任一Topic消息任一Partition分塊內(nèi)獲取的批量的消息;
S3、通過(guò)步驟S2獲取的批量消息,存儲(chǔ)于Redis的第二Key值中,消費(fèi)者根據(jù)需求對(duì)所述存儲(chǔ)的消息進(jìn)行消費(fèi),并等待消息消費(fèi)完畢后更新步驟S1中第一Key值中的消費(fèi)記錄Offset,并進(jìn)行刪除處理;
S4、消費(fèi)者消費(fèi)中若發(fā)生宕機(jī),重新啟動(dòng)后,需確認(rèn)步驟S2中Redis的Key值中是否有消息,若有消息則繼續(xù)消費(fèi),若沒(méi)有消息,則重復(fù)步驟S3繼續(xù)進(jìn)行消息處理;
其中,所述步驟S3中所述批量消息處理完后進(jìn)行的刪除處理包括以下步驟:
S31、若消息消費(fèi)失敗,則根據(jù)失敗消息的ID,通過(guò)第二Key值重新獲取所述失敗的消息并進(jìn)行消費(fèi)處理,直至消費(fèi)成功;
S32、若批量消息全部消費(fèi)成功,記錄所述消費(fèi)成功的MsgOffsetEnd,并刪除Redis的第二Key值,并將MsgOffsetEnd消費(fèi)成功的數(shù)據(jù)在所述第一Key值中對(duì)應(yīng)管理的消費(fèi)組的Topic的Partition的Offset進(jìn)行更新,其中,所述MsgOffsetEnd表示Kafka消息分發(fā)平臺(tái)拉取消息的結(jié)束消費(fèi)記錄offset。
2.根據(jù)權(quán)利要求1所述一種Kafka消息唯一消費(fèi)的方法,其特征在于,所述每個(gè)Topic建立的第一Key值結(jié)構(gòu)為Topic_consumerGroup,所述Topic_consumerGroup結(jié)構(gòu)中包括的多個(gè)第一Field與所述Topic的所有Partition一一對(duì)應(yīng),第一Field的第一Value值為所述Topic的Partition的Offset值。
3.根據(jù)權(quán)利要求2所述一種Kafka消息唯一消費(fèi)的方法,其特征在于,所述每個(gè)Topic創(chuàng)建第二Key值,所述第二Key值的結(jié)構(gòu)包括Kafka平臺(tái)的拉取消息的開始消費(fèi)記錄offset、所述Kafka消息分發(fā)平臺(tái)拉取消息的結(jié)束消費(fèi)記錄offset、以及所述第一Key值結(jié)構(gòu);
所述第二Key值中第二Field為當(dāng)前消費(fèi)記錄offset的值,第二Value為消息的具體內(nèi)容。
4.根據(jù)權(quán)利要求1所述一種Kafka消息唯一消費(fèi)的方法,其特征在于,所述步驟S32中若MsgOffsetEnd消費(fèi)記錄數(shù)據(jù)在第一Key值中的Offset中需要更新的位置已經(jīng)被其他Topic的消費(fèi)數(shù)據(jù)更新,則不再更新MsgOffsetEnd消費(fèi)記錄數(shù)據(jù),若未更新,則進(jìn)行更新,并將更新后的第一Key值中的Offset值更新至Kaflka平臺(tái)。
該專利技術(shù)資料僅供研究查看技術(shù)是否侵權(quán)等信息,商用須獲得專利權(quán)人授權(quán)。該專利全部權(quán)利屬于武漢斗魚網(wǎng)絡(luò)科技有限公司,未經(jīng)武漢斗魚網(wǎng)絡(luò)科技有限公司許可,擅自商用是侵權(quán)行為。如果您想購(gòu)買此專利、獲得商業(yè)授權(quán)和技術(shù)合作,請(qǐng)聯(lián)系【客服】
本文鏈接:http://www.szxzyx.cn/pat/books/201811330631.4/1.html,轉(zhuǎn)載請(qǐng)聲明來(lái)源鉆瓜專利網(wǎng)。
- 同類專利
- 專利分類
G06Q 專門適用于行政、商業(yè)、金融、管理、監(jiān)督或預(yù)測(cè)目的的數(shù)據(jù)處理系統(tǒng)或方法;其他類目不包含的專門適用于行政、商業(yè)、金融、管理、監(jiān)督或預(yù)測(cè)目的的處理系統(tǒng)或方法
G06Q20-00 支付體系結(jié)構(gòu)、方案或協(xié)議
G06Q20-02 .涉及中立的第三方,例如認(rèn)證機(jī)構(gòu)、公證人或可信的第三方[TTP]
G06Q20-04 .支付電路
G06Q20-08 .支付體系結(jié)構(gòu)
G06Q20-22 .支付方案或模式
G06Q20-30 .以特定設(shè)備的使用為特征的
- Kafka集群的數(shù)據(jù)傳送方法和裝置
- KAFKA消息隊(duì)列數(shù)監(jiān)控方法、裝置、電子設(shè)備及存儲(chǔ)介質(zhì)
- KAFKA主題監(jiān)控方法、裝置、電子設(shè)備及存儲(chǔ)介質(zhì)
- 基于容器的集群遷移方法及裝置
- kafka消息的測(cè)試方法、裝置、計(jì)算機(jī)設(shè)備及存儲(chǔ)介質(zhì)
- 基于kafka集群的數(shù)據(jù)寫入方法、裝置、電子設(shè)備、存儲(chǔ)介質(zhì)
- 一種可視化部署并管理kafka集群的方法
- kafka消息存儲(chǔ)系統(tǒng)、方法、裝置及計(jì)算機(jī)可讀存儲(chǔ)介質(zhì)
- 一種基于kafka-connect的數(shù)據(jù)傳遞方法
- 一種將Kafka長(zhǎng)連接消費(fèi)轉(zhuǎn)換成服務(wù)的方法





