Radosgw-agent 源码分析

介绍

在同步数据的时候分别对 source、destination 进行抓包,对应源码进行分析。

源码分析

radosgw-agent/cli.py:main()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def main():
# 设置 logger、日志等级

# 解析 args,获得配置项

# 获得 regionmap,对应的 api 是 /admin/config
try:
region_map = client.get_region_map(dest_conn)
except AgentError:

# 配置 endpoint,其中会进行检查 master 或 secondary
client.configure_endpoints(region_map, dest, src, args.metadata_only)
# test_server

# 数据同步
## 初始化 meta_syncer、data_syncer

## prepare_sync
sync.prepare_sync(meta_syncer, args.prepare_error_delay)
if not args.metadata_only:
sync.prepare_sync(data_syncer, args.prepare_error_delay)
## sync
if args.sync_scope == ‘full’:
log.info(‘syncing all metadata’)
meta_syncer.sync(args.num_workers, args.lock_timeout)
if not args.metadata_only:
log.info(‘syncing all data’)
data_syncer.sync(args.num_workers, args.lock_timeout)
log.info(‘Finished full sync. Check logs to see any issues that ‘
‘incremental sync will retry.’)
else:
sync.incremental_sync(meta_syncer, data_syncer,
args.num_workers,
args.lock_timeout,
args.incremental_sync_delay,
args.metadata_only,
args.prepare_error_delay)

prepare_sync

以 meta_syncer 为例
/radosgw-agent/sync.py:class IncrementalSyncer(Syncer)

1
2
3
4
5
6
7
8
9
10
11
12
13
def prepare(self):
# src 的 shards ,对应的方法是 self.num_shards = client.num_log_shards(self.src_conn, self.type)
# 该方法使用的 api 是 /admin/log?type=metadata
# 得到的结果类似 {”num_objects”:64}
self.init_num_shards()
self.shard_info = {}
self.shard_work = {}
for shard_num in xrange(self.num_shards):
marker, retries = self.get_worker_bound(shard_num)
last_marker, log_entries = self.get_log_entries(shard_num, marker)
self.shard_work[shard_num] = log_entries, retries
self.shard_info[shard_num] = last_marker
self.prepared_at = time.time()

对于上面代码

  1. 获取 dest 的 bound
    1
    marker, retries = self.get_worker_bound(shard_num)

    获取 dest 的 bound,对应的 api 是 /admin/replica_log?bounds&type=data&id=0
    得到的结果是
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    {
    marker“: “1_1453184408.808048_5.1”,
    oldest_time“: “0.000000”,
    markers“: [
    {
    entity“: “radosgw-agent”,
    position_marker“: “1_1453184408.808048_5.1”,
    position_time“: “0.000000”,
    items_in_progress“: [ ]
    }

    ]
    }



    1
    {”Code“:“NoSuchKey”}

    marker, retries 都为空
  2. 获取 src 的 log_entries
    1
    last_marker, log_entries = self.get_log_entries(shard_num, marker)

    获取 src 的 log_entries,对应的 api 是 /admin/log?marker=%20&type=metadata&id=0&max-entries=1000
    得到的结果一个 marker,包含许多的 entries
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    {
    marker“: “1_1453461773.098122_674.1”,
    truncated“: false,
    entries“: [
    {
    id“: “1_1448948065.089679_11.1”,
    section“: “bucket.instance”,
    name“: “pengdake_test:center-1.5447.2”,
    timestamp“: “2015-12-01 05:34:25.089679Z”,
    data“: {
    read_version“: {
    tag“: “”,
    ver“: 0
    }
    ,

    write_version“: {
    tag“: “_yl8VxMaeEnZZpEAZE-M9vwX”,
    ver“: 1
    }
    ,

    status“: {
    status“: “write”
    }

    }

    }, …

sync

考虑 incremental 方式,radosgw-agent/sync.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def incremental_sync(meta_syncer, data_syncer, num_workers, lock_timeout,
incremental_sync_delay, metadata_only, error_delay):

“””Run a continuous incremental sync.

This will run forever, pausing between syncs by a
incremental_sync_delay seconds.
“””

while True:
try:
meta_syncer.sync(num_workers, lock_timeout)
if not metadata_only:
data_syncer.sync(num_workers, lock_timeout)
except Exception:
log.warn(‘error doing incremental sync, will try again. Traceback:’,
exc_info=True)

# prepare data before sleeping due to rgw_log_bucket_window
if not metadata_only:
prepare_sync(data_syncer, error_delay)
log.info(‘waiting %d seconds until next sync’,
incremental_sync_delay)
time.sleep(incremental_sync_delay)
prepare_sync(meta_syncer, error_delay)
  1. 初始化
    其中meta_syncer.sync(num_workers, lock_timeout),最终方法是radosgw-agent/sync.py:class Syncer()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    def sync(self, num_workers, log_lock_time):
    # 初始化 multiprocessing,并启动
    workQueue = multiprocessing.Queue()
    resultQueue = multiprocessing.Queue()
    processes = [self.worker_cls(workQueue,
    resultQueue,
    log_lock_time,
    self.src,
    self.dest,
    daemon_id=self.daemon_id,
    max_entries=self.max_entries,
    object_sync_timeout=self.object_sync_timeout,
    )
    for i in xrange(num_workers)]
    for process in processes:
    process.daemon = True
    process.start()
    self.wait_until_ready()
    log.info(‘Starting sync’)
    # enqueue the shards to be synced
    # 将 shards 放入队列
    num_items = 0
    for item in self.generate_work():
    num_items += 1
    workQueue.put(item)
    # add a poison pill for each worker
    for i in xrange(num_workers):
    workQueue.put(None)
    # pull the results out as they are produced
    # 获取处理的结果
    retries = {}
    for i in xrange(num_items):
    result, item = resultQueue.get()
    shard_num, retries = item
    if result == worker.RESULT_SUCCESS:
    log.debug(‘synced item %r successfully’, item)
    self.complete_item(shard_num, retries)
    else:
    log.error(‘error syncing shard %d’, shard_num)
    retries.append(shard_num)
    log.info(‘%d/%d items processed’, i + 1, num_items)
    if retries:
    log.error(‘Encountered errors syncing these %d shards: %r’,
    len(retries), retries)
    2. 同步
    对于上 1 中的代码,self.generate_work()对应的是:
    1
    2
    def generate_work(self):
    return self.shard_work.iteritems()

    这里的 shard_work 就是上述 prepare 中的 shard_work,其对应的格式是
    1
    shard_num, (log_entries, retries)

    将其放入队列后,process 就的得到一个 work,进行 process.start(),对应的方法是
    radosgw-agent/worker.py:class IncrementalMixin()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    def run(self):
    self.prepare_lock()
    while True:
    # 获得 shard_work 中的一个
    item = self.work_queue.get()
    if item is None:
    dev_log.info(‘process %s is done. Exiting’, self.ident)
    break
    shard_num, (log_entries, retries) = item
    log.info(‘%s is processing shard number %d’,
    self.ident, shard_num)
    # first, lock the log
    try:
    self.lock_shard(shard_num)
    except SkipShard:
    continue
    result = RESULT_SUCCESS
    try:
    # 同步
    new_retries = self.sync_entries(log_entries, retries)
    except Exception:
    log.exception(‘syncing entries for shard %d failed’,
    shard_num)
    result = RESULT_ERROR
    new_retries = []
    # finally, unlock the log
    self.unlock_shard()
    self.result_queue.put((result, (shard_num, new_retries)))
    log.info(‘finished processing shard %d’, shard_num)

    如上代码中new_retries = self.sync_entries(log_entries, retries)对应的是
    radosgw-agent/worker.py:class IncrementalMixin()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    def sync_entries(self, log_entries, retries):
    try:
    # 整理 entries 中的每一个 entry
    entries = [_meta_entry_from_json(entry) for entry in log_entries]
    except KeyError:
    log.error(‘log containing bad key is: %s’, log_entries)
    raise
    new_retries = []
    # 取 entry 的 [section, name] 并去重
    mentioned = set([(entry.section, entry.name) for entry in entries])
    # 整理 bound
    split_retries = [tuple(entry.split(‘/‘, 1)) for entry in retries]
    # 组合并同步
    for section, name in mentioned.union(split_retries):
    sync_result = self.sync_meta(section, name)
    # 同步失败的将返回,下一次将重试
    if sync_result == RESULT_ERROR:
    new_retries.append(section + ‘/‘ + name)
    return new_retries

    如上代码中sync_result = self.sync_meta(section, name)对应的是
    radosgw-agent/worker.py:class MetadataWorker(Worker)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    def sync_meta(self, section, name):
    log.debug(‘syncing metadata type %s key “%s”‘, section, name)
    try:
    # 获取 source 的 某个 metadata
    metadata = client.get_metadata(self.src_conn, section, name)
    except NotFound:
    log.debug(‘%s “%s” not found on master, deleting from secondary’,
    section, name)
    try:
    # 若没有找到,则在 dest 上删除
    client.delete_metadata(self.dest_conn, section, name)
    except NotFound:
    # Since this error is handled appropriately, return success
    return RESULT_SUCCESS
    except Exception as e:
    log.warn(‘error getting metadata for %s “%s”: %s’,
    section, name, e, exc_info=True)
    return RESULT_ERROR
    else:
    try:
    # 更新 metadata
    client.update_metadata(self.dest_conn, section, name, metadata)
    return RESULT_SUCCESS
    except Exception as e:
    log.warn(‘error updating metadata for %s “%s”: %s’,
    section, name, e, exc_info=True)
    return RESULT_ERROR
    3. 获取处理结果
    其代码是
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    retries = {}
    for i in xrange(num_items):
    result, item = resultQueue.get()
    shard_num, retries = item
    if result == worker.RESULT_SUCCESS:
    log.debug(‘synced item %r successfully’, item)
    self.complete_item(shard_num, retries)
    else:
    log.error(‘error syncing shard %d’, shard_num)
    retries.append(shard_num)
    log.info(‘%d/%d items processed’, i + 1, num_items)
    if retries:
    log.error(‘Encountered errors syncing these %d shards: %r’,
    len(retries), retries)

    从 queue 中去的一个处理结果,
    若失败,报错,
    若成功,执行self.complete_item(shard_num, retries)
    radosgw-agent/sync.py:class Syncer(object)

    <span class="function"><span class="keyword">def</span> <span class="title">complete_item</span><span class="params">(self, shard_num, retries)</span>:</span> <span class="string">"""Called when syncing a single item completes successfully"""</span> marker = self.shard_info.get(shard_num) <span class="comment"># 获取该 shard_num 的 marker,若没有 shard_num,则返回空</span> <span class="keyword">if</span> <span class="keyword">not</span> marker: <span class="keyword">return</span> <span class="keyword">try</span>: <span class="comment"># post 该 shard 的 bound</span> data = [dict(name=retry, time=worker.DEFAULT_TIME) <span class="keyword">for</span> retry <span class="keyword">in</span> retries] client.set_worker_bound(self.dest_conn, self.type, marker, worker.DEFAULT_TIME, self.daemon_id, shard_num, data) <span class="keyword">except</span> Exception: log.warn(<span class="string">'could not set worker bounds, may repeat some work.'</span> <span class="string">'Traceback:'</span>, exc_info=<span class="keyword">True</span>) 
    

总结

  • 在 sync 中 2. 同步 中return new_retries,同步失败的将返回,若没有失败的则返回空;
    当获取处理结果的时候,从 queue 中获得该 retries,根据 shard_num 来 post admin/replica_log。