Radosgw-agent 源码分析
介绍
在同步数据的时候分别对 source、destination 进行抓包,对应源码进行分析。
源码分析
radosgw-agent/cli.py:main()
1 | def main(): |
prepare_sync
以 meta_syncer 为例
/radosgw-agent/sync.py:class IncrementalSyncer(Syncer)
1 | def prepare(self): |
对于上面代码
- 获取 dest 的 bound
1
marker, retries = self.get_worker_bound(shard_num)
获取 dest 的 bound,对应的 api 是 /admin/replica_log?bounds&type=data&id=0
得到的结果是1
2
3
4
5
6
7
8
9
10
11
12{
“marker“: “1_1453184408.808048_5.1”,
“oldest_time“: “0.000000”,
“markers“: [
{
“entity“: “radosgw-agent”,
“position_marker“: “1_1453184408.808048_5.1”,
“position_time“: “0.000000”,
“items_in_progress“: [ ]
}
]
}
或1
{”Code“:“NoSuchKey”}
marker, retries 都为空 - 获取 src 的 log_entries
1
last_marker, log_entries = self.get_log_entries(shard_num, marker)
获取 src 的 log_entries,对应的 api 是 /admin/log?marker=%20&type=metadata&id=0&max-entries=1000
得到的结果一个 marker,包含许多的 entries1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23{
“marker“: “1_1453461773.098122_674.1”,
“truncated“: false,
“entries“: [
{
“id“: “1_1448948065.089679_11.1”,
“section“: “bucket.instance”,
“name“: “pengdake_test:center-1.5447.2”,
“timestamp“: “2015-12-01 05:34:25.089679Z”,
“data“: {
“read_version“: {
“tag“: “”,
“ver“: 0
},
“write_version“: {
“tag“: “_yl8VxMaeEnZZpEAZE-M9vwX”,
“ver“: 1
},
“status“: {
“status“: “write”
}
}
}, …
sync
考虑 incremental 方式,radosgw-agent/sync.py
1 | def incremental_sync(meta_syncer, data_syncer, num_workers, lock_timeout, |
初始化
其中meta_syncer.sync(num_workers, lock_timeout)
,最终方法是radosgw-agent/sync.py:class Syncer()2. 同步1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44def sync(self, num_workers, log_lock_time):
# 初始化 multiprocessing,并启动
workQueue = multiprocessing.Queue()
resultQueue = multiprocessing.Queue()
processes = [self.worker_cls(workQueue,
resultQueue,
log_lock_time,
self.src,
self.dest,
daemon_id=self.daemon_id,
max_entries=self.max_entries,
object_sync_timeout=self.object_sync_timeout,
)
for i in xrange(num_workers)]
for process in processes:
process.daemon = True
process.start()
self.wait_until_ready()
log.info(‘Starting sync’)
# enqueue the shards to be synced
# 将 shards 放入队列
num_items = 0
for item in self.generate_work():
num_items += 1
workQueue.put(item)
# add a poison pill for each worker
for i in xrange(num_workers):
workQueue.put(None)
# pull the results out as they are produced
# 获取处理的结果
retries = {}
for i in xrange(num_items):
result, item = resultQueue.get()
shard_num, retries = item
if result == worker.RESULT_SUCCESS:
log.debug(‘synced item %r successfully’, item)
self.complete_item(shard_num, retries)
else:
log.error(‘error syncing shard %d’, shard_num)
retries.append(shard_num)
log.info(‘%d/%d items processed’, i + 1, num_items)
if retries:
log.error(‘Encountered errors syncing these %d shards: %r’,
len(retries), retries)
对于上 1 中的代码,self.generate_work()
对应的是:1
2def generate_work(self):
return self.shard_work.iteritems()
这里的 shard_work 就是上述 prepare 中的 shard_work,其对应的格式是1
shard_num, (log_entries, retries)
将其放入队列后,process 就的得到一个 work,进行 process.start(),对应的方法是
radosgw-agent/worker.py:class IncrementalMixin()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29def run(self):
self.prepare_lock()
while True:
# 获得 shard_work 中的一个
item = self.work_queue.get()
if item is None:
dev_log.info(‘process %s is done. Exiting’, self.ident)
break
shard_num, (log_entries, retries) = item
log.info(‘%s is processing shard number %d’,
self.ident, shard_num)
# first, lock the log
try:
self.lock_shard(shard_num)
except SkipShard:
continue
result = RESULT_SUCCESS
try:
# 同步
new_retries = self.sync_entries(log_entries, retries)
except Exception:
log.exception(‘syncing entries for shard %d failed’,
shard_num)
result = RESULT_ERROR
new_retries = []
# finally, unlock the log
self.unlock_shard()
self.result_queue.put((result, (shard_num, new_retries)))
log.info(‘finished processing shard %d’, shard_num)
如上代码中new_retries = self.sync_entries(log_entries, retries)
对应的是
radosgw-agent/worker.py:class IncrementalMixin()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19def sync_entries(self, log_entries, retries):
try:
# 整理 entries 中的每一个 entry
entries = [_meta_entry_from_json(entry) for entry in log_entries]
except KeyError:
log.error(‘log containing bad key is: %s’, log_entries)
raise
new_retries = []
# 取 entry 的 [section, name] 并去重
mentioned = set([(entry.section, entry.name) for entry in entries])
# 整理 bound
split_retries = [tuple(entry.split(‘/‘, 1)) for entry in retries]
# 组合并同步
for section, name in mentioned.union(split_retries):
sync_result = self.sync_meta(section, name)
# 同步失败的将返回,下一次将重试
if sync_result == RESULT_ERROR:
new_retries.append(section + ‘/‘ + name)
return new_retries
如上代码中sync_result = self.sync_meta(section, name)
对应的是
radosgw-agent/worker.py:class MetadataWorker(Worker)3. 获取处理结果1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27def sync_meta(self, section, name):
log.debug(‘syncing metadata type %s key “%s”‘, section, name)
try:
# 获取 source 的 某个 metadata
metadata = client.get_metadata(self.src_conn, section, name)
except NotFound:
log.debug(‘%s “%s” not found on master, deleting from secondary’,
section, name)
try:
# 若没有找到,则在 dest 上删除
client.delete_metadata(self.dest_conn, section, name)
except NotFound:
# Since this error is handled appropriately, return success
return RESULT_SUCCESS
except Exception as e:
log.warn(‘error getting metadata for %s “%s”: %s’,
section, name, e, exc_info=True)
return RESULT_ERROR
else:
try:
# 更新 metadata
client.update_metadata(self.dest_conn, section, name, metadata)
return RESULT_SUCCESS
except Exception as e:
log.warn(‘error updating metadata for %s “%s”: %s’,
section, name, e, exc_info=True)
return RESULT_ERROR
其代码是1
2
3
4
5
6
7
8
9
10
11
12
13
14retries = {}
for i in xrange(num_items):
result, item = resultQueue.get()
shard_num, retries = item
if result == worker.RESULT_SUCCESS:
log.debug(‘synced item %r successfully’, item)
self.complete_item(shard_num, retries)
else:
log.error(‘error syncing shard %d’, shard_num)
retries.append(shard_num)
log.info(‘%d/%d items processed’, i + 1, num_items)
if retries:
log.error(‘Encountered errors syncing these %d shards: %r’,
len(retries), retries)
从 queue 中去的一个处理结果,
若失败,报错,
若成功,执行self.complete_item(shard_num, retries)
,
radosgw-agent/sync.py:class Syncer(object)<span class="function"><span class="keyword">def</span> <span class="title">complete_item</span><span class="params">(self, shard_num, retries)</span>:</span> <span class="string">"""Called when syncing a single item completes successfully"""</span> marker = self.shard_info.get(shard_num) <span class="comment"># 获取该 shard_num 的 marker,若没有 shard_num,则返回空</span> <span class="keyword">if</span> <span class="keyword">not</span> marker: <span class="keyword">return</span> <span class="keyword">try</span>: <span class="comment"># post 该 shard 的 bound</span> data = [dict(name=retry, time=worker.DEFAULT_TIME) <span class="keyword">for</span> retry <span class="keyword">in</span> retries] client.set_worker_bound(self.dest_conn, self.type, marker, worker.DEFAULT_TIME, self.daemon_id, shard_num, data) <span class="keyword">except</span> Exception: log.warn(<span class="string">'could not set worker bounds, may repeat some work.'</span> <span class="string">'Traceback:'</span>, exc_info=<span class="keyword">True</span>)
总结
- 在 sync 中 2. 同步 中
return new_retries
,同步失败的将返回,若没有失败的则返回空;
当获取处理结果的时候,从 queue 中获得该 retries,根据 shard_num 来 post admin/replica_log。