diff --git a/rt.py b/rt.py index d793040858ad7e0564906273079661385881122f..f0e799e0b01196388b05332b172cfbb4555d57d6 100755 --- a/rt.py +++ b/rt.py @@ -27,6 +27,11 @@ class RT(Plugin): regex_properties = re.compile(r'([a-zA-z]+): (.+)') regex_history = re.compile(r'([0-9]+): (.+)') regex_entry = re.compile(r'([a-zA-z]+): (.+(?:\n {8}.*)*)', re.MULTILINE) + interesting = [ + 'Ticket created', + 'Correspondence added', + 'Comments added', + ] async def start(self) -> None: self.on_external_config_update() @@ -36,8 +41,8 @@ class RT(Plugin): self.prefix = self.config['prefix'] self.whitelist = set(self.config['whitelist']) self.url = self.config['url'] - self.api = '{}/REST/1.0/'.format(self.url) - self.display = '{}/Ticket/Display.html'.format(self.url) + self.rest = f'{self.url}/REST/1.0/' + self.display = f'{self.url}/Ticket/Display.html' self.login = {'user': self.config['user'], 'pass': self.config['pass']} self.filter_properties = set(self.config['filter_properties']) self.filter_entry = set(self.config['filter_entry']) @@ -46,81 +51,74 @@ class RT(Plugin): def get_config_class(cls) -> Type[BaseProxyConfig]: return Config + def is_valid_number(self, number: str) -> bool: + return True if self.regex_number.match(number) else False + + def filter_dict(self, raw: dict, keys: Set) -> dict: + return {k: v for k, v in raw.items() if k in keys} + + def markdown_link(self, number: str) -> str: + return f'[rt#{number}]({self.display}?id={number})' + + def html_link(self, number: str) -> str: + return f'<a href="{self.display}?id={number}">rt#{number}</a>' + async def can_manage(self, evt: MessageEvent) -> bool: if evt.sender in self.whitelist: return True return False - async def get_username(self, evt: MessageEvent) -> str: + async def _username(self, evt: MessageEvent) -> str: return evt.sender[1:].split(':')[0] - async def get_member_names(self, room_id: RoomID) -> Dict[UserID, str]: + async def _member_mxids(self, room_id: RoomID) -> Dict[UserID, str]: room_members = await self.client.get_joined_members(room_id) - member_names = {} + member_mxids = {} for mxid in room_members.keys(): - displayname = await self._get_displayname(room_id, mxid) - member_names[displayname] = mxid - return member_names - - def is_valid_number(self, number: str) -> bool: - if self.regex_number.match(number): - return True - return False - - def filter_dict(self, raw: dict, keys: Set) -> dict: - return {k: v for k, v in raw.items() if k in keys} - - async def get_markdown_link(self, number: str) -> str: - link = '{}/Ticket/Display.html?id={}'.format(self.config['url'], number) - markdown = '[rt#{}]({})'.format(number, link) - return markdown - - async def get_html_link(self, number: str) -> str: - link = '{}/Ticket/Display.html?id={}'.format(self.config['url'], number) - html = '<a href="{}">rt#{}</a>'.format(link, number) - return html + displayname = await self._displayname(room_id, mxid) + member_mxids[displayname] = mxid + return member_mxids - async def _get_displayname(self, room_id: RoomID, user_id: UserID) -> str: + async def _displayname(self, room_id: RoomID, user_id: UserID) -> str: event = await self.client.get_state_event(room_id, EventType.ROOM_MEMBER, user_id) return event.displayname async def _properties(self, number: str) -> dict: - await self.http.post(self.api, data=self.login, headers=self.headers) - api_show = '{}ticket/{}/show'.format(self.api, number) - async with self.http.get(api_show, headers=self.headers) as response: + await self.http.post(self.rest, data=self.login, headers=self.headers) + rest = f'{self.rest}ticket/{number}/show' + async with self.http.get(rest, headers=self.headers) as response: content = await response.text() raw = dict(self.regex_properties.findall(content)) return self.filter_dict(raw, self.filter_properties) async def _edit(self, number: str, properties: dict) -> None: - api_edit = '{}ticket/{}/edit'.format(self.api, number) - content = {'content': '\n'.join(['{}: {}'.format(k, v) for k, v in properties.items()])} + rest = f'{self.rest}ticket/{number}/edit' + content = {'content': '\n'.join([f'{k}: {v}' for k, v in properties.items()])} data = {**self.login, **content} - await self.http.post(api_edit, data=data, headers=self.headers) + await self.http.post(rest, data=data, headers=self.headers) async def _comment(self, number: str, comment: str) -> None: - api_comment = '{}ticket/{}/comment'.format(self.api, number) - content = {'content': 'id: {}\nAction: comment\nText: {}'.format(number, comment)} + rest = f'{self.rest}ticket/{number}/comment' + content = {'content': f'id: {number}\nAction: comment\nText: {comment}'} data = {**self.login, **content} - await self.http.post(api_comment, data=data, headers=self.headers) + await self.http.post(rest, data=data, headers=self.headers) async def _history(self, number: str) -> dict: - await self.http.post(self.api, data=self.login, headers=self.headers) - api_history = '{}ticket/{}/history'.format(self.api, number) - async with self.http.get(api_history, headers=self.headers) as response: + await self.http.post(self.rest, data=self.login, headers=self.headers) + rest = f'{self.rest}ticket/{number}/history' + async with self.http.get(rest, headers=self.headers) as response: content = await response.text() - ticket = dict(self.regex_history.findall(content)) - return ticket + return dict(self.regex_history.findall(content)) async def _entry(self, number: str, entry: str) -> dict: - await self.http.post(self.api, data=self.login, headers=self.headers) - api_entry = '{}ticket/{}/history/id/{}'.format(self.api, number, entry) - async with self.http.get(api_entry, headers=self.headers) as response: + await self.http.post(self.rest, data=self.login, headers=self.headers) + rest = f'{self.rest}ticket/{number}/history/id/{entry}' + async with self.http.get(rest, headers=self.headers) as response: content = await response.text() raw = dict(self.regex_entry.findall(content)) entry = self.filter_dict(raw, self.filter_entry) if 'Content' in entry and '\n' in entry['Content']: - block = ' \n```\n' + entry['Content'].replace('\n ', '\n').rstrip() + '\n```' + block = ' \n```\n' + entry['Content'].replace('\n' + ' ' * 9, '\n').rstrip() + '\n```' entry['Content'] = block return entry @@ -128,16 +126,15 @@ class RT(Plugin): async def handler(self, evt: MessageEvent, subs: List[Tuple[str, str]]) -> None: await evt.mark_read() msg_lines = [] - await self.http.post(self.api, data=self.login, headers=self.headers) + await self.http.post(self.rest, data=self.login, headers=self.headers) for sub in subs: number = sub[4] - api_show = '{}ticket/{}/show'.format(self.api, number) - async with self.http.get(api_show, headers=self.headers) as response: + rest = f'{self.rest}ticket/{number}/show' + async with self.http.get(rest, headers=self.headers) as response: content = await response.text() ticket = dict(self.regex_properties.findall(content)) - markdown_link = await self.get_markdown_link(number) markdown = '{} is **{}** in **{}** from {} \n{}'.format( - markdown_link, + self.markdown_link(number), ticket['Status'], ticket['Queue'], ticket['Creator'], @@ -146,7 +143,7 @@ class RT(Plugin): msg_lines.append(markdown) if msg_lines: - await evt.respond('\n'.join(msg_lines)) + await evt.respond(' \n'.join(msg_lines)) @command.new(name=lambda self: self.prefix, help='Manage RT tickets', require_subcommand=True) @@ -160,10 +157,8 @@ class RT(Plugin): return await evt.mark_read() properties_dict = await self._properties(number) - properties_list = ['{}: {}'.format(k, v) for k, v in properties_dict.items()] - markdown_link = await self.get_markdown_link(number) - markdown = '{} properties: \n{}'.format(markdown_link, ' \n'.join(properties_list)) - await evt.respond(markdown) + properties = ' \n'.join([f'{k}: {v}' for k, v in properties_dict.items()]) + await evt.respond(f'{self.markdown_link(number)} properties: \n{properties}') @rt.subcommand('resolve', aliases=('r', 'res'), help='Mark the ticket as resolved.') @command.argument('number', 'ticket number', parser=str) @@ -172,8 +167,7 @@ class RT(Plugin): return await evt.mark_read() await self._edit(number, {'Status': 'resolved'}) - markdown_link = await self.get_markdown_link(number) - await evt.respond('{} resolved 😃'.format(markdown_link)) + await evt.respond(f'{self.markdown_link(number)} resolved 😃') @rt.subcommand('open', aliases=('o', 'op'), help='Mark the ticket as open.') @command.argument('number', 'ticket number', parser=str) @@ -182,8 +176,7 @@ class RT(Plugin): return await evt.mark_read() await self._edit(number, {'Status': 'open'}) - markdown_link = await self.get_markdown_link(number) - await evt.respond('{} opened ðŸ˜ï¸'.format(markdown_link)) + await evt.respond(f'{self.markdown_link(number)} opened ðŸ˜ï¸') @rt.subcommand('stall', aliases=('st', 'sta'), help='Mark the ticket as stalled.') @command.argument('number', 'ticket number', parser=str) @@ -192,8 +185,7 @@ class RT(Plugin): return await evt.mark_read() await self._edit(number, {'Status': 'stalled'}) - markdown_link = await self.get_markdown_link(number) - await evt.respond('{} stalled 😴'.format(markdown_link)) + await evt.respond(f'{self.markdown_link(number)} stalled 😴') @rt.subcommand('delete', aliases=('d', 'del'), help='Mark the ticket as deleted.') @command.argument('number', 'ticket number', parser=str) @@ -202,10 +194,9 @@ class RT(Plugin): return await evt.mark_read() await self._edit(number, {'Status': 'deleted'}) - markdown_link = await self.get_markdown_link(number) - await evt.respond('{} deleted 🤬'.format(markdown_link)) + await evt.respond(f'{self.markdown_link(number)} deleted 🤬') - @rt.subcommand('autoresolve', help='Enable automatic ticket resolve mode.') + @rt.subcommand('autoresolve', help='Ask the bot to automatically answer and resolve tickets.') async def autoresolve(self, evt: MessageEvent) -> None: if not await self.can_manage(evt): return @@ -220,8 +211,7 @@ class RT(Plugin): return await evt.mark_read() await self._comment(number, comment) - markdown_link = await self.get_markdown_link(number) - await evt.respond('{} comment added 🤓'.format(markdown_link)) + await evt.respond(f'{self.markdown_link(number)} comment added 🤓') @rt.subcommand('history', aliases=('h', 'hist'), help='Get a list of all history entries.') @command.argument('number', 'ticket number', parser=str) @@ -230,40 +220,32 @@ class RT(Plugin): return await evt.mark_read() history_dict = await self._history(number) - history_list = ['{}: {}'.format(k, v) for k, v in history_dict.items()] - markdown_link = await self.get_markdown_link(number) - markdown = '{} history entries: \n{}'.format(markdown_link, ' \n'.join(history_list)) - await evt.respond(markdown) + history = ' \n'.join([f'{k}: {v}' for k, v in history_dict.items()]) + await evt.respond(f'{self.markdown_link(number)} history entries: \n{history}') @rt.subcommand('entry', aliases=('e', 'ent'), help='Gets a single history entry.') @command.argument('number', 'ticket number', parser=str) - @command.argument('entry', 'history entry number', parser=str) - async def entry(self, evt: MessageEvent, number: str, entry: str) -> None: + @command.argument('entryid', 'history entry number', parser=str) + async def entry(self, evt: MessageEvent, number: str, entryid: str) -> None: if not await self.can_manage(evt) or not self.is_valid_number(number): return await evt.mark_read() - entry_dict = await self._entry(number, entry) - entry_list = ['{}: {}'.format(k, v) for k, v in entry_dict.items()] - markdown_link = await self.get_markdown_link(number) - markdown = '{} history entry {}: \n{}'.format(markdown_link, entry, - ' \n'.join(entry_list)) - await evt.respond(markdown) - - @rt.subcommand('last', aliases=('l', 'la'), help='Gets the last correspondence entry.') + entry_dict = await self._entry(number, entryid) + entry = ' \n'.join([f'{k}: {v}' for k, v in entry_dict.items()]) + await evt.respond(f'{self.markdown_link(number)} history entry {entryid}: \n{entry}') + + @rt.subcommand('last', aliases=('l', 'la'), help='Gets the last entry.') @command.argument('number', 'ticket number', parser=str) async def last(self, evt: MessageEvent, number: str) -> None: if not await self.can_manage(evt) or not self.is_valid_number(number): return await evt.mark_read() - history_dict = await self._history(number) - correspondences = {k: v for k, v in history_dict.items() if 'Corr' in v or 'Tick' in v} - entry = max(correspondences, key=int) - entry_dict = await self._entry(number, entry) - entry_list = ['{}: {}'.format(k, v) for k, v in entry_dict.items()] - markdown_link = await self.get_markdown_link(number) - markdown = '{} history entry {}: \n{}'.format(markdown_link, entry, - ' \n'.join(entry_list)) - await evt.respond(markdown) + history = await self._history(number) + mails = {k: v for k, v in history.items() if any(i in v for i in self.interesting)} + entryid = max(mails, key=int) + entry_dict = await self._entry(number, entryid) + entry = ' \n'.join([f'{k}: {v}' for k, v in entry_dict.items()]) + await evt.respond(f'{self.markdown_link(number)} history entry {entryid}: \n{entry}') @rt.subcommand('show', aliases=('s', 'sh'), help='Show all information about the ticket.') @command.argument('number', 'ticket number', parser=str) @@ -272,47 +254,32 @@ class RT(Plugin): return await evt.mark_read() prop_dict = await self._properties(number) - prop_list = ['{}: {}'.format(k, v) for k, v in prop_dict.items()] - link = await self.get_markdown_link(number) - await evt.respond('{} properties: \n{}'.format(link, ' \n'.join(prop_list))) - history_dict = await self._history(number) - for entry in history_dict.keys(): - entry_dict = await self._entry(number, entry) - entry_list = ['{}: {}'.format(k, v) for k, v in entry_dict.items()] - await evt.respond('history entry {}: \n{}'.format(entry, ' \n'.join(entry_list))) - - @rt.subcommand('steal', aliases=('st', 'ste'), help='Steal the ticket.') - @command.argument('number', 'ticket number', parser=str) - async def steal(self, evt: MessageEvent, number: str) -> None: - if not await self.can_manage(evt) or not self.is_valid_number(number): - return - await evt.mark_read() - username = await self.get_username(evt) - displayname = await self._get_displayname(evt.room_id, evt.sender) - await self._edit(number, {'Owner': username}) - html_link = await self.get_html_link(number) - content = TextMessageEventContent( - msgtype=MessageType.NOTICE, format=Format.HTML, - body=f'{displayname} has stolen rt#{number} 😈', - formatted_body=f'<a href="https://matrix.to/#/{evt.sender}">{evt.sender}</a> ' - f'has stolen {html_link} 😈') - await evt.respond(content) - - @rt.subcommand('take', aliases=('t', 'ta'), help='Take the ticket.') + props = ' \n'.join([f'{k}: {v}' for k, v in prop_dict.items()]) + await evt.respond(f'{self.markdown_link(number)} properties: \n{props}') + history = await self._history(number) + for entryid, entry_text in history.items(): + if any(i in entry_text for i in self.interesting + ['Requestor']): + if 'Requestor' in entry_text: + await evt.respond(f'history entry {entryid}: {entry_text}') + continue + entry_dict = await self._entry(number, entryid) + entry = ' \n'.join([f'{k}: {v}' for k, v in entry_dict.items()]) + await evt.respond(f'history entry {entryid}: \n{entry}') + + @rt.subcommand('take', aliases=('t', 'ta', 'steal'), help='Take or steal the ticket.') @command.argument('number', 'ticket number', parser=str) async def take(self, evt: MessageEvent, number: str) -> None: if not await self.can_manage(evt) or not self.is_valid_number(number): return await evt.mark_read() - username = await self.get_username(evt) - displayname = await self._get_displayname(evt.room_id, evt.sender) + username = await self._username(evt) + displayname = await self._displayname(evt.room_id, evt.sender) await self._edit(number, {'Owner': username}) - html_link = await self.get_html_link(number) content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=f'{displayname} took rt#{number} ðŸ‘ï¸', formatted_body=f'<a href="https://matrix.to/#/{evt.sender}">{evt.sender}</a> ' - f'took {html_link} ðŸ‘ï¸') + f'took {self.html_link(number)} ðŸ‘ï¸') await evt.respond(content) @rt.subcommand('give', aliases=('g', 'gi', 'assign'), help='Give the ticket to somebody.') @@ -322,18 +289,23 @@ class RT(Plugin): if not await self.can_manage(evt) or not self.is_valid_number(number): return await evt.mark_read() - members = await self.get_member_names(evt.room_id) - if user not in members.keys(): + member_mxids = await self._member_mxids(evt.room_id) + if user[0] == '@': + if ':' in user: + user = {v: k for k, v in member_mxids.items()}[user] + else: + user = user[1:] + if user not in member_mxids.keys() and user not in member_mxids.values(): await evt.respond(f'hmm... **{user}** is not the in room 🤔') return - displayname = await self._get_displayname(evt.room_id, evt.sender) - target_mxid = members[user] + displayname = await self._displayname(evt.room_id, evt.sender) + target_mxid = member_mxids[user] target_username = target_mxid[1:].split(':')[0] await self._edit(number, {'Owner': target_username}) - html_link = await self.get_html_link(number) content = TextMessageEventContent( msgtype=MessageType.NOTICE, format=Format.HTML, body=f'{displayname} assigned rt#{number} to {user} 😜', - formatted_body=f'<a href="https://matrix.to/#/{evt.sender}">{evt.sender}</a> assigned ' - f'{html_link} to <a href="https://matrix.to/#/{target_mxid}">{target_mxid}</a> 😜') + formatted_body=f'<a href="https://matrix.to/#/{evt.sender}">{evt.sender}</a> ' + f'assigned {self.html_link(number)} to ' + f'<a href="https://matrix.to/#/{target_mxid}">{target_mxid}</a> 😜') await evt.respond(content)