Source code for analysis_engine.work_tasks.publish_ticker_aggregate_from_s3

"""
**Publish Aggregate Ticker Data from S3 Task**

Publish S3 key with aggregated stock data to redis
and s3 (if either of them are running and enabled)

- redis - using `redis-py <https://github.com/andymccurdy/redis-py>`__
- s3 - using boto3

**Sample work_dict request for this method**

`analysis_engine.api_requests.build_publish_ticker_aggregate_from_s3
_request <https://
github.com/AlgoTraders/stock-analysis-engine/blob/master/
analysis_engine/api_requests.py#L426>`__

::

    work_request = {
        'ticker': ticker,
        'ticker_id': ticker_id,
        's3_bucket': s3_bucket_name,
        's3_compiled_bucket': s3_compiled_bucket_name,
        's3_key': s3_key,
        'redis_key': redis_key,
        's3_enabled': s3_enabled,
        'redis_enabled': redis_enabled
    }

.. tip:: This task uses the `analysis_engine.work_tasks.
    custom_task.CustomTask class <https://github.com/A
    lgoTraders/stock-analysis-engine/blob/master/anal
    ysis_engine/work_tasks/custom_task.py>`__ for
    task event handling.

**Supported Environment Variables**

::

    export DEBUG_RESULTS=1

"""

import boto3
import json
import re
import redis
import zlib
import celery.task as celery_task
import analysis_engine.consts as ae_consts
import analysis_engine.build_result as build_result
import analysis_engine.get_task_results as get_task_results
import analysis_engine.work_tasks.custom_task as custom_task
import analysis_engine.set_data_in_redis_key as redis_set
import spylunking.log.setup_logging as log_utils
import analysis_engine.s3_read_contents_from_key as s3_read_contents_from_key

log = log_utils.build_colorized_logger(name=__name__)


@celery_task(
    bind=True,
    base=custom_task.CustomTask,
    queue='publish_ticker_aggregate_from_s3')
def publish_ticker_aggregate_from_s3(
        self,
        work_dict):
    """publish_ticker_aggregate_from_s3

    Publish Aggregated Ticker Data from S3 to Redis

    :param work_dict: dictionary for key/values
    """

    label = 'pub-tic-agg-s3-to-redis'

    log.info(f'task - {label} - start work_dict={work_dict}')

    ticker = ae_consts.TICKER
    ticker_id = ae_consts.TICKER_ID
    rec = {
        'ticker': None,
        'ticker_id': None,
        's3_read_enabled': True,
        's3_upload_enabled': True,
        'redis_enabled': True,
        's3_bucket': None,
        's3_compiled_bucket': None,
        's3_key': None,
        'redis_key': None,
        'updated': None
    }
    res = build_result.build_result(
        status=ae_consts.NOT_RUN,
        err=None,
        rec=rec)

    try:
        ticker = work_dict.get(
            'ticker',
            ae_consts.TICKER)
        ticker_id = int(work_dict.get(
            'ticker_id',
            ae_consts.TICKER_ID))

        if not ticker:
            res = build_result.build_result(
                status=ae_consts.ERR,
                err='missing ticker',
                rec=rec)
            return res

        label = work_dict.get(
            'label',
            label)
        s3_key = work_dict.get(
            's3_key',
            None)
        s3_bucket_name = work_dict.get(
            's3_bucket',
            'pricing')
        s3_compiled_bucket_name = work_dict.get(
            's3_compiled_bucket',
            'compileddatasets')
        redis_key = work_dict.get(
            'redis_key',
            None)
        updated = work_dict.get(
            'updated',
            None)
        enable_s3_upload = work_dict.get(
            's3_upload_enabled',
            ae_consts.ENABLED_S3_UPLOAD)
        enable_redis_publish = work_dict.get(
            'redis_enabled',
            ae_consts.ENABLED_REDIS_PUBLISH)
        serializer = work_dict.get(
            'serializer',
            'json')
        encoding = work_dict.get(
            'encoding',
            'utf-8')

        enable_s3_read = True

        rec['ticker'] = ticker
        rec['ticker_id'] = ticker_id
        rec['s3_bucket'] = s3_bucket_name
        rec['s3_compiled_bucket'] = s3_compiled_bucket_name
        rec['s3_key'] = s3_key
        rec['redis_key'] = redis_key
        rec['updated'] = updated
        rec['s3_read_enabled'] = enable_s3_read
        rec['s3_upload_enabled'] = enable_s3_upload
        rec['redis_enabled'] = enable_redis_publish

        if enable_s3_read:
            log.info(f'{label} parsing s3 values')
            access_key = work_dict.get(
                's3_access_key',
                ae_consts.S3_ACCESS_KEY)
            secret_key = work_dict.get(
                's3_secret_key',
                ae_consts.S3_SECRET_KEY)
            region_name = work_dict.get(
                's3_region_name',
                ae_consts.S3_REGION_NAME)
            service_address = work_dict.get(
                's3_address',
                ae_consts.S3_ADDRESS)
            secure = work_dict.get(
                's3_secure',
                ae_consts.S3_SECURE) == '1'

            endpoint_url = f'http{"s" if secure else ""}://{service_address}'

            log.info(
                f'{label} building s3 endpoint_url={endpoint_url} '
                f'region={region_name}')

            s3 = boto3.resource(
                's3',
                endpoint_url=endpoint_url,
                aws_access_key_id=access_key,
                aws_secret_access_key=secret_key,
                region_name=region_name,
                config=boto3.session.Config(
                    signature_version='s3v4')
            )

            try:
                log.info(f'{label} checking bucket={s3_bucket_name} exists')
                if s3.Bucket(s3_bucket_name) not in s3.buckets.all():
                    log.info(f'{label} creating bucket={s3_bucket_name}')
                    s3.create_bucket(
                        Bucket=s3_bucket_name)
            except Exception as e:
                log.info(
                    f'{label} failed creating bucket={s3_bucket_name} '
                    f'with ex={e}')
            # end of try/ex for creating bucket

            try:
                log.info(f'{label} checking bucket={s3_bucket_name} keys')
                date_keys = []
                keys = []
                # {TICKER}_YYYY-DD-MM regex
                reg = r'^.*_\d{4}-(0?[1-9]|1[012])-(0?[1-9]|[12][0-9]|3[01])$'
                for bucket in s3.buckets.all():
                    for key in bucket.objects.all():
                        if (ticker.lower() in key.key.lower() and
                                bool(re.compile(reg).search(key.key))):
                            keys.append(key.key)
                            date_keys.append(
                                key.key.split(f'{ticker}_')[1])
            except Exception as e:
                log.info(
                    f'{label} failed to get bucket={s3_bucket_name} '
                    f'keys with ex={e}')
            # end of try/ex for getting bucket keys

            if keys:
                data = []
                for idx, key in enumerate(keys):
                    try:
                        log.info(
                            f'{label} reading to s3={s3_bucket_name}/{key} '
                            f'updated={updated}')
                        loop_data = s3_read_contents_from_key.\
                            s3_read_contents_from_key(
                                s3=s3,
                                s3_bucket_name=s3_bucket_name,
                                s3_key=key,
                                encoding=encoding,
                                convert_as_json=True)

                        initial_size_value = \
                            len(str(loop_data)) / 1024000
                        initial_size_str = ae_consts.to_f(initial_size_value)
                        if ae_consts.ev('DEBUG_S3', '0') == '1':
                            log.info(
                                f'{label} read s3={s3_bucket_name}/{key} '
                                f'data={ae_consts.ppj(loop_data)}')
                        else:
                            log.info(
                                f'{label} read s3={s3_bucket_name}/{key} data '
                                f'size={initial_size_str} MB')
                        data.append({f'{date_keys[idx]}': loop_data})
                    except Exception as e:
                        err = (
                            f'{label} failed reading bucket={s3_bucket_name} '
                            f'key={key} ex={e}')
                        log.error(
                            err)
                        res = build_result.build_result(
                            status=ae_consts.NOT_RUN,
                            err=err,
                            rec=rec)
                    # end of try/ex for creating bucket
            else:
                log.info(
                    f'{label} No keys found in S3 '
                    f'bucket={s3_bucket_name} for ticker={ticker}')
        else:
            log.info(
                f'{label} SKIP S3 read bucket={s3_bucket_name} '
                f'ticker={ticker}')
        # end of if enable_s3_read

        if data and enable_s3_upload:
            try:
                log.info(
                    f'{label} checking bucket={s3_compiled_bucket_name} '
                    'exists')
                if s3.Bucket(s3_compiled_bucket_name) not in s3.buckets.all():
                    log.info(
                        f'{label} creating bucket={s3_compiled_bucket_name}')
                    s3.create_bucket(
                        Bucket=s3_compiled_bucket_name)
            except Exception as e:
                log.info(
                    f'{label} failed creating '
                    f'bucket={s3_compiled_bucket_name} with ex={e}')
            # end of try/ex for creating bucket

            try:
                cmpr_data = zlib.compress(json.dumps(data).encode(encoding), 9)

                if ae_consts.ev('DEBUG_S3', '0') == '1':
                    log.info(
                        f'{label} uploading to '
                        f's3={s3_compiled_bucket_name}/{s3_key} '
                        f'data={ae_consts.ppj(loop_data)} updated={updated}')
                else:
                    sizes = {'MB': 1024000,
                             'GB': 1024000000,
                             'TB': 1024000000000,
                             'PB': 1024000000000000}
                    initial_size_value = len(str(data))
                    org_data_size = 'MB'
                    for key in sizes.keys():
                        size = float(initial_size_value) / float(sizes[key])
                        if size > 1024:
                            continue
                        org_data_size = key
                        initial_size_value = size
                        break
                    initial_size_str = ae_consts.to_f(initial_size_value)

                    cmpr_data_size_value = len(cmpr_data)
                    cmpr_data_size = 'MB'
                    for key in sizes.keys():
                        size = float(cmpr_data_size_value) / float(sizes[key])
                        if size > 1024:
                            continue
                        cmpr_data_size = key
                        cmpr_data_size_value = size
                        break
                    cmpr_size_str = ae_consts.to_f(cmpr_data_size_value)
                    log.info(
                        f'{label} uploading to '
                        f's3={s3_compiled_bucket_name}/{s3_key} data '
                        f'original_size={initial_size_str} {org_data_size} '
                        f'compressed_size={cmpr_size_str} {cmpr_data_size} '
                        f'updated={updated}')
                s3.Bucket(s3_compiled_bucket_name).put_object(
                    Key=s3_key,
                    Body=cmpr_data)
            except Exception as e:
                log.error(
                    f'{label} failed '
                    f'uploading bucket={s3_compiled_bucket_name} '
                    f'key={s3_key} ex={e}')
            # end of try/ex for creating bucket
        else:
            log.info(
                f'{label} SKIP S3 upload bucket={s3_bucket_name} key={s3_key}')
        # end of if enable_s3_upload

        if data and enable_redis_publish:
            redis_address = work_dict.get(
                'redis_address',
                ae_consts.REDIS_ADDRESS)
            redis_key = work_dict.get(
                'redis_key',
                ae_consts.REDIS_KEY)
            redis_password = work_dict.get(
                'redis_password',
                ae_consts.REDIS_PASSWORD)
            redis_db = work_dict.get(
                'redis_db',
                None)
            if not redis_db:
                redis_db = ae_consts.REDIS_DB
            redis_expire = None
            if 'redis_expire' in work_dict:
                redis_expire = work_dict.get(
                    'redis_expire',
                    ae_consts.REDIS_EXPIRE)
            log.info(
                f'redis enabled address={redis_address}@{redis_db} '
                f'key={redis_key}')
            redis_host = redis_address.split(':')[0]
            redis_port = redis_address.split(':')[1]
            try:
                if ae_consts.ev('DEBUG_REDIS', '0') == '1':
                    log.info(
                        f'{label} publishing redis={redis_host}:{redis_port} '
                        f'db={redis_db} key={redis_key} updated={updated} '
                        f'expire={redis_expire} data={ae_consts.ppj(data)}')
                else:
                    log.info(
                        f'{label} publishing redis={redis_host}:{redis_port} '
                        f'db={redis_db} key={redis_key} '
                        f'updated={updated} expire={redis_expire}')
                # end of if/else

                rc = redis.Redis(
                    host=redis_host,
                    port=redis_port,
                    password=redis_password,
                    db=redis_db)

                redis_set_res = redis_set.set_data_in_redis_key(
                    label=label,
                    client=rc,
                    key=redis_key,
                    data=data,
                    serializer=serializer,
                    encoding=encoding,
                    expire=redis_expire,
                    px=None,
                    nx=False,
                    xx=False)

                log.info(
                    f'{label} redis_set '
                    f'status={ae_consts.get_status(redis_set_res["status"])} '
                    f'err={redis_set_res["err"]}')

            except Exception as e:
                log.error(
                    f'{label} failed - redis publish to '
                    f'key={redis_key} ex={e}')
            # end of try/ex for creating bucket
        else:
            log.info(f'{label} SKIP REDIS publish key={redis_key}')
        # end of if enable_redis_publish

        res = build_result.build_result(
            status=ae_consts.SUCCESS,
            err=None,
            rec=rec)

    except Exception as e:
        res = build_result.build_result(
            status=ae_consts.ERR,
            err=(f'failed - publish_from_s3 dict={work_dict} with ex={e}'),
            rec=rec)
        log.error(f'{label} - {res["err"]}')
    # end of try/ex

    log.info(
        'task - publish_from_s3 done - '
        f'{label} - status={ae_consts.get_status(res["status"])}')

    return get_task_results.get_task_results(
        work_dict=work_dict,
        result=res)
# end of publish_ticker_aggregate_from_s3


[docs]def run_publish_ticker_aggregate_from_s3( work_dict): """run_publish_ticker_aggregate_from_s3 Celery wrapper for running without celery :param work_dict: task data """ label = work_dict.get( 'label', '') log.info(f'run_publish_ticker_aggregate_from_s3 - {label} - start') response = build_result.build_result( status=ae_consts.NOT_RUN, err=None, rec={}) task_res = {} # allow running without celery if ae_consts.is_celery_disabled( work_dict=work_dict): work_dict['celery_disabled'] = True task_res = publish_ticker_aggregate_from_s3( work_dict=work_dict) if task_res: response = task_res.get( 'result', task_res) if ae_consts.ev('DEBUG_RESULTS', '0') == '1': response_details = response try: response_details = ae_consts.ppj(response) except Exception: response_details = response log.info(f'{label} task result={response_details}') else: log.error( f'{label} celery was disabled but the task={response} ' 'did not return anything') # end of if response else: task_res = publish_ticker_aggregate_from_s3.delay( work_dict=work_dict) rec = { 'task_id': task_res } response = build_result.build_result( status=ae_consts.SUCCESS, err=None, rec=rec) # if celery enabled if response: if ae_consts.ev('DEBUG_RESULTS', '0') == '1': log.info( f'run_publish_ticker_aggregate_from_s3 - {label} - done ' f'status={ae_consts.get_status(response["status"])} ' f'err={response["err"]} rec={response["rec"]}') else: log.info( f'run_publish_ticker_aggregate_from_s3 - {label} - done ' f'status={ae_consts.get_status(response["status"])} ' f'err={response["err"]}') else: log.info( f'run_publish_ticker_aggregate_from_s3 - {label} - done ' 'no response') # end of if/else response return response
# end of run_publish_ticker_aggregate_from_s3