是一個分布式運算程序編程框架。核心功能是將用戶編寫的業務邏輯代碼和自帶的默認組件整合成一個完整的分布式程序,并發運行在一個hadoop集群上。
(1)優點
1>易于編程:以普通程序的編程方法加上使用MapReduce提供的接口,可以快速完成分布式程序的編寫。
2>良好的擴展性:計算資源得不到滿足時,可以通過簡單的增加計算機器來擴展計算能力
3>高容錯性:如果一個任務所在計算節點掛了,上面的計算任務可以自動轉移到另外的節點上執行,即故障自動轉移,這個過程是內部完成的,無需人工干預
4>適合PB級別以上數據的離線處理
(2)缺點
1>實時計算:無法像MySQL一樣在毫秒級或者秒級返回計算結果
2>流式計算:流式計算的輸入數據是動態的,而MapReduce要求輸入數據是靜態的,已經持久化在存儲上的。
3>DAG(有向無環圖)計算:多個應用程序存在依賴關系,后一個應用程序的輸入為前一個的輸出,這種情況下,MapReduce的性能很低。因為MapReduce的每個階段的輸出結果都會先寫入到磁盤中,大量的磁盤IO會造成性能的急劇下降。
核心思想就是分為map和reduce兩個階段。
1)首先將輸出的數據進行切片處理,然后各個切片數據分給獨立的一個map task任務。map內部根據業務邏輯對數據進行統計處理。每個map task之間互不影響。
2)接著就是將所有map task 的輸出作為 reduce task的輸入(reduce task的數量與分區有關,后面細講),將各個map task的局部統計匯總成全局統計,最終完成結果輸出
3)MapReduce編程模型中只能由一個map和reduce階段,多個MapReduce程序只能串行運行,無法并行運行
基本概述:
1)當我們編寫完MR作業后,需要通過JobClient來提交一個job,提交的信息會發送到JobTracker模塊,這個模塊是第一代MapReduce計算框架的核心之一,它負責與集群中的其他節點維持心跳,為提交的作業分配資源,管理提交的作業的正常運作(失敗,重啟等)。
2)第一代MapReduce的另一個核心的功能是TaskTracker,在各個TaskTracker安裝節點上,它的主要功能是監控自己所在節點的資源使用情況。
3)TaskTracker監控當前節點的Tasks的運行情況,其中包含Map Task和Reduce Task,最后由Reduce Task到Reduce階段,將結果輸送到HDFS的文件系統中;其中的具體流程如圖中描述的1-7步驟。TaskTracker在監控期間,需要把這些信息通過心跳機制發送給JobTracker,JobTracker收集到這些信息后,給新提交的作業分配其他的資源,避免重復資源分配。
缺點:
1)JobTracker是第一代MapReduce的入口點,若是JobTracker服務宕機,整個服務將會癱瘓,存在單點問題。
2)JobTracker負責的事情太多,完成來太多的任務,占用過多的資源,當Job數非常多的時候,會消耗很多內存,容易出現性能瓶頸。
3)對TaskTracker而言,Task擔當的角色過于簡單,沒有考慮到CPU及內存的使用情況,若存在多個大內存的Task被集中調度,容易出現內存溢出。
4)另外,TaskTracker把資源強制分為map task slot和reduce task slot,若是MR任務中只存在其中一個(map或是reduce),會出現資源浪費的情況,資源利用率低。也就是說資源是靜態分配的
V2比起V1最大的不同就是增加了 yarn 這個組件。
架構重構的基本思想在于將JobTracker的兩個核心的功能單獨分離成獨立的組件了。分離后的組件分別為資源管理(Applications Manager)和任務調度器(Resource Scheduler)。新的資源管理器(Resource Manager)管理整個系統的資源分配,而每一個Node Manager下的App Master(Application Master)負責對應的調度和協調工作(每個MapReduce任務都有一個對應的app master),而在實際中,App Master從Resource Manager上獲得資源,讓Node Manager來協同工作和任務監控。
對比于MR V1中的Task的監控,重啟等內熱都交由App Master來處理,Resource Manager提供中心服務,負責資源的分配與調度。Node Manager負責維護Container的狀態,并將收集的信息上報給Resource Manager,以及負責和Resource Manager維持心跳。
優點:
1)減少資源消耗,讓監控每一個作業更加分布式了。
2)加入了yarn之后,支持更多的編程模型,比如spark等
3)將資源以內存量的概念來描述,比V1中的slot更加合理,而且資源都是動態分配
4)資源的調度和分配更加有層次化,RM負責總的資源管理和調度,每個節點上的appMaster負責當前節點的資源管理和調度
其中上面從第7步到16步稱為shuffle機制,
1)maptask收集我們的map()方法輸出的kv對,放到內存緩沖區中
2)從內存緩沖區不斷溢出本地磁盤文件,可能會溢出多個文件
3)多個溢出文件會被合并成大的溢出文件
4)在溢出過程中,及合并的過程中,都要調用partitioner進行分區和針對key進行排序
5)reducetask根據自己的分區號,去各個maptask機器上取相應的結果分區數據
6)reducetask會取到同一個分區的來自不同maptask的結果文件,reducetask會將這些文件再進行合并(歸并排序)
7)合并成大文件后,shuffle的過程也就結束了,后面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
由MapReduce的工作流程可以知道,maptask的數量決定于切片的數量,所以我們看看切片的原理。
在MapReduce的工作流程中,在對數據進行map運算前,會先對數據進行切片處理,然后每一片交給一個獨立的map task進行處理。那么map task是如何獲取到切片實現類的呢?
首先 MapTask是以 run 方法為入口開始map任務的。
/*MapTask.java*/publicvoidrun(JobConfjob,TaskUmbilicalProtocolumbilical)throwsIOException,ClassNotFoundException,InterruptedException{//此處省略好多代碼,直接看這個方法,其實就是新舊api的兼容this.runNewMapper(job,this.splitMetaInfo,umbilical,reporter);}else{this.runOldMapper(job,this.splitMetaInfo,umbilical,reporter);}this.done(umbilical,reporter);}}//下面是runNewMapper方法private<INKEY,INVALUE,OUTKEY,OUTVALUE>voidrunNewMapper(JobConfjob,TaskSplitIndexsplitIndex,TaskUmbilicalProtocolumbilical,TaskReporterreporter)throwsIOException,ClassNotFoundException,InterruptedException{................//這里就看到獲取inputFormat的實現類,關鍵就在于taskContext這對象,它的類是TaskAttemptContextImplInputFormat<INKEY,INVALUE>inputFormat=(InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(),job);}/*TaskAttemptContextImpl.java繼承JobContextImpl類JobContextImpl實現了JobContext接口,該接口定義很多set和get方法,用于配置job上下文對象的*/publicclassJobContextImplimplementsJobContext{publicClass<?extendsInputFormat<?,?>>getInputFormatClass()throwsClassNotFoundException{//可以看到這里就是從conf對象中獲取inputformat.class,默認值就是TextInputFormatreturnthis.conf.getClass("mapreduce.job.inputformat.class",TextInputFormat.class);}}
由此我們可以看到,默認處理輸入數據的類是 TextInputFormat,但是這個類并沒有實現切片方法,在它的父類 FileInputFormat中實現了切片方法:
/*FileInputFormat.java*/publicList<InputSplit>getSplits(JobContextjob)throwsIOException{StopWatchsw=(newStopWatch()).start();longminSize=Math.max(this.getFormatMinSplitSize(),getMinSplitSize(job));longmaxSize=getMaxSplitSize(job);//這個就是存儲切片信息的數組List<InputSplit>splits=newArrayList();//獲取輸入路徑的所有文件List<FileStatus>files=this.listStatus(job);Iteratori$=files.iterator();while(true){while(true){while(i$.hasNext()){FileStatusfile=(FileStatus)i$.next();Pathpath=file.getPath();longlength=file.getLen();if(length!=0L){BlockLocation[]blkLocations;if(fileinstanceofLocatedFileStatus){//獲取文件塊信息blkLocations=((LocatedFileStatus)file).getBlockLocations();}else{FileSystemfs=path.getFileSystem(job.getConfiguration());blkLocations=fs.getFileBlockLocations(file,0L,length);}//從這里開始就正式切片if(this.isSplitable(job,path)){longblockSize=file.getBlockSize();//獲取切片大小longsplitSize=this.computeSplitSize(blockSize,minSize,maxSize);longbytesRemaining;intblkIndex;//循環對文件進行切片,可以看到這里是判斷文件剩余部分是否大于1.1倍的切片大小的for(bytesRemaining=length;(double)bytesRemaining/(double)splitSize>1.1D;bytesRemaining-=splitSize){blkIndex=this.getBlockIndex(blkLocations,length-bytesRemaining);//將文件,切片起始和終止位置,切片大小,切片的block所在主機等記錄到切片數組中作為切片信息。splits.add(this.makeSplit(path,length-bytesRemaining,splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}//這里是將文件最后的內容作為最后一個切片添加到切片規劃中if(bytesRemaining!=0L){blkIndex=this.getBlockIndex(blkLocations,length-bytesRemaining);splits.add(this.makeSplit(path,length-bytesRemaining,bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}}else{splits.add(this.makeSplit(path,0L,length,blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}}else{splits.add(this.makeSplit(path,0L,length,newString[0]));}}job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles",(long)files.size());sw.stop();if(LOG.isDebugEnabled()){LOG.debug("Total#ofsplitsgeneratedbygetSplits:"+splits.size()+",TimeTaken:"+sw.now(TimeUnit.MILLISECONDS));}returnsplits;}}}/*這個方法是決定切片大小的,簡單說主要決定于maxsize和blocksize的大小,maxsize>blockSize,則splitsize=blockSizemaxsize<blockSize,則splitsize=maxsizeminSize>blockSize,則splitsize=minSizeminSize<blockSize,則splitsize=blockSize當然要注意的是,maxsize需要是永遠大于minSize的*/protectedlongcomputeSplitSize(longblockSize,longminSize,longmaxSize){returnMath.max(minSize,Math.min(maxSize,blockSize));}
上面的切片值是規劃而已,并沒有真正的切片,而是當job提交給yarn執行的之后,才會真正按照切片規劃進行數據讀取。上述切片的特點總結如下:
1)按照文件的內容的長度進行切片
2)切片是按照每個文件獨立進行切片,并不會將所有文件當做一個整體去切片,這樣有缺點(后面講)
3)切片大小:默認為blocksize,計算機制如上,這里不重復
FileInputFormat.setMaxInputSplitSize();maxsizeFileInputFormat.setMinIutputSplitSize();minsize
可以通過設置兩個值來改變切片大小
4)切片的方式:根據源碼,每次切片時,都會判斷切完剩下的部分是否大于splitSize的1.1倍,如果不大于,那么此時切片就終止,并將剩下的部分作為最后一個切片。
我們從(2)中可以知道,TextInputFormat(FileInputFormat)切片時是按照文件進行切片的,也就是說一個文件至少是一個切片,無論文件的大小是多大。而如果有大量的小文件,那么就會生成很多個maptask,處理效率很低。對于這種情況,解決方案為:
1)從數據源頭上解決,將數據合并后再上傳至HDFS,不產生大量小文件
2)如果必須處理大量小文件,那么就采用CombineTextInputFormat來進行切片。
切片邏輯如下(源碼挺長的,下面直接說我研究源碼后的結果):
首先CombineTextInputFormat沒有實現 getSplit() 方法,而是由它的父類 CombineFileInputformat實現的,它會將一個目錄下的多個文件作為一個整體的數據源進行切片,切片的大小取決于 MaxSplitSize 設定的最大切片大小大小,單位是byte。切片邏輯為
totalSize<=1.5*MaxSplitSize1片,splitSize=totalSize1.5*MaxSplitSize<totalsize<2*MaxSplitSize2片,splitSize=MaxSplitSizetotalsize>2*MaxSplitSizen片,splitSize=MaxSplitSize要注意的是:如果總的數據大小遠大于MaxSplitSize時,切到最后一片的時候,會判斷切片后,剩下的部分是否大于2倍MaxSplitSize,如果不大于,就算作一片,如果大于就兩片
使用 CombineTextInputFormat 作為InpuFormat的操作類:
//設置InputFormat的類為CombineTextInputFormatjob.setInputFormatClass(CombineTextInputFormat.class);//分別設置切片最大值和最小值CombineTextInputFormat.setMaxInputSplitSize(job,4194304);//4mCombineTextInputFormat.setMinInputSplitSize(job,2097152);//2m
前面說到,map task的數量決定于切片的數量,那么reduce task的數量決定于什么呢?決定于分區的數量。
1)首先需要自定義一個分區類,并繼承 Partitioner<key,value>
2)重寫public Int getPartition() 方法。返回的是分區號
3)在job中設置自定義的類為分區類,否則默認的分區類就是HashPartitioner
job.setPartitionerClass(CustomPartitioner.class);
4)設置reduce task數量,一般和分區數相同 ,
job.setNumReduceTasks(N);
注意:分區數和reduce task數的聯系
如果reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;
如果1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會Exception;
如果reduceTask的數量=1,則不管mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000;
publicclassProvincePartitionerextendsPartitioner<Text,FlowBean>{@OverridepublicintgetPartition(Textkey,FlowBeanvalue,intnumPartitions){//1獲取電話號碼的前三位StringpreNum=key.toString().substring(0,3);//默認分區號,如果都不符合下面條件,則KV劃分到這個分區intpartition=4;//2判斷是哪個省if("136".equals(preNum)){partition=0;}elseif("137".equals(preNum)){partition=1;}elseif("138".equals(preNum)){partition=2;}elseif("139".equals(preNum)){partition=3;}returnpartition;}}
到此,關于“MapReduce基本原理是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注本站網站,小編會繼續努力為大家帶來更多實用的文章!
本文由 貴州做網站公司 整理發布,部分圖文來源于互聯網,如有侵權,請聯系我們刪除,謝謝!
c語言中正確的字符常量是用一對單引號將一個字符括起表示合法的字符常量。例如‘a’。數值包括整型、浮點型。整型可用十進制,八進制,十六進制。八進制前面要加0,后面...
2022年天津專場考試原定于3月19日舉行,受疫情影響確定延期,但目前延期后的考試時間推遲。 符合報名條件的考生,須在規定時間登錄招考資訊網(www.zha...
:喜歡聽,樂意看。指很受歡迎?!巴卣官Y料”喜聞樂見:[ xǐ wén lè jiàn ]詳細解釋1. 【解釋】:喜歡聽,樂意看。指很受歡迎。2. 【示例】:這是...
廣發信用卡申請進度查詢如何查?1、電話查詢:打信用卡客服電話或者登陸網銀,輸入身份證號查詢進度。2、微信查詢:關注銀行公眾號,查詢申請進度。3、網銀查詢:登錄銀行信用卡中心頁面,然后點擊“辦卡進度查詢”功能;4、進入“信用卡辦卡進度查詢”頁面,輸入證件號碼及驗證碼就能查詢到銀行信用卡申請進度詳細情況。5、銀行柜臺查詢:通過銀行柜臺或者聯系當地的發卡機...
毛利率法就是指依據當期銷售凈額乘以上期具體毛利率算當期市場銷售毛利,并據以計算發出庫存商品和期終結存存貨成本的一種方式。這一方法常見于商品批發為企業計算當期商品銷售成本和期終庫存商品成本。商品流通企業因為商品類型多,一般來講,和類似商品的毛利率基本相同,選用毛利率法可緩降低工作量。毛利率法的計算公式1、 毛利率=銷售毛利/銷售凈額* 100%;2、銷售毛利=銷售凈額*毛利率;3、銷售凈額=商品銷售...
【資料圖】關于溫州的二手電腦市場在哪啊的知識大家了解嗎?以下就是小編整理的關于溫州的二手電腦市場在哪啊的介紹,希望可以給到大家一些參考,一起來了解下吧!1、在錦繡公路和吳橋公路的交叉口再往城南立交橋方向,走50米左右。2、從城南立交橋往物華天寶方向,過了蔬菜批發市場,再過了橋就到了。坐公交208可以到,公交108也可以,還有27路是路過二手電腦市場。...