《ceph源碼分析之讀寫操作流程(2)》由會(huì)員分享,可在線閱讀,更多相關(guān)《ceph源碼分析之讀寫操作流程(2)(9頁珍藏版)》請(qǐng)?jiān)谘b配圖網(wǎng)上搜索。
1、
ceph源碼分析之讀寫操作流程(2)
上一篇介紹了ceph存儲(chǔ)在上兩層的消息邏輯,這一篇主要介紹一下讀寫操作在底兩層的流程。下圖是上一篇消息流程的一個(gè)總結(jié)。上在ceph中,讀寫操作由于分布式存儲(chǔ)的原因,故走了不同流程。
對(duì)于讀操作而言:
1.客戶端直接計(jì)算出存儲(chǔ)數(shù)據(jù)所屬于的主osd,直接給主osd上發(fā)送消息。
2.主osd收到消息后,可以調(diào)用Filestore直接讀取處在底層文件系統(tǒng)中的主pg里面的內(nèi)容然后返回給客戶端。具體調(diào)用函數(shù)在ReplicatedPG::do_osd_ops中實(shí)現(xiàn)。讀操作代碼流程如圖:如我們之前說的,當(dāng)確定讀操作為主osd的消息時(shí)(CEPH_MS
2、G_OSD_OP類型),會(huì)調(diào)用到ReplicatePG::do_osd_op函數(shù),該函數(shù)對(duì)類型做進(jìn)一步判斷,當(dāng)發(fā)現(xiàn)為讀類型(CEPH_OSD_OP_READ)時(shí),會(huì)調(diào)用FileStore中的函數(shù)對(duì)磁盤上數(shù)據(jù)進(jìn)行讀。
[cpp] view plain copy int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) { …… switch (op.op) { …… case CEPH_OSD_OP_READ: ++ct
3、x->num_read; { // read into a buffer bufferlist bl; int r = osd->store->read(coll, soid, op.extent.offset, op.extent.length, bl);
授課:XXX
// 調(diào)用FileStore::read從底層文件系統(tǒng)讀取 …… } case CEPH_OSD_OP_WRITE
4、: ++ctx->num_write; { ……//寫操作只是做準(zhǔn)備工作,并不實(shí)際的寫 } …… } } FileStore::read函數(shù)是底層具體的實(shí)現(xiàn),會(huì)通過調(diào)用系統(tǒng)函數(shù)如::open,::pread,::close等函數(shù)來完成具體的操作。[cpp] view plain copy int FileStore::read( coll_t cid, const ghobject_t& oid, uint64_t offset, siz
5、e_t len, bufferlist& bl, bool allow_eio) { …… int r = lfn_open(cid, oid, false, &fd); …… got = safe_pread(**fd, bptr.c_str(), len, offset); //FileStore::safe_pread中調(diào)用了::pread …… lfn_close(fd); …… } 而對(duì)于寫操作而言,由于要保證數(shù)據(jù)寫入的同步性就會(huì)復(fù)雜很多:
1.首先客戶端會(huì)將數(shù)據(jù)發(fā)送給
6、主osd,
2.主osd同樣要先進(jìn)行寫操作預(yù)處理,完成后它要發(fā)送寫消息給其他的從osd,讓他們對(duì)副本pg進(jìn)行更改,
3.從osd通過FileJournal完成寫操作到Journal中后發(fā)送消息告訴主osd說完成,進(jìn)入5
授課:XXX
4.當(dāng)主osd收到所有的從osd完成寫操作的消息后,會(huì)通過FileJournal完成自身的寫操作到Journal中。完成后會(huì)通知客戶端,已經(jīng)完成了寫操作。
5.主osd,從osd的線程開始工作調(diào)用Filestore將Journal中的數(shù)據(jù)寫入到底層文件系統(tǒng)中。在介紹寫操作的流程前,需要先介紹一下ceph中的callback函數(shù)。
Context
7、類定義在src/include文件中,該類是一個(gè)回調(diào)函數(shù)類的抽象類,繼承它的類只要在子類實(shí)現(xiàn)它的finish函數(shù),在finish函數(shù)調(diào)用自己需要回調(diào)的函數(shù),就可以完成回調(diào)。
[cpp] view plain copy class Context { Context(const Context& other); const Context& operator=(const Context& other); protected: virtual void finish(int r) = 0; public: Context(
8、) {} virtual ~Context() {} // we want a virtual destructor!!! virtual void complete(int r) { finish(r); delete this; } }; Finisher類是在src/common中定義的一個(gè)專門查看操作是否結(jié)束的一個(gè)類。在這個(gè)類里面擁有一個(gè)線程finisher_thread和一個(gè)類型為Context指針的隊(duì)列finisher_queue。當(dāng)一個(gè)操作線程完成自己的操作后,會(huì)將Context類型對(duì)象送入隊(duì)列。此時(shí)f
9、inisher_thread線程循環(huán)監(jiān)視著自己的finisher_queue隊(duì)列,當(dāng)發(fā)現(xiàn)了有新進(jìn)入的Context時(shí),會(huì)調(diào)用這個(gè)Context::complete函數(shù),這個(gè)函數(shù)則會(huì)調(diào)用到Context子類自己實(shí)現(xiàn)的finish函數(shù)。來處理操作完成后的后續(xù)工作。
授課:XXX
[cpp] view plain copy class Finisher { CephContext *cct; …… vector<Context*> finisher_queue; …… void *finisher_thread_entry()
10、; struct FinisherThread : public Thread { Finisher *fin; FinisherThread(Finisher *f) : fin(f) {} void* entry() { return (void*)fin->finisher_thread_entry(); } } finisher_thread; …… } void *Finisher::finisher_thread_entry() { …… while(!f
11、inisher_stop){ while(!finisher_queue.empty()){ …… vector<Context*> ls ls.swap(finisher_queue); for (vector<Context*>::iterator p = ls.begin(); p != ls.end(); ++p) { if (*p) {
12、 //這里面調(diào)用Context子類實(shí)現(xiàn)的finish函數(shù) (*p)->complete(0); } } } } } 在寫操作中涉及了多個(gè)線程和消息隊(duì)列的協(xié)同工作,需要注意的是一個(gè)類擁有一個(gè)Finisher成員時(shí),以為著它同時(shí)獲得了一個(gè)隊(duì)列和一個(gè)執(zhí)行線程。
授課:XXX
OSD中處理讀寫操作是線程池和消息隊(duì)列(有很多,其他暫時(shí)不討論):
[cpp] view plain copy ThreadPool op_tp
13、; ThreadPool::WorkQueueVal<pair<PGRef,OpRequestRef>, PGRef> &op_wq;
FileJournal中擁有的線程和消息隊(duì)列:
[cpp] view plain copy Write write_thread; deque<write_item> writeq;
其父類Journal中擁有線程和消息隊(duì)列(引用自之前說的JournalingObjectStore類中):
[cpp] view plain copy Finisher finisher_
14、thread;
FileStore中擁有的線程和消息隊(duì)列:
[cpp] view plain copy ThreadPool op_tp; OpWQ op_wq;//Filestore中實(shí)現(xiàn),繼承自ThreadPool::WorkQueue<OpSequencer> Finisher ondisk_finisher; Finisher op_finisher; 前一章說過在前兩層,OSD根據(jù)不同的消息類型,選擇了主OSD處理和從OSD的處理,以下介紹的寫流程,就是在收到具體寫操作以后,本地OSD開始的工作。
授課:X
15、XX
寫的邏輯流程圖如圖:從圖中我們可以看到寫操作分為以下幾步:
1.OSD::op_tp線程從OSD::op_wq中拿出來操作如本文開始的圖上描述,具體代碼流是 ReplicatePG::apply_repop中創(chuàng)建回調(diào)類C_OSD_OpCommit和C_OSD_OpApplied
FileStore::queue_transactions中創(chuàng)建了回調(diào)類C_JournaledAhead
2.FileJournal::write_thread線程從FileJournal::writeq中拿出來操作,主要就是寫數(shù)據(jù)到具體的journal中,具體代碼流:3.Journ
16、al::Finisher.finisher_thread線程從Journal::Finisher.finish_queue中拿出來操作,通過調(diào)用C_JournalAhead留下的回調(diào)函數(shù)FileStore:_journaled_ahead,該線程開始工作兩件事:首先入底層FileStore::op_wq通知開始寫,再入FileStore::ondisk_finisher.finisher_queue通知可以返回。具體代碼流:4.FileStore::ondisk_finisher.finisher_thread線程從FileStore::ondisk_finisher.finisher_que
17、ue中拿出來操作,通過調(diào)用C_OSD_OpCommit留下來的回調(diào)函數(shù)ReplicatePG::op_commit,通知客戶端寫操作成功5.FileStore::op_tp線程池從FileStore::op_wq中拿出操作(此處的OP_WQ繼承了父類ThreadPool::WorkQueue重寫了_process和_process_finish等函數(shù),所以不同于OSD::op_wq,它有自己的工作流程),首先調(diào)用FileStore::_do_op,完成后調(diào)用FileStore::_finish_op。6. FileStore::op_finisher.finisher_thread線程從FileStore::op_finisher.finisher_queue中拿出來操作,通過調(diào)用C_OSD_OpApplied留下來的回調(diào)函數(shù)ReplicatePG::op_applied,通知數(shù)據(jù)可讀。
(注:可編輯下載,若有不當(dāng)之處,請(qǐng)指正,謝謝!)
授課:XXX