MapReduce源碼分析總結(jié)
《MapReduce源碼分析總結(jié)》由會(huì)員分享,可在線閱讀,更多相關(guān)《MapReduce源碼分析總結(jié)(22頁珍藏版)》請(qǐng)?jiān)谘b配圖網(wǎng)上搜索。
1、 參考: 1 caibinbupt的源代碼分析 2 coderplay的avaeye 3 Javen-Studio 咖啡小屋 一 MapReduce概述 Map/Reduce是一個(gè)用于大規(guī)模數(shù)據(jù)處理的分布式計(jì)算模型,它最初是由Google工程師設(shè)計(jì)并實(shí)現(xiàn)的,Google已經(jīng)將它完整的MapReduce論文公開發(fā)布了。其中對(duì)它的定義是,Map/Reduce是一個(gè)編程模型(programming model),是一個(gè)用于處理和生成大規(guī)模數(shù)據(jù)集(processing and generating large data sets)的相關(guān)的實(shí)現(xiàn)。用戶定義一個(gè)map函數(shù)來處理一個(gè)k
2、ey/value對(duì)以生成一批中間的key/value對(duì),再定義一個(gè)reduce函數(shù)將所有這些中間的有著相同key的values合并起來。很多現(xiàn)實(shí)世界中的任務(wù)都可用這個(gè)模型來表達(dá)。
二 MapReduce工作原理
Map-Reduce框架的運(yùn)作完全基于
3、執(zhí)行排序操作。
一個(gè)Map-Reduce任務(wù)的執(zhí)行過程以及數(shù)據(jù)輸入輸出的類型如下所示:
Map:
4、個(gè)key/value對(duì),key取偏移量,value為行內(nèi)容。 如下是map1的輸入數(shù)據(jù): 推薦精選 Key1 Value1 0 Hello World Bye World 如下是map2的輸入數(shù)據(jù): Key1 Value1 0 Hello Hadoop GoodBye Hadoop 2 map輸出/combine輸入 如下是map1的輸出結(jié)果 Key2 Value2 Hello 1 World 1 Bye 1 World 1 如下是map2的輸出結(jié)果 Key2 Value2 Hello 1 Hadoop 1 GoodBye 1
5、 Hadoop 1 3 combine輸出 推薦精選 Combiner類實(shí)現(xiàn)將相同key的值合并起來,它也是一個(gè)Reducer的實(shí)現(xiàn)。 如下是combine1的輸出 Key2 Value2 Hello 1 World 2 Bye 1 如下是combine2的輸出 Key2 Value2 Hello 1 Hadoop 2 GoodBye 1 4 reduce輸出 Reducer類實(shí)現(xiàn)將相同key的值合并起來。 如下是reduce的輸出 Key2 Value2 Hello 2 World 2 Bye 1 Hadoop 2
6、 GoodBye 1 推薦精選 三 MapReduce框架結(jié)構(gòu) 1 角色 1.1 JobTracker JobTracker是一個(gè)master服務(wù), JobTracker負(fù)責(zé)調(diào)度job的每一個(gè)子任務(wù)task運(yùn)行于TaskTracker上,并監(jiān)控它們,如果發(fā)現(xiàn)有失敗的task就重新運(yùn)行它。一般情況應(yīng)該把JobTracker部署在單獨(dú)的機(jī)器上。 1.2 TaskTracker TaskTracker是運(yùn)行于多個(gè)節(jié)點(diǎn)上的slaver服務(wù)。TaskTracker則負(fù)責(zé)直接執(zhí)行每一個(gè)task。TaskTracker都需要運(yùn)行在HDFS的DataNode上, 1.3 JobClien
7、t 每一個(gè)job都會(huì)在用戶端通過JobClient類將應(yīng)用程序以及配置參數(shù)打包成jar文件存儲(chǔ)在HDFS,并把路徑提交到JobTracker,然后由JobTracker創(chuàng)建每一個(gè)Task(即MapTask和ReduceTask)并將它們分發(fā)到各個(gè)TaskTracker服務(wù)中去執(zhí)行。 2 數(shù)據(jù)結(jié)構(gòu) 2.1 Mapper和Reducer 運(yùn)行于Hadoop的MapReduce應(yīng)用程序最基本的組成部分包括一個(gè)Mapper和一個(gè)Reducer類,以及一個(gè)創(chuàng)建JobConf的執(zhí)行程序,在一些應(yīng)用中還可以包括一個(gè)Combiner類,它實(shí)際也是Reducer的實(shí)現(xiàn)。 2.2 JobInProgre
8、ss JobClient提交job后,JobTracker會(huì)創(chuàng)建一個(gè)JobInProgress來跟蹤和調(diào)度這個(gè)job,并把它添加到j(luò)ob隊(duì)列里。JobInProgress會(huì)根據(jù)提交的job jar中定義的輸入數(shù)據(jù)集(已分解成FileSplit)創(chuàng)建對(duì)應(yīng)的一批TaskInProgress用于監(jiān)控和調(diào)度MapTask,同時(shí)在創(chuàng)建指定數(shù)目的TaskInProgress用于監(jiān)控和調(diào)度ReduceTask,缺省為1個(gè)ReduceTask。 2.3 TaskInProgress JobTracker啟動(dòng)任務(wù)時(shí)通過每一個(gè)TaskInProgress來launchTask,這時(shí)會(huì)把Task對(duì)象(即Map
9、Task和ReduceTask)序列化寫入相應(yīng)的TaskTracker服務(wù)中,TaskTracker收到后會(huì)創(chuàng)建對(duì)應(yīng)的TaskInProgress(此TaskInProgress實(shí)現(xiàn)非JobTracker中使用的TaskInProgress,作用類似)用于監(jiān)控和調(diào)度該Task。啟動(dòng)具體的Task進(jìn)程是通過TaskInProgress管理的TaskRunner對(duì)象來運(yùn)行的。TaskRunner會(huì)自動(dòng)裝載job jar,并設(shè)置好環(huán)境變量后啟動(dòng)一個(gè)獨(dú)立的java child進(jìn)程來執(zhí)行Task,即MapTask或者ReduceTask,但它們不一定運(yùn)行在同一個(gè)TaskTracker中。 2.4 Ma
10、pTask和ReduceTask
一個(gè)完整的job會(huì)自動(dòng)依次執(zhí)行Mapper、Combiner(在JobConf指定了Combiner時(shí)執(zhí)行)和Reducer,其中Mapper和Combiner是由MapTask調(diào)用執(zhí)行,Reducer則由ReduceTask調(diào)用,Combiner實(shí)際也是Reducer接口類的實(shí)現(xiàn)。Mapper會(huì)根據(jù)job jar中定義的輸入數(shù)據(jù)集按
11、集。MapTask的任務(wù)全完成即交給ReduceTask進(jìn)程調(diào)用Reducer處理,生成最終結(jié)果
12、有任務(wù)可做,如果有,讓其派發(fā)任務(wù)給它執(zhí)行。如果JobTracker的作業(yè)隊(duì)列不為空, 則TaskTracker發(fā)送的心跳將會(huì)獲得JobTracker給它派發(fā)的任務(wù)。這是一道pull過程。slave節(jié)點(diǎn)的TaskTracker接到任務(wù)后在其本地發(fā)起Task,執(zhí)行任務(wù)。以下是簡略示意圖: 推薦精選 下面詳細(xì)介紹一下Map/Reduce處理一個(gè)工作的流程。 四JobClient 在編寫MapReduce程序時(shí)通常是上是這樣寫的: Configuration conf = new Configuration(); // 讀取hadoop配置 Job job = new Job(co
13、nf, "作業(yè)名稱"); // 實(shí)例化一道作業(yè) job.setMapperClass(Mapper類型); job.setCombinerClass(Combiner類型); job.setReducerClass(Reducer類型); job.setOutputKeyClass(輸出Key的類型); job.setOutputValueClass(輸出Value的類型); FileInputFormat.addInputPath(job, new Path(輸入hdfs路徑)); FileOutputFormat.setOutputPath(job, new Path(輸出h
14、dfs路徑)); // 其它初始化配置 JobClient.runJob(job); 1 配置Job JobConf是用戶描述一個(gè)job的接口。下面的信息是MapReduce過程中一些較關(guān)鍵的定制信息: 推薦精選 2 JobClient.runJob():運(yùn)行Job并分解輸入數(shù)據(jù)集 一個(gè)MapReduce的Job會(huì)通過JobClient類根據(jù)用戶在JobConf類中定義的InputFormat實(shí)現(xiàn)類來將輸入的數(shù)據(jù)集分解成一批小的數(shù)據(jù)集,每一個(gè)小數(shù)據(jù)集會(huì)對(duì)應(yīng)創(chuàng)建一個(gè)MapTask來處理。JobClient會(huì)使用缺省的FileInputFormat類調(diào)用FileInputFo
15、rmat.getSplits()方法生成小數(shù)據(jù)集,如果判斷數(shù)據(jù)文件是isSplitable()的話,會(huì)將大的文件分解成小的FileSplit,當(dāng)然只是記錄文件在HDFS里的路徑及偏移量和Split大小。這些信息會(huì)統(tǒng)一打包到j(luò)obFile的jar中。 JobClient然后使用submitJob(job)方法向 master提交作業(yè)。submitJob(job)內(nèi)部是通過submitJobInternal(job)方法完成實(shí)質(zhì)性的作業(yè)提交。submitJobInternal(job)方法首先會(huì)向hadoop分布系統(tǒng)文件系統(tǒng)hdfs依次上傳三個(gè)文件: job.jar, job.split和job
16、.xml。 job.xml: 作業(yè)配置,例如Mapper, Combiner, Reducer的類型,輸入輸出格式的類型等。 job.jar: jar包,里面包含了執(zhí)行此任務(wù)需要的各種類,比如 Mapper,Reducer等實(shí)現(xiàn)。 job.split: 文件分塊的相關(guān)信息,比如有數(shù)據(jù)分多少個(gè)塊,塊的大小(默認(rèn)64m)等。 這三個(gè)文件在hdfs上的路徑由hadoop-default.xml文件中的mapreduce系統(tǒng)路徑mapred.system.dir屬性 + jobid決定。mapred.system.dir屬性默認(rèn)是/tmp/hadoop-user_name/mapred
17、/system。寫完這三個(gè)文 件之后, 此方法會(huì)通過RPC調(diào)用master節(jié)點(diǎn)上的JobTracker.submitJob(job)方法,此時(shí)作業(yè)已經(jīng)提交完成。 推薦精選 3 提交Job jobFile的提交過程是通過RPC模塊(有單獨(dú)一章來詳細(xì)介紹)來實(shí)現(xiàn)的。大致過程是,JobClient類中通過RPC實(shí)現(xiàn)的Proxy接口調(diào)用JobTracker的submitJob()方法,而JobTracker必須實(shí)現(xiàn)JobSubmissionProtocol接口。 JobTracker創(chuàng)建job成功后會(huì)給JobClient傳回一個(gè)JobStatus對(duì)象用于記錄job的狀態(tài)信息,如執(zhí)行時(shí)間、M
18、ap和Reduce任務(wù)完成的比例等。JobClient會(huì)根據(jù)這個(gè)JobStatus對(duì)象創(chuàng)建一個(gè)NetworkedJob的RunningJob對(duì)象,用于定時(shí)從JobTracker獲得執(zhí)行過程的統(tǒng)計(jì)數(shù)據(jù)來監(jiān)控并打印到用戶的控制臺(tái)。 與創(chuàng)建Job過程相關(guān)的類和方法如下圖所示 五 JobTracker 上面已經(jīng)提到,job是統(tǒng)一由JobTracker來調(diào)度的,具體的Task分發(fā)給各個(gè)TaskTracker節(jié)點(diǎn)來執(zhí)行。下面來詳細(xì)解析執(zhí)行過程,首先先從JobTracker收到JobClient的提交請(qǐng)求開始。 1 JobTracker初始化Job 推薦精選 1.1 JobTracker
19、.submitJob() 收到請(qǐng)求 當(dāng)JobTracker接收到新的job請(qǐng)求(即submitJob()函數(shù)被調(diào)用)后,會(huì)創(chuàng)建一個(gè)JobInProgress對(duì)象并通過它來管理和調(diào)度任務(wù)。JobInProgress在創(chuàng)建的時(shí)候會(huì)初始化一系列與任務(wù)有關(guān)的參數(shù),調(diào)用到FileSystem,把在JobClient端上傳的所有任務(wù)文件下載到本地的文件系統(tǒng)中的臨時(shí)目錄里。這其中包括上傳的*.jar文件包、記錄配置信息的xml、記錄分割信息的文件。 1.2 JobTracker.JobInitThread 通知初始化線程 JobTracker 中的監(jiān)聽器類EagerTaskInitialization
20、Listener負(fù)責(zé)任務(wù)Task的初始化。JobTracker使用jobAdded(job)加入job到EagerTaskInitializationListener中一個(gè)專門管理需要初始化的隊(duì)列里,即一個(gè)list成員變量jobInitQueue里。resortInitQueue方法根據(jù)作業(yè)的優(yōu)先級(jí)排序。然后調(diào)用notifyAll()函數(shù),會(huì)喚起一個(gè)用于初始化job的線程JobInitThread來處理。JobInitThread收到信號(hào)后即取出最靠前的job,即優(yōu)先級(jí)別最高的job,調(diào)用TaskTrackerManager的initJob最終調(diào)用JobInProgress.initTasks
21、()執(zhí)行真正的初始化工作。 1.3 JobInProgress.initTasks() 初始化TaskInProgress 任務(wù)Task分兩種: MapTask 和reduceTask,它們的管理對(duì)象都是TaskInProgress 。 首先JobInProgress會(huì)創(chuàng)建Map的監(jiān)控對(duì)象。在initTasks()函數(shù)里通過調(diào)用JobClient的readSplitFile()獲得已分解的輸入數(shù)據(jù)的RawSplit列表,然后根據(jù)這個(gè)列表創(chuàng)建對(duì)應(yīng)數(shù)目的Map執(zhí)行管理對(duì)象TaskInProgress。在這個(gè)過程中,還會(huì)記錄該RawSplit塊對(duì)應(yīng)的所有在HDFS里的blocks所在的Data
22、Node節(jié)點(diǎn)的host,這個(gè)會(huì)在RawSplit創(chuàng)建時(shí)通過FileSplit的getLocations()函數(shù)獲取,該函數(shù)會(huì)調(diào)用DistributedFileSystem的getFileCacheHints()獲得(這個(gè)細(xì)節(jié)會(huì)在HDFS中講解)。當(dāng)然如果是存儲(chǔ)在本地文件系統(tǒng)中,即使用LocalFileSystem時(shí)當(dāng)然只有一個(gè)location即“l(fā)ocalhost”了。 創(chuàng)建這些TaskInProgress對(duì)象完畢后,initTasks()方法會(huì)通 過createCache()方法為這些TaskInProgress對(duì)象產(chǎn)生一個(gè)未執(zhí)行任務(wù)的Map緩存nonRunningMapCache。sla
23、ve端的 TaskTracker向master發(fā)送心跳時(shí),就可以直接從這個(gè)cache中取任務(wù)去執(zhí)行。 其次JobInProgress會(huì)創(chuàng)建Reduce的監(jiān)控對(duì)象,這個(gè)比較簡單,根據(jù)JobConf里指定的Reduce數(shù)目創(chuàng)建,缺省只創(chuàng)建1個(gè)Reduce任務(wù)。監(jiān)控和調(diào)度Reduce任務(wù)的是TaskInProgress類,不過構(gòu)造方法有所不同,TaskInProgress會(huì)根據(jù)不同參數(shù)分別創(chuàng)建具體的MapTask或者ReduceTask。同樣地,initTasks()也會(huì)通過createCache()方法產(chǎn)生nonRunningReduceCache成員。 JobInProgress創(chuàng)建完Tas
24、kInProgress后,最后構(gòu)造JobStatus并記錄job正在執(zhí)行中,然后再調(diào)用JobHistory.JobInfo.logStarted()記錄job的執(zhí)行日志。到這里JobTracker里初始化job的過程全部結(jié)束。 推薦精選 2 JobTracker調(diào)度Job hadoop默認(rèn)的調(diào)度器是FIFO策略的JobQueueTaskScheduler,它有兩個(gè)成員變量 jobQueueJobInProgressListener與上面說的eagerTaskInitializationListener。JobQueueJobInProgressListener是JobTracke
25、r的另一個(gè)監(jiān)聽器類,它包含了一個(gè)映射,用來管理和調(diào)度所有的JobInProgress。jobAdded(job)同時(shí)會(huì)加入job到JobQueueJobInProgressListener中的映射。 JobQueueTaskScheduler最重要的方法是assignTasks ,他實(shí)現(xiàn)了工作調(diào)度。具體實(shí)現(xiàn):JobTracker 接到TaskTracker 的heartbeat() 調(diào)用后,首先會(huì)檢查上一個(gè)心跳響應(yīng)是否完成,是沒要求啟動(dòng)或重啟任務(wù),如果一切正常,則會(huì)處理心跳。首先它會(huì)檢查 TaskTracker 端還可以做多少個(gè) map 和 reduce 任務(wù),將要派發(fā)的任務(wù)數(shù)是否超出這個(gè)數(shù)
26、,是否超出集群的任務(wù)平均剩余可負(fù)載數(shù)。如果都沒超出,則為此 TaskTracker 分配一個(gè) MapTask 或 ReduceTask 。產(chǎn)生 Map 任務(wù)使用 JobInProgress 的 obtainNewMapTask() 方法,實(shí)質(zhì)上最后調(diào)用了 JobInProgress 的 findNewMapTask() 訪問 nonRunningMapCache 。 上面講解任務(wù)初始化時(shí)說過,createCache()方法會(huì)在網(wǎng)絡(luò)拓?fù)浣Y(jié)構(gòu)上掛上需要執(zhí)行的TaskInProgress。findNewMapTask()從近到遠(yuǎn)一層一層地尋找,首先是同一節(jié)點(diǎn),然后在尋找同一機(jī)柜上的節(jié)點(diǎn),接著尋找相
27、同數(shù)據(jù)中心下的節(jié)點(diǎn),直到找了maxLevel層結(jié)束。這樣的話,在JobTracker給TaskTracker派發(fā)任務(wù)的時(shí)候,可以迅速找到最近的TaskTracker,讓它執(zhí)行任務(wù)。 最終生成一個(gè)Task類對(duì)象,該對(duì)象被封裝在一個(gè)LanuchTaskAction 中,發(fā)回給TaskTracker,讓它去執(zhí)行任務(wù)。 產(chǎn)生 Reduce 任務(wù)過程類似,使用 JobInProgress.obtainNewReduceTask() 方法,實(shí)質(zhì)上最后調(diào)用了 JobInProgress 的 findNewReduceTask() 訪問 nonRuningReduceCache。 推薦精選 六
28、 TaskTracker 1 TaskTracker加載Task到子進(jìn)程 Task的執(zhí)行實(shí)際是由TaskTracker發(fā)起的,TaskTracker會(huì)定期(缺省為10秒鐘,參見MRConstants類中定義的HEARTBEAT_INTERVAL變量)與JobTracker進(jìn)行一次通信,報(bào)告自己Task的執(zhí)行狀態(tài),接收J(rèn)obTracker的指令等。如果發(fā)現(xiàn)有自己需要執(zhí)行的新任務(wù)也會(huì)在這時(shí)啟動(dòng),即是在TaskTracker調(diào)用JobTracker的heartbeat()方法時(shí)進(jìn)行,此調(diào)用底層是通過IPC層調(diào)用Proxy接口實(shí)現(xiàn)。下面一一簡單介紹下每個(gè)步驟。 1.1 TaskTracker.r
29、un() 連接JobTracker TaskTracker的啟動(dòng)過程會(huì)初始化一系列參數(shù)和服務(wù),然后嘗試連接JobTracker(即必須實(shí)現(xiàn)InterTrackerProtocol接口),如果連接斷開,則會(huì)循環(huán)嘗試連接JobTracker,并重新初始化所有成員和參數(shù)。 1.2 TaskTracker.offerService() 主循環(huán) 如果連接JobTracker服務(wù)成功,TaskTracker就會(huì)調(diào)用offerService()函數(shù)進(jìn)入主執(zhí)行循環(huán)中。這個(gè)循環(huán)會(huì)每隔10秒與JobTracker通訊一次,調(diào)用transmitHeartBeat(),獲得HeartbeatResponse信息
30、。然后調(diào)用HeartbeatResponse的getActions()函數(shù)獲得JobTracker傳過來的所有指令即一個(gè)TaskTrackerAction數(shù)組。再遍歷這個(gè)數(shù)組,如果是一個(gè)新任務(wù)指令即LaunchTaskAction則調(diào)用調(diào)用addToTaskQueue加入到待執(zhí)行隊(duì)列,否則加入到tasksToCleanup隊(duì)列,交給一個(gè)taskCleanupThread線程來處理,如執(zhí)行 推薦精選 KillJobAction或者KillTaskAction等。 1.3 TaskTracker.transmitHeartBeat() 獲取JobTracker指令 在transmitH
31、eartBeat()函數(shù)處理中,TaskTracker會(huì)創(chuàng)建一個(gè)新的TaskTrackerStatus對(duì)象記錄目前任務(wù)的執(zhí)行狀況,檢查目前執(zhí)行的Task數(shù)目以及本地磁盤的空間使用情況等,如果可以接收新的Task則設(shè)置heartbeat()的askForNewTask參數(shù)為true。然后通過IPC接口調(diào)用JobTracker的heartbeat()方法發(fā)送過去,heartbeat()返回值TaskTrackerAction數(shù)組。 1.4 TaskTracker.addToTaskQueue,交給TaskLauncher處理 TaskLauncher是用來處理新任務(wù)的線程類,包含了一個(gè)待運(yùn)行任
32、務(wù)的隊(duì)列 tasksToLaunch。TaskTracker.addToTaskQueue會(huì)調(diào)用TaskTracker的registerTask,創(chuàng)建TaskInProgress對(duì)象來調(diào)度和監(jiān)控任務(wù),并把它加入到runningTasks隊(duì)列中。同時(shí)將這個(gè)TaskInProgress加到tasksToLaunch 中,并notifyAll()喚醒一個(gè)線程運(yùn)行,該線程從隊(duì)列tasksToLaunch取出一個(gè)待運(yùn)行任務(wù),調(diào)用TaskTracker的startNewTask運(yùn)行任務(wù)。 1.5 TaskTracker.startNewTask() 啟動(dòng)新任務(wù) 調(diào)用localizeJob()真正初始化
33、Task并開始執(zhí)行。 1.6 TaskTracker.localizeJob() 初始化job目錄等 此函數(shù)主要任務(wù)是初始化工作目錄workDir,再將job jar包從HDFS復(fù)制到本地文件系統(tǒng)中,調(diào)用RunJar.unJar()將包解壓到工作目錄。然后創(chuàng)建一個(gè)RunningJob并調(diào)用addTaskToJob()函數(shù)將它添加到runningJobs監(jiān)控隊(duì)列中。addTaskToJob方法把一個(gè)任務(wù)加入到該任務(wù)屬于的runningJob的tasks列表中。如果該任務(wù)屬于的runningJob不存在,先新建,加到runningJobs中。完成后即調(diào)用launchTaskForJob()開始
34、執(zhí)行Task。 1.7 TaskTracker.launchTaskForJob() 執(zhí)行任務(wù) 啟動(dòng)Task的工作實(shí)際是調(diào)用TaskTracker$TaskInProgress的launchTask()函數(shù)來執(zhí)行的。 1.8 TaskTracker$TaskInProgress.launchTask() 執(zhí)行任務(wù) 執(zhí)行任務(wù)前先調(diào)用localizeTask()更新一下jobConf文件并寫入到本地目錄中。然后通過調(diào)用Task的createRunner()方法創(chuàng)建TaskRunner對(duì)象并調(diào)用其start()方法最后啟動(dòng)Task獨(dú)立的java執(zhí)行子進(jìn)程。 1.9 Task.createR
35、unner() 創(chuàng)建啟動(dòng)Runner對(duì)象 Task有兩個(gè)實(shí)現(xiàn)版本,即MapTask和ReduceTask,它們分別用于創(chuàng)建Map和Reduce任務(wù)。MapTask會(huì)創(chuàng)建MapTaskRunner來啟動(dòng)Task子進(jìn)程,而ReduceTask則創(chuàng)建ReduceTaskRunner來啟動(dòng)。 推薦精選 1.10 TaskRunner.start() 啟動(dòng)子進(jìn)程 TaskRunner負(fù)責(zé)將一個(gè)任務(wù)放到一個(gè)進(jìn)程里面來執(zhí)行。它會(huì)調(diào)用run()函數(shù)來處理,主要的工作就是初始化啟動(dòng)java子進(jìn)程的一系列環(huán)境變量,包括設(shè)定工作目錄workDir,設(shè)置CLASSPATH環(huán)境變量等。然后裝載job jar
36、包。JvmManager用于管理該TaskTracker上所有運(yùn)行的Task子進(jìn)程。每一個(gè)進(jìn)程都是由JvmRunner來管理的,它也是位于單獨(dú)線程中的。JvmManager的launchJvm方法,根據(jù)任務(wù)是map還是reduce,生成對(duì)應(yīng)的JvmRunner并放到對(duì)應(yīng)JvmManagerForType的進(jìn)程容器中進(jìn)行管理。JvmManagerForType的reapJvm() 分配一個(gè)新的JVM進(jìn)程。如果JvmManagerForType槽滿,就尋找idle的進(jìn)程,如果是同Job的直接放進(jìn)去,否則殺死這個(gè)進(jìn)程,用一個(gè)新的進(jìn)程代替。如果槽沒有滿,那么就啟動(dòng)新的子進(jìn)程。生成新的進(jìn)程使用spaw
37、nNewJvm方法。spawnNewJvm使用JvmRunner線程的run方法,run方法用于生成一個(gè)新的進(jìn)程并運(yùn)行它,具體實(shí)現(xiàn)是調(diào)用runChild。 2 子進(jìn)程執(zhí)行MapTask 真實(shí)的執(zhí)行載體,是Child,它包含一個(gè) main函數(shù),進(jìn)程執(zhí)行,會(huì)將相關(guān)參數(shù)傳進(jìn)來,它會(huì)拆解這些參數(shù),通過getTask(jvmId)向父進(jìn)程索取任務(wù),并且構(gòu)造出相關(guān)的Task實(shí)例,然后使用Task的run()啟動(dòng)任務(wù)。 2.1 run 方法相當(dāng)簡單,配置完系統(tǒng)的TaskReporter后,就根據(jù)情況執(zhí)行runJobCleanupTask,runJobSetupTask,runTaskCleanupT
38、ask或執(zhí)行Mapper。由于MapReduce現(xiàn)在有兩套API,MapTask需要支持這兩套API,使得MapTask執(zhí)行Mapper分為runNewMapper和runOldMapper,我們分析runOldMapper。 2.2 runOldMapper runOldMapper最開始部分是構(gòu)造Mapper處理的InputSplit,然后就開始創(chuàng)建Mapper的RecordReader,最終得到map的輸入。之后構(gòu)造Mapper的輸出,是通過MapOutputCollector進(jìn)行的,也分兩種情況,如果沒有Reducer,那么,用DirectMapOutputCollector,否則
39、,用MapOutputBuffer。
構(gòu)造完Mapper的輸入輸出,通過構(gòu)造配置文件中配置的MapRunnable,就可以執(zhí)行Mapper了。目前系統(tǒng)有兩個(gè)MapRunnable:MapRunner和MultithreadedMapRunner。MapRunner是單線程執(zhí)行器,比較簡單,他會(huì)使用反射機(jī)制生成用戶定義的Mapper接口實(shí)現(xiàn)類,作為他的一個(gè)成員。
2.3 MapRunner的run方法
會(huì)先創(chuàng)建對(duì)應(yīng)的key,value對(duì)象,然后,對(duì)InputSplit的每一對(duì)
40、llector收集每次處理kv對(duì)后得到的新的kv對(duì),把他們spill到文件或者放到內(nèi)存,以做進(jìn)一步的處理,比如排序,combine等。 2.4 OutputCollector OutputCollector的作用是收集每次調(diào)用map后得到的新的kv對(duì),寧把他們spill到文件或者放到內(nèi)存,以做進(jìn)一步的處理,比如排序,combine等。 推薦精選 MapOutputCollector 有兩個(gè)子類:MapOutputBuffer和DirectMapOutputCollector。 DirectMapOutputCollector用在不需要Reduce階段的時(shí)候。如果Mapper后續(xù)有r
41、educe任務(wù),系統(tǒng)會(huì)使用MapOutputBuffer做為輸出, MapOutputBuffer使用了一個(gè)緩沖區(qū)對(duì)map的處理結(jié)果進(jìn)行緩存,放在內(nèi)存中,又使用幾個(gè)數(shù)組對(duì)這個(gè)緩沖區(qū)進(jìn)行管理。 在適當(dāng)?shù)臅r(shí)機(jī),緩沖區(qū)中的數(shù)據(jù)會(huì)被spill到硬盤中。 向硬盤中寫數(shù)據(jù)的時(shí)機(jī): (1)當(dāng)內(nèi)存緩沖區(qū)不能容下一個(gè)太大的kv對(duì)時(shí)。spillSingleRecord方法。 (2)內(nèi)存緩沖區(qū)已滿時(shí)。SpillThread線程。 (3)Mapper的結(jié)果都已經(jīng)collect了,需要對(duì)緩沖區(qū)做最后的清理。Flush方法。 2.5 spillThread線程:將緩沖區(qū)中的數(shù)據(jù)spill到硬盤中。
42、(1)需要spill時(shí)調(diào)用函數(shù)sortAndSpill,按照partition和key做排序。默認(rèn)使用的是快速排序QuickSort。 (2)如果沒有combiner,則直接輸出記錄,否則,調(diào)用CombinerRunner的combine,先做combin然后輸出。 3 子進(jìn)程執(zhí)行ReduceTask ReduceTask.run方法開始和MapTask類似,包括initialize()初始化 ,runJobCleanupTask(),runJobSetupTask(),runTaskCleanupTask()。之后進(jìn)入正式的工作,主要有這么三個(gè)步驟:Copy、Sort、Reduce。
43、 推薦精選 3.1 Copy 就是從執(zhí)行各個(gè)Map任務(wù)的服務(wù)器那里,收羅到map的輸出文件??截惖娜蝿?wù),是由ReduceTask.ReduceCopier 類來負(fù)責(zé)。 3.1.1 類圖: 3.1.2 流程: 使用ReduceCopier.fetchOutputs開始 (1)索取任務(wù)。使用GetMapEventsThread線程。該線程的run方法不停的調(diào)用getMapCompletionEvents方法,該方法又使用RPC調(diào)用TaskUmbilicalProtocol協(xié)議的getMapCompletionEvents,方法使用所屬的jobID向其父TaskTracker詢問此
44、作業(yè)個(gè)Map任務(wù)的完成狀況(TaskTracker要向JobTracker詢問后再轉(zhuǎn)告給它...)。返回一個(gè)數(shù)組TaskCompletionEvent events[]。TaskCompletionEvent包含taskid和ip地址之類的信息。 (2)當(dāng)獲取到相關(guān)Map任務(wù)執(zhí)行服務(wù)器的信息后,有一個(gè)線程MapOutputCopier開啟,做具體的拷貝工作。 它會(huì)在一個(gè)單獨(dú)的線程內(nèi),負(fù)責(zé)某個(gè)Map任務(wù)服務(wù)器上文件的拷貝工作。MapOutputCopier的run循環(huán)調(diào)用copyOutput,copyOutput又調(diào)用getMapOutput,使用HTTP遠(yuǎn)程拷貝。 (3)getMapOutp
45、ut遠(yuǎn)程拷貝過來的內(nèi)容(當(dāng)然也可以是本地了...),作為MapOutput對(duì)象存在,它可以在內(nèi)存中也可以序列化在磁盤上,這個(gè)根據(jù)內(nèi)存使用狀況來自動(dòng)調(diào)節(jié)。 (4) 同時(shí),還有一個(gè)內(nèi)存Merger線程InMemFSMergeThread和一個(gè)文件Merger線程LocalFSMerger在同步工作,它們將下載過來的文件(可能在內(nèi)存中,簡單的統(tǒng)稱為文件...),做著歸并排序,以此,節(jié)約時(shí)間,降低輸入文件的數(shù)量,為后續(xù)的排序工作減 負(fù)。InMemFSMergeThread的run循環(huán)調(diào)用doInMemMerge, 該方法使用工具類Merger實(shí)現(xiàn)歸并,如果需要combine,則combinerRbi
46、ne。 3.2 Sort 排序工作,就相當(dāng)于上述排序工作的一個(gè)延續(xù)。它會(huì)在所有的文件都拷貝完畢后進(jìn)行。使用工具類Merger歸并所有的文件。經(jīng)過這一個(gè)流程,一個(gè)合并了所有所需Map任務(wù)輸出文件的新文件產(chǎn)生了。而那些從其他各個(gè)服務(wù)器網(wǎng)羅過來的 Map任務(wù)輸出文件,全部刪除了。 推薦精選 3.3Reduce Reduce任務(wù)的最后一個(gè)階段。他會(huì)準(zhǔn)備好 keyClass("mapred.output.key.class"或"mapred.mapoutput.key.class"), valueClass("mapred.mapoutput.value.class"或"mapred.ou
47、tput.value.class")和 Comparator(“mapred.output.value.groupfn.class”或 “parator.class”)。最后調(diào)用runOldReducer方法。(也是兩套API,我們分析runOldReducer) 3.3.1 runOldReducer (1)輸出方面。 它會(huì)準(zhǔn)備一個(gè)OutputCollector收集輸出,與MapTask不同,這個(gè)OutputCollector更為簡單,僅僅是打開一個(gè)RecordWriter,collect一次,write一次。最大的不同在于,這次傳入RecordWriter的文件系統(tǒng),基本都是分布式文件系統(tǒng), 或者說是HDFS。 (2)輸入方面,ReduceTask會(huì)用準(zhǔn)備好的KeyClass、ValueClass、KeyComparator等等之類的自定義類,構(gòu)造出Reducer所需的鍵類型, 和值的迭代類型Iterator(一個(gè)鍵到了這里一般是對(duì)應(yīng)一組值)。 (3)有了輸入,有了輸出,不斷循環(huán)調(diào)用自定義的Reducer,最終,Reduce階段完成。 推薦精選 (注:可編輯下載,若有不當(dāng)之處,請(qǐng)指正,謝謝!) 推薦精選
- 溫馨提示:
1: 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
2: 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
3.本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
5. 裝配圖網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 市教育局冬季運(yùn)動(dòng)會(huì)安全工作預(yù)案
- 2024年秋季《思想道德與法治》大作業(yè)及答案3套試卷
- 2024年教師年度考核表個(gè)人工作總結(jié)(可編輯)
- 2024年xx村兩委涉案資金退還保證書
- 2024年憲法宣傳周活動(dòng)總結(jié)+在機(jī)關(guān)“弘揚(yáng)憲法精神推動(dòng)發(fā)改工作高質(zhì)量發(fā)展”專題宣講報(bào)告會(huì)上的講話
- 2024年XX村合作社年報(bào)總結(jié)
- 2024-2025年秋季第一學(xué)期初中歷史上冊(cè)教研組工作總結(jié)
- 2024年小學(xué)高級(jí)教師年終工作總結(jié)匯報(bào)
- 2024-2025年秋季第一學(xué)期初中物理上冊(cè)教研組工作總結(jié)
- 2024年xx鎮(zhèn)交通年度總結(jié)
- 2024-2025年秋季第一學(xué)期小學(xué)語文教師工作總結(jié)
- 2024年XX村陳規(guī)陋習(xí)整治報(bào)告
- 2025年學(xué)校元旦迎新盛典活動(dòng)策劃方案
- 2024年學(xué)校周邊安全隱患自查報(bào)告
- 2024年XX鎮(zhèn)農(nóng)村規(guī)劃管控述職報(bào)告