-
Notifications
You must be signed in to change notification settings - Fork 814
/
Copy pathcollector.py
418 lines (356 loc) · 15.4 KB
/
collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
# Core modules
import os
import re
import logging
import subprocess
import sys
import time
import datetime
import socket
import modules
from util import get_os, get_uuid, md5, Timer
from config import get_version
from checks import gethostname
import checks.system.unix as u
import checks.system.win32 as w32
from checks.nagios import Nagios
from checks.build import Hudson
from checks.db.mysql import MySql
from checks.db.couch import CouchDb
from checks.db.mcache import Memcache
from checks.queue import RabbitMq
from checks.ganglia import Ganglia
from checks.cassandra import Cassandra
from checks.datadog import Dogstreams, DdForwarder
from checks.db.elastic import ElasticSearch, ElasticSearchClusterStatus
from checks.wmi_check import WMICheck
from checks.ec2 import EC2
from checks.check_status import CheckStatus, CollectorStatus, EmitterStatus
from resources.processes import Processes as ResProcesses
log = logging.getLogger(__name__)
class Collector(object):
"""
The collector is responsible for collecting data from each check and
passing it along to the emitters, who send it to their final destination.
"""
def __init__(self, agentConfig, emitters, systemStats):
self.agentConfig = agentConfig
# system stats is generated by config.get_system_stats
self.agentConfig['system_stats'] = systemStats
# agent config is used during checks, system_stats can be accessed through the config
self.os = get_os()
self.plugins = None
self.emitters = emitters
self.metadata_interval = int(agentConfig.get('metadata_interval', 10 * 60))
self.metadata_start = time.time()
socket.setdefaulttimeout(15)
self.run_count = 0
self.continue_running = True
self.metadata_cache = None
# Unix System Checks
self._unix_system_checks = {
'disk': u.Disk(log),
'io': u.IO(log),
'load': u.Load(log),
'memory': u.Memory(log),
'network': u.Network(log),
'processes': u.Processes(log),
'cpu': u.Cpu(log)
}
# Win32 System `Checks
self._win32_system_checks = {
'disk': w32.Disk(log),
'io': w32.IO(log),
'proc': w32.Processes(log),
'memory': w32.Memory(log),
'network': w32.Network(log),
'cpu': w32.Cpu(log)
}
# Old-style metric checks
self._couchdb = CouchDb(log)
self._mysql = MySql(log)
self._rabbitmq = RabbitMq()
self._ganglia = Ganglia(log)
self._cassandra = Cassandra()
self._dogstream = Dogstreams.init(log, self.agentConfig)
self._ddforwarder = DdForwarder(log, self.agentConfig)
self._ec2 = EC2(log)
# Metric Checks
self._metrics_checks = [
ElasticSearch(log),
WMICheck(log),
Memcache(log),
]
# Custom metric checks
for module_spec in [s.strip() for s in self.agentConfig.get('custom_checks', '').split(',')]:
if len(module_spec) == 0: continue
try:
self._metrics_checks.append(modules.load(module_spec, 'Check')(log))
log.info("Registered custom check %s" % module_spec)
except Exception, e:
log.exception('Unable to load custom check module %s' % module_spec)
# Event Checks
self._event_checks = [
ElasticSearchClusterStatus(log),
Nagios(socket.gethostname()),
Hudson()
]
# Resource Checks
self._resources_checks = [
ResProcesses(log,self.agentConfig)
]
def stop(self):
"""
Tell the collector to stop at the next logical point.
"""
# This is called when the process is being killed, so
# try to stop the collector as soon as possible.
# Most importantly, don't try to submit to the emitters
# because the forwarder is quite possibly already killed
# in which case we'll get a misleading error in the logs.
# Best to not even try.
self.continue_running = False
def run(self, checksd=None):
"""
Collect data from each check and submit their data.
"""
timer = Timer()
self.run_count += 1
log.debug("Starting collection run #%s" % self.run_count)
payload = self._build_payload()
metrics = payload['metrics']
events = payload['events']
# Run the system checks. Checks will depend on the OS
if self.os == 'windows':
# Win32 system checks
try:
metrics.extend(self._win32_system_checks['disk'].check(self.agentConfig))
metrics.extend(self._win32_system_checks['memory'].check(self.agentConfig))
metrics.extend(self._win32_system_checks['cpu'].check(self.agentConfig))
metrics.extend(self._win32_system_checks['network'].check(self.agentConfig))
metrics.extend(self._win32_system_checks['io'].check(self.agentConfig))
metrics.extend(self._win32_system_checks['proc'].check(self.agentConfig))
except Exception:
log.exception('Unable to fetch Windows system metrics.')
else:
# Unix system checks
sys_checks = self._unix_system_checks
diskUsage = sys_checks['disk'].check(self.agentConfig)
if diskUsage and len(diskUsage) == 2:
payload["diskUsage"] = diskUsage[0]
payload["inodes"] = diskUsage[1]
load = sys_checks['load'].check(self.agentConfig)
payload.update(load)
memory = sys_checks['memory'].check(self.agentConfig)
if memory:
payload.update({
'memPhysUsed' : memory.get('physUsed'),
'memPhysPctUsable' : memory.get('physPctUsable'),
'memPhysFree' : memory.get('physFree'),
'memPhysTotal' : memory.get('physTotal'),
'memPhysUsable' : memory.get('physUsable'),
'memSwapUsed' : memory.get('swapUsed'),
'memSwapFree' : memory.get('swapFree'),
'memSwapPctFree' : memory.get('swapPctFree'),
'memSwapTotal' : memory.get('swapTotal'),
'memCached' : memory.get('physCached'),
'memBuffers': memory.get('physBuffers'),
'memShared': memory.get('physShared')
})
ioStats = sys_checks['io'].check(self.agentConfig)
if ioStats:
payload['ioStats'] = ioStats
processes = sys_checks['processes'].check(self.agentConfig)
payload.update({'processes': processes})
networkTraffic = sys_checks['network'].check(self.agentConfig)
payload.update({'networkTraffic': networkTraffic})
cpuStats = sys_checks['cpu'].check(self.agentConfig)
if cpuStats:
payload.update(cpuStats)
# Run old-style checks
mysqlStatus = self._mysql.check(self.agentConfig)
rabbitmq = self._rabbitmq.check(log, self.agentConfig)
couchdb = self._couchdb.check(self.agentConfig)
gangliaData = self._ganglia.check(self.agentConfig)
cassandraData = self._cassandra.check(log, self.agentConfig)
dogstreamData = self._dogstream.check(self.agentConfig)
ddforwarderData = self._ddforwarder.check(self.agentConfig)
if gangliaData is not False and gangliaData is not None:
payload['ganglia'] = gangliaData
if cassandraData is not False and cassandraData is not None:
payload['cassandra'] = cassandraData
# MySQL Status
if mysqlStatus:
payload.update(mysqlStatus)
# RabbitMQ
if rabbitmq:
payload['rabbitMQ'] = rabbitmq
# CouchDB
if couchdb:
payload['couchDB'] = couchdb
# dogstream
if dogstreamData:
dogstreamEvents = dogstreamData.get('dogstreamEvents', None)
if dogstreamEvents:
if 'dogstream' in payload['events']:
events['dogstream'].extend(dogstreamEvents)
else:
events['dogstream'] = dogstreamEvents
del dogstreamData['dogstreamEvents']
payload.update(dogstreamData)
# metrics about the forwarder
if ddforwarderData:
payload['datadog'] = ddforwarderData
# Process the event checks.
for event_check in self._event_checks:
event_data = event_check.check(log, self.agentConfig)
if event_data:
events[event_check.key] = event_data
# Resources checks
if self.os != 'windows':
has_resource = False
for resources_check in self._resources_checks:
resources_check.check()
snaps = resources_check.pop_snapshots()
if snaps:
has_resource = True
res_value = { 'snaps': snaps,
'format_version': resources_check.get_format_version() }
res_format = resources_check.describe_format_if_needed()
if res_format is not None:
res_value['format_description'] = res_format
payload['resources'][resources_check.RESOURCE_KEY] = res_value
if has_resource:
payload['resources']['meta'] = {
'api_key': self.agentConfig['api_key'],
'host': payload['internalHostname'],
}
# newer-style checks (not checks.d style)
for metrics_check in self._metrics_checks:
res = metrics_check.check(self.agentConfig)
if res:
metrics.extend(res)
# checks.d checks
check_statuses = []
checksd = checksd or []
for check in checksd:
if not self.continue_running:
return
log.info("Running check %s" % check.name)
instance_statuses = []
metric_count = 0
event_count = 0
try:
# Run the check.
instance_statuses = check.run()
# Collect the metrics and events.
current_check_metrics = check.get_metrics()
current_check_events = check.get_events()
# Save them for the payload.
metrics.extend(current_check_metrics)
if current_check_events:
if check.name not in events:
events[check.name] = current_check_events
else:
events[check.name] += current_check_events
# Save the status of the check.
metric_count = len(current_check_metrics)
event_count = len(current_check_events)
except Exception, e:
log.exception("Error running check %s" % check.name)
check_status = CheckStatus(check.name, instance_statuses, metric_count, event_count)
check_statuses.append(check_status)
# Store the metrics and events in the payload.
payload['metrics'] = metrics
payload['events'] = events
collect_duration = timer.step()
emitter_statuses = self._emit(payload)
emit_duration = timer.step()
# Persist the status of the collection run.
try:
CollectorStatus(check_statuses, emitter_statuses, self.metadata_cache).persist()
except Exception:
log.exception("Error persisting collector status")
log.debug("Finished run #%s. Collection time: %ss. Emit time: %ss" %
(self.run_count, round(collect_duration, 2), round(emit_duration, 2)))
def _emit(self, payload):
""" Send the payload via the emitters. """
statuses = []
for emitter in self.emitters:
# Don't try to send to an emitter if we're stopping/
if not self.continue_running:
return statuses
name = emitter.__name__
emitter_status = EmitterStatus(name)
try:
emitter(payload, log, self.agentConfig)
except Exception, e:
log.exception("Error running emitter: %s" % emitter.__name__)
emitter_status = EmitterStatus(name, e)
statuses.append(emitter_status)
return statuses
def _is_first_run(self):
return self.run_count <= 1
def _build_payload(self):
"""
Return an dictionary that contains all of the generic payload data.
"""
payload = {
'collection_timestamp': time.time(),
'os' : self.os,
'python': sys.version,
'agentVersion' : self.agentConfig['version'],
'apiKey': self.agentConfig['api_key'],
'events': {},
'metrics': [],
'resources': {},
'internalHostname' : gethostname(self.agentConfig),
'uuid' : get_uuid(),
}
# Include system stats on first postback
if self._is_first_run():
payload['systemStats'] = self.agentConfig.get('system_stats', {})
# Also post an event in the newsfeed
payload['events']['System'] = [{'api_key': self.agentConfig['api_key'],
'host': payload['internalHostname'],
'timestamp': int(time.mktime(datetime.datetime.now().timetuple())),
'event_type':'Agent Startup',
'msg_text': 'Version %s' % get_version()
}]
# Periodically send the host metadata.
if self._is_first_run() or self._should_send_metadata():
payload['meta'] = self._get_metadata()
self.metadata_cache = payload['meta']
# Add static tags from the configuration file
if self.agentConfig['tags'] is not None:
payload['tags'] = self.agentConfig['tags']
return payload
def _get_metadata(self):
metadata = self._ec2.get_metadata()
if metadata.get('hostname'):
metadata['ec2-hostname'] = metadata.get('hostname')
# if hostname is set in the configuration file
# use that instead of gethostname
# gethostname is vulnerable to 2 hosts: x.domain1, x.domain2
# will cause both to be aliased (see #157)
if self.agentConfig.get('hostname'):
metadata['agent-hostname'] = self.agentConfig.get('hostname')
metadata['hostname'] = metadata['agent-hostname']
else:
try:
metadata["hostname"] = socket.gethostname()
except:
pass
try:
metadata["fqdn"] = socket.getfqdn()
except:
pass
return metadata
def _should_send_metadata(self):
# If the interval has passed, send the metadata again
now = time.time()
if now - self.metadata_start >= self.metadata_interval:
log.debug('Metadata interval has passed. Sending metadata.')
self.metadata_start = now
return True
return False