[發(fā)明專利]一種基于多線程編程及消息隊(duì)列的多線程并行處理方法有效
| 申請(qǐng)?zhí)枺?/td> | 201210316211.7 | 申請(qǐng)日: | 2012-08-31 |
| 公開(公告)號(hào): | CN102902512A | 公開(公告)日: | 2013-01-30 |
| 發(fā)明(設(shè)計(jì))人: | 吳慶;張清;趙開勇 | 申請(qǐng)(專利權(quán))人: | 浪潮電子信息產(chǎn)業(yè)股份有限公司 |
| 主分類號(hào): | G06F9/38 | 分類號(hào): | G06F9/38 |
| 代理公司: | 暫無信息 | 代理人: | 暫無信息 |
| 地址: | 250014 山東*** | 國省代碼: | 山東;37 |
| 權(quán)利要求書: | 查看更多 | 說明書: | 查看更多 |
| 摘要: | |||
| 搜索關(guān)鍵詞: | 一種 基于 多線程 編程 消息 隊(duì)列 并行 處理 方法 | ||
1.一種基于多線程編程及消息隊(duì)列的多線程并行處理方法,其特征在于在單節(jié)點(diǎn)內(nèi),創(chuàng)建三類pthread線程,分別為讀、計(jì)算、寫線程,并且各類線程數(shù)目靈活可配置,開辟多緩存,創(chuàng)建四個(gè)隊(duì)列,用于線程間通信,調(diào)配計(jì)算任務(wù)及管理緩存空間資源,具體步驟如下:
基于多緩沖和消息隊(duì)列建立任務(wù)分發(fā)機(jī)制,包括:
1)計(jì)算任務(wù)的劃分:任務(wù)劃分的基本單位總的計(jì)算任務(wù)數(shù)是TOTAL_JOB,它可以被劃分成多個(gè)子任務(wù),定義每個(gè)子任務(wù)大小為JOB_SIZE,定義靈活的任務(wù)劃分策略,軟件有自動(dòng)配置模式和用戶手動(dòng)配置模式;
2)任務(wù)分發(fā)、執(zhí)行策略,包括:
(1)子任務(wù)實(shí)際由讀線程來生成;讀線程定義每個(gè)子任務(wù)的信息tmp_msg,包括:job_begin,job_size,buf_id;
其中:job_begin是該任務(wù)計(jì)數(shù)編號(hào),通過它可以確定該任務(wù)的起始LINE號(hào)和CMP號(hào);
job_size定義了該任務(wù)的大小,其上限是預(yù)先已經(jīng)定義好的JOB_SIZE;
buf_id指明了該任務(wù)所在的BUF編號(hào);
(2)任務(wù)信息tmp_msg的類型實(shí)際上就是消息隊(duì)列成員的類型,被加入到各個(gè)隊(duì)列中;
3)子任務(wù)執(zhí)行所需資源的競(jìng)爭(zhēng)策略完成一個(gè)子任務(wù),需要如下幾個(gè)步驟:
讀線程根據(jù)當(dāng)前讀取進(jìn)度CURRENT_READ_STEP及總作業(yè)大TOTAL_STEP,確定當(dāng)前任務(wù)的起始job_begin,任務(wù)大小job_size,并且從空SR_BUF隊(duì)列SR_BUF_EMPTY_QUEUE中獲取一個(gè)空的SR_BUF_ID,將數(shù)據(jù)讀入SR_BUF_ID對(duì)應(yīng)的SR_BUF中,即SR_BUF[SR_BUF_ID],然后將新生成的任務(wù)信息保存至tmp_msg中,并將tmp_msg加入新計(jì)算任務(wù)隊(duì)列SR_BUF_FULL_QUEUE中;
計(jì)算線程需先從新計(jì)算任務(wù)隊(duì)列SR_BUF_FULL_QUEUE中獲取一個(gè)新計(jì)算任務(wù),然后再從空閑目標(biāo)緩沖隊(duì)列DR_BUF_EMPTY_QUEUE中獲取一個(gè)空閑DR_BUF_ID,之后才進(jìn)行計(jì)算,計(jì)算源數(shù)據(jù)為SR_BUF[SR_?BUF_ID],計(jì)算結(jié)果存放于DR_BUF[DR_BUF_ID]中,計(jì)算結(jié)束后,釋放SR_BUF_ID對(duì)應(yīng)的源數(shù)據(jù)緩存,即將SR_BUF_ID加入SR_BUF_EMPTY_QUEUE隊(duì)列中,并告知寫線程進(jìn)行輸出,即將tmp_msg加入到待輸出隊(duì)列DR_BUF_FULL_QUEUE中;
寫線程從待輸出任務(wù)隊(duì)列DR_BUF_FULL_QUEUE中獲取一個(gè)寫任務(wù)信息tmp_msg,該任務(wù)信息定義了數(shù)據(jù)存放的DR_BUF_ID以及該寫任務(wù)需要寫到的位置信息,即job_begin,以及寫任務(wù)的規(guī)模job_size,寫線程完成該輸出任務(wù)后,需要告知計(jì)算線程DR_BUF[DR_BUF_ID]中的數(shù)據(jù)已經(jīng)輸出完畢,可重新用于存放計(jì)算結(jié)果,?即將DR_BUF_ID加入DR_BUF_EMPTY_QUEUE隊(duì)列中;
多緩沖設(shè)計(jì)
設(shè)計(jì)多個(gè)源數(shù)據(jù)緩沖SR_BUF和目標(biāo)數(shù)據(jù)緩沖DR_BUF,緩沖的數(shù)目靈活可調(diào),為了以最少的緩沖達(dá)到最高的效能,緩沖的個(gè)數(shù)有一個(gè)臨限值,理論上,源緩沖與目標(biāo)緩沖的數(shù)目至少為計(jì)算線程數(shù)的2倍,即:
SR_BUF_NUM>=2*COMPUTE_THREAD_NUM,DR_BUF_NUM>=?2*COMPUTE_THREAD_NUM
考慮到實(shí)際生產(chǎn)中網(wǎng)絡(luò)資源的竟?fàn)幒筒环€(wěn)定因素,保證計(jì)算線程隨時(shí)都能獲得一個(gè)源緩沖和一個(gè)目標(biāo)緩沖,軟件為每個(gè)計(jì)算線程預(yù)留一個(gè)緩沖余量,默認(rèn)將源數(shù)據(jù)緩沖和目標(biāo)緩沖數(shù)都設(shè)置為計(jì)算線程數(shù)的3倍;
環(huán)形消息隊(duì)列設(shè)計(jì)
為了實(shí)現(xiàn)上述任務(wù)分發(fā)策略,設(shè)計(jì)以下四個(gè)隊(duì)列:
其中消息隊(duì)列中存放的消息數(shù)據(jù)類型定義如下:
SR_BUF_FULL_QUEUE:新的計(jì)算任務(wù)隊(duì)列新計(jì)算任務(wù)消息隊(duì)列,記錄作業(yè)信息JOB_INFO(包括JOB_BEGIN,JOB_SIZE,SR_BUF_ID),由讀線程寫入(生產(chǎn)),計(jì)算線程彈出(消費(fèi))當(dāng)讀線程向SR_BUF_ID讀入新數(shù)據(jù)時(shí),將JOB_INFO入隊(duì),計(jì)算線程彈出JOB_INFO時(shí),計(jì)算SR_BUF_ID對(duì)應(yīng)的源數(shù)據(jù);
SR_BUF_EMPTY_QUEUE:?存放當(dāng)前空閑SR_BUF_ID號(hào)源緩沖釋放消息隊(duì)列,與SR_BUF_FULL_QUEUE功能相反,由計(jì)算線程寫入,讀線程彈出,當(dāng)SR_BUF_ID對(duì)應(yīng)的任務(wù)計(jì)算完畢時(shí),釋放SR_BUF_ID,告知讀線程可對(duì)其更新數(shù)據(jù);
DR_BUF_EMPTY_QUEUE:?存放當(dāng)前空閑DR_BUF_ID號(hào)目標(biāo)緩沖為空消息隊(duì)列,記錄DR_BUF_ID號(hào),由寫線程寫入,計(jì)算線程彈出;
當(dāng)寫線程對(duì)DR_BUF_ID數(shù)據(jù)輸出完畢時(shí),將DR_BUF_ID入隊(duì),告知計(jì)算線程,該DR_BUF_ID輸出完畢可重新用于計(jì)算,計(jì)算線程彈出DR_BUF_ID時(shí),啟動(dòng)計(jì)算,并將結(jié)果寫入DR_BUF_ID對(duì)應(yīng)的目標(biāo)緩沖中;
DR_BUF_FULL_QUEUE:?新的寫任務(wù)隊(duì)列新寫任務(wù)消息隊(duì)列,記錄作業(yè)信息JOB_INFO,包括JOB_BEGIN,JOB_SIZE,DR_BUF_ID,由計(jì)算線程寫入,寫線程彈出;
當(dāng)計(jì)算線程向DR_BUF_ID讀入新數(shù)據(jù)時(shí),將JOB_INFO入隊(duì),寫線程彈出JOB_INFO時(shí),對(duì)DR_BUF_ID對(duì)應(yīng)的目標(biāo)數(shù)據(jù)進(jìn)行輸出;
線程設(shè)計(jì)
1)主線程設(shè)計(jì)
(1)主線程功能及運(yùn)行流程如下:
1.參數(shù)預(yù)處理;
2.定義讀、計(jì)算、寫線程數(shù);
3.定義源、目標(biāo)緩存數(shù)目,與計(jì)算線程數(shù)相關(guān);
4.定義任務(wù)劃分粒度,即子任務(wù)規(guī)模JOB_SIZE;
5.開辟源、目標(biāo)緩沖內(nèi)存空間;
6.創(chuàng)建并初始化消息隊(duì)列、鎖、信號(hào)量;
創(chuàng)建并啟動(dòng)讀、計(jì)算、寫線程;
8.等待所有線程退出;
9.其它處理;
10程序退出;
(2)線程偽代碼
1.INIT(PARA);
2.SET(THREAD_NUM);
3.SET(BUF_NUM);
4.SET(JOB_SIZE)
5.CREATE(OUTPUT_FILE_LIST);
6.MALLOC(BUF);
7.INIT(QUEUE);
8.INIT(MUTEX);
9.INIT(SEM);
10.INIT(PROGRESS,Total_Step);
11.CREATE(THREADS);
12.WHILE?State_Flag?&&?!wasCancelled?&&?progress<=Total_Step
13.IF?PROGRESSS.wasCancelled()
14.wasCancelled=true;
15.break;
16.ENDIF
17.IF?!?State_Flag
18.breadk
19.ENDIF
20.SEM_WAIT(progress_sm);
21.progress+=JOB_SIZE;
22.SET_PROGRESS(progress);
23.DONE
24.JOIN(THREADS);
25.IF?State_Flag?&&?!wasCancelled
26.CREATESEIS_INDEX(OUTPUT_FILE_LIST);
27.ELSE
28.DELETESEIS_OBJ(OUTPUT_FILE_LIST);
29.ERROR_PROCESS();
30.EXIT
31.ENDIF
32.DELETE(PROGRESS);
33.DELETE(?BUF);
34.DESTROY(MUTEX);
35.DESTROY(SEM);
36.PRINTF(LOG);
37.EXIT;
2)讀線程設(shè)計(jì)
(1)線程數(shù)設(shè)計(jì)
根據(jù)實(shí)際應(yīng)用需求,靈活設(shè)置讀線程數(shù),默認(rèn)只設(shè)置一個(gè)讀線程;
(2)線程功能及運(yùn)行流程
1.參數(shù)初始化;
2.檢查錯(cuò)誤標(biāo)志及用戶行為,如果出錯(cuò)或被用戶取消,則進(jìn)入步驟9,否則進(jìn)入步聚3;
3.檢查當(dāng)前任務(wù)進(jìn)度READ_CURRENT_STEP,判斷是否完成所有讀任務(wù),如果是,則進(jìn)入步驟9,否則進(jìn)入步聚4;
4.根據(jù)當(dāng)前讀進(jìn)度READ_CURRENT_STEP和總?cè)蝿?wù)數(shù)Total_Step,計(jì)算剩余任務(wù)數(shù)left_job,生成新任務(wù)起始job_begin及大小信息Job_size,job_size上限為JOB_SIZE,更新任務(wù)計(jì)數(shù)READ_CURRENT_STEP;
5.從SR_BUF_EMPTY_QUEUE隊(duì)列中獲得一個(gè)空閑SR_BUF_ID;
6.從源文件INPUT_FILE中讀取任務(wù)源數(shù)據(jù)至源數(shù)據(jù)緩存SR_BUF[SR_BUF_ID]中;
7.將該任務(wù)信息tmp_msg加入新計(jì)算任務(wù)隊(duì)列SR_BUF_FULL_QUEUE中;
8.返回步驟2;
9.線程退出;
3)線程偽代碼
1.INIT
2.WHILE?State_Flag?&&?!wasCancelled
3.IF?READ_CURRENT_STEP?<=?Total_Step
4.job_begin=READ_CURRENT_STEP;
5.job_left=Total_Step-READ_CURRENT_STEP;
6.IF?job_left>JOB_SIZE
7.job_size=JOB_SIZE;
8.ELSE
9.job_size=job_left;
10.READ_CURRENT_STEP?+=?job_size;
11.tmp_msg?=?SR_BUF_EMPTY_QUEUE.pop();
12.SR_BUF_id=tmp_msg.buf_id;
13.READ(INPUT_FILE,SR_BUF[SR_BUF_id])
14.tmp_msg(job_begin,job_size,SR_BUF_id);
15.SR_BUF_FULL_QUEUE.push(tmp_msg);
16.ELSE
17.pthread_exit();
18.ENDIF?
19.DONE
20.pthread_exit();
4)計(jì)算線程設(shè)計(jì)
線程數(shù)設(shè)計(jì):
默認(rèn)情況下,計(jì)算線程數(shù)為系統(tǒng)可用CPU核數(shù),即SYS_CPU_CORE_NUM,用戶通過宏COMPUTE_THREAD_NUM來定義計(jì)算線程數(shù);
線程功能及運(yùn)行流程:
1.參數(shù)初始化;
2.檢查錯(cuò)誤標(biāo)志及用戶行為,如果出錯(cuò)或被用戶取消,則進(jìn)入步聚10,否則進(jìn)入步聚3;
3.檢查當(dāng)前任務(wù)進(jìn)度COMPUTE_CURRENT_STEP,判斷是否完成所有讀任務(wù),如果是,則進(jìn)入步驟10,否則進(jìn)入步聚4;
4.從新計(jì)算任務(wù)隊(duì)列SR_BUF_FULL_QUEUE中獲取一個(gè)任務(wù)信息tmp_msg,其包含了任務(wù)的起始信息job_begin,大小信息job_size,以及任務(wù)數(shù)據(jù)存放源緩沖編號(hào)SR_BUF_ID,并更新任務(wù)計(jì)數(shù)COMPUTE_CURRENT_STEP;
5.從DR_BUF_EMPTY_QUEUE隊(duì)列中獲得一個(gè)空閑的DR_BUF_ID;
6.以SR_BUF[SR_BUF_ID]為數(shù)據(jù)輸入緩存,以DR_BUF[DR_BUF_ID]為數(shù)據(jù)輸出緩存進(jìn)行計(jì)算;
7.將SR_BUF_ID加入SR_BUF_EMPTY_QUEUE中,表示SR_BUF[SR_BUF_ID]所存的數(shù)據(jù)計(jì)算完畢,需要重新加載源數(shù)據(jù);
8.根據(jù)計(jì)算任務(wù)信息,生成寫任務(wù)信息,并將其加入到DR_BUF_FULL_QUEUE隊(duì)列中,表示需要寫線程進(jìn)行輸出;
9.返回步驟2;
10.線程退出;
線程偽代碼
1)INIT
2)WHILE?State_Flag?&&?!wasCancelled
3)IF?COMPUTE_CURRENT_STEP?<=?Total_Step
4)tmp_msg=SR_BUF_FULL_QUEUE.pop()
5)job_begin=tmp_msg.job_begin;
6)job_size=tmp_msg.job_size;
7)SR_BUF_id?=tmp_msg.buf_id;
8)COMPUTE_CURRENT_STEP+=job_size;
9)tmp_msg=?DR_BUF_EMPTY_QUEUE.pop();
10)?DR_BUF_id?=tmp_msg.buf_id;
11)COMPUTE(INPUT_FILE,SR_BUF[SR_BUF_id],DR_BUF[DR_BUF_id])
12)tmp_msg(-1,-1,SR_BUF_id);
13)?SR_BUF_EMPTY_QUEUE.push(tmp_msg);
14)?tmp_msg(job_begin,job_size,DR_BUF_id);
15)?DR_BUF_FULL_QUEUE.push(tmp_msg);
16)?ELSE
17)?pthread_exit();
18)ENDIF?
19)DONE
20)pthread_exit();
4)寫線程設(shè)計(jì)
(1)線程數(shù)設(shè)計(jì)
根據(jù)實(shí)際應(yīng)用需求,靈活設(shè)置寫線程數(shù),默認(rèn)只設(shè)置一個(gè)寫線程;
(2)線程功能及運(yùn)行流程
1.參數(shù)初始化;
2.檢查錯(cuò)誤標(biāo)志及用戶行為,如果出錯(cuò)或被用戶取消,則進(jìn)入步驟9,否則進(jìn)入步聚3;
3.檢查當(dāng)前任務(wù)進(jìn)度WRITE_CURRENT_STEP,判斷是否完成所有讀任務(wù),如果是,則進(jìn)入步驟9,否則進(jìn)入步聚4;
4.從新寫任務(wù)隊(duì)列DR_BUF_FULL_QUEUE中獲取一個(gè)寫任務(wù)信息tmp_msg,其包含了任務(wù)的起始信息job_begin,大小信息job_size,以及任務(wù)數(shù)據(jù)存放目標(biāo)緩沖編號(hào)DR_BUF_ID,并更新任務(wù)計(jì)數(shù)WRITE_CURRENT_STEP;
5.將目標(biāo)緩存DR_BUF[DR_BUF_ID]中的數(shù)據(jù)輸出至OUTPUT_FILE;
6.將DR_BUF_ID加入DR_BUF_EMPTY_QUEUE中,表示DR_BUF[DR_BUF_ID]所存的數(shù)據(jù)輸出完畢,需要重新加載計(jì)算結(jié)果;
7.向主線程發(fā)送更新進(jìn)度條信號(hào);
8.返回步驟2;
9.線程退出;
(4)線程偽代碼
1.INIT
2.WHILE?State_Flag?&&?!wasCancelled
3.IF?WRITE_CURRENT_STEP?<=?Total_Step
4.tmp_msg?=?DR_BUF_FULL_QUEUE.pop();
5.job_begin=tmp_msg.job_begin;
6.job_size=tmp_msg.job_size;
7.DR_BUF_id=tmp_msg.buf_id;
8.WRITE_CURRENT_STEP+=job_size;
9.WRITE(OUTPUT_FILE,DR_BUF[DR_BUF_id])
10.tmp_msg(-1,-1,DR_BUF_id);
11.DR_BUF_EMPTY_QUEUE.push(tmp_msg);
12.SEM_POST?(progress_sem);
13.ELSE
14.pthread_exit();
15.ENDIF?
16.DONE
17.pthread_exit()。
該專利技術(shù)資料僅供研究查看技術(shù)是否侵權(quán)等信息,商用須獲得專利權(quán)人授權(quán)。該專利全部權(quán)利屬于浪潮電子信息產(chǎn)業(yè)股份有限公司,未經(jīng)浪潮電子信息產(chǎn)業(yè)股份有限公司許可,擅自商用是侵權(quán)行為。如果您想購買此專利、獲得商業(yè)授權(quán)和技術(shù)合作,請(qǐng)聯(lián)系【客服】
本文鏈接:http://www.szxzyx.cn/pat/books/201210316211.7/1.html,轉(zhuǎn)載請(qǐng)聲明來源鉆瓜專利網(wǎng)。
- 上一篇:珠形飾品釘砂機(jī)
- 下一篇:異構(gòu)水龍頭的自定心夾具
- 多線程應(yīng)用系統(tǒng)的異常處理方法和異常處理裝置
- 一種面向片上網(wǎng)絡(luò)的多線程調(diào)度實(shí)現(xiàn)方法
- 基于計(jì)算機(jī)多線程多核顯微鏡細(xì)胞圖像快速掃描處理方法
- 一種基于同步鎖的多線程處理方法、終端以及存儲(chǔ)介質(zhì)
- 多線程并發(fā)處理系統(tǒng)及方法
- 海外控股估值流程控制方法、裝置、計(jì)算機(jī)設(shè)備及存儲(chǔ)介質(zhì)
- 讀數(shù)方法、電子裝置、計(jì)算機(jī)設(shè)備及存儲(chǔ)介質(zhì)
- 一種基于云平臺(tái)多線程調(diào)度的方法、系統(tǒng)、設(shè)備及介質(zhì)
- 一種基于云平臺(tái)的前端多線程調(diào)度方法和系統(tǒng)
- 多線程調(diào)度方法、裝置、電子設(shè)備及存儲(chǔ)介質(zhì)





