Skip to content
Snippets Groups Projects
Commit c4493c11 authored by Sven Mäder's avatar Sven Mäder :speech_balloon:
Browse files

Add xymon home check

parent b3e8fd0d
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
import os
import sys
import pwd
import grp
import stat
import glob
import lib_path
import lib
import posix1e
import dphysldap
import pyxymon as pymon
CHECK_NAME = 'permissions'
CHECK_VERSION = 2
LIFETIME = 30
home_dir = '/export/home1/*'
owner = []
permission = []
acl = []
home_dirs = 0
users = {}
nis_homes = {}
nis_shares = {}
bad_home_directory = []
no_ldap_user = []
bad_nis_map = []
no_nis_map = []
class Home(object):
"""
Holds info about a home directory
"""
def __init__(self, name, path, st):
self.name = name
self.path = path
self.st = st
@property
def uid(self):
return self.st.st_uid
@property
def gid(self):
return self.st.st_gid
@property
def uname(self):
return pwd.getpwuid(self.uid).pw_name
@property
def gname(self):
return grp.getgrgid(self.gid).gr_name
@property
def filemode(self):
return stat.filemode(self.st.st_mode)
@property
def permission(self):
return ' '.join([self.filemode, self.path])
def __str__(self):
return ' '.join([self.filemode, self.uname, self.gname, self.path])
def search_ldap():
ldap = dphysldap.Ldap()
ldap_users = dphysldap.Users(ldap, ['uid', 'uidNumber', 'gidNumber', 'homeDirectory'])
entries = dphysldap.Entries(ldap, ['cn', 'nisMapEntry'])
auto_home = 'nisMapName=auto.home,ou=automount,dc=phys,dc=ethz,dc=ch'
ldap_users.search('*')
for user in ldap_users:
users[user['uid'][0]] = user['homeDirectory'][0]
entries.search('cn: *, nisMapEntry: phd-home*', ['nisObject'], base=auto_home)
for entry in entries:
nis_homes[entry['cn'][0]] = entry['nisMapEntry'][0]
entries.search('cn: *, nisMapEntry: != phd-home*', ['nisObject'], base=auto_home)
for entry in entries:
nis_shares[entry['cn'][0]] = entry['nisMapEntry'][0]
def check_homes(top):
global home_dirs
if not os.path.isdir(top):
return
for f in os.listdir(top):
pathname = os.path.join(top, f)
if not os.path.isdir(pathname):
continue
st = os.stat(pathname)
home = Home(f, pathname, st)
home_dirs += 1
if bad_owner(home):
owner.append(home)
if bad_permission(home):
permission.append(home)
if posix1e.has_extended(home.path):
acl.append(home)
if home.name in users:
if users[home.name][6:] != home.name:
bad_home_directory.append(': '.join([home.name, users[home.name]]))
del users[home.name]
else:
no_ldap_user.append(home)
if home.name in nis_homes:
if nis_homes[home.name].split(':', maxsplit=1)[1] != home.path:
bad_nis_map.append(': '.join([home.name, nis_homes[home.name]]))
del nis_homes[home.name]
else:
no_nis_map.append(home)
def check_shares():
for user in list(users.keys()):
if user in nis_shares:
del nis_shares[user]
del users[user]
def bad_owner(home):
if home.name == home.uname and home.name == home.gname:
return False
return True
def bad_permission(home):
# d---------
if home.st.st_mode == stat.S_IFDIR:
return False
# drwx------
elif home.st.st_mode == stat.S_IFDIR | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR:
return False
return True
def list_homes(homes):
for home in homes:
print(home)
def run_check(xymon):
search_ldap()
title = 'statistics'
content = ''
content += 'ldap users: ' + str(len(users)) + '<br/>'
content += 'ldap nismaps (home): ' + str(len(nis_homes)) + '<br/>'
content += 'ldap nismaps (share): ' + str(len(nis_shares)) + '<br/>'
for path in glob.glob(home_dir):
check_homes(path)
check_shares()
content += 'home dirs: ' + str(home_dirs) + '<br/>'
content += 'strange ldap users: ' + str(len(users)) + '<br/>'
content += 'orphaned nis homes: ' + str(len(nis_homes)) + '<br/>'
content += 'orphaned nis shares: ' + str(len(nis_shares)) + '<br/>'
content += 'bad homeDirectory: ' + str(len(bad_home_directory)) + '<br/>'
content += 'no user for home: ' + str(len(no_ldap_user)) + '<br/>'
content += 'bad nismaps (home): ' + str(len(bad_nis_map)) + '<br/>'
content += 'no nismap for home: ' + str(len(no_nis_map)) + '<br/>'
xymon.section(title, content)
if owner:
title = 'bad owner or group'
content = 'home must be owned by the respective user and the group his user-private-group<br/><br/>'
for home in owner:
content += ''.join([str(home), '<br/>'])
xymon.section(title, content)
xymon.color = pymon.STATUS_CRITICAL
if permission:
title = 'bad permissions'
content = 'home permission not <code>drwx------</code> (active user) or <code>d---------</code> (blocked user)<br/><br/>'
for home in permission:
content += ''.join([str(home.permission), '<br/>'])
xymon.section(title, content)
xymon.color = pymon.STATUS_CRITICAL
if acl:
title = 'bad acls'
content = 'home has posix.1e extended ACLs<br/>check acls using `getfacl`, which stands for `get fucking ACL`<br/><br/>'
for home in acl:
extacl = posix1e.ACL(file=home.path)
content += ''.join([home.path, '<br/>'])
content += ''.join([str(extacl), '<br/>'])
xymon.section(title, content)
xymon.color = pymon.STATUS_CRITICAL
if users:
title = 'strange ldap users'
content = 'these users seem to not have a home directory on the filesystem<br/><br/>'
for k, v in users.items():
content += ': '.join([k, v]) + '<br/>'
xymon.section(title, content)
xymon.color = pymon.STATUS_CRITICAL
if nis_homes:
title = 'orphaned nis homes'
content = 'these nismaps do not have a home directory on the filesystem<br/><br/>'
for k, v in nis_homes.items():
content += ': '.join([k, v]) + '<br/>'
xymon.section(title, content)
xymon.color = pymon.STATUS_CRITICAL
if nis_shares:
title = 'orphaned nis shares'
content = 'these nismaps do not have a user in ldap<br/><br/>'
for k, v in nis_shares.items():
content += ': '.join([k, v]) + '<br/>'
xymon.section(title, content)
xymon.color = pymon.STATUS_CRITICAL
if bad_home_directory:
title = 'bad homeDirectory'
content = 'the homeDirectory attributes home name does not match the username'
for home in bad_home_directory:
content += ''.join([str(home), '<br/>'])
xymon.section(title, content)
xymon.color = pymon.STATUS_CRITICAL
if no_ldap_user:
title = 'no user for home'
content = 'home directory without a corresponding ldap user'
for home in no_ldap_user:
content += ''.join([str(home), '<br/>'])
xymon.section(title, content)
xymon.color = pymon.STATUS_CRITICAL
if bad_nis_map:
title = 'bad nismaps (home)'
content = 'nismap does not match home path on the filesystem'
for home in bad_nis_map:
content += ''.join([str(home), '<br/>'])
xymon.section(title, content)
xymon.color = pymon.STATUS_CRITICAL
if no_nis_map:
title = 'no nismap for home'
content = 'home directory without a corresponding nismap entry'
for home in no_nis_map:
content += ''.join([str(home), '<br/>'])
xymon.section(title, content)
xymon.color = pymon.STATUS_CRITICAL
def main():
"""Run xymon check"""
xymon = pymon.XymonClient(CHECK_NAME)
check_script = os.path.basename(__file__)
# The default criticity is set to 'pymon.STATUS_OK'
xymon.lifetime = LIFETIME
xymon.title('home ownership and permissions')
try:
run_check(xymon)
except Exception as e:
xymon.color = pymon.STATUS_WARNING
xymon.section('Exception', e)
xymon.footer(check_script, CHECK_VERSION)
xymon.send()
if __name__ == '__main__':
main()
sys.exit(0)
# -*- coding: utf-8 -*-
"""Helper class intended for creation of Xymon Extension Modules in Python.
This simple Python module provides a simple helper class that aims simplify
the creation of Xymon Extension Modules in Python.
"""
__author__ = "Davide Madrisan <davide.madrisan.gmail.com>"
__copyright__ = "Copyright 2017 Davide Madrisan"
__license__ = "GPL-3.0"
__status__ = "Stable"
__version__ = "3"
STATUS_OK = '&green'
STATUS_WARNING = '&yellow'
STATUS_CRITICAL = '&red'
__all__ = ['XymonClient',
'STATUS_OK', 'STATUS_WARNING', 'STATUS_CRITICAL']
from datetime import datetime
import os
import socket
_ALL_COLORS = (STATUS_OK, STATUS_WARNING, STATUS_CRITICAL)
"""list of all the allower colors (criticity levels)"""
class XymonMessage(object):
"""Class for rendering the Xymon messages that will be sent to the server.
Note:
This class is not intended to be used directly from your code.
"""
def __init__(self):
self._message = ''
self._footer = None
self._color = STATUS_OK
"""default criticity"""
@staticmethod
def _get_date():
"""Return the current date."""
return datetime.now().strftime('%c')
@staticmethod
def _get_machine():
"""Get the environment variable `MACHINE` exported by Xymon.
Raises:
RuntimeError: If `MACHINE` is not set.
"""
xymon_machine = os.environ.get('MACHINE')
if not xymon_machine:
raise RuntimeError('The environment variable MACHINE is not set')
return xymon_machine
@property
def color(self):
"""Return the current color (message criticity level)."""
return self._color
@color.setter
def color(self, value):
"""Set the color (message criticity level) to `value`.
Note:
The color is not updated when `value` has a criticity
lower than the current one `self._color`.
Attributes:
value (str): The new color to be set.
The following colors are the only valid ones:
- pyxymon.STATUS_OK
- pyxymon.STATUS_WARNING
- pyxymon.STATUS_CRITICAL
Raises:
ValueError: If `value` is not a valid color string.
"""
if value not in _ALL_COLORS:
raise ValueError('Illegal color for xymon: {0}'.format(value))
current_color_index = _ALL_COLORS.index(self._color)
new_color_index = _ALL_COLORS.index(value)
if new_color_index > current_color_index:
self._color = value
def title(self, text):
"""Set the message title.
Attributes:
text (str): The string containing the title.
"""
self._message += '<br><h1>{0}</h1><hr><br>'.format(text)
def section(self, title, body):
"""Add a section to the Xymon message.
Attributes:
title (str): The string containing the title of this section.
body (str): The content of the section.
"""
self._message += (
'<h2>{0}</h2><p>{1}</p><br>'.format(title, body))
def footer(self, check_filename, check_version):
"""Add a footer the the Xymon message.
Attributes:
check_filename (str): The name of the check script.
check_version (str): The version of the check script.
"""
self._footer = (
'<br>'
'<center>xymon script: {0} version {1}</center>'.format(
check_filename, check_version))
def _render(self, test):
"""Return the message string in a format accepted by the Xymon server.
Attributes:
test (str): The string containing the name of the Xymon test.
Raises:
RuntimeError: If `self._color` is an illegal color
(this should never happen).
"""
date = self._get_date()
machine = self._get_machine()
if self._color not in _ALL_COLORS:
raise RuntimeError(
'Illegal color for xymon: {0}'.format(self._color))
html = (self._message if not self._footer else
self._message + self._footer)
return 'status {0}.{1} {2} {3}\n{4}\n'.format(
machine, test, self._color[1:], date, html)
class XymonClient(XymonMessage):
"""Class for managing and sending the final message to the Xymon server.
Attributes:
test (str): Name of the Xymon test.
Usage:
import os
import pyxymon as pymon
check_name = 'mytest'
check_version = 1
check_filename = os.path.basename(__file__)
xymon = pymon.XymonClient(check_name)
# do your logic...
# you can set the criticity of the final xymon message by using:
# xymon.color = pymon.STATUS_WARNING
# or
# xymon.color = pymon.STATUS_CRITICAL
# The criticity is set by default to 'pymon.STATUS_OK'
xymon.title('Title in the xymon check page')
xymon.section('Section Title',
'Text containing the lines you want to display')
# You can add here other sections, if required.
xymon.footer(check_filename, check_version)
xymon.send()
"""
def __init__(self, test):
XymonMessage.__init__(self)
self.test = test
"""Name of the Xymon test"""
@staticmethod
def _get_xymon_server_name():
"""Return the content of the environment variable XYMSRV.
Raises:
RuntimeError: If `XYMSRV` is not set.
"""
xymon_server = os.environ.get('XYMSRV')
if not xymon_server:
RuntimeError('The environment variable XYMSRV is not set')
return os.environ.get('XYMSRV')
@staticmethod
def _get_xymon_server_port():
"""Return the content of the environment variable XYMONDPORT.
Note:
The default Xymon port (1984) is returned, when such a variable
does not exist.
"""
xymon_port = os.environ.get('XYMONDPORT', 1984)
return int(xymon_port)
def send(self):
"""Send a rendered message to the xymon server.
Note:
The server and port are read from the environment variables
XYMSRV and XYMONDPORT (default set to 1984 when not found).
"""
server = self._get_xymon_server_name()
port = self._get_xymon_server_port()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((server, port))
xymon_string = self._render(self.test)
sock.send(xymon_string.encode('utf-8'))
sock.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment