ceilometer-central 服务

分析

1
2
3
def main():
service.prepare_service()
os_service.launch(manager.AgentManager()).wait()

第一步准备服务service.prepare_service()

1
2
3
4
5
6
7
8
9
10
11
12
def prepare_service(argv=None):
gettextutils.install(‘ceilometer’)
gettextutils.enable_lazy() #定义国际化
log_levels = (cfg.CONF.default_log_levels +
[‘stevedore=INFO’, ‘keystoneclient=INFO’])
cfg.set_defaults(log.log_opts,
default_log_levels=log_levels) #设置日志等级
if argv is None:
argv = sys.argv
cfg.CONF(argv[1:], project=‘ceilometer’)
log.setup(‘ceilometer’)
messaging.setup() #设置默认消息队列

第二步启动服务os_service.launch(manager.AgentManager()).wait()

1
2
3
4
5
6
7
8
9
def launch(service, workers=1):
if workers is None or workers == 1:
launcher = ServiceLauncher()
launcher.launch_service(service)
else:
launcher = ProcessLauncher()
launcher.launch_service(service, workers=workers)

return launcher
1
2
3
4
5
6
7
8
9
10
class ServiceLauncher(Launcher):

def wait(self, ready_callback=None):
systemd.notify_once()
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal(ready_callback)
if not _is_sighup_and_daemon(signo):
return status
self.restart()

最终是调用这里的wait()

launcher.launch_service(service)

1
2
3
4
5
6
7
8
9
def launch_service(self, service):
“””Load and start the given service.

:param service: The service you would like to start.
:returns: None

“””

service.backdoor_port = self.backdoor_port
self.services.add(service)
1
2
3
4
5
6
7
8
9
10
class Services(object):

def init(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()

def add(self, service):
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)

这里是将各个服务放到了线程组中。

ceilometer/ceilometer/agent.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

class AgentManager(os_service.Service):

def start(self):
self.pipeline_manager = publish_pipeline.setup_pipeline()

self.partition_coordinator.start()
self.join_partitioning_groups()

# allow time for coordination if necessary
delay_start = self.partition_coordinator.is_active()

for interval, task in six.iteritems(self.setup_polling_tasks()):
self.tg.add_timer(interval,
self.interval_task,
initial_delay=interval if delay_start else None,
task=task)
self.tg.add_timer(cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat)

这里定时去之执行task,注意self.interval_task

具体某项的采集

举例
image = ceilometer.image.glance:ImagePollster

对应到ceilometer/image/glance.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class ImagePollster(_Base):
def get_samples(self, manager, cache, resources):
for endpoint in resources:
for image in self._iter_images(manager.keystone, cache, endpoint):
yield sample.Sample(
name=‘image’,
type=sample.TYPE_GAUGE,
unit=‘image’,
volume=1,
user_id=None,
project_id=image.owner,
resource_id=image.id,
timestamp=timeutils.isotime(),
resource_metadata=self.extract_image_metadata(image),
)

self._iter_images(manager.keystone, cache, endpoint)
这是一个生成器,其中根据endpoint建立glanceclient去获取image信息。