ceilometer-central 服务
分析
1
2
3def main():
service.prepare_service()
os_service.launch(manager.AgentManager()).wait()
第一步准备服务service.prepare_service()
1
2
3
4
5
6
7
8
9
10
11
12def prepare_service(argv=None):
gettextutils.install(‘ceilometer’)
gettextutils.enable_lazy() #定义国际化
log_levels = (cfg.CONF.default_log_levels +
[‘stevedore=INFO’, ‘keystoneclient=INFO’])
cfg.set_defaults(log.log_opts,
default_log_levels=log_levels) #设置日志等级
if argv is None:
argv = sys.argv
cfg.CONF(argv[1:], project=‘ceilometer’)
log.setup(‘ceilometer’)
messaging.setup() #设置默认消息队列
第二步启动服务os_service.launch(manager.AgentManager()).wait()
1
2
3
4
5
6
7
8
9def launch(service, workers=1):
if workers is None or workers == 1:
launcher = ServiceLauncher()
launcher.launch_service(service)
else:
launcher = ProcessLauncher()
launcher.launch_service(service, workers=workers)
return launcher1
2
3
4
5
6
7
8
9
10class ServiceLauncher(Launcher):
…
def wait(self, ready_callback=None):
systemd.notify_once()
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal(ready_callback)
if not _is_sighup_and_daemon(signo):
return status
self.restart()
最终是调用这里的wait()
launcher.launch_service(service)
1
2
3
4
5
6
7
8
9def launch_service(self, service):
“””Load and start the given service.
:param service: The service you would like to start.
:returns: None
“””
service.backdoor_port = self.backdoor_port
self.services.add(service)1
2
3
4
5
6
7
8
9
10class Services(object):
def init(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()
def add(self, service):
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)
这里是将各个服务放到了线程组中。
ceilometer/ceilometer/agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19…
class AgentManager(os_service.Service):
…
def start(self):
self.pipeline_manager = publish_pipeline.setup_pipeline()
self.partition_coordinator.start()
self.join_partitioning_groups()
# allow time for coordination if necessary
delay_start = self.partition_coordinator.is_active()
for interval, task in six.iteritems(self.setup_polling_tasks()):
self.tg.add_timer(interval,
self.interval_task,
initial_delay=interval if delay_start else None,
task=task)
self.tg.add_timer(cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat)
这里定时去之执行task,注意self.interval_task
。
具体某项的采集
举例image = ceilometer.image.glance:ImagePollster
对应到ceilometer/image/glance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15class ImagePollster(_Base):
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for image in self._iter_images(manager.keystone, cache, endpoint):
yield sample.Sample(
name=‘image’,
type=sample.TYPE_GAUGE,
unit=‘image’,
volume=1,
user_id=None,
project_id=image.owner,
resource_id=image.id,
timestamp=timeutils.isotime(),
resource_metadata=self.extract_image_metadata(image),
)
self._iter_images(manager.keystone, cache, endpoint)
这是一个生成器,其中根据endpoint建立glanceclient去获取image信息。