Ceilometer 中 meter 的流程(二)
介绍
接着上一篇 meter 的采集,这一篇写的是 meter 的存储。
meter 的存储
存储 meter 整个流程大概如下:
- agent 收集到 meter 的 samples,进行 push
- push时,对 samples 进行 transform
- 获得 pipeline 的 publisher,进行 publish_samples
流程
publisher
接上一节,在方法 poll_and_publish() 中,使用 with as 来调用 publisher()
1 | with self.publishers[source.name] as publisher: |
这里的publisher就是类PublishContext
ceilometer/pipeline.py:PublishContext
1 | class PublishContext(object): |
在方法 enter() 中 self.context 维护 admin 用户的认证信息;p 是 ceilometer/pipeline.py:Pipeline 的实例
1
2
3
4
5
6class Pipeline(object):
…
def publish_samples(self, ctxt, samples):
supported = [s for s in samples if self.source.support_meter(s.name)]
self.sink.publish_samples(ctxt, supported)
…
sink 维护 source 及其对应的 publishers
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19class Sink(object):
…
def _publish_samples(self, start, ctxt, samples):
transformed_samples = []
for sample in samples:
sample = self._transform_sample(start, ctxt, sample)
if sample:
transformed_samples.append(sample)
if transformed_samples:
for p in self.publishers:
try:
p.publish_samples(ctxt, transformed_samples)
…
def publish_samples(self, ctxt, samples):
for meter_name, samples in itertools.groupby(
sorted(samples, key=operator.attrgetter(‘name’)),
operator.attrgetter(‘name’)):
self._publish_samples(0, ctxt, samples)
- 先对 samples 根据 meter_name 进行升序排列,再根据 meter_name 进行分组
- 方法 _publish_samples() 中将 samples 进行 transform,然后遍历 publishers 再分别进行 publish
- publishers 的初始化
1
2
3
4
5
6
7
8
9self.publishers = []
for p in cfg[‘publishers’]:
if ‘://‘ not in p:
# Support old format without URL
p = p + “://“
try:
self.publishers.append(publisher.get_publisher(p))
except Exception:
LOG.exception(_(“Unable to load publisher %s”), p)
publisher的加载使用的是 stevedore 的 driver 方式,根据 pipeline.yarm 文件中的 publisher 的配置,使用的是 notifier,对应的是
1 | ceilometer.publisher = |
使用 rabbitmq 的 topic 方式 push