[發明專利]一種處理消息的方法及裝置在審
| 申請號: | 202210459808.0 | 申請日: | 2022-04-24 |
| 公開(公告)號: | CN114706700A | 公開(公告)日: | 2022-07-05 |
| 發明(設計)人: | 郭方宇 | 申請(專利權)人: | 京東科技信息技術有限公司 |
| 主分類號: | G06F9/54 | 分類號: | G06F9/54 |
| 代理公司: | 中原信達知識產權代理有限責任公司 11219 | 代理人: | 楊倩;郝紅玉 |
| 地址: | 100176 北京市北京經濟技*** | 國省代碼: | 北京;11 |
| 權利要求書: | 查看更多 | 說明書: | 查看更多 |
| 摘要: | |||
| 搜索關鍵詞: | 一種 處理 消息 方法 裝置 | ||
本發明公開了一種處理消息的方法及裝置,涉及計算機技術領域。該方法的一具體實施方式包括:獲取消息消費請求,所述消息消費請求指示了待消費主題;根據所述消息消費請求,從所述待消費主題對應的一個或多個分區中拉取待消費的消息;將所述待消費的消息存儲至所述分區對應的消息鏈表中;對所述待消費的消息進行處理,并將所述消息鏈表中已處理的所述消息標記為已完成狀態;在所述消息鏈表中確定連續被標記為已完成狀態的多個消息中的最后一個目標消息,將所述目標消息的標識提交給服務端。該實施方式能夠避免消息丟失,并且能夠滿足高并發場景下的高吞吐量處理。
技術領域
本發明涉及計算機技術領域,尤其涉及一種處理消息的方法及裝置。
背景技術
客戶端從Kafka集群中某主題(Topic)對應的消息集合中消費消息時,在Topic中每個分區會記錄客戶端當前消費的位置,即消息的offset。在客戶端的應用程序重啟后,會從上一次標記的offset繼續消費消息。
由于目前消息的offset為客戶端自動管理,在客戶端出現異常時,可能會導致已經提交了offset的消息,實際并沒有被處理,從而導致消息丟失。
發明內容
有鑒于此,本發明實施例提供一種處理消息的方法及裝置,通過將從分區中拉取出的待消費消息存儲至對應的消息鏈表中,并將消息鏈表中已處理的消息標記為已完成狀態,然后將消息鏈表中連續被標記為已完成狀態的多個消息中的最后一個目標消息提交給服務端。由此,消息鏈表中處于目標消息之前的消息一定已處理完成,在客戶端異常后,可以根據服務端存儲的目標消息的標識繼續處理消息,從而避免消息丟失。
為實現上述目的,根據本發明實施例的一個方面,提供了一種處理消息的方法。
本發明實施例的一種處理消息的方法包括:獲取消息消費請求,所述消息消費請求指示了待消費主題;
根據所述消息消費請求,從所述待消費主題對應的一個或多個分區中拉取待消費的消息;
將所述待消費的消息存儲至所述分區對應的消息鏈表中;
對所述待消費的消息進行處理,并將所述消息鏈表中已處理的所述消息標記為已完成狀態;
在所述消息鏈表中確定連續被標記為已完成狀態的多個消息中的最后一個目標消息,將所述目標消息的標識提交給服務端。
可選地,所述對所述待消費的消息進行處理,包括:
將所述待消費的消息放入協程池,以使所述協程池中的多個協程對所述待消費的消息進行異步處理。
可選地,在對所述待消費的消息進行處理之前,還包括:
將所述消息鏈表中待消費的消息標記為未完成狀態;
所述將所述消息鏈表中已處理完成的所述消息標記為已完成狀態,包括:
在所述協程將所述消息處理完成后,將所述消息由未完成狀態更新為已完成狀態。
可選地,該方法還包括:確定所述協程是否在預設時長內將所述消息處理完成;
如果否,終止所述協程對所述消息的處理。
可選地,所述消息鏈表與所述分區一一對應;所述將所述待消費的消息存儲至所述分區對應的消息鏈表中,包括:
針對每個所述分區:根據所述待消費的消息在所述分區的順序,將所述待消費的消息存儲至所述分區對應的消息鏈表中。
可選地,該方法還包括:
關閉發送所述消息消費請求的客戶端中的自動標識處理服務。
可選地,在所述確定所述消息鏈表中連續被標記為已完成狀態的多個消息中的最后一個目標消息之后,還包括:
該專利技術資料僅供研究查看技術是否侵權等信息,商用須獲得專利權人授權。該專利全部權利屬于京東科技信息技術有限公司,未經京東科技信息技術有限公司許可,擅自商用是侵權行為。如果您想購買此專利、獲得商業授權和技術合作,請聯系【客服】
本文鏈接:http://www.szxzyx.cn/pat/books/202210459808.0/2.html,轉載請聲明來源鉆瓜專利網。
- 上一篇:一種建筑工程用抬升式路燈起吊裝置
- 下一篇:一種單運放恒壓均流控制電路





