[發(fā)明專利]基于Spark Streaming讀取Kafka數(shù)據(jù)的處理方法有效
| 申請(qǐng)?zhí)枺?/td> | 201611069230.9 | 申請(qǐng)日: | 2016-11-29 |
| 公開(公告)號(hào): | CN106776855B | 公開(公告)日: | 2020-03-13 |
| 發(fā)明(設(shè)計(jì))人: | 程永新;謝濤;王仁錚 | 申請(qǐng)(專利權(quán))人: | 上海輕維軟件有限公司 |
| 主分類號(hào): | G06F16/182 | 分類號(hào): | G06F16/182;G06F16/18;G06F16/13;G06F16/28 |
| 代理公司: | 上海科律專利代理事務(wù)所(特殊普通合伙) 31290 | 代理人: | 袁亞軍;金碎平 |
| 地址: | 200331 上海市普陀區(qū)*** | 國省代碼: | 上海;31 |
| 權(quán)利要求書: | 查看更多 | 說明書: | 查看更多 |
| 摘要: | |||
| 搜索關(guān)鍵詞: | 基于 spark streaming 讀取 kafka 數(shù)據(jù) 處理 方法 | ||
1.一種基于Spark Streaming讀取Kafka數(shù)據(jù)的處理方法,其特征在于,包括如下步驟:
S1)利用Kafka將數(shù)據(jù)存儲(chǔ)在話題中,每個(gè)話題均包含若干可配置數(shù)量的分區(qū);
S2)利用Spark Streaming把實(shí)時(shí)輸入數(shù)據(jù)流以時(shí)間片為單位切分成塊,每個(gè)塊均生成一個(gè)Spark Job處理;
S3)預(yù)先根據(jù)Kafka數(shù)據(jù)失敗記錄數(shù),設(shè)置SparkStreaming補(bǔ)數(shù)調(diào)度時(shí)間;
S4)實(shí)時(shí)監(jiān)控SparkStreaming讀取Kafka數(shù)據(jù)的處理過程;
S5) 根據(jù)Kafka數(shù)據(jù)失敗記錄數(shù)和調(diào)度時(shí)間,通過SparkStreaming重新讀取失敗丟失的Kafka數(shù)據(jù);
所述步驟S3)使用關(guān)系型數(shù)據(jù)庫創(chuàng)建兩張數(shù)據(jù)庫表,分別為調(diào)度表和失敗記錄數(shù)表,所述調(diào)度表中存放調(diào)度編號(hào)id,開始時(shí)間,結(jié)束時(shí)間,狀態(tài)和創(chuàng)建時(shí)間信息,所述失敗數(shù)記錄表中存放失敗記錄id,偏移量,Kafka話題,Kafka節(jié)點(diǎn)列表信息,所述調(diào)度表中的調(diào)度編號(hào)id和失敗數(shù)記錄表的失敗記錄id為主外鍵關(guān)系。
2.如權(quán)利要求1所述的基于Spark Streaming讀取Kafka數(shù)據(jù)的處理方法,其特征在于,所述步驟S4)包括:在SparkStreaming讀取Kafka數(shù)據(jù)過程中,如果對(duì)應(yīng)的Kafka話題數(shù)據(jù)不為空,則獲取到正在從Kafka讀取到數(shù)據(jù)的偏移量,并將該數(shù)據(jù)偏移量、Kafka話題以及Kafka節(jié)點(diǎn)列表信息入庫到關(guān)系型數(shù)據(jù)庫失敗數(shù)記錄表中,如果數(shù)據(jù)處理異常,則修改數(shù)據(jù)表中的狀態(tài)為失敗。
3. 如權(quán)利要求2所述的基于Spark Streaming讀取Kafka數(shù)據(jù)的處理方法,其特征在于,所述步驟S4)中SparkStreaming通過Direct方式直接連接到Kafka節(jié)點(diǎn)上,并通過createDirectStream 方法獲取到正在從Kafka讀取到數(shù)據(jù)的偏移量,同時(shí)將調(diào)度表中的狀態(tài)標(biāo)識(shí)為正在進(jìn)行中;當(dāng)SparkStreaming對(duì)接Kafka讀取處理數(shù)據(jù)過程中,出現(xiàn)異常造成程序不能正常執(zhí)行,則修改調(diào)度表中的狀態(tài)為失敗。
4. 如權(quán)利要求3所述的基于Spark Streaming讀取Kafka數(shù)據(jù)的處理方法,其特征在于,所述步驟S5)包括:首先根據(jù)調(diào)度表狀態(tài)字段作為查詢條件,掃描調(diào)度表,根據(jù)創(chuàng)建時(shí)間字段作為排序降序,得到最早的調(diào)度記錄,然后獲得調(diào)度編號(hào)id,以該字段作為查詢失敗數(shù)記錄表?xiàng)l件,獲得所有Kafka失敗記錄數(shù),再根據(jù)Kafka話題和偏移量重新讀取Kafka數(shù)據(jù)。
5. 如權(quán)利要求2所述的基于Spark Streaming讀取Kafka數(shù)據(jù)的處理方法,其特征在于,所述步驟S4)先讀取關(guān)系數(shù)據(jù)庫中調(diào)度表和失敗數(shù)記錄表緩存到內(nèi)存中,再通過線程定時(shí)更新緩存中的數(shù)據(jù)進(jìn)行實(shí)時(shí)監(jiān)控。
該專利技術(shù)資料僅供研究查看技術(shù)是否侵權(quán)等信息,商用須獲得專利權(quán)人授權(quán)。該專利全部權(quán)利屬于上海輕維軟件有限公司,未經(jīng)上海輕維軟件有限公司許可,擅自商用是侵權(quán)行為。如果您想購買此專利、獲得商業(yè)授權(quán)和技術(shù)合作,請(qǐng)聯(lián)系【客服】
本文鏈接:http://www.szxzyx.cn/pat/books/201611069230.9/1.html,轉(zhuǎn)載請(qǐng)聲明來源鉆瓜專利網(wǎng)。
- 一種Spark平臺(tái)性能自動(dòng)優(yōu)化方法
- 一種Spark作業(yè)的提交方法及裝置
- Spark性能優(yōu)化控制方法、裝置、設(shè)備及存儲(chǔ)介質(zhì)
- spark任務(wù)的提交方法、裝置和服務(wù)器
- Spark任務(wù)的提交方法、系統(tǒng)、客戶端及服務(wù)端
- 一種提交并守護(hù)spark任務(wù)的方法及裝置
- 用戶任務(wù)的處理方法、裝置、電子設(shè)備和計(jì)算機(jī)可讀介質(zhì)
- Spark任務(wù)處理方法及裝置
- 一種Spark應(yīng)用部署管理方法及相關(guān)設(shè)備
- 數(shù)據(jù)處理方法、裝置、電子設(shè)備、存儲(chǔ)介質(zhì)及程序產(chǎn)品
- 一種基于spark streaming的大數(shù)據(jù)流處理方法和系統(tǒng)
- 一種基于大數(shù)據(jù)分布式編程框架的大數(shù)據(jù)預(yù)統(tǒng)系統(tǒng)及方法
- 基于SparkStreaming的智能配電柜、節(jié)能系統(tǒng)及方法
- 讀取RabbitMQ數(shù)據(jù)的方法及裝置
- 用于spark streaming的資源動(dòng)態(tài)分配和反饋方法及裝置
- 監(jiān)控告警系統(tǒng)及方法
- 一種基于Spark_Streaming程序的運(yùn)行環(huán)境控制方法
- 一種數(shù)據(jù)處理方法、裝置、設(shè)備及存儲(chǔ)介質(zhì)
- Spark Streaming程序的運(yùn)行系統(tǒng)及方法
- Spark批次時(shí)間修改方法、裝置、設(shè)備和存儲(chǔ)介質(zhì)





