Ceilometer 中 meter 的流程(二)

介绍

接着上一篇 meter 的采集,这一篇写的是 meter 的存储。

meter 的存储

存储 meter 整个流程大概如下:

  • agent 收集到 meter 的 samples,进行 push
  • push时,对 samples 进行 transform
  • 获得 pipeline 的 publisher,进行 publish_samples

流程

publisher

接上一节,在方法 poll_and_publish() 中,使用 with as 来调用 publisher()

1
2
3
4
5
6
7
8
9
10
with self.publishers[source.name] as publisher:
try:
samples = list(pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=(source_resources or
pollster_resources or
agent_resources)
))
publisher(samples)

这里的publisher就是类PublishContext
ceilometer/pipeline.py:PublishContext

1
2
3
4
5
6
7
8
9
10
11
12
class PublishContext(object):

def enter(self):
def p(samples):
for p in self.pipelines:
p.publish_samples(self.context,
samples)
return p

def exit(self, exc_type, exc_value, traceback):
for p in self.pipelines:
p.flush(self.context)

在方法 enter() 中 self.context 维护 admin 用户的认证信息;p 是 ceilometer/pipeline.py:Pipeline 的实例

1
2
3
4
5
6
class Pipeline(object):

def publish_samples(self, ctxt, samples):
supported = [s for s in samples if self.source.support_meter(s.name)]
self.sink.publish_samples(ctxt, supported)

sink 维护 source 及其对应的 publishers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Sink(object):

def _publish_samples(self, start, ctxt, samples):
transformed_samples = []
for sample in samples:
sample = self._transform_sample(start, ctxt, sample)
if sample:
transformed_samples.append(sample)

if transformed_samples:
for p in self.publishers:
try:
p.publish_samples(ctxt, transformed_samples)

def publish_samples(self, ctxt, samples):
for meter_name, samples in itertools.groupby(
sorted(samples, key=operator.attrgetter(‘name’)),
operator.attrgetter(‘name’)):
self._publish_samples(0, ctxt, samples)

  • 先对 samples 根据 meter_name 进行升序排列,再根据 meter_name 进行分组
  • 方法 _publish_samples() 中将 samples 进行 transform,然后遍历 publishers 再分别进行 publish
  • publishers 的初始化
    1
    2
    3
    4
    5
    6
    7
    8
    9
    self.publishers = []
    for p in cfg[‘publishers’]:
    if ‘://‘ not in p:
    # Support old format without URL
    p = p + “://“
    try:
    self.publishers.append(publisher.get_publisher(p))
    except Exception:
    LOG.exception(_(“Unable to load publisher %s”), p)

publisher的加载使用的是 stevedore 的 driver 方式,根据 pipeline.yarm 文件中的 publisher 的配置,使用的是 notifier,对应的是

1
2
3
ceilometer.publisher =

notifier = ceilometer.publisher.messaging:NotifierPublisher

使用 rabbitmq 的 topic 方式 push

总结

参考