Source code for analysis_engine.work_tasks.prepare_pricing_dataset

"""
**Prepare Pricing Dataset**

Prepare dataset for analysis. This task collapses
nested json dictionaries into a
csv file with a header row and stores the output
file in s3 and redis automatically.

- if key not in redis, load the key by the same name from s3
- prepare dataset from redis key
- the dataset will be stored as a dictionary with a pandas dataframe

**Sample work_dict request for this method**

`analysis_engine.api_requests.build_prepare_dataset_request <https://
github.com/AlgoTraders/stock-analysis-engine/blob/master/
analysis_engine/api_requests.py#L300>`__

::

    work_request = {
        'ticker': ticker,
        'ticker_id': ticker_id,
        's3_bucket': s3_bucket_name,
        's3_key': s3_key,
        'redis_key': redis_key,
        'prepared_s3_key': s3_prepared_key,
        'prepared_s3_bucket': s3_prepared_bucket_name,
        'prepared_redis_key': redis_prepared_key,
        'ignore_columns': ignore_columns,
        's3_enabled': s3_enabled,
        'redis_enabled': redis_enabled
    }

.. tip:: This task uses the `analysis_engine.work_tasks.
    custom_task.CustomTask class <https://github.com/A
    lgoTraders/stock-analysis-engine/blob/master/anal
    ysis_engine/work_tasks/custom_task.py>`__ for
    task event handling.

**Supported Environment Variables**

::

    export DEBUG_PREPARE=1
    export DEBUG_RESULTS=1

"""

import datetime
import redis
import celery.task as celery_task
import analysis_engine.consts as ae_consts
import analysis_engine.build_result as build_result
import analysis_engine.api_requests as api_requests
import analysis_engine.get_data_from_redis_key as redis_get
import analysis_engine.get_task_results as get_task_results
import analysis_engine.work_tasks.custom_task as custom_task
import analysis_engine.work_tasks.publish_from_s3_to_redis as s3_to_redis
import analysis_engine.dict_to_csv as dict_to_csv
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)


@celery_task(
    bind=True,
    base=custom_task.CustomTask,
    queue='prepare_pricing_dataset')
def prepare_pricing_dataset(
        self,
        work_dict):
    """prepare_pricing_dataset

    Prepare dataset for analysis. Supports loading dataset from
    s3 if not found in redis. Outputs prepared artifact as a csv
    to s3 and redis.

    :param work_dict: dictionary for key/values
    """

    label = 'prepare'

    log.info(f'task - {label} - start work_dict={work_dict}')

    initial_data = None

    ticker = ae_consts.TICKER
    ticker_id = ae_consts.TICKER_ID
    rec = {
        'ticker': None,
        'ticker_id': None,
        's3_enabled': True,
        'redis_enabled': True,
        's3_bucket': None,
        's3_key': None,
        'redis_key': None,
        'prepared_s3_key': None,
        'prepared_s3_bucket': None,
        'prepared_redis_key': None,
        'prepared_data': None,
        'prepared_size': None,
        'initial_data': None,
        'initial_size': None,
        'ignore_columns': None,
        'updated': None
    }
    res = build_result.build_result(
        status=ae_consts.NOT_RUN,
        err=None,
        rec=rec)

    try:
        ticker = work_dict.get(
            'ticker',
            ae_consts.TICKER)
        ticker_id = int(work_dict.get(
            'ticker_id',
            ae_consts.TICKER_ID))

        if not ticker:
            res = build_result.build_result(
                status=ae_consts.ERR,
                err='missing ticker',
                rec=rec)
            return res

        label = work_dict.get(
            'label',
            label)
        s3_key = work_dict.get(
            's3_key',
            None)
        s3_bucket_name = work_dict.get(
            's3_bucket',
            'pricing')
        s3_access_key = work_dict.get(
            's3_access_key',
            ae_consts.S3_ACCESS_KEY)
        s3_secret_key = work_dict.get(
            's3_secret_key',
            ae_consts.S3_SECRET_KEY)
        s3_region_name = work_dict.get(
            's3_region_name',
            ae_consts.S3_REGION_NAME)
        s3_address = work_dict.get(
            's3_address',
            ae_consts.S3_ADDRESS)
        s3_secure = work_dict.get(
            's3_secure',
            ae_consts.S3_SECURE) == '1'
        redis_address = work_dict.get(
            'redis_address',
            ae_consts.REDIS_ADDRESS)
        redis_key = work_dict.get(
            'redis_key',
            ae_consts.REDIS_KEY)
        redis_password = work_dict.get(
            'redis_password',
            ae_consts.REDIS_PASSWORD)
        redis_db = work_dict.get(
            'redis_db',
            None)
        if not redis_db:
            redis_db = ae_consts.REDIS_DB
        redis_expire = None
        if 'redis_expire' in work_dict:
            redis_expire = work_dict.get(
                'redis_expire',
                ae_consts.REDIS_EXPIRE)
        updated = work_dict.get(
            'updated',
            datetime.datetime.utcnow().strftime(
                '%Y_%m_%d_%H_%M_%S'))
        prepared_s3_key = work_dict.get(
            'prepared_s3_key',
            f'{ticker}_{updated}.csv')
        prepared_s3_bucket = work_dict.get(
            'prepared_s3_bucket',
            'prepared')
        prepared_redis_key = work_dict.get(
            'prepared_redis_key',
            'prepared')
        ignore_columns = work_dict.get(
            'ignore_columns',
            None)
        log.info(
            f'{label} redis enabled address={redis_address}@{redis_db} '
            f'key={redis_key} '
            f'prepare_s3={prepared_s3_bucket}:{prepared_s3_key} '
            f'prepare_redis={prepared_redis_key} '
            f'ignore_columns={ignore_columns}')
        redis_host = redis_address.split(':')[0]
        redis_port = redis_address.split(':')[1]

        enable_s3 = True
        enable_redis_publish = True

        rec['ticker'] = ticker
        rec['ticker_id'] = ticker_id
        rec['s3_bucket'] = s3_bucket_name
        rec['s3_key'] = s3_key
        rec['redis_key'] = redis_key
        rec['prepared_s3_key'] = prepared_s3_key
        rec['prepared_s3_bucket'] = prepared_s3_bucket
        rec['prepared_redis_key'] = prepared_redis_key
        rec['updated'] = updated
        rec['s3_enabled'] = enable_s3
        rec['redis_enabled'] = enable_redis_publish

        try:
            log.info(
                f'{label} connecting redis={redis_host}:{redis_port} '
                f'db={redis_db} key={redis_key} '
                f'updated={updated} expire={redis_expire}')
            rc = redis.Redis(
                host=redis_host,
                port=redis_port,
                password=redis_password,
                db=redis_db)
        except Exception as e:
            err = (
                f'{label} failed - redis connection to '
                f'address={redis_address}@{redis_port} '
                f'db={redis_db} key={redis_key} ex={e}')
            res = build_result.build_result(
                status=ae_consts.ERR,
                err=err,
                rec=rec)
            return res
        # end of try/ex for connecting to redis

        initial_data_res = redis_get.get_data_from_redis_key(
            label=label,
            client=rc,
            key=redis_key)

        log.info(
            f'{label} get redis key={redis_key} '
            f'status={ae_consts.get_status(initial_data_res["status"])} '
            f'err={initial_data_res["err"]}')

        initial_data = initial_data_res['rec'].get(
            'data',
            None)

        if enable_s3 and not initial_data:

            log.info(
                f'{label} failed to find redis_key={redis_key} trying s3 from '
                f's3_key={s3_key} s3_bucket={s3_bucket_name} '
                f's3_address={s3_address}')

            get_from_s3_req = \
                api_requests.build_publish_from_s3_to_redis_request()

            get_from_s3_req['s3_enabled'] = enable_s3
            get_from_s3_req['s3_access_key'] = s3_access_key
            get_from_s3_req['s3_secret_key'] = s3_secret_key
            get_from_s3_req['s3_region_name'] = s3_region_name
            get_from_s3_req['s3_address'] = s3_address
            get_from_s3_req['s3_secure'] = s3_secure
            get_from_s3_req['s3_key'] = s3_key
            get_from_s3_req['s3_bucket'] = s3_bucket_name
            get_from_s3_req['redis_key'] = redis_key
            get_from_s3_req['label'] = f'{label}-run_publish_from_s3_to_redis'

            log.info(f'{label} load from s3={s3_key} to redis={redis_key}')

            try:
                # run in synchronous mode:
                get_from_s3_req['celery_disabled'] = True
                task_res = s3_to_redis.run_publish_from_s3_to_redis(
                    get_from_s3_req)
                if task_res.get(
                        'status',
                        ae_consts.ERR) == ae_consts.SUCCESS:
                    log.info(
                        f'{label} loaded s3={s3_bucket_name}:{s3_key} '
                        f'to redis={redis_key} retrying')
                    initial_data_res = redis_get.get_data_from_redis_key(
                        label=label,
                        client=rc,
                        key=redis_key)

                    log.info(
                        f'{label} get redis try=2 key={redis_key} status='
                        f'{ae_consts.get_status(initial_data_res["status"])} '
                        f'err={initial_data_res["err"]}')

                    initial_data = initial_data_res['rec'].get(
                        'data',
                        None)
                else:
                    err = (
                        f'{label} ERR failed loading from '
                        f'bucket={s3_bucket_name} s3_key={s3_key} to '
                        f'redis_key={redis_key} with res={task_res}')
                    log.error(err)
                    res = build_result.build_result(
                        status=ae_consts.ERR,
                        err=err,
                        rec=rec)
                    return res
            except Exception as e:
                err = (
                    f'{label} extract from s3 and publish to redis failed '
                    f'loading data from bucket={s3_bucket_name} in '
                    f's3_key={s3_key} with publish to redis_key={redis_key} '
                    f'with ex={e}')
                log.error(err)
                res = build_result.build_result(
                    status=ae_consts.ERR,
                    err=err,
                    rec=rec)
                return res
            # end of try/ex for publishing from s3->redis
        # end of if enable_s3

        if not initial_data:
            err = (
                f'{label} did not find any data to prepare in '
                f'redis_key={redis_key} or s3_key={s3_key} in '
                f'bucket={s3_bucket_name}')
            log.error(err)
            res = build_result.build_result(
                status=ae_consts.ERR,
                err=err,
                rec=rec)
            return res

        initial_data_num_chars = len(str(initial_data))
        initial_size_value = None
        initial_size_str = None
        if initial_data_num_chars < ae_consts.PREPARE_DATA_MIN_SIZE:
            err = (
                f'{label} not enough data={initial_data_num_chars} in '
                f'redis_key={redis_key} or s3_key={s3_key} in '
                f'bucket={s3_bucket_name}')
            log.error(err)
            res = build_result.build_result(
                status=ae_consts.ERR,
                err=err,
                rec=rec)
            return res
        else:
            initial_size_value = initial_data_num_chars / 1024000
            initial_size_str = ae_consts.to_f(initial_size_value)
            if ae_consts.ev('DEBUG_PREPARE', '0') == '1':
                log.info(
                    f'{label} initial - redis_key={redis_key} '
                    f'data={str(initial_data)}')
            else:
                log.info(
                    f'{label} initial - redis_key={redis_key} data '
                    f'size={initial_size_str} MB')
        # end of trying to get initial_data

        rec['initial_data'] = initial_data
        rec['initial_size'] = initial_data_num_chars

        prepare_data = None

        try:
            if ae_consts.ev('DEBUG_PREPARE', '0') == '1':
                log.info(
                    f'{label} data={ae_consts.ppj(initial_data)} - flatten - '
                    f'{initial_size_str} MB from redis_key={redis_key}')
            else:
                log.info(
                    f'{label} flatten - {initial_size_str} MB from '
                    f'redis_key={redis_key}')
            prepare_data = dict_to_csv.flatten_dict(
                data=initial_data)
        except Exception as e:
            prepare_data = None
            err = (
                f'{label} flatten - convert to csv failed with ex={e} '
                f'redis_key={redis_key}')
            log.error(err)
            res = build_result.build_result(
                status=ae_consts.ERR,
                err=err,
                rec=rec)
            return res
        # end of try/ex

        if not prepare_data:
            err = (
                f'{label} flatten - did not return any data from '
                f'redis_key={redis_key} or s3_key={s3_key} in '
                f'bucket={s3_bucket_name}')
            log.error(err)
            res = build_result.build_result(
                status=ae_consts.ERR,
                err=err,
                rec=rec)
            return res
        # end of prepare_data

        prepare_data_num_chars = len(str(prepare_data))
        prepare_size_value = None

        if prepare_data_num_chars < ae_consts.PREPARE_DATA_MIN_SIZE:
            err = (
                f'{label} prepare - there is not enough '
                f'data={prepare_data_num_chars} in redis_key={redis_key}')
            log.error(err)
            res = build_result.build_result(
                status=ae_consts.ERR,
                err=err,
                rec=rec)
            return res
        else:
            prepare_size_value = prepare_data_num_chars / 1024000
            prepare_size_str = ae_consts.to_f(prepare_size_value)
            if ae_consts.ev('DEBUG_PREPARE', '0') == '1':
                log.info(
                    f'{label} data={redis_key} - prepare - '
                    f'redis_key={ae_consts.ppj(prepare_data)}')
            else:
                log.info(
                    f'{label} prepare - redis_key={redis_key} data '
                    f'size={prepare_size_str} MB')
        # end of trying to the size of the prepared data

        rec['prepared_data'] = prepare_data
        rec['prepared_size'] = prepare_data_num_chars

        res = build_result.build_result(
            status=ae_consts.SUCCESS,
            err=None,
            rec=rec)

        rc = None

    except Exception as e:
        res = build_result.build_result(
            status=ae_consts.ERR,
            err=(
                'failed - prepare_pricing_dataset '
                f'dict={work_dict} with ex={e}'),
            rec=rec)
        log.error(f'{label} - {res["err"]}')
    # end of try/ex

    log.info(
        'task - prepare_pricing_dataset done - '
        f'{label} - status={ae_consts.get_status(res["status"])}')

    return get_task_results.get_task_results(
        work_dict=work_dict,
        result=res)
# end of prepare_pricing_dataset


[docs]def run_prepare_pricing_dataset( work_dict): """run_prepare_pricing_dataset Celery wrapper for running without celery :param work_dict: task data """ label = work_dict.get( 'label', '') log.info(f'run_prepare_pricing_dataset - {label} - start') response = build_result.build_result( status=ae_consts.NOT_RUN, err=None, rec={}) task_res = {} # allow running without celery if ae_consts.is_celery_disabled( work_dict=work_dict): work_dict['celery_disabled'] = True task_res = prepare_pricing_dataset( work_dict) if task_res: response = task_res.get( 'result', task_res) if ae_consts.ev('DEBUG_RESULTS', '0') == '1': response_details = response try: response_details = ae_consts.ppj(response) except Exception: response_details = response log.info(f'{label} task result={response_details}') else: log.error( f'{label} celery was disabled but the task={response} ' 'did not return anything') # end of if response else: task_res = prepare_pricing_dataset.delay( work_dict=work_dict) rec = { 'task_id': task_res } response = build_result.build_result( status=ae_consts.SUCCESS, err=None, rec=rec) # if celery enabled if response: if ae_consts.ev('DEBUG_RESULTS', '0') == '1': log.info( f'run_prepare_pricing_dataset - {label} - done ' f'status={ae_consts.get_status(response["status"])} ' f'err={response["err"]} rec={response["rec"]}') else: log.info( f'run_prepare_pricing_dataset - {label} - done ' f'status={ae_consts.get_status(response["status"])} ' f'err={response["err"]}') else: log.info( f'run_prepare_pricing_dataset - {label} - done ' 'no response') # end of if/else response return response
# end of run_prepare_pricing_dataset