From 715432cb283a80e6ab60ae021e23b1c5c550bd7e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Sven=20M=C3=A4der?= <maeder@phys.ethz.ch>
Date: Wed, 11 Apr 2018 13:24:18 +0200
Subject: [PATCH] Add deltalog

---
 bin/deltalogparse.py | 137 ++++++++++++++++++++++++++++++-------------
 lib/isg/dphysldap.py |   8 +--
 2 files changed, 99 insertions(+), 46 deletions(-)

diff --git a/bin/deltalogparse.py b/bin/deltalogparse.py
index d177d97..52a303a 100755
--- a/bin/deltalogparse.py
+++ b/bin/deltalogparse.py
@@ -6,36 +6,37 @@ import datetime
 import re
 import pprint
 import logging
+import json
+import mmap
 import lib_path
 import lib
 import dphysldap
 
-process_name = 'deltalogparse'
-debug = 2
-loglevel = logging.DEBUG
 update_every = 1
-log_path = '/var/log/ldap/delta.log'
+log_level = logging.DEBUG
+log_datefmt = '%Y-%m-%d %H:%M:%S'
+log_path = '/var/log/ldap/'
+log_file_log = log_path + 'deltalogparse.log'
+log_file_deltalog = log_path + 'delta.log'
 search_base = 'cn=deltalog'
 object_classes = '(|(objectClass=auditModify)(objectClass=auditAdd)(objectClass=auditDelete))'
 
-#logging.basicConfig(
-#    filename='/var/log/ldap/deltalogparse.log',
-#    level=loglevel,
-#    format='%(asctime)s ' + process_name + '[%(process)d]:%(levelname)s: %(message)s',
-#    datefmt='%Y-%m-%d %H:%M:%S'
-#)
-
-log = logging.getLogger('deltalogparse')
-log.setLevel(loglevel)
-log_fh = logging.FileHandler('/var/log/ldap/deltalogparse.log')
-log_fh.setLevel(loglevel)
-log_formatter = logging.Formatter(
-    fmt='%(asctime)s ' + process_name + '[%(process)d]:%(levelname)s: %(message)s',
-    datefmt='%Y-%m-%d %H:%M:%S'
-)
+log = logging.getLogger('log')
+log.setLevel(log_level)
+log_fh = logging.FileHandler(log_file_log)
+log_fh.setLevel(log_level)
+log_formatter = logging.Formatter(fmt='{asctime} {levelname}: {message}', style='{', datefmt=log_datefmt)
 log_fh.setFormatter(log_formatter)
 log.addHandler(log_fh)
 
+deltalog = logging.getLogger('deltalog')
+deltalog.setLevel(logging.INFO)
+deltalog_fh = logging.FileHandler(log_file_deltalog)
+deltalog_fh.setLevel(logging.INFO)
+deltalog_formatter = logging.Formatter(fmt='{asctime}: {message}', style='{', datefmt=log_datefmt)
+deltalog_fh.setFormatter(deltalog_formatter)
+deltalog.addHandler(deltalog_fh)
+
 rgx_skip = re.compile(
     r'^('
     r'heartbeatTimestamp|'
@@ -43,55 +44,108 @@ rgx_skip = re.compile(
     r'modifiersName|'
     r'lastUse.*?|'
     r'(context|entry)CSN'
-    r'):'
-)
+    r'):')
 
 rgx_filter = re.compile(
     r'^('
     r'heartbeatTimestamp|'
     r'lastUse.*?|'
     r'(context|entry)CSN'
-    r'):'
-)
+    r'):')
+
 
+def getlastline(fname):
+    """Using mmap to return a copy of the last line of a file"""
+    with open(fname) as source:
+        mapping = mmap.mmap(source.fileno(), 0, prot=mmap.PROT_READ)
+    return mapping[mapping.rfind(b'\n', 0, -1)+1:]
 
-def is_skipped(attributes):
-    for attribute in attributes:
+
+def is_skipped(entry):
+    """Check if at least on of the reqMod attributes is interesting"""
+    for attribute in entry['attributes']['reqMod']:
         if not rgx_skip.search(attribute):
-            logging.info('hmm... interesting attribute: %s' % attribute)
+            log.info('interesting attribute: {}'.format(attribute))
             return False
 
     return True
 
 
-def deltalog(entry):
+def filtered(entry):
+    """Filter out attributes not of interest"""
+    for attribute in entry['attributes']['reqMod']:
+        if rgx_filter.search(attribute):
+            log.info('filter attribute: {}'.format(attribute))
+            entry['attributes']['reqMod'].remove(attribute)
+
+    del entry['raw_dn']
+    del entry['raw_attributes']
+
+    return entry
+
+
+def log_action(action, entry):
+    """Log an action on a req"""
     req_type = entry['attributes']['reqType'][0]
     req_dn = entry['attributes']['reqDN'][0]
+    log.debug('entry:')
+    log.debug(pprint.pformat(entry, indent=1))
+    log.info('{}: {} {}'.format(action, req_type, req_dn))
 
-    attributes = dict(entry['attributes'])
-    entry['attributes'] = attributes
-    del entry['raw_attributes']
 
-    log.debug(pprint.pformat(entry, indent=4))
-    log.info('log: %s %s' % (req_type, req_dn))
+def write(entry):
+    """Write a req to file"""
+    req_type = entry['attributes']['reqType'][0]
+    req_dn = entry['attributes']['reqDN'][0]
+
+    entry['attributes'] = dict(entry['attributes'])
+
+    # debug
+    entry['raw_attributes'] = dict(entry['raw_attributes'])
+    log.debug('original entry:')
+    log.debug(pprint.pformat(entry, indent=1))
+
+    filtered_entry = filtered(entry)
+
+    log_action('write', filtered_entry)
+    deltalog.info('{}'.format(json.dumps(filtered_entry)))
 
 
 def sleep(start_time):
+    """After hard work, take a nap for the rest of the second"""
     current_time = time.perf_counter()
     elapsed_time = current_time - start_time
     sleep_time = update_every - elapsed_time
 
-    log.info('runtime %0.3fs, sleeping %0.3fs' % (elapsed_time, sleep_time))
+    log.info('runtime {0:.3f}s, sleeping {1:.3f}s'.format(elapsed_time, sleep_time))
 
     if sleep_time > 0:
         time.sleep(sleep_time)
 
 
 def main():
-    """Connect to slapd socket and parse accesslog"""
+    """Connect to slapd socket, search accesslog, write interesting changes"""
     slapd = dphysldap.Slapd()
-
-    req_start = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S.%fZ')
+    try:
+        slapd.connect()
+    except:
+        log.exception('error: failed to connect! trying to reconnect...')
+        if slapd.reconnect():
+            log.error('connected to slapd')
+
+    try:
+        log.info('trying to read last req from {}'.format(log_file_deltalog))
+        last_line = getlastline(log_file_deltalog).decode('utf-8')
+        req_json = last_line.split(': ', maxsplit=1)[1]
+        log.debug('last line: {}'.format(req_json))
+        req = json.loads(req_json)
+        log.debug(pprint.pformat(req, indent=1))
+        req_start = req['attributes']['reqStart'][0]
+        log.info('continuing from last logged req: {}'.format(req_start))
+    except:
+        log.exception('error: while trying to retrieve last req')
+        req_start = datetime.datetime.utcnow().strftime('%Y%m%d%H%M%S.%fZ')
+        log.info('using current timestamp: {}'.format(req_start))
 
     while True:
         start_time = time.perf_counter()
@@ -101,7 +155,7 @@ def main():
         try:
             response = slapd.search(search_base, search_filter)
         except:
-            log.error('socket error!')
+            log.exception('error: reading socket! trying to reconnect...')
             if slapd.reconnect():
                 log.error('socket reconnected.')
 
@@ -114,15 +168,14 @@ def main():
                 req_type = entry['attributes']['reqType'][0]
                 req_dn = entry['attributes']['reqDN'][0]
 
-                log.info(' '.join(['processing:', req_start, entry['dn']]))
+                log.info('processing: {}'.format(entry['dn']))
 
                 if req_type == 'modify':
-                    req_mods = entry['attributes']['reqMod']
-                    if is_skipped(req_mods):
-                        log.info('skip: %s %s' % (req_type, req_dn))
+                    if is_skipped(entry):
+                        log_action('skip', entry)
                         continue
 
-                deltalog(entry)
+                write(entry)
 
         sleep(start_time)
 
diff --git a/lib/isg/dphysldap.py b/lib/isg/dphysldap.py
index 123a5b9..9c2c548 100644
--- a/lib/isg/dphysldap.py
+++ b/lib/isg/dphysldap.py
@@ -95,10 +95,11 @@ class Slapd(object):
     SLAPD connection to socket
     """
     def __init__(self, socket=SLAPD_SOCKET, get_info=SLAPD_INFO):
-        self.server = ldap3.Server(socket, get_info=get_info)
-        self.connect()
+        self.socket = socket
+        self.get_info = get_info
 
     def connect(self):
+        self.server = ldap3.Server(self.socket, get_info=self.get_info)
         self.connection = ldap3.Connection(
             self.server,
             authentication=ldap3.SASL,
@@ -106,8 +107,7 @@ class Slapd(object):
             sasl_credentials='',
             auto_bind='NONE',
             version=3,
-            client_strategy='SYNC'
-        )
+            client_strategy='SYNC')
         self.connection.bind()
 
     def search(self, search_base, search_filter, search_scope=SLAPD_SEARCH_SCOPE,
-- 
GitLab