[發明專利]大規模并發數據流處理系統及其處理方法有效
| 申請號: | 201110135906.0 | 申請日: | 2011-05-25 |
| 公開(公告)號: | CN102200906A | 公開(公告)日: | 2011-09-28 |
| 發明(設計)人: | 陳慶奎;那麗春;劉伯承;王海峰;郝聚濤;霍歡;趙海燕;莊松林;丁曉東 | 申請(專利權)人: | 上海理工大學 |
| 主分類號: | G06F9/38 | 分類號: | G06F9/38;G06F9/48 |
| 代理公司: | 上海申匯專利代理有限公司 31001 | 代理人: | 吳寶根 |
| 地址: | 200093 *** | 國省代碼: | 上海;31 |
| 權利要求書: | 查看更多 | 說明書: | 查看更多 |
| 摘要: | |||
| 搜索關鍵詞: | 大規模 并發 數據流 處理 系統 及其 方法 | ||
1.一種大規模并發數據流處理系統,其特征在于,包括:
數據流單元緩沖區,是一個二元組DSB(DSUB,MR),其中DSB為數據流單元緩沖區,DSUB及MR均是由p個元素構成的一維數組,p為并發數據流中的數據流數量,DSUB中的每個數組元素為一個DSU,MR中的每個數組元素是一個取值為0或1的整型數,該數組用于數據流流水處理的同步標志;
所述DSU是指數據流單元,一個數據流單元是一個九元組DSU(id,sno,segno,seq,t,type,prog,data,odata),其中DSU為數據流單元,id為該DSU的標識符,且該id具有唯一性,sno為該DSU的數據流號,segno為該DSU的數據流段號,seq為該DSU的在segno數據流段中的單元序號,用于表示其在數據流段中的位置,t為一個時間印,用于記載該DSU被處理的時刻,type為該DSU的類型,data為該DSU所承載的數據對象,odata為該DSU處理后的輸出數據對象,prog是該DSU的data的處理程序;
?所述數據流段是由多個seq連續的DSU構成的序列,記為DSS={DSU1,DSU2,DSU3,…,DSUn,DSUE},其中DSS為數據流段,每個DSS均有一個數據流段號segno被分別存儲在構成該DSS的每個DSU中,DSS序列尾的DSUE為該DSS的結束標志,是一個type為常量值EOS的DSU,其prog、data、odata均為空;
?????所述數據流是由多個segno連續的DSS構成的序列,記為DS={DSS1,DSS2,DSS3,…,DSSo},每個DS均有一個數據流號sno被分別存儲在構成該DS的各個DSS的DSU中;
所述并發數據流由多個并發傳輸的DS構成,每個DS均以DSU作為并發處理的單位,并以DSS作為多個數據流并發同步的單元;
數據流單元聚類隊列池,由|TS|個DSU隊列構成,記為CPOOL={DSUQ1,DSUQ2,…,DSUQ|TS|},其中CPOOL為數據流單元聚類隊列池,DSUQ為數據流單元聚類隊列,TS為應用系統數據流單元類型集合,該集合是DSU類型的集合,TS中的元素個數為m,則|TS|=m,同一個DSU隊列由同類型的DSU構成,這些DSU來自p個并發數據流的當前處理單元,有:????????????????????????????????????????????????;
數據流單元映射表,由多個表元構成,記為MapM(nu,sno,segno,seq,t,qso,?qoffset),其中MapM為數據流單元映射表,nu為序號,sno為數據流號,segno為數據段號,seq為數據流單元號,t為時間印,qso為聚類隊列號,qoffset為聚類隊列內部元素位置號;
流處理器池,由多個GPU構成,所述GPU為二元組GPU(KernelP,D_BUFF),?其中KernelP為該GPU當前執行SPMD任務的計算核心部件,D_BUFF為KernelP執行SPMD操作的多個DSU集合;
數據流讀取部件,用于讀取數據流;
DSU聚類分配部件,用于對數據流單元緩沖區中當前被處理的數據流單元進行分類;
任務調度部件,用于將數據流單元聚類隊列池中的就緒隊列加載至流處理器池中的GPU上執行流計算;
計算后處理部件,用于將GPU計算的DSU的odata按MapM的標志回歸到DSU所在的數據流。
2.根據權利要求1所述的大規模并發數據流處理系統的處理方法,其特征在于:
數據流讀取部件重復執行以下步驟直至并發數據流中的DS讀取完畢:
1)根據并發數據流的個數,在DSB中為每個DS分配一個單元,并初始化DSB的MR,置MR[i]值為0,其中1≤i≤p,p為并發數據流的個數;
2)讀取并發數據流中所有DS的當前DSS;
3)掃描并發數據流,對i=1,2,…,p,對DSi做步驟4的處理,所述DSi是指第i個DS;
4)如果MR[i]值為1,則轉至步驟3處理下一個DS的DSU;
如果MR[i]值為0,則提取DSi的當前處理DSU,并判斷當前處理DSU的type,如果當前處理DSU的type值為EOS,則DSi的當前DSS結束,則置MR[i]為1,并轉至步驟3處理下一個DS的DSU,反之則判斷DSUB[i]是否為空,如DSUB[i]為空,則把當前處理DSU存入DSUB[i];
5)如果DSUB的所有元素均置滿數據,則等待至DSUB的所有元素都被DSU聚類分配部件置為空;
6)如果DSB中的MR的所有元素都為1,則轉至步驟1處理并發數據流中所有DS的下一個DSS,反之則轉向步驟2繼續處理當前DSS的DSU;
?DSU聚類分配部件重復執行以下步驟:
1)判別DSB的DSUB中是否置滿數據,如果未滿則重復本步驟,反之則轉至步驟2;
2)判別是否收到來自任務調度部件的“數據流處理完畢”消息,如果未收到則重復本步驟,反之則轉至步驟3;
3)對i=1,2,…,p,分類處理DSUB[i],其分類處理步驟如下:
如果DSUB[i]的type值不是EOS,則將DSUB[i]加入CPOOL的第w個數據流聚類隊列DSUQw中,其中w值等于DSUB[i]的type值;然后獲取DSUB[i]在DSUQw的位置下標,記為pos,并置MapM[i]的nu值為i,置MapM[i]的sno值為i,置MapM[i]的segno值為DSUB[i]的segno值,置MapM[i]的seq值為DSUB[i]的seq值,置MapM[i]的t值為DSUB[i]的t值,置MapM[i]的qso值為w值,置MapM[i]的qoffset值為pos,然后置DSUB[i]為空;
4)向任務調度部件發送“數據流聚類隊列構建完畢”消息;
任務調度部件執行以下步驟:
1)判別是否收到來自DSU聚類分配部件的“數據流聚類隊列構建完畢”消息,如果未收到則重復本步驟,反之則轉至步驟2;
2)為流處理器池中的各個GPU配置一個工作標志數組work,并對i=1,2,3,…,q,置work[i]=0,其中q為流處理器池中的GPU數量;
3)從CPOOL中提取q個DSUQ以及每個隊列所對應GPU的KernelP,構成任務對(DSUQ1,Kernel1),(DSUQ2,Kernel2),…,(DSUQq,Kernelq);
4)對i=1,2,…,q,分別加載(DSUQi,Kerneli)到GPUi執行步驟5,其中GPUi是指第i個GPU;
5)向GPUi的存儲器申請DSUQi大小的存儲單元D_BUFFi,然后將DSQUi的內容加載到D_BUFFi,然后再提交Kerneli?及D_BUFFi到GPUi執行;
6)監控所有GPU的執行狀況,如果GPUi執行完畢,則向計算后處理部件發送“GPUi數據流處理完畢”消息,并從CPOOL中提取下一個未被執行的任務對(DSUQi,Kerneli)后轉至步驟5;如果CPOOL中的所有DSUQ都被加載執行完畢,則向DSU聚類分配部件發送“數據流處理完畢”消息,并對所有的i置work[i]=0,然后再轉至步驟1;
計算后處理部件執行以下步驟:
1)判別是否收到來自任務調度部件的“GPUi數據流處理完畢”消息,如果未收到則重復本步驟,反之則轉至步驟2;
2)向內存申請D_BUFFi大小空間的POST_DSUQ,所述POST_DSUQ的結構與DSUQ的結構一致;
3)先將D_BUFFi的內容加載到POST_DSUQ,再釋放D_BUFFi的空間;
4)掃描POST_DSUQ中的每個DSU,將DSU按照MapM記載的位置映射信息還原到相應的DS中,保持原有DS的順序,并把結果寫入RS;
5)轉至步驟1;
GPUi上的Kernel執行以下步驟:
1)獲取Kerneli?及D_BUFFi,并計算出D_BUFFi中的DSU數量記為g;
2)在GPUi的各個物理流處理單元分配DSU,每個物理流處理單元得到個DSU,其中h為GPUi的物理流處理單元數量;
3)所有物理流處理單元并行地對其分配到的DSU執行Kerneli進行處理,并輸出計算結果到其所處理的DSU的odata;
4)GPUi計算結束。
該專利技術資料僅供研究查看技術是否侵權等信息,商用須獲得專利權人授權。該專利全部權利屬于上海理工大學,未經上海理工大學許可,擅自商用是侵權行為。如果您想購買此專利、獲得商業授權和技術合作,請聯系【客服】
本文鏈接:http://www.szxzyx.cn/pat/books/201110135906.0/1.html,轉載請聲明來源鉆瓜專利網。
- 上一篇:一種同步聯系人信息的方法、裝置和系統
- 下一篇:組合物





