ceilometer-alarm 源码

介绍

以一个alarm为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
+—————————|—————————————————–+
| Property | Value |

+—————————|—————————————————–+
| alarm_actions | [u’log://‘] |

| alarm_id | a020c017-b2a9-422e-b1bc-c414c98091d1 |
| comparison_operator | gt |
| description | instance running hot |
| enabled | True |
| evaluation_periods | 3 |
| exclude_outliers | False |
| insufficient_data_actions | [] |
| meter_name | cpu_util |
| name | cpu_high |
| ok_actions | [] |
| period | 600 |
| project_id | |
| query | resource_id == 0da3f008-60d7-4b10-a7eb-b143a842a25b |
| repeat_actions | False |
| state | ok |
| statistic | avg |
| threshold | 70.0 |
| type | threshold |
| user_id | 3dbf0919d60d4025842e6ea149e4aeba |
+—————————|—————————————————–+

此alarm为threshold类型,其方法调用在
ceilometer/alarm/evaluator/threshold.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def evaluate(self, alarm):
#判断是否在包含的时间内
if not self.within_time_constraint(alarm):
LOG.debug(_(‘Attempted to evaluate alarm %s, but it is not ‘
‘within its time constraint.’) % alarm.alarm_id)
return

query = self._bound_duration(
alarm,
alarm.rule[‘query’]
)

statistics = self._sanitize(
alarm,
self._statistics(alarm, query)
)

if self._sufficient(alarm, statistics):
def _compare(stat):
op = COMPARATORS[alarm.rule[‘comparison_operator’]]
value = getattr(stat, alarm.rule[‘statistic’])
limit = alarm.rule[‘threshold’]
LOG.debug(_(‘comparing value %(value)s against threshold’
‘ %(limit)s’) %
{‘value’: value, ‘limit’: limit})
return op(value, limit)

self._transition(alarm,
statistics,
map(_compare, statistics))

先分析

1
2
3
4
query = self._bound_duration(
alarm,
alarm.rule[‘query’]
)

这里的alarm.rule[‘query’]为resource_id == 0da3f008-60d7-4b10-a7eb-b143a842a25b

在查看函数_bound_duration

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@classmethod
def _bound_duration(cls, alarm, constraints):
“””Bound the duration of the statistics query.”””
now = timeutils.utcnow()
# when exclusion of weak datapoints is enabled, we extend
# the look-back period so as to allow a clearer sample count
# trend to be established
look_back = (cls.look_back if not alarm.rule.get(‘exclude_outliers’)
else alarm.rule[‘evaluation_periods’])

window = (alarm.rule[‘period’] *
(alarm.rule[‘evaluation_periods’] + look_back))
start = now - datetime.timedelta(seconds=window)
LOG.debug(_(‘query stats from %(start)s to ‘
‘%(now)s’) % {‘start’: start, ‘now’: now})
after = dict(field=‘timestamp’, op=‘ge’, value=start.isoformat())
before = dict(field=‘timestamp’, op=‘le’, value=now.isoformat())
constraints.extend([before, after])
return constraints

这个函数主要计算evaluate的时间段;

  • 结束时间即before为now;
  • 开始时间为after即start为now – period*(evaluation_periods + lookback)

    • evaluation_periods
    • look_back:这个参数是reporting/ingestion的延迟时间,可以忽略的细节;* 然后将时间段这个过滤条件加到query中返回;

接下来函数_sanitize

1
2
3
4
statistics = self._sanitize(
alarm,

self._statistics(alarm, query)
)

这个函数就是判断是否清洗数据,用方差的方式;
其中数据的来源是函数_statistics;

1
2
3
4
5
6
7
8
9
def statistics(self, alarm, query):
LOG.debug((‘stats query %s’) % query)
try:
return self._client.statistics.list(
meter_name=alarm.rule[‘meter_name’], q=query,
period=alarm.rule[‘period’])
except Exception:
LOG.exception(_(‘alarm stats retrieval failed’))
return []

这里穿进来的query就包括resource_id == 0da3f008-60d7-4b10-a7eb-b143a842a25b、[after,before];
这里的_client是调用的ceilometerclient/v2/client.py,通过分析,最终定位到ceilometerclient/v2/statistics.py中的list方法;

1
2
3
4
5
6
7
8
9
10
11
12
def list(self, meter_name, q=None, period=None, groupby=None,
aggregates=None):

groupby = groupby or []
aggregates = aggregates or []
p = [‘period=%s’ % period] if period else []
if isinstance(groupby, six.string_types):
groupby = [groupby]
p.extend([‘groupby=%s’ % g for g in groupby] if groupby else [])
p.extend(self._build_aggregates(aggregates))
return self._list(options.build_url(
‘/v2/meters/‘ + meter_name + ‘/statistics’,
q, p))

其中根据meter_name、perioed、query等构建一个url返回;
然后回到了ceilometerclient/common/base.py中的base.Manager的方法_list;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def _list(self, url, response_key=None, obj_class=None, body=None,
expect_single=False):
try:
resp = self.api.get(url)
except exceptions.NotFound:
raise exc.HTTPNotFound
if not resp.content:
raise exc.HTTPNotFound
body = resp.json()

if obj_class is None:
obj_class = self.resource_class

if response_key:
try:
data = body[response_key]
except KeyError:
return []
else:
data = body
if expect_single:
data = [data]
return [obj_class(self, res, loaded=True) for res in data if res]

这个使用GET去查到了数据返回;
最后回到evaluate方法_sufficient

1
2
3
4
5
6
7
8
9
10
11
12
13
if self._sufficient(alarm, statistics):
def _compare(stat):
op = COMPARATORS[alarm.rule[‘comparison_operator’]]
value = getattr(stat, alarm.rule[‘statistic’])
limit = alarm.rule[‘threshold’]
LOG.debug(_(‘comparing value %(value)s against threshold’
‘ %(limit)s’) %
{‘value’: value, ‘limit’: limit})
return op(value, limit)

self._transition(alarm,
statistics,
map(_compare, statistics))

这里先判断数据是否充足;
不充足刷新alarm状态为insufficient data;
充足则执行下面的代码方法_transition;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def _transition(self, alarm, statistics, compared):
#是否alarm定义的所有statistics均满足
distilled = all(compared)
#是否alarm定义的所有statistics均满足或均不满足
unequivocal = distilled or not any(compared)
#alarm的当前状态是否为UNKNOWN
unknown = alarm.state == evaluator.UNKNOWN
#是否连续
continuous = alarm.repeat_actions
#如果都满足或都不满足的时候,若是都满足,转换为alarm;若都不满足,则转换ok,为正常状态;
if unequivocal:
state = evaluator.ALARM if distilled else evaluator.OK
reason, reason_data = self._reason(alarm, statistics,
distilled, state)
#如果当前状态与目标状态不同,则需要转换;另外,如果alarm的repeat_actions被设置,则状态相同也要进行转换
if alarm.state != state or continuous:
self._refresh(alarm, state, reason, reason_data)
elif unknown or continuous:
trending_state = evaluator.ALARM if compared[-1] else evaluator.OK
state = trending_state if unknown else alarm.state
reason, reason_data = self._reason(alarm, statistics,
distilled, state)
self._refresh(alarm, state, reason, reason_data)

接下来看函数_refresh(/ceilometer/alarm/evaluator/init.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def _refresh(self, alarm, state, reason, reason_data):
“”“Refresh alarm state.”“”
try:
previous = alarm.state
if previous != state:
LOG.info(_(‘alarm %(id)s transitioning to %(state)s because ‘
‘%(reason)s’) % {’id’: alarm.alarm_id,
state‘: state,
‘reason’: reason})

self._client.alarms.set_state(alarm.alarm_id, state=state)
alarm.state = state
if self.notifier:
self.notifier.notify(alarm, previous, reason, reason_data)
except Exception:
# retry will occur naturally on the next evaluation
# cycle (unless alarm state reverts in the meantime)
LOG.exception(_(‘alarm state update failed’))

这里才是真正设置状态的地方,对比之前和现在的状态,不一样则改之,反之;