diff --git a/bin/deltalogparse.py b/bin/deltalogparse.py index d177d9736ca25263079b636760f77a78e8ee4ab5..52a303ae8bfec88dbf698e3686833ee90dcbc4bc 100755 --- a/bin/deltalogparse.py +++ b/bin/deltalogparse.py @@ -6,36 +6,37 @@ import datetime import re import pprint import logging +import json +import mmap import lib_path import lib import dphysldap -process_name = 'deltalogparse' -debug = 2 -loglevel = logging.DEBUG update_every = 1 -log_path = '/var/log/ldap/delta.log' +log_level = logging.DEBUG +log_datefmt = '%Y-%m-%d %H:%M:%S' +log_path = '/var/log/ldap/' +log_file_log = log_path + 'deltalogparse.log' +log_file_deltalog = log_path + 'delta.log' search_base = 'cn=deltalog' object_classes = '(|(objectClass=auditModify)(objectClass=auditAdd)(objectClass=auditDelete))' -#logging.basicConfig( -# filename='/var/log/ldap/deltalogparse.log', -# level=loglevel, -# format='%(asctime)s ' + process_name + '[%(process)d]:%(levelname)s: %(message)s', -# datefmt='%Y-%m-%d %H:%M:%S' -#) - -log = logging.getLogger('deltalogparse') -log.setLevel(loglevel) -log_fh = logging.FileHandler('/var/log/ldap/deltalogparse.log') -log_fh.setLevel(loglevel) -log_formatter = logging.Formatter( - fmt='%(asctime)s ' + process_name + '[%(process)d]:%(levelname)s: %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' -) +log = logging.getLogger('log') +log.setLevel(log_level) +log_fh = logging.FileHandler(log_file_log) +log_fh.setLevel(log_level) +log_formatter = logging.Formatter(fmt='{asctime} {levelname}: {message}', style='{', datefmt=log_datefmt) log_fh.setFormatter(log_formatter) log.addHandler(log_fh) +deltalog = logging.getLogger('deltalog') +deltalog.setLevel(logging.INFO) +deltalog_fh = logging.FileHandler(log_file_deltalog) +deltalog_fh.setLevel(logging.INFO) +deltalog_formatter = logging.Formatter(fmt='{asctime}: {message}', style='{', datefmt=log_datefmt) +deltalog_fh.setFormatter(deltalog_formatter) +deltalog.addHandler(deltalog_fh) + rgx_skip = re.compile( r'^(' r'heartbeatTimestamp|' @@ -43,55 +44,108 @@ rgx_skip = re.compile( r'modifiersName|' r'lastUse.*?|' r'(context|entry)CSN' - r'):' -) + r'):') rgx_filter = re.compile( r'^(' r'heartbeatTimestamp|' r'lastUse.*?|' r'(context|entry)CSN' - r'):' -) + r'):') + +def getlastline(fname): + """Using mmap to return a copy of the last line of a file""" + with open(fname) as source: + mapping = mmap.mmap(source.fileno(), 0, prot=mmap.PROT_READ) + return mapping[mapping.rfind(b'\n', 0, -1)+1:] -def is_skipped(attributes): - for attribute in attributes: + +def is_skipped(entry): + """Check if at least on of the reqMod attributes is interesting""" + for attribute in entry['attributes']['reqMod']: if not rgx_skip.search(attribute): - logging.info('hmm... interesting attribute: %s' % attribute) + log.info('interesting attribute: {}'.format(attribute)) return False return True -def deltalog(entry): +def filtered(entry): + """Filter out attributes not of interest""" + for attribute in entry['attributes']['reqMod']: + if rgx_filter.search(attribute): + log.info('filter attribute: {}'.format(attribute)) + entry['attributes']['reqMod'].remove(attribute) + + del entry['raw_dn'] + del entry['raw_attributes'] + + return entry + + +def log_action(action, entry): + """Log an action on a req""" req_type = entry['attributes']['reqType'][0] req_dn = entry['attributes']['reqDN'][0] + log.debug('entry:') + log.debug(pprint.pformat(entry, indent=1)) + log.info('{}: {} {}'.format(action, req_type, req_dn)) - attributes = dict(entry['attributes']) - entry['attributes'] = attributes - del entry['raw_attributes'] - log.debug(pprint.pformat(entry, indent=4)) - log.info('log: %s %s' % (req_type, req_dn)) +def write(entry): + """Write a req to file""" + req_type = entry['attributes']['reqType'][0] + req_dn = entry['attributes']['reqDN'][0] + + entry['attributes'] = dict(entry['attributes']) + + # debug + entry['raw_attributes'] = dict(entry['raw_attributes']) + log.debug('original entry:') + log.debug(pprint.pformat(entry, indent=1)) + + filtered_entry = filtered(entry) + + log_action('write', filtered_entry) + deltalog.info('{}'.format(json.dumps(filtered_entry))) def sleep(start_time): + """After hard work, take a nap for the rest of the second""" current_time = time.perf_counter() elapsed_time = current_time - start_time sleep_time = update_every - elapsed_time - log.info('runtime %0.3fs, sleeping %0.3fs' % (elapsed_time, sleep_time)) + log.info('runtime {0:.3f}s, sleeping {1:.3f}s'.format(elapsed_time, sleep_time)) if sleep_time > 0: time.sleep(sleep_time) def main(): - """Connect to slapd socket and parse accesslog""" + """Connect to slapd socket, search accesslog, write interesting changes""" slapd = dphysldap.Slapd() - - req_start = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S.%fZ') + try: + slapd.connect() + except: + log.exception('error: failed to connect! trying to reconnect...') + if slapd.reconnect(): + log.error('connected to slapd') + + try: + log.info('trying to read last req from {}'.format(log_file_deltalog)) + last_line = getlastline(log_file_deltalog).decode('utf-8') + req_json = last_line.split(': ', maxsplit=1)[1] + log.debug('last line: {}'.format(req_json)) + req = json.loads(req_json) + log.debug(pprint.pformat(req, indent=1)) + req_start = req['attributes']['reqStart'][0] + log.info('continuing from last logged req: {}'.format(req_start)) + except: + log.exception('error: while trying to retrieve last req') + req_start = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S.%fZ') + log.info('using current timestamp: {}'.format(req_start)) while True: start_time = time.perf_counter() @@ -101,7 +155,7 @@ def main(): try: response = slapd.search(search_base, search_filter) except: - log.error('socket error!') + log.exception('error: reading socket! trying to reconnect...') if slapd.reconnect(): log.error('socket reconnected.') @@ -114,15 +168,14 @@ def main(): req_type = entry['attributes']['reqType'][0] req_dn = entry['attributes']['reqDN'][0] - log.info(' '.join(['processing:', req_start, entry['dn']])) + log.info('processing: {}'.format(entry['dn'])) if req_type == 'modify': - req_mods = entry['attributes']['reqMod'] - if is_skipped(req_mods): - log.info('skip: %s %s' % (req_type, req_dn)) + if is_skipped(entry): + log_action('skip', entry) continue - deltalog(entry) + write(entry) sleep(start_time) diff --git a/lib/isg/dphysldap.py b/lib/isg/dphysldap.py index 123a5b988ccb25da0a8f2f6d0ff7b48ee61d04ef..9c2c5485d29865f7a951e366c095c54469d5103c 100644 --- a/lib/isg/dphysldap.py +++ b/lib/isg/dphysldap.py @@ -95,10 +95,11 @@ class Slapd(object): SLAPD connection to socket """ def __init__(self, socket=SLAPD_SOCKET, get_info=SLAPD_INFO): - self.server = ldap3.Server(socket, get_info=get_info) - self.connect() + self.socket = socket + self.get_info = get_info def connect(self): + self.server = ldap3.Server(self.socket, get_info=self.get_info) self.connection = ldap3.Connection( self.server, authentication=ldap3.SASL, @@ -106,8 +107,7 @@ class Slapd(object): sasl_credentials='', auto_bind='NONE', version=3, - client_strategy='SYNC' - ) + client_strategy='SYNC') self.connection.bind() def search(self, search_base, search_filter, search_scope=SLAPD_SEARCH_SCOPE,