Hadoop源碼分析(5HDFS數(shù)據(jù)流)

上傳人:熏** 文檔編號(hào):50507246 上傳時(shí)間:2022-01-20 格式:DOCX 頁(yè)數(shù):33 大小:660.42KB
收藏 版權(quán)申訴 舉報(bào) 下載
Hadoop源碼分析(5HDFS數(shù)據(jù)流)_第1頁(yè)
第1頁(yè) / 共33頁(yè)
Hadoop源碼分析(5HDFS數(shù)據(jù)流)_第2頁(yè)
第2頁(yè) / 共33頁(yè)
Hadoop源碼分析(5HDFS數(shù)據(jù)流)_第3頁(yè)
第3頁(yè) / 共33頁(yè)

下載文檔到電腦,查找使用更方便

0 積分

下載資源

還剩頁(yè)未讀,繼續(xù)閱讀

資源描述:

《Hadoop源碼分析(5HDFS數(shù)據(jù)流)》由會(huì)員分享,可在線閱讀,更多相關(guān)《Hadoop源碼分析(5HDFS數(shù)據(jù)流)(33頁(yè)珍藏版)》請(qǐng)?jiān)谘b配圖網(wǎng)上搜索。

1、Hadoop源碼分析(5 HDFS數(shù)據(jù)流) 作者:張孟志 日期:2013-07-30 下面我們可以來(lái)開(kāi)始分析DataNode上的動(dòng)態(tài)行為。首先我們來(lái)分析DataXceiverServer和DataXceiver。DataNode上數(shù)據(jù)塊的接受/發(fā)送并沒(méi)有采用我們前面介紹的RPC機(jī)制,原因很簡(jiǎn)單,RPC是一個(gè)命令式的接口,而DataNode處理數(shù)據(jù)部分,往往是一種流式機(jī)制。DataXceiverServer和DataXceiver就是這個(gè)機(jī)制的實(shí)現(xiàn)。其中,DataXceiver還依賴于兩個(gè)輔助類:BlockSender和BlockReceiver。下面是類圖: DataXceiver

2、Server很簡(jiǎn)單,它打開(kāi)一個(gè)端口,然后每接收到一個(gè)連接,就創(chuàng)建一個(gè)DataXceiver,服務(wù)于該連接,并記錄該連接的socket,對(duì)應(yīng)的實(shí)現(xiàn)在DataXceiverServer的run方法里。當(dāng)系統(tǒng)關(guān)閉時(shí),DataXceiverServer將關(guān)閉監(jiān)聽(tīng)的socket和所有DataXceiver的socket,這樣就導(dǎo)致了DataXceiver出錯(cuò)并結(jié)束線程。 DataXceiver才是真正干活的地方,目前,DataXceiver支持的操作總共有六條,分別是: OP_WRITE_BLOCK (80):寫(xiě)數(shù)據(jù)塊 OP_READ_BLOCK (81):讀數(shù)據(jù)塊 OP_READ_METADA

3、TA (82):讀數(shù)據(jù)塊元文件 OP_REPLACE_BLOCK (83):替換一個(gè)數(shù)據(jù)塊 OP_COPY_BLOCK (84):拷貝一個(gè)數(shù)據(jù)塊 OP_BLOCK_CHECKSUM (85):讀數(shù)據(jù)塊檢驗(yàn)碼 DataXceiver首先讀取客戶端的版本號(hào)并檢驗(yàn),然后再讀取一個(gè)字節(jié)的操作碼,并轉(zhuǎn)入相關(guān)的子程序進(jìn)行處理。 下面以$HADOOP_HOME/bin/hadoop fs -put 或$HADOOP_HOME/bin/hadoop fs -copyFromLocal (這兩個(gè)命令是一樣的)命令來(lái)介紹整個(gè)寫(xiě)入的流

4、程(OP_WRITE_BLOCK (80):寫(xiě)數(shù)據(jù)塊)。 客戶端首先向namenode申請(qǐng)寫(xiě)入文件(這里僅討論新寫(xiě)入一個(gè)文件的情況,目前Hadoop也支持append寫(xiě)入方式,但暫不討論); namenode接收請(qǐng)求后僅在namdnode端創(chuàng)建一個(gè)無(wú)對(duì)應(yīng)block的文件并在整個(gè)hdfs中維護(hù); 接著datanode建立以本地要上傳的文件作為輸入流,namenode返回的路徑作為輸出流,利用IOUtils.copyBytes()函數(shù)開(kāi)始上傳數(shù)據(jù)文件; 在client準(zhǔn)備好發(fā)送、響應(yīng)的兩個(gè)隊(duì)列和數(shù)據(jù)的packet后,向namenode申請(qǐng)對(duì)應(yīng)的datenodes和blocks;nameno

5、de為其分配datanodes和創(chuàng)建新的blocks; 然后由client建立到第一個(gè)datanode的連接(默認(rèn)會(huì)有3份備份)開(kāi)始正式寫(xiě)入數(shù)據(jù),并由每個(gè)datanode建立到下一個(gè)datanode的連接完成數(shù)據(jù)備份直到最后一個(gè)datanode; 同時(shí)每個(gè)datanode接受下一個(gè)datanode寫(xiě)入數(shù)據(jù)是否成功的響應(yīng)(ACK),并最終傳給client(以上過(guò)程類似于管線),釋放資源或?qū)﹀e(cuò)誤進(jìn)行處理。 數(shù)據(jù)流圖如圖所示: 步驟一:客戶端通過(guò)對(duì)DistributedFileSystem對(duì)象調(diào)用create()方法來(lái)創(chuàng)建文件; 步驟二:DistributedFileSystem對(duì)n

6、amenode創(chuàng)建一個(gè)RPC調(diào)用,在文件系統(tǒng)的命名空間中創(chuàng)建一個(gè)新文件。此時(shí)該文件中沒(méi)有對(duì)應(yīng)的數(shù)據(jù)塊;namenode執(zhí)行各種檢查以確保這個(gè)文件不存在,并且客戶端有創(chuàng)建該文件的權(quán)限; 步驟三:DistributedFileSystem向客戶端返回一個(gè)負(fù)責(zé)處理datanode和namenode間通信的FSDataOutputStream對(duì)象,由此客戶端開(kāi)始寫(xiě)入數(shù)據(jù)。DFSOutputStream將文件分成一個(gè)個(gè)數(shù)據(jù)包,并寫(xiě)入稱為數(shù)據(jù)隊(duì)列(data queue)的內(nèi)部隊(duì)列; 步驟四: DataStreamer處理數(shù)據(jù)隊(duì)列,根據(jù)datenode列表來(lái)要求namenode分配適合的新塊來(lái)存儲(chǔ)數(shù)

7、據(jù)備份。一組datanode組成一條管線(pipeline)。DataStreamer將數(shù)據(jù)包流式傳輸?shù)焦芫€中的第1個(gè)datanode,再根據(jù)管線節(jié)點(diǎn)順序傳輸數(shù)據(jù)包。 步驟五:DFSOutputStream維護(hù)一個(gè)稱為確認(rèn)隊(duì)列(ack queue)的內(nèi)部數(shù)據(jù)包隊(duì)列等待datanode的收到確認(rèn)回執(zhí)。收到管道中所有datanode確認(rèn)信息后,該數(shù)據(jù)包從確認(rèn)隊(duì)列刪除。 步驟六、步驟七:客戶端完成數(shù)據(jù)的寫(xiě)入后,會(huì)對(duì)數(shù)據(jù)流調(diào)用close()方法。該操作將剩余的所有數(shù)據(jù)包寫(xiě)入datanode管線中,并在聯(lián)系namenode且發(fā)送文件寫(xiě)入完成信號(hào)之前,等待確認(rèn)。Namenode以知道文件由哪些塊組

8、成(步驟四),所以在返回成功前只需要等待數(shù)據(jù)塊進(jìn)行最小量的復(fù)制。 下面就來(lái)詳細(xì)介紹以上過(guò)程在hadoop中的代碼實(shí)現(xiàn): 一、client端申請(qǐng) org.apache.hadoop.fs. FsShell: public int run(String argv[]) throws Exception { …… if ("-put".equals(cmd) || "-copyFromLocal".equals(cmd)) { Path[] srcs = new Path[argv.length-2]; for (int j=0 ; i < argv.

9、length-1 ;) srcs[j++] = new Path(argv[i++]); copyFromLocal(srcs, argv[i++]); } …… } org.apache.hadoop.fs. FsShell: void copyFromLocal(Path[] srcs, String dstf) throws IOException { Path dstPath = new Path(dstf); FileSystem dstFs = dstPath.getFileSystem(get

10、Conf()); if (srcs.length == 1 && srcs[0].toString().equals("-")) copyFromStdin(dstPath, dstFs); else dstFs.copyFromLocalFile(false, false, srcs, dstPath); } org.apache.hadoop.fs. FileSystem: public void copyFromLocalFile(boolean delSrc, boolean overwrite,

11、 Path[] srcs, Path dst) throws IOException { Configuration conf = getConf(); FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf); } org.apache.hadoop.fs. FileUtil: public static boolean copy(FileSystem srcFS, Path[] srcs,

12、 FileSystem dstFS, Path dst, boolean deleteSource, boolean overwrite, Configuration conf) throws IOException { …… if (srcs.length == 1) return copy(srcFS, srcs[0], dstFS, dst, deleteSour

13、ce, overwrite, conf); …… for (Path src : srcs) { try { if (!copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf)) returnVal = false; } catch (IOException e) { gotException = true; exceptions.append(e.getMessage()); exceptions.appe

14、nd("\n"); } } …… return returnVal; } 在FsShell類中,定義了hadoop命令的解析,有run()函數(shù)檢查hadoop shell命令的正確性并初始化shell,然后針對(duì)不同命令調(diào)用不同處理函數(shù)。本文是進(jìn)入-put和-copyFromLocal的處理,接著調(diào)用copyFromLocal()函數(shù);該函數(shù)會(huì)判斷輸入源是shell標(biāo)準(zhǔn)輸入還是本地文件,本文進(jìn)入本地文件,即調(diào)用copyFromLocalFile()函數(shù),經(jīng)過(guò)參數(shù)的完善,再調(diào)用FileUtil.copy()函數(shù);該函數(shù)會(huì)判斷輸入文件是一個(gè)還是多個(gè),我們?yōu)楹?jiǎn)單起見(jiàn),按

15、一個(gè)處理,從而進(jìn)入另一個(gè)重載的copy()函數(shù)。 org.apache.hadoop.fs. FileUtil: public static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, boolean deleteSource, boolean overwrite,

16、 Configuration conf) throws IOException { dst = checkDest(src.getName(), dstFS, dst, overwrite); if (srcFS.getFileStatus(src).isDir()) { checkDependencies(srcFS, src, dstFS, dst); if (!dstFS.mkdirs(dst)) { return false; } FileStatus contents[] = srcFS

17、.listStatus(src); for (int i = 0; i < contents.length; i++) { copy(srcFS, contents[i].getPath(), dstFS, new Path(dst, contents[i].getPath().getName()), deleteSource, overwrite, conf); } } else if (srcFS.isFile(src)) { InputStream in=nul

18、l; OutputStream out = null; try { in = srcFS.open(src); out = dstFS.create(dst, overwrite); IOUtils.copyBytes(in, out, conf, true); 這里分開(kāi)兩個(gè)步驟介紹。 } catch (IOException e) { IOUtils.closeStream(out); IOUtils.closeStream(in); t

19、hrow e; } } else { throw new IOException(src.toString() + ": No such file or directory"); } if (deleteSource) { return srcFS.delete(src, true); } else { return true; } } 從該copy()函數(shù)的參數(shù)可以看出,該函數(shù)包括輸入文件系統(tǒng)、源路徑、目標(biāo)文件系統(tǒng)、目標(biāo)路徑、是否刪除源文件(本例不刪除)、是否覆蓋目標(biāo)文件(本例是新建

20、,不覆蓋),以及conf配置,是處理不同文件系統(tǒng)間復(fù)制文件的任務(wù)。處理過(guò)程中,程序會(huì)判斷源路徑是目錄還是文件(本例是文件),然后對(duì)源文件建立輸入流(in),目標(biāo)文件路徑建立輸入流(out),再調(diào)用IOUtils.copyBytes()函數(shù)寫(xiě)入數(shù)據(jù)。 這時(shí)候需要話分兩頭,各表一支,先說(shuō)dstFS.create()函數(shù) 查看整體序列圖 。 create()函數(shù)在FileSystem類中也有多個(gè)版本,一級(jí)級(jí)封裝,最后調(diào)用了一個(gè)抽象函數(shù)FSDataOutputStream create()函數(shù),返回一個(gè)FSDateOutputStream類型(如圖2所示),我們這里的FS就是分布式文件系統(tǒng)

21、HDFS,故進(jìn)入DistributedFileSystem類的create()方法。在改方法的返回語(yǔ)句中可以看到,返回的FSDataOutputStream中包含了一個(gè)OutputStream類型(dfs.create()函數(shù)返回類型),該類型實(shí)際上是DFSOutputStream類型。 dfs.create()實(shí)際上調(diào)用的是DFSClient類中的create()函數(shù),該函數(shù)返回的OutputStream即時(shí)DFSOutputStream,DFSOutputStream的構(gòu)造函數(shù)里做了兩件事情,一是遠(yuǎn)程調(diào)用namenode去創(chuàng)建一個(gè)文件,另一個(gè)是streamer.start(),即啟動(dòng)

22、了一個(gè)pipeline,用于寫(xiě)數(shù)據(jù),也就是一個(gè)DataStreamer,處理data queue,每個(gè)block默認(rèn)64M,默認(rèn)按每64K為一個(gè)packet(共計(jì)1000個(gè)packets/block)交給DataStreamer處理。 二、namenode創(chuàng)建文件 org.apache.hadoop.hdfs.server.namenode. NameNode: public void create(String src, FsPermission masked, String cl

23、ientName, boolean overwrite, boolean createParent, short replication, long blockSize ) throws IOException { String clientMachine = getClientMach

24、ine(); if (stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.create: file " +src+" for "+clientName+" at "+clientMachine); } if (!checkPathLength(src)) { throw new IOException("create: Pathname too long. Limit "

25、 + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } namesystem.startFile(src, new PermissionStatus(UserGroupInformation.getCurrentUser().getShortUserName(), null, masked), clientName, clientMachine, overwrite, createParen

26、t, replication, blockSize); myMetrics.incrNumFilesCreated(); myMetrics.incrNumCreateFileOps(); } org.apache.hadoop.hdfs.server.namenode. FSNamesystem void startFile(String src, PermissionStatus permissions, String holder, String clientMachine, boo

27、lean overwrite, boolean createParent, short replication, long blockSize ) throws IOException { startFileInternal(src, permissions, holder, clientMachine, overwrite, false, createParent, replication, blockSize); getEditLog().logSync(); if (auditL

28、og.isInfoEnabled() && isExternalInvocation()) { final HdfsFileStatus stat = dir.getFileInfo(src); logAuditEvent(UserGroupInformation.getCurrentUser(), Server.getRemoteIp(), "create", src, null, stat); } } org.apache.hadoop.hdfs.serve

29、r.namenode. FSNamesystem private synchronized void startFileInternal(String src, PermissionStatus permissions, String holder, String clientMachine,

30、 boolean overwrite, boolean append, boolean createParent, short replication,

31、 long blockSize ) throws IOException { …… DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(clientMachine); if (append) { // // Replace current node with a INodeUnderConstruction. // Rec

32、reate in-memory lease record. // INodeFile node = (INodeFile) myFile; INodeFileUnderConstruction cons = new INodeFileUnderConstruction( node.getLocalNameBytes(), node.getReplication(),

33、 node.getModificationTime(), node.getPreferredBlockSize(), node.getBlocks(), node.getPermissionStatus(),

34、 holder, clientMachine, clientNode); dir.replaceNode(src, node, cons); leaseManager.addLease(cons.clientName, src); } else { // Now we can add the name to the filesystem. This file

35、has no // blocks associated with it. // checkFsObjectLimit(); // increment global generation stamp long genstamp = nextGenerationStamp(); INodeFileUnderConstruction newNode = dir.addFile(src, permissions, replication, blockSize, holde

36、r, clientMachine, clientNode, genstamp); if (newNode == null) { throw new IOException("DIR* NameSystem.startFile: " + "Unable to add file to namespace."); } leaseManager.addLease(newNode.clientName, src); if (NameNode.st

37、ateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: " +"add "+src+" to namespace for "+holder); } } …… } namenode的create()函數(shù)調(diào)用FSNameSystem類的startFileInternale()函數(shù)。從該函數(shù)中可以看出,該版本的hadoop是支持append文

38、件的,會(huì)以一個(gè)以創(chuàng)建的INode替換目前node,并在內(nèi)存中更新;而新寫(xiě)入的文件則是創(chuàng)建一個(gè)新的文件,狀態(tài)為under construction,但并沒(méi)有blocks與之對(duì)應(yīng),并生成文件的stamp。 接下來(lái)就返回到client的IOUtils.copyBytes()函數(shù)處,下面來(lái)表第二支。 三、client端準(zhǔn)備數(shù)據(jù)&詢問(wèn)block IOUtils調(diào)用重載的copyBytes()函數(shù),并在FSOutputSummer類中生成帶checksum的writeChecksumChunk()函數(shù),由于繼承關(guān)系,最終還是調(diào)用DFSClient類中的DFSOutputStream類中的writ

39、eChunk()函數(shù)。參考繼承關(guān)系如下: 由圖可見(jiàn),DFSOutputStream類繼承FSOutputSummer類,故由DFSOutputStream類來(lái)實(shí)現(xiàn)writeChunk()函數(shù);而且從圖中可以看出,DistributedFileSystem類的create()方法返回是一個(gè)包含了DFSOutputStream類的FSDataOutputStream類型。 接著繼續(xù)討論DFSOutputStream類的writeChunk()函數(shù)。首先介紹下寫(xiě)入數(shù)據(jù)流程,如前所述,client將block分為一個(gè)個(gè)的packet,按默認(rèn)寫(xiě)入3份至datanode1、datanode2和da

40、tanode3: client建立與datanode1的連接,將packet1寫(xiě)入datanode1; datanode1建立與datanode2的連接,將packet1寫(xiě)入datanode2,并寫(xiě)到本地,同時(shí)client將packet2寫(xiě)入datanode1; datanode2建立與datanode3的連接,將packet1寫(xiě)入datanode3,并寫(xiě)到本地,同時(shí)client將packet3寫(xiě)入datanode1,datanode1將packet2寫(xiě)入datanode2; 如此挨個(gè)流式寫(xiě)入到各datanode中 …… 當(dāng)datanode3寫(xiě)入packet1完成,檢查無(wú)誤后,發(fā)送a

41、ck信息給datanode2,datanode2再將ack發(fā)給datanode1,直到client,這才表示一個(gè)packet發(fā)送成功。 org.apache.hadoop.hdfs.DFSClient.DFSOutputStream // @see FSOutputSummer#writeChunk() @Override protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)

42、 throws IOException { checkOpen(); isClosed(); int cklen = checksum.length; int bytesPerChecksum = this.checksum.getBytesPerChecksum(); if (len > bytesPerChecksum) { throw new IOException("writeChunk() buffer size is " + len +

43、 " is larger than supported bytesPerChecksum " + bytesPerChecksum); } if (checksum.length != this.checksum.getChecksumSize()) { throw new IOException("writeChunk() checksum size is supposed to be " +

44、 this.checksum.getChecksumSize() + " but found to be " + checksum.length); } synchronized (dataQueue) { // If queue is full, then wait till we can create enough space while (!closed && dataQueue.size() + ackQueue.si

45、ze() > maxPackets) { try { dataQueue.wait(); } catch (InterruptedException e) { } } isClosed(); if (currentPacket == null) { currentPacket = new Packet(packetSize, chunksPerPacket,

46、 bytesCurBlock); if (LOG.isDebugEnabled()) { LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.seqno + ", src=" + src + ", packetSize=" + packetSize +

47、 ", chunksPerPacket=" + chunksPerPacket + ", bytesCurBlock=" + bytesCurBlock); } } currentPacket.writeChecksum(checksum, 0, cklen); currentPacket.writeData(b, offset, len); currentPacket.numChunks++; bytesCurBlock +

48、= len; // If packet is full, enqueue it for transmission // if (currentPacket.numChunks == currentPacket.maxChunks || bytesCurBlock == blockSize) { if (LOG.isDebugEnabled()) { LOG.debug("DFSClient writeChunk packet full seqno=" +

49、 currentPacket.seqno + ", src=" + src + ", bytesCurBlock=" + bytesCurBlock + ", blockSize=" + blockSize + ", appendChunk=" + appendChunk); } // // if we allo

50、cated a new packet because we encountered a block // boundary, reset bytesCurBlock. // if (bytesCurBlock == blockSize) { currentPacket.lastPacketInBlock = true; bytesCurBlock = 0; lastFlushOffset = 0; } enq

51、ueueCurrentPacket(); // If this was the first write after reopening a file, then the above // write filled up any partial chunk. Tell the summer to generate full // crc chunks from now on. if (appendChunk) { appendChunk = false;

52、 resetChecksumChunk(bytesPerChecksum); } int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize); computePacketChunkSize(psize, bytesPerChecksum); } } //LOG.debug("DFSClient writeChunk done length " + len + // "

53、checksum length " + cklen); } org.apache.hadoop.hdfs.DFSClient.DFSOutputStream private synchronized void enqueueCurrentPacket() { synchronized (dataQueue) { if (currentPacket == null) return; dataQueue.addLast(currentPacket); dataQueue.notifyAll();

54、 lastQueuedSeqno = currentPacket.seqno; currentPacket = null; } } DFSOutputStream中維護(hù)了兩個(gè)重要的隊(duì)列: org.apache.hadoop.hdfs.DFSClient.DFSOutputStream private LinkedList dataQueue = new LinkedList(); private LinkedList ackQueue = new LinkedList(); 負(fù)責(zé)

55、保證以上流水線持續(xù)進(jìn)行。 writeChunk()會(huì)先檢查data queue是否已滿,若滿就等到有足夠的空間來(lái)創(chuàng)建隊(duì)列中下一個(gè)packet。若currentPacket為空,則創(chuàng)建一個(gè)new Packet,并將該packet寫(xiě)上checksum、數(shù)據(jù)、序號(hào)等信息;接著檢查該packet是否已滿,若滿則放入data queue,并喚醒等待data queue的傳輸線程,即DataStreamer。 org.apache.hadoop.hdfs.DFSClient.DFSOutputStream.DataStreamer public void run() { long

56、lastPacket = 0; while (!closed && clientRunning) { // if the Responder encountered an error, shutdown Responder if (hasError && response != null) { try { response.close(); response.join(); response = null;

57、 } catch (InterruptedException e) { } } Packet one = null; synchronized (dataQueue) { // process IO errors if any boolean doSleep = processDatanodeError(hasError, false); // wait for a packet to be

58、 sent. long now = System.currentTimeMillis(); while ((!closed && !hasError && clientRunning && dataQueue.size() == 0 && (blockStream == null || ( blockStream != null && now - lastPacket < timeoutValue/2)))

59、 || doSleep) { long timeout = timeoutValue/2 - (now-lastPacket); timeout = timeout <= 0 ? 1000 : timeout; try { dataQueue.wait(timeout); now = System.currentTimeMillis(); } catch (Interrupte

60、dException e) { } doSleep = false; } if (closed || hasError || !clientRunning) { continue; } try { // get packet to be sent. if (dataQueue.isEmpty()) {

61、 one = new Packet(); // heartbeat packet } else { one = dataQueue.getFirst(); // regular data packet } long offsetInBlock = one.offsetInBlock; // get new block from namenode. if (blockStr

62、eam == null) { LOG.debug("Allocating new block"); nodes = nextBlockOutputStream(src); this.setName("DataStreamer for file " + src + " block " + block); response = new ResponseProcessor(nodes);

63、 response.start(); } if (offsetInBlock >= blockSize) { throw new IOException("BlockSize " + blockSize + " is smaller than data size. " + " Offset of packet in block

64、" + offsetInBlock + " Aborting file " + src); } ByteBuffer buf = one.getBuffer(); // move packet from dataQueue to ackQueue if (!one.isHeartbea

65、tPacket()) { dataQueue.removeFirst(); dataQueue.notifyAll(); synchronized (ackQueue) { ackQueue.addLast(one); ackQueue.notifyAll(); } } // write out

66、 data to remote datanode blockStream.write(buf.array(), buf.position(), buf.remaining()); if (one.lastPacketInBlock) { blockStream.writeInt(0); // indicate end-of-block } blockStream.flush(); lastPacket = System.currentTimeMillis(); if (LOG.isDebugEnabled()) { LOG.debug("DataStreamer block " + block + " wrote packet seqno:" + on

展開(kāi)閱讀全文
溫馨提示:
1: 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
2: 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
3.本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
5. 裝配圖網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

相關(guān)資源

更多
正為您匹配相似的精品文檔
關(guān)于我們 - 網(wǎng)站聲明 - 網(wǎng)站地圖 - 資源地圖 - 友情鏈接 - 網(wǎng)站客服 - 聯(lián)系我們

copyright@ 2023-2025  zhuangpeitu.com 裝配圖網(wǎng)版權(quán)所有   聯(lián)系電話:18123376007

備案號(hào):ICP2024067431號(hào)-1 川公網(wǎng)安備51140202000466號(hào)


本站為文檔C2C交易模式,即用戶上傳的文檔直接被用戶下載,本站只是中間服務(wù)平臺(tái),本站所有文檔下載所得的收益歸上傳人(含作者)所有。裝配圖網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)上載內(nèi)容本身不做任何修改或編輯。若文檔所含內(nèi)容侵犯了您的版權(quán)或隱私,請(qǐng)立即通知裝配圖網(wǎng),我們立即給予刪除!