[發(fā)明專利]讀取RabbitMQ數(shù)據(jù)的方法及裝置有效
| 申請?zhí)枺?/td> | 201811311846.1 | 申請日: | 2018-11-06 |
| 公開(公告)號: | CN111143082B | 公開(公告)日: | 2023-05-16 |
| 發(fā)明(設計)人: | 劉丹;張殿臣;潘競旭;田宜喜;張玉魁;宋穎 | 申請(專利權)人: | 航天信息股份有限公司 |
| 主分類號: | G06F9/54 | 分類號: | G06F9/54;G06F16/2455;G06F16/25 |
| 代理公司: | 北京同達信恒知識產權代理有限公司 11291 | 代理人: | 黃志華 |
| 地址: | 100195 北京市*** | 國省代碼: | 北京;11 |
| 權利要求書: | 查看更多 | 說明書: | 查看更多 |
| 摘要: | |||
| 搜索關鍵詞: | 讀取 rabbitmq 數(shù)據(jù) 方法 裝置 | ||
本發(fā)明涉及大數(shù)據(jù)技術領域,公開了一種基于Spark?Streaming讀取RabbitMQ數(shù)據(jù)的方法及裝置,用于實現(xiàn)Spark?Streaming從RabbitMQ數(shù)據(jù)讀取數(shù)據(jù)進行處理,使Spark?Streaming接收到的數(shù)據(jù)流更加穩(wěn)定和準確,并且避免了Spark?Streaming停止運行后數(shù)據(jù)的丟失。所述方法包括:通過RabbitMQ的接收器Receiver讀取所述RabbitMQ中的數(shù)據(jù),并將所述數(shù)據(jù)存儲在指定內存中;通過Spark?Streaming的Receiver從所述指定內存中讀取所述數(shù)據(jù),并對所述數(shù)據(jù)進行批處理。
技術領域
本發(fā)明涉及大數(shù)據(jù)技術領域,尤其涉及一種基于Spark?Streaming讀取RabbitMQ數(shù)據(jù)的方法及裝置。
背景技術
大數(shù)據(jù)時代,大數(shù)據(jù)的處理技術不斷的發(fā)展,大量的數(shù)據(jù)通過隊列進行數(shù)據(jù)的收集和消費。Kafka和RabbitMQ同為消息代理,且都以分布式部署為目的。Kafka是LinkedIn開源的分布式發(fā)布-訂閱消息系統(tǒng),其主要特點是基于Pull的模式來處理消息消費,追求高吞吐量,但對消息的重復、丟失、錯誤沒有嚴格要求。RabbitMQ是使用Erlang語言開發(fā)的開源消息隊列系統(tǒng),基于AMQP協(xié)議來實現(xiàn),相較于kafka而言,RabbitMQ可用性、穩(wěn)定性、可靠性更高。大數(shù)據(jù)時代對數(shù)據(jù)的可靠性、穩(wěn)定性、準確性要求也越來越高,因此,越來越多的公司選擇RabbitMQ作為消息代理。
發(fā)明內容
本發(fā)明實施例提供了一種基于Spark?Streaming讀取RabbitMQ數(shù)據(jù)的方法及裝置,用于實現(xiàn)Spark?Streaming從RabbitMQ數(shù)據(jù)讀取數(shù)據(jù)進行處理,使得Spark?Streaming接收到的數(shù)據(jù)流更加穩(wěn)定和準確,并且避免了Spark?Streaming停止運行后數(shù)據(jù)的丟失。
第一方面,本發(fā)明實施例提供了一種基于Spark?Streaming讀取RabbitMQ數(shù)據(jù)的方法,包括:
通過RabbitMQ的接收器Receiver讀取所述RabbitMQ中的數(shù)據(jù),并將所述數(shù)據(jù)存儲在指定內存中;
通過Spark?Streaming的Receiver從所述指定內存中讀取所述數(shù)據(jù),并對所述數(shù)據(jù)進行批處理。
可選的,所述通過RabbitMQ的Receiver讀取所述RabbitMQ中的數(shù)據(jù),具體包括:
確定所述RabbitMQ的receiver讀取所述RabbitMQ中的數(shù)據(jù)的時間節(jié)點是否到達預先設置的所述RabbitMQ的Receiver停止讀取所述RabbitMQ中的數(shù)據(jù)的第一時間;
若未到達所述第一時間,則通過所述RabbitMQ的receiver讀取所述RabbitMQ中的數(shù)據(jù)。
可選的,所述通過Spark?Streaming的receiver從所述指定內存中讀取所述數(shù)據(jù),還包括:
啟動所述Spark?Streaming;
確定所述Spark?Streaming的Receiver從所述指定內存中讀取所述數(shù)據(jù)的時間節(jié)點是否到達預先設置的所述Spark?Streaming停止運行的第二時間,其中,所述第二時間晚于所述第一時間;
若未到達所述第二時間,則通過Spark?Streaming的Receiver從所述指定內存中讀取所述數(shù)據(jù)。
可選的,若確定所述RabbitMQ的Receiver讀取所述RabbitMQ中的數(shù)據(jù)的時間節(jié)點到達所述第一時間時,所述方法還包括:確定所述RabbitMQ的Receiver停止從所述RabbitMQ中讀取數(shù)據(jù);
若確定所述Spark?Streaming的Receiver從所述指定內存中讀取所述數(shù)據(jù)的時間節(jié)點到達所述第二時間,所述方法還包括:停止運行所述Spark?Streaming。
該專利技術資料僅供研究查看技術是否侵權等信息,商用須獲得專利權人授權。該專利全部權利屬于航天信息股份有限公司,未經航天信息股份有限公司許可,擅自商用是侵權行為。如果您想購買此專利、獲得商業(yè)授權和技術合作,請聯(lián)系【客服】
本文鏈接:http://www.szxzyx.cn/pat/books/201811311846.1/2.html,轉載請聲明來源鉆瓜專利網。
- 上一篇:一種可升降式機器人檢修架
- 下一篇:場景檢測裝置及方法
- 一種基于RabbitMQ和MongoDB的下載系統(tǒng)
- 一種RabbitMQ安全配置的自動化檢測方法
- 一種保持RabbitMQ服務的方法、系統(tǒng)及相關裝置
- 一種恢復RabbitMQ網絡分區(qū)的方法及系統(tǒng)
- 實現(xiàn)消息隊列重連的方法、裝置、存儲介質及設備
- 一種基于Docker容器的容器自動化管理方法
- 一種管理和監(jiān)控RabbitMq消息隊列的方法及系統(tǒng)
- 一種基于RabbitMQ集群的服務節(jié)點彈性伸縮方法
- 一種基于RabbitMQ集群熱修復的方法、系統(tǒng)、設備及介質
- 一種RabbitMQ集群的遷移方法及計算機系統(tǒng)
- 數(shù)據(jù)顯示系統(tǒng)、數(shù)據(jù)中繼設備、數(shù)據(jù)中繼方法、數(shù)據(jù)系統(tǒng)、接收設備和數(shù)據(jù)讀取方法
- 數(shù)據(jù)記錄方法、數(shù)據(jù)記錄裝置、數(shù)據(jù)記錄媒體、數(shù)據(jù)重播方法和數(shù)據(jù)重播裝置
- 數(shù)據(jù)發(fā)送方法、數(shù)據(jù)發(fā)送系統(tǒng)、數(shù)據(jù)發(fā)送裝置以及數(shù)據(jù)結構
- 數(shù)據(jù)顯示系統(tǒng)、數(shù)據(jù)中繼設備、數(shù)據(jù)中繼方法及數(shù)據(jù)系統(tǒng)
- 數(shù)據(jù)嵌入裝置、數(shù)據(jù)嵌入方法、數(shù)據(jù)提取裝置及數(shù)據(jù)提取方法
- 數(shù)據(jù)管理裝置、數(shù)據(jù)編輯裝置、數(shù)據(jù)閱覽裝置、數(shù)據(jù)管理方法、數(shù)據(jù)編輯方法以及數(shù)據(jù)閱覽方法
- 數(shù)據(jù)發(fā)送和數(shù)據(jù)接收設備、數(shù)據(jù)發(fā)送和數(shù)據(jù)接收方法
- 數(shù)據(jù)發(fā)送裝置、數(shù)據(jù)接收裝置、數(shù)據(jù)收發(fā)系統(tǒng)、數(shù)據(jù)發(fā)送方法、數(shù)據(jù)接收方法和數(shù)據(jù)收發(fā)方法
- 數(shù)據(jù)發(fā)送方法、數(shù)據(jù)再現(xiàn)方法、數(shù)據(jù)發(fā)送裝置及數(shù)據(jù)再現(xiàn)裝置
- 數(shù)據(jù)發(fā)送方法、數(shù)據(jù)再現(xiàn)方法、數(shù)據(jù)發(fā)送裝置及數(shù)據(jù)再現(xiàn)裝置





