[發明專利]一種數據寫入Kafka的方法、裝置及設備有效
| 申請號: | 202011283123.2 | 申請日: | 2020-11-17 |
| 公開(公告)號: | CN112328602B | 公開(公告)日: | 2023-03-31 |
| 發明(設計)人: | 周朝衛;毛春陽 | 申請(專利權)人: | 中盈優創資訊科技有限公司 |
| 主分類號: | G06F16/22 | 分類號: | G06F16/22;G06F16/23;G06F16/27 |
| 代理公司: | 上海嘉藍專利代理事務所(普通合伙) 31407 | 代理人: | 盧化宇 |
| 地址: | 201800 上海市嘉定區安*** | 國省代碼: | 上海;31 |
| 權利要求書: | 查看更多 | 說明書: | 查看更多 |
| 摘要: | |||
| 搜索關鍵詞: | 一種 數據 寫入 kafka 方法 裝置 設備 | ||
本發明的實施例提供了一種數據寫入Kafka的方法、裝置及設備。所述方法包括檢索批次號,存在且任務執行狀態為任務開始,則合并標識為肯定標識;存在且任務執行狀態為任務成功,則設置合并標識為否定標識;不存在,則插入當前任務的批次號及其狀態信息,設置合并標識為否定標識;讀取最近批次的Topic的偏移量,從偏移量開始消費數據源Kafka的數據;每條消費的數據對應生成一個Key,將消費的數據和對應的Key寫入目標存儲Kafka的Topic;根據合并標識對數據進行合并,并保存。以此方式,保證在異常場景下,程序重啟導致的數據重新處理,再次寫入時不會導致數據的重復,從而實現數據存儲一致性。
技術領域
本發明的實施例一般涉及數據處理領域,并且更具體地,涉及一種數據寫入Kafka的方法、裝置及設備。
背景技術
在現實場景中,經常需要通過Spark Structured Streaming讀取Kafka,對讀取的數據進行清洗處理等操作,例如進行字符串的截取、數據類型的轉換、日期格式轉換等。然后再將處理后的數據寫入Kafka。
然而,將數據寫入Kafka的過程中,寫入后的數據不能更改和刪除;而且SparkStructured Streaming消費Kafka的Topic,并反寫Kafka的Topic,只能保證數據不丟失,無法保證數據不重復寫入。例如,數據保存到Kafka后,沒來得及保存offset,此時程序重啟,將從上上次保存的offset開始消費,如此,就重復消費了上一次的數據,導致數據重復。
發明內容
根據本發明的實施例,提供了一種數據寫入Kafka的方案。
在本發明的第一方面,提供了一種數據寫入Kafka方法。該方法包括:
步驟1:分批次消費數據源Kafka的Topic的數據,在數據庫中檢索當前任務的批次號,如果數據庫中存在所述批次號,則執行步驟2;否則,向所述數據庫中插入當前任務的批次號及其狀態信息,并設置合并標識為否定標識,執行步驟3;
步驟2:如果當前批次任務的狀態為任務開始,則設置合并標識為肯定標識,執行步驟3;如果當前批次任務的狀態為任務成功,則設置合并標識為否定標識,執行步驟6;
步驟3:讀取最近批次的數據源Kafka的Topic的偏移量,從所述偏移量開始消費數據源Kafka的Topic的數據;
步驟4:每條消費的數據對應生成一個Key,將所述消費的數據和對應的Key寫入目標存儲Kafka的Topic;
步驟5:判斷所述合并標識是否為肯定標識,如果是,則對數據進行合并,執行步驟6;否則,更新當前批次任務的狀態信息,執行步驟6;
步驟6:保存當前批次的數據源Kafka的Topic的偏移量。
進一步地,所述每條消費的數據對應生成一個Key,包括:
將每條數據對應的Topic名稱、分區以及偏移量的字段按特定順序順次拼接為一個字符串,形成該條數據的Key。
進一步地,在所述對數據進行合并時,判斷待合并的數據中是否存在相同的Key,如果是,則從Key相同的數據中,根據每條數據的寫入時間戳,保留最新的一條數據;否則,在數據合并時保留每條數據。
進一步地,所述狀態信息包括批次號和任務執行狀態,用于標識當前批次任務的執行狀態;所述任務執行狀態包括任務開始和任務成功。
在本發明的第二方面,提供了一種數據寫入Kafka裝置。該裝置包括:
第一判斷模塊,用于分批次消費數據源Kafka的Topic的數據,在數據庫中檢索當前任務的批次號,如果數據庫中存在所述批次號,則調用第二判斷模塊;否則,向所述數據庫中插入當前任務的批次號及其狀態信息,并設置合并標識為否定標識,調用消費模塊;
該專利技術資料僅供研究查看技術是否侵權等信息,商用須獲得專利權人授權。該專利全部權利屬于中盈優創資訊科技有限公司,未經中盈優創資訊科技有限公司許可,擅自商用是侵權行為。如果您想購買此專利、獲得商業授權和技術合作,請聯系【客服】
本文鏈接:http://www.szxzyx.cn/pat/books/202011283123.2/2.html,轉載請聲明來源鉆瓜專利網。
- 上一篇:太陽能空氣源熱泵聯合裝置
- 下一篇:一種有效降低烤煙硫含量的施肥方法
- 數據顯示系統、數據中繼設備、數據中繼方法、數據系統、接收設備和數據讀取方法
- 數據記錄方法、數據記錄裝置、數據記錄媒體、數據重播方法和數據重播裝置
- 數據發送方法、數據發送系統、數據發送裝置以及數據結構
- 數據顯示系統、數據中繼設備、數據中繼方法及數據系統
- 數據嵌入裝置、數據嵌入方法、數據提取裝置及數據提取方法
- 數據管理裝置、數據編輯裝置、數據閱覽裝置、數據管理方法、數據編輯方法以及數據閱覽方法
- 數據發送和數據接收設備、數據發送和數據接收方法
- 數據發送裝置、數據接收裝置、數據收發系統、數據發送方法、數據接收方法和數據收發方法
- 數據發送方法、數據再現方法、數據發送裝置及數據再現裝置
- 數據發送方法、數據再現方法、數據發送裝置及數據再現裝置





