diff --git a/bin/deltalogcat.py b/bin/deltalogcat.py new file mode 100755 index 0000000000000000000000000000000000000000..267bdf23ad32412e96070aaed950c215b513ad28 --- /dev/null +++ b/bin/deltalogcat.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 + +import sys +import time +import subprocess +import select +import pprint +import json +import argparse +#import base64 +import lib_path +import lib +import dphysldap +import tabulate + +update_every = 0.001 +log_path = '/var/log/ldap/' +log_file_deltalog = log_path + 'delta.log' +indent = 16 + +FMT = 'plain' +tabulate.PRESERVE_WHITESPACE = True + +REQ_ATTRS = frozenset({ + 'reqAuthzID', + 'reqEntryUUID', + 'entryCSN' +}) + +NODES = { + 0: 'phd-aa1', + 1: 'phd-aa2', + 2: 'phd-aa3' +} + + +def getlastline(fname): + """Using mmap to return a copy of the last line of a file""" + with open(fname) as source: + mapping = mmap.mmap(source.fileno(), 0, prot=mmap.PROT_READ) + return mapping[mapping.rfind(b'\n', 0, -1)+1:] + + +def sleep(start_time): + """After hard work, take a nap for the rest of the second""" + current_time = time.perf_counter() + elapsed_time = current_time - start_time + sleep_time = update_every - elapsed_time + + #print('runtime {0:.3f}s, sleeping {1:.3f}s'.format(elapsed_time, sleep_time)) + + if sleep_time > 0: + time.sleep(sleep_time) + + +def parse(line): + """Parse line and print in pretty format""" + req_json = line.split(' >>> ', maxsplit=1)[1] + req = json.loads(req_json) + + sid = int(req['attributes']['entryCSN'][0].split('#')[2]) + + print() + print('{0:<{indent}}{1}'.format('req:', req['dn'], indent=indent)) + print('{0:<{indent}}{1}'.format('node:', NODES[sid], indent=indent)) + print('{0:<{indent}}{1}'.format('changetype:', req['attributes']['reqType'][0], indent=indent)) + print('{0:<{indent}}{1}'.format('entry:', req['attributes']['reqDN'][0], indent=indent)) + + if 'reqMod' in req['attributes']: + table = list() + + for req_mod in req['attributes']['reqMod']: + row = [ ' ' * (indent - 2) ] + row.extend(req_mod.split(' ', maxsplit=1)) + table.append(row) + + print('modfications:') + print(tabulate.tabulate(table, tablefmt=FMT)) + + del req['attributes']['reqMod'] + + entry = dphysldap.Entry(req['attributes']) + table = list() + + for key, value in entry.items(): + if key in REQ_ATTRS: + row = [ ' ' * (indent - 2), ''.join([key, ':']), str(value) ] + table.append(row) + + print('req attrs:') + print(tabulate.tabulate(table, tablefmt=FMT)) + + print() + print('------------------------------------------------------------------------------------------') + + +def get_input_method(arg): + """Determine the input method""" + if arg['follow']: + if arg['file']: + tail_file(arg['file']) + else: + tail_file(log_file_deltalog) + else: + if arg['file']: + read_file(arg['file']) + else: + read_stdin() + + +def tail_file(log_file): + """Open the file in tail mode""" + fin = subprocess.Popen(['tail', '-F', log_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + poll = select.poll() + poll.register(fin.stdout) + + try: + while True: + start_time = time.perf_counter() + + if poll.poll(1): + line = fin.stdout.readline().decode('utf-8') + + parse(line) + + sleep(start_time) + + except KeyboardInterrupt: + sys.stdout.flush() + pass + + + +def read_file(log_file): + """Read the whole file""" + with open(log_file, 'r') as fin: + while True: + line = fin.readline() + + if not line: + break + + parse(line) + + +def read_stdin(): + """Read from stdin""" + fin = sys.stdin + + try: + while True: + line = fin.readline() + + if not line: + break + + parse(line) + + except KeyboardInterrupt: + sys.stdout.flush() + pass + + +def main(): + """Open file in non blocking mode and parse json""" + parser = argparse.ArgumentParser(add_help=False, description='Parse deltalog from file or stdin') + parser.add_argument( + '-f', '--follow', + dest='follow', action='store_const', const=True, + help='Output appended data as the file grows' + ) + parser.add_argument( + 'file', + nargs='?', type=str, + help='File to open' + ) + parser.add_argument( + '-h', '--help', + action='help', + help='Show this help message and exit' + ) + arg = vars(parser.parse_args()) + + get_input_method(arg) + + sys.exit(0) + + +if __name__ == '__main__': + main() + sys.exit(0) diff --git a/bin/deltalogparse.py b/bin/deltalogparse.py new file mode 100755 index 0000000000000000000000000000000000000000..9abff03bbb9096a7afebcec1401eda6a330185ee --- /dev/null +++ b/bin/deltalogparse.py @@ -0,0 +1,312 @@ +#!/usr/bin/env python3 + +import sys +import fcntl +import time +import datetime +import re +import pprint +import logging +import json +import mmap +import base64 +import lib_path +import lib +import dphysldap + +# log levels: CRITICAL | ERROR | WARNING | INFO | DEBUG | NOTSET +# default: WARNING +log_level = logging.INFO + +script_name = 'deltalogparse' +deltalog_name = 'delta' +log_path = '/var/log/ldap/' +update_every = 1 +lock_file = '/var/run/' + script_name + '.lock' +log_datefmt = '%Y-%m-%d %H:%M:%S' +log_file_log = log_path + script_name + '.log' +log_file_deltalog = log_path + deltalog_name + '.log' +search_base = 'cn=deltalog' +object_classes = '(|(objectClass=auditModify)(objectClass=auditAdd)(objectClass=auditDelete)(objectClass=auditModRDN))' +log = None +deltalog = None + +rgx_skip = re.compile( + r'^(' + r'heartbeatTimestamp|' + r'modifyTimestamp|' + r'modifiersName|' + r'lastUse.*?|' + r'(context|entry)CSN' + r'):') + +rgx_filter = re.compile( + r'^(' + r'heartbeatTimestamp|' + r'lastUse.*?|' + r'(context|entry)CSN' + r'):') + +rgx_redacted = re.compile( + r'^(' + r'krbPrincipalKey|' + r'preBlockPassword|' + r'userPassword' + r'):') + +rgx_attr_key = re.compile(r'^(?P<key>.*?)(?P<delimiter>:[=+-:]? ?)') + + +def lock(lock_file): + flock = open(lock_file, 'w') + + try: + fcntl.lockf(flock, fcntl.LOCK_EX | fcntl.LOCK_NB) + return flock + except (IOError, OSError): + print('error: another instance is running') + sys.exit(1) + + +def unlock(lock): + fcntl.flock(lock, fcntl.LOCK_UN) + + +def init_log(): + global log, deltalog + log = logging.getLogger('log') + log.setLevel(log_level) + log_fh = logging.FileHandler(log_file_log) + log_fh.setLevel(log_level) + log_formatter = logging.Formatter(fmt='{asctime} {levelname}: {message}', style='{', datefmt=log_datefmt) + log_fh.setFormatter(log_formatter) + log.addHandler(log_fh) + + deltalog = logging.getLogger('deltalog') + deltalog.setLevel(logging.INFO) + deltalog_fh = logging.FileHandler(log_file_deltalog) + deltalog_fh.setLevel(logging.INFO) + deltalog_formatter = logging.Formatter(fmt='{asctime}: {message}', style='{', datefmt=log_datefmt) + deltalog_fh.setFormatter(deltalog_formatter) + deltalog.addHandler(deltalog_fh) + + +def getlastline(fname): + """Using mmap to return a copy of the last line of a file""" + with open(fname) as source: + mapping = mmap.mmap(source.fileno(), 0, prot=mmap.PROT_READ) + return mapping[mapping.rfind(b'\n', 0, -1)+1:] + + +def is_skipped(entry): + """Check if at least on of the reqMod attributes is interesting""" + for attribute in entry['attributes']['reqMod']: + try: + if not rgx_skip.search(attribute): + log.info('interesting attribute: {}'.format(attribute)) + return False + except: + log.debug('caught exception: while checking skipped, assume interesting attribute: {}'.format(attribute)) + return False + + return True + + +def filtered(entry): + """ + Filter attributes: + - not of interest: remove + - sensitive: redact + - binary values: encode base64 + """ + if 'reqMod' in entry['attributes']: + for index, attribute in enumerate(entry['attributes']['reqMod']): + is_bytes = False + + if type(attribute) is bytes: + is_bytes = True + log.info('this is a bytes object, decoding it...') + log.debug('encoded: {}'.format(attribute)) + decoded = attribute.decode('utf-8', errors='ignore') + attribute = decoded + + try: + if rgx_filter.search(attribute): + log.info('filter attribute: {}'.format(attribute)) + entry['attributes']['reqMod'].remove(attribute) + + elif rgx_redacted.search(attribute): + match = rgx_attr_key.match(attribute) + + if match: + key = match.group('key') + delimiter = match.group('delimiter') + + if match.group('delimiter')[-1:] == ' ': + redacted = delimiter.join([key, '[REDACTED]']) + log.info('redact attribute: {}'.format(redacted)) + else: + redacted = ''.join([key, delimiter]) + log.info('redact attribute (nothing to redact): {}'.format(redacted)) + + entry['attributes']['reqMod'][index] = redacted + + else: + log.error('error: matching rgx_attr_key on redacted attribute') + unknown_redacted = 'unknownAttribute: [REDACTED]' + log.error('using: {}'.format(unknown_redacted)) + entry['attributes']['reqMod'][index] = unknown_redacted + + elif is_bytes: + match = rgx_attr_key.match(attribute) + + if match: + key_delimit = ''.join([match.group('key'), match.group('delimiter')]) + key_delimit_bytes = key_delimit.encode('utf-8') + log.debug('encoded key_delimit_bytes: {}'.format(key_delimit_bytes)) + value_bytes = entry['attributes']['reqMod'][index][len(key_delimit_bytes):] + + if value_bytes: + log.debug('value_bytes: {}'.format(value_bytes)) + value_base64 = base64.b64encode(value_bytes).decode('utf-8') + log.debug('value_base64: {}'.format(value_base64)) + half_encoded_base64 = ''.join([key_delimit, value_base64]) + log.info('bytes attribute: {}'.format(half_encoded_base64)) + entry['attributes']['reqMod'][index] = half_encoded_base64 + + else: + log.warning('warning: something strange has happened, this should not happen (maybe)') + encoded_base64 = base64.b64encode(entry['attributes']['reqMod'][index]).decode('utf-8') + log.error('bytes attribute, using base64 encoding: {}'.format(encoded_base64)) + entry['attributes']['reqMod'][index] = encoded_base64 + + else: + log.error('error: matching rgx_attr_key on bytes attribute') + encoded_base64 = base64.b64encode(entry['attributes']['reqMod'][index]).decode('utf-8') + log.error('bytes attribute, using full base64 encoding: {}'.format(encoded_base64)) + entry['attributes']['reqMod'][index] = encoded_base64 + + except: + log.exception('error: caught exception: while filtering attribute: {}'.format(attribute)) + + del entry['raw_dn'] + del entry['raw_attributes'] + + return entry + + +def log_action(action, entry): + """Log an action on a req""" + req_type = entry['attributes']['reqType'][0] + req_dn = entry['attributes']['reqDN'][0] + if not action == 'skip': + log.debug('entry:') + log.debug(pprint.pformat(entry, indent=1)) + log.info('{}: {} {}'.format(action, req_type, req_dn)) + else: + log.debug('{}: {} {}'.format(action, req_type, req_dn)) + + +def write(entry): + """Write a req to file""" + req_type = entry['attributes']['reqType'][0] + req_dn = entry['attributes']['reqDN'][0] + + entry['attributes'] = dict(entry['attributes']) + + # debug + entry['raw_attributes'] = dict(entry['raw_attributes']) + log.debug('original entry:') + log.debug(pprint.pformat(entry, indent=1)) + + filtered_entry = filtered(entry) + + log_action('write', filtered_entry) + + try: + deltalog.info('{} {} >>> {}'.format(req_type, req_dn, json.dumps(filtered_entry))) + except: + log.exception('error: writing entry: {} >>> {}: {}'.format(entry['dn'], req_type, req_dn)) + + +def sleep(start_time): + """After hard work, take a nap for the rest of the second""" + current_time = time.perf_counter() + elapsed_time = current_time - start_time + sleep_time = update_every - elapsed_time + + log.debug('runtime {0:.3f}s, sleeping {1:.3f}s'.format(elapsed_time, sleep_time)) + + if sleep_time > 0: + time.sleep(sleep_time) + + +def main(): + """Connect to slapd socket, search accesslog, write interesting changes""" + flock = lock(lock_file) + + init_log() + + slapd = dphysldap.Slapd() + try: + slapd.connect() + except: + log.exception('error: failed to connect! trying to reconnect...') + if slapd.reconnect(): + log.error('connected to slapd') + + try: + log.error('trying to read last req from {}'.format(log_file_deltalog)) + last_line = getlastline(log_file_deltalog).decode('utf-8') + #req_json = last_line.split(': ', maxsplit=1)[1] + req_json = last_line.split(' >>> ', maxsplit=1)[1] + log.debug('last line: {}'.format(req_json.rstrip())) + req = json.loads(req_json) + log.debug('last line pretty:') + log.debug(pprint.pformat(req, indent=1)) + req_start = req['attributes']['reqStart'][0] + log.error('continuing from last logged req: {}'.format(req_start)) + except: + log.exception('error: trying to retrieve last req') + req_start = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S.%fZ') + log.error('using current timestamp: {}'.format(req_start)) + + while True: + start_time = time.perf_counter() + + search_filter=''.join(['(&', object_classes, '(reqStart>=', req_start, ')',')']) + + try: + response = slapd.search(search_base, search_filter) + except: + log.exception('error: reading socket! trying to reconnect...') + if slapd.reconnect(): + log.error('socket reconnected.') + continue + + if response: + for entry in slapd.response(): + if entry['attributes']['reqStart'][0] == req_start: + continue + + req_start = entry['attributes']['reqStart'][0] + req_type = entry['attributes']['reqType'][0] + req_dn = entry['attributes']['reqDN'][0] + + log.debug('processing: {}'.format(entry['dn'])) + + if req_type == 'modify': + if is_skipped(entry): + log_action('skip', entry) + continue + + write(entry) + + sleep(start_time) + + unlock(flock) + + +if __name__ == '__main__': + main() + sys.exit(0) diff --git a/lib/isg/dphysldap.py b/lib/isg/dphysldap.py index 171b160d8486b3332815cdcc5f13b965b58f9fc3..9c2c5485d29865f7a951e366c095c54469d5103c 100644 --- a/lib/isg/dphysldap.py +++ b/lib/isg/dphysldap.py @@ -2,6 +2,7 @@ import ssl import collections +import time import lib_path import lib import ldap3 @@ -11,6 +12,10 @@ FMT = 'simple' SERVERS = ['phd-aa1.ethz.ch', 'phd-aa2.ethz.ch', 'phd-aa3.ethz.ch'] BASE = 'dc=phys,dc=ethz,dc=ch' CA_CERTS = '/etc/ssl/certs/ca-certificates.crt' +SLAPD_SOCKET = 'ldapi:///var/run/slapd/ldapi' +SLAPD_INFO = [ldap3.ALL, ldap3.OFFLINE_SLAPD_2_4] +SLAPD_SEARCH_SCOPE = ldap3.SUBTREE +SLAPD_SEARCH_ATTRS = [ldap3.ALL_ATTRIBUTES, ldap3.ALL_OPERATIONAL_ATTRIBUTES] class AttributeValue(list): @@ -85,6 +90,49 @@ class User(Entry): Entry.__init__(self, *args, **kwargs) +class Slapd(object): + """ + SLAPD connection to socket + """ + def __init__(self, socket=SLAPD_SOCKET, get_info=SLAPD_INFO): + self.socket = socket + self.get_info = get_info + + def connect(self): + self.server = ldap3.Server(self.socket, get_info=self.get_info) + self.connection = ldap3.Connection( + self.server, + authentication=ldap3.SASL, + sasl_mechanism=ldap3.EXTERNAL, + sasl_credentials='', + auto_bind='NONE', + version=3, + client_strategy='SYNC') + self.connection.bind() + + def search(self, search_base, search_filter, search_scope=SLAPD_SEARCH_SCOPE, + attributes=SLAPD_SEARCH_ATTRS): + response = self.connection.search(search_base, search_filter, + search_scope=search_scope, + attributes=attributes) + return response + + def response(self): + return self.connection.response + + def reconnect(self, interval=1, retries=0): + forever = True if not retries else False + + while forever or retries > 0: + try: + self.connect() + return True + except ldap3.core.exceptions.LDAPSocketOpenError: + retries -= 1 + time.sleep(interval) + + return False + class Ldap(object): """ LDAP connection to random server in pool