Source code for analysis_engine.send_to_slack

"""
Helper for extracting details from Celery task and
sending it to a slack webhook.

Supported environment variables:

::

    # slack webhook
    export SLACK_WEBHOOK=https://hooks.slack.com/services/

"""

import os
import json
import requests
import tabulate as tb
import analysis_engine.consts as ae_consts
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)
publish_msg_error = (
    'please add a SLACK_WEBHOOK environment variable to publish messages')
message_types = {
    'success': [{"color": "good", "title": "SUCCESS"}],
    'failure': [{"color": "danger", "title": "FAILED"}],
    'message': [{"title": "MESSAGE"}]
}


[docs]def post_success(msg, jupyter=False, block=False, full_width=False): """Post a SUCCESS message to slack :param msg: A string, list, or dict to send to slack """ return get_response('success', msg, jupyter=jupyter, block=block, full_width=full_width)
[docs]def post_failure(msg, jupyter=False, block=False, full_width=False): """Post a FAILURE message to slack :param msg: A string, list, or dict to send to slack """ return get_response('failure', msg, jupyter=jupyter, block=block, full_width=full_width)
[docs]def post_message(msg, jupyter=False, block=False, full_width=False): """Post any message to slack :param msg: A string, list, or dict to send to slack """ return get_response('message', msg, jupyter=jupyter, block=block, full_width=full_width)
def get_response(msg_type, msg, jupyter=False, block=False, full_width=False): """Attempt to create and send any message type to slack :param msg_type: A string of either 'success', 'failure', or 'message' :param msg: A string, list, or dict to send to slack """ response = {'status': ae_consts.FAILED} if msg_type in message_types: if not os.getenv('SLACK_WEBHOOK', False): log.info(f'post_{msg_type} - {publish_msg_error}') return response if msg: attachments = [{"attachments": message_types[msg_type]}] fields = parse_msg(msg, block=block) if fields: if full_width: attachments.append({"text": fields[0].pop("value")}) else: attachments[0]["attachments"][0]["fields"] = fields response = post(attachments, jupyter=jupyter) return response
[docs]def parse_msg(msg, block=False): """Create an array of fields for slack from the msg type :param msg: A string, list, or dict to massage for sending to slack """ if type(msg) is str: if block: return [{"value": f"```{msg}```"}] return [{"value": msg}] elif type(msg) is list: if block: string_list = '\n'.join(f"{str(x)}" for x in msg) return [{"value": f"```{string_list}```"}] return [{"value": str(x)} for x in msg] elif type(msg) is dict: if block: string_dict = '\n'.join( f"{str(k)}: {str(v)}" for k, v in msg.items()) return [{"value": f"```{string_dict}```"}] return [{"value": f"{str(k)}: {str(v)}"} for k, v in msg.items()] return None
[docs]def post(attachments, jupyter=False): """Send created attachments to slack :param attachments: Values to post to slack """ SLACK_WEBHOOK = ae_consts.ev('SLACK_WEBHOOK', None) result = {'status': ae_consts.FAILED} if not os.getenv('SLACK_WEBHOOK', False): log.info(f'post - {publish_msg_error}') return result if attachments and SLACK_WEBHOOK: try: # if not jupyter: # log.debug(f'Attempting to post attachments={attachments} ' # 'to slack_webhook exists') for attachment in attachments: r = requests.post(SLACK_WEBHOOK, data=json.dumps(attachment)) if str(r.status_code) == "200": # log.info(( # f'''Successful post of attachment={ # attachment if not jupyter else # True if attachment else False} ''' # 'to slack_webhook')) result['status'] = ae_consts.SUCCESS else: log.error( f'''Failed to post attachment={ attachment if not jupyter else True if attachment else False} ''' f'with status_code={r.status_code}') result['status'] = ae_consts.FAILED break except Exception as e: log.error( f'''Failed to post attachments={ attachments if not jupyter else True if attachments else False} ''' f'with ex={e}') result['status'] = ae_consts.ERR result['err'] = e else: log.info( 'Skipping post to slack due to missing ' f'''attachments={ attachments if not jupyter else True if attachments else False} or SLACK_WEBHOOK ''' f'missing={False if SLACK_WEBHOOK else True}') return result
[docs]def post_df( df, columns=None, block=True, jupyter=True, full_width=True, tablefmt='github'): """post_df Post a ``pandas.DataFrame`` to Slack :param df: ``pandas.DataFrame`` object :param columns: ordered list of columns to for the table header row (``None`` by default) :param block: bool for post as a Slack-formatted block ```like this``` (``True`` by default) :param jupyter: bool for jupyter attachment handling (``True`` by default) :param full_width: bool to ensure the width is preserved the Slack message (``True`` by default) :param tablefmt: string for table format (``github`` by default). Additional format values can be found on: https://bitbucket.org/astanin/python-tabulate """ if not os.getenv('SLACK_WEBHOOK', False): log.info(f'post_df - {publish_msg_error}') return if not hasattr(df, 'index'): log.debug('post_df - no df ') return log.debug( f'post_df - df.index={len(df.index)} ' f'columns={columns} fmt={tablefmt}') msg = None if columns: msg = tb.tabulate( df[columns], headers=columns, tablefmt=tablefmt) else: msg = tb.tabulate( df, tablefmt=tablefmt) # end of if/else post_success( msg=msg, block=block, jupyter=jupyter, full_width=full_width)
# end of post_df def post_plot(plot, filename='tmp', title=None): """post_plot Post a matlibplot plot to Slack :param plot: matlibplot pyplot figure :param filename: filename of plot :param title: Title for slack plot postiing """ result = {'status': ae_consts.FAILED} if not os.getenv('SLACK_ACCESS_TOKEN', False): log.info( 'post_plot - please add a SLACK_ACCESS_TOKEN environment ' 'variable to publish plots') return result if not filename: log.info( 'post_plot - no filename provided' ) return result url = 'https://slack.com/api/files.upload' filename = f'{filename.split(".")[0]}.png' channels = [f'#{channel}' for channel in os.getenv( 'SLACK_PUBLISH_PLOT_CHANNELS', 'general').split(',')] try: log.info(f'post_plot - temporarily saving plot: {filename}') plot.savefig(filename) tmp_file = { 'file': (filename, open(filename, 'rb'), 'png') } payload = { 'channels': channels, 'filename': filename, 'title': title if title else filename, 'token': os.getenv('SLACK_ACCESS_TOKEN'), } log.info(f'post_plot - sending payload: {payload} to url: {url}') r = requests.post(url, params=payload, files=tmp_file) log.info(f'post_plot - removing temporarily saved plot: {filename}') os.remove(filename) if str(r.status_code) == "200": log.info(f'post_plot - posted plot to slack channels: {channels}') result['status'] = ae_consts.SUCCESS except Exception as e: log.info(f'post_plot - failed with Exception: {e}') result['status'] = ae_consts.ERR result['err'] = e return result # end of post_plot