Skip to content

What to monitor? ALL THE THINGS #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 5, 2014
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 243 additions & 78 deletions mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,186 @@
#

import collectd
import types
from pymongo import Connection
from distutils.version import StrictVersion as V
from copy import deepcopy


# maps that duplicate the key/nested-key structure for returned data from mongodb commands but mapping
# to values that are a tuple containing the value's collectd type (types.db)
# and optionally the collectd value name (defaults to nested key path to the leaf value with key strings joined with ".")
# NOTE: some metrics are more complicated and thus are handled specially (ex. connPoolStats hosts data)
CONNECTION_POOL_STATUS_METRICS = {
# db.runCommand({connPoolStats:1})
# hosts and replicaSets handled specially
"createdByType": {
"master": "total_connections",
"set": "total_connections",
"sync": "total_connections"
},
"totalAvailable": "gauge",
"totalCreated": "total_connections",
"numDBClientConnection": "gauge",
"numAScopedConnection": "gauge"
}

DBSTATS_METRICS = {
# db.runCommand({dbStats:1})
"collections": "gauge",
"objects" : "gauge",
"avgObjSize" : "bytes",
"dataSize" : "bytes",
"storageSize" : "bytes",
"numExtents" : "gauge",
"indexes" : "gauge",
"indexSize" : "bytes",
"fileSize" : "bytes",
"nsSizeMB": "gauge"
}

SERVER_STATUS_METRICS = {
# db.runCommand({serverStatus:1})
"backgroundFlushing": {
"last_ms": "gauge"
},
"connections": {
"current": "gauge",
"available": "gauge"
},
"cursors": {
"totalOpen": "gauge",
"timedout": "derive"
},
"globalLock": {
"currentQueue" : {
"total" : "gauge",
"readers" : "gauge",
"writers" : "gauge"
},
"activeClients" : {
"total" : "gauge",
"readers" : "gauge",
"writers" : "gauge"
}
},
"indexCounters" : {
"accesses" : "derive",
"hits" : "derive",
"misses" : "derive"
},
"locks": {
"." : {
"timeLockedMicros" : {
"R" : "derive",
"W" : "derive"
},
"timeAcquiringMicros" : {
"R" : "derive",
"W" : "derive"
}
}
# special handling for other configured DBs
},
"opcounters": {
"insert" : "derive",
"query" : "derive",
"update" : "derive",
"delete" : "derive",
"getmore" : "derive",
"command" : "derive"
},
"recordStats": {
"accessesNotInMemory" : "derive",
"pageFaultExceptionsThrown" : "derive"
# do we want db specificity?
},
"mem": {
"bits" : "gauge",
"resident" : "gauge",
"virtual" : "gauge",
"mapped" : "gauge",
"mappedWithJournal" : "gauge"
},
"metrics" : {
"document" : {
"deleted" : "derive",
"inserted" : "derive",
"returned" : "derive",
"updated" : "derive"
}
}
}


class MongoDB(object):


def __init__(self):
self.plugin_name = "mongo"
self.mongo_host = "127.0.0.1"
self.mongo_port = 27017
self.mongo_db = ["admin", ]
self.mongo_dbs = ["admin", ]
self.mongo_user = None
self.mongo_password = None

self.lockTotalTime = None
self.lockTime = None
self.accesses = None
self.misses = None
self.includeConnPoolMetrics = None
self.includeServerStatsMetrics = None
self.includeDbstatsMetrics = None


def connect(self):
global SERVER_STATUS_METRICS
self.mongo_client = Connection(host=self.mongo_host, port=self.mongo_port, slave_okay=True)
if not self.mongo_client.alive():
collectd.error("mongodb plugin failed to connect to %s:%d" % (self.mongo_host, self.mongo_port))

server_status = self.mongo_client[self.mongo_dbs[0]].command("serverStatus")
version = server_status["version"]
at_least_2_4 = V(version) >= V("2.4.0")
if not at_least_2_4:
indexCountersMap = SERVER_STATUS_METRICS.pop("indexCounters")
SERVER_STATUS_METRICS["indexCounters"] = {"btree": indexCountersMap}


def disconnect(self):
if self.mongo_client and self.mongo_client.alive():
self.mongo_client.disconnect()


def submit(self, type, instance, value, db=None):
def config(self, obj):
for node in obj.children:
if node.key == "Port":
self.mongo_port = int(node.values[0])
collectd.info("mongodb plugin: Port " + self.mongo_port)
elif node.key == "Host":
self.mongo_host = node.values[0]
collectd.info("mongodb plugin: Host " + self.Host)
elif node.key == "User":
self.mongo_user = node.values[0]
elif node.key == "Password":
self.mongo_password = node.values[0]
elif node.key == "Databases":
self.mongo_dbs = node.values
collectd.info("mongodb plugin: Databases " + self.mongo_dbs)
elif node.key == "ConnectionPoolStatus":
self.includeConnPoolMetrics = node.values
collectd.info("mongodb plugin: ConnectionPoolStatus " + self.ConnectionPoolStatus)
elif node.key == "ServerStats":
self.includeServerStatsMetrics = node.values
collectd.info("mongodb plugin: ServerStats " + self.ServerStats)
elif node.key == "DBStats":
self.includeDbstatsMetrics = node.values
collectd.info("mongodb plugin: DBStats " + self.DBStats)
else:
collectd.warning("mongodb plugin: Unkown configuration key %s" % node.key)


def submit(self, instance, type, value, db=None):
# actually a recursive submit call to dive deeper into nested dict data
# since the leaf value in the nested dicts is the type, we check on the type type :-)
if db:
plugin_instance = '%s-%s' % (self.mongo_port, db)
plugin_instance = "%s-%s" % (self.mongo_port, db)
else:
plugin_instance = str(self.mongo_port)
v = collectd.Values()
Expand All @@ -35,91 +193,98 @@ def submit(self, type, instance, value, db=None):
v.values = [value, ]
v.dispatch()

def do_server_status(self):
con = Connection(host=self.mongo_host, port=self.mongo_port, slave_okay=True)
db = con[self.mongo_db[0]]

def recursive_submit(self, type_tree, data_tree, instance_name=None, db=None):
# if we are still in the middle of the type and data tree
if isinstance(type_tree, types.DictType) and isinstance(data_tree, types.DictType):
for type_name, type_value in type_tree.iteritems():
next_instance_name = None
if instance_name:
next_instance_name = instance_name + "." + type_name
else:
next_instance_name = type_name
if data_tree.has_key(type_name):
self.recursive_submit(type_value, data_tree[type_name], next_instance_name, db=db)
else:
# may want to log this but some mongodb setups may not have anything to report
pass
elif isinstance(type_tree, types.DictType) or isinstance(data_tree, types.DictType):
print("type tree and data tree structure differ for data instance: " + instance_name)
else:
self.submit(instance_name, type_tree, data_tree, db)


def publish_connection_pool_metrics(self):
# connPoolStats produces the same results regardless of db used
db = self.mongo_client[self.mongo_dbs[0]]
if self.mongo_user and self.mongo_password:
db.authenticate(self.mongo_user, self.mongo_password)
server_status = db.command('serverStatus')

version = server_status['version']
at_least_2_4 = V(version) >= V('2.4.0')
conn_pool_stats = db.command("connPoolStats")
metrics_to_collect = {}
if self.includeConnPoolMetrics:
for root_metric_key in self.includeConnPoolMetrics.iterkeys():
if conn_pool_stats.has_key(root_metric_key):
metrics_to_collect[root_metric_key] = deepcopy(CONNECTION_POOL_STATUS_METRICS[root_metric_key])
else:
metrics_to_collect = CONNECTION_POOL_STATUS_METRICS

# operations
for k, v in server_status['opcounters'].items():
self.submit('total_operations', k, v)
self.recursive_submit(metrics_to_collect, conn_pool_stats)

# memory
for t in ['resident', 'virtual', 'mapped']:
self.submit('memory', t, server_status['mem'][t])

# connections
self.submit('connections', 'connections', server_status['connections']['current'])
def publish_dbstats(self):
for db_name in self.mongo_dbs:
db = self.mongo_client[db_name]
if self.mongo_user and self.mongo_password:
db.authenticate(self.mongo_user, self.mongo_password)

# locks
if self.lockTotalTime is not None and self.lockTime is not None:
if self.lockTime == server_status['globalLock']['lockTime']:
value = 0.0
dbstats = db.command("dbStats")
metrics_to_collect = {}
if self.includeDbstatsMetrics:
for root_metric_key in self.includeDbstatsMetrics.iterkeys():
if dbstats.has_key(root_metric_key):
metrics_to_collect[root_metric_key] = deepcopy(DBSTATS_METRICS[root_metric_key])
else:
value = float(server_status['globalLock']['lockTime'] - self.lockTime) * 100.0 / float(server_status['globalLock']['totalTime'] - self.lockTotalTime)
self.submit('percent', 'lock_ratio', value)

self.lockTotalTime = server_status['globalLock']['totalTime']
self.lockTime = server_status['globalLock']['lockTime']

# indexes
accesses = None
misses = None
index_counters = server_status['indexCounters'] if at_least_2_4 else server_status['indexCounters']['btree']

if self.accesses is not None:
accesses = index_counters['accesses'] - self.accesses
if accesses < 0:
accesses = None
misses = (index_counters['misses'] or 0) - (self.misses or 0)
if misses < 0:
misses = None
if accesses and misses is not None:
self.submit('cache_ratio', 'cache_misses', int(misses * 100 / float(accesses)))
metrics_to_collect = DBSTATS_METRICS

self.recursive_submit(metrics_to_collect, dbstats, db=db_name)


def publish_server_status(self):
# serverStatus produces the same results regardless of db used
db = self.mongo_client[self.mongo_dbs[0]]
if self.mongo_user and self.mongo_password:
db.authenticate(self.mongo_user, self.mongo_password)

server_status = db.command("serverStatus")
metrics_to_collect = {}
if self.includeServerStatsMetrics:
for root_metric_key in self.includeServerStatsMetrics.iterkeys():
if server_status.has_key(root_metric_key):
metrics_to_collect[root_metric_key] = deepcopy(SERVER_STATUS_METRICS[root_metric_key])
else:
self.submit('cache_ratio', 'cache_misses', 0)
self.accesses = index_counters['accesses']
self.misses = index_counters['misses']
metrics_to_collect = deepcopy(SERVER_STATUS_METRICS)
# rename "." lock to be "GLOBAL"
if metrics_to_collect["locks"].has_key("."):
print(SERVER_STATUS_METRICS["locks"])
global_lock_data = metrics_to_collect["locks"].pop(".")
metrics_to_collect["locks"]["GLOBAL"] = global_lock_data

for mongo_db in self.mongo_db:
db = con[mongo_db]
if self.mongo_user and self.mongo_password:
db.authenticate(self.mongo_user, self.mongo_password)
db_stats = db.command('dbstats')
print(SERVER_STATUS_METRICS["locks"])
for db_name in self.mongo_dbs:
metrics_to_collect["locks"][db_name] = deepcopy(SERVER_STATUS_METRICS["locks"]["."])

# stats counts
self.submit('counter', 'object_count', db_stats['objects'], mongo_db)
self.submit('counter', 'collections', db_stats['collections'], mongo_db)
self.submit('counter', 'num_extents', db_stats['numExtents'], mongo_db)
self.submit('counter', 'indexes', db_stats['indexes'], mongo_db)
self.recursive_submit(metrics_to_collect, server_status)

# stats sizes
self.submit('file_size', 'storage', db_stats['storageSize'], mongo_db)
self.submit('file_size', 'index', db_stats['indexSize'], mongo_db)
self.submit('file_size', 'data', db_stats['dataSize'], mongo_db)

con.disconnect()
def publish_data(self):
self.publish_server_status()
self.publish_connection_pool_metrics()
self.publish_dbstats()

def config(self, obj):
for node in obj.children:
if node.key == 'Port':
self.mongo_port = int(node.values[0])
elif node.key == 'Host':
self.mongo_host = node.values[0]
elif node.key == 'User':
self.mongo_user = node.values[0]
elif node.key == 'Password':
self.mongo_password = node.values[0]
elif node.key == 'Database':
self.mongo_db = node.values
else:
collectd.warning("mongodb plugin: Unkown configuration key %s" % node.key)

mongodb = MongoDB()
collectd.register_read(mongodb.do_server_status)
collectd.register_read(mongodb.publish_data)
collectd.register_config(mongodb.config)
collectd.register_init(mongodb.connect)
collectd.register_shutdown(mongodb.disconnect)