Source code for analysis_engine.work_tasks.handle_pricing_update_task

"""
**Handle Pricing Update Task**

Get the latest stock news, quotes and options chains for a
ticker and publish the values to redis and S3 for downstream analysis.

Writes pricing updates to S3 and Redis by
building a list of publishing sub-task:

**Sample work_dict request for this method**

`analysis_engine.api_requests.publish_pricing_update <https://
github.com/AlgoTraders/stock-analysis-engine/blob/master/
analysis_engine/api_requests.py#L218>`__

::

    work = {
        'ticker': ticker,
        'ticker_id': ticker_id,
        's3_bucket': s3_bucket_name,
        's3_key': s3_key,
        'redis_key': redis_key,
        'prepared_s3_key': s3_prepared_key,
        'prepared_s3_bucket': s3_prepared_bucket_name,
        'prepared_redis_key': redis_prepared_key,
        'ignore_columns': ignore_columns,
        's3_enabled': s3_enabled,
        'redis_enabled': redis_enabled
    }

.. tip:: This task uses the `analysis_engine.work_tasks.
    custom_task.CustomTask class <https://github.com/A
    lgoTraders/stock-analysis-engine/blob/master/anal
    ysis_engine/work_tasks/custom_task.py>`__ for
    task event handling.

**Supported Environment Variables**

::

    export DEBUG_RESULTS=1

"""

import datetime
import celery.task as celery_task
import analysis_engine.consts as ae_consts
import analysis_engine.get_task_results as get_task_results
import analysis_engine.work_tasks.custom_task as custom_task
import analysis_engine.build_result as build_result
import analysis_engine.work_tasks.publish_pricing_update as publisher
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)


@celery_task(
    bind=True,
    base=custom_task.CustomTask,
    queue='handle_pricing_update_task')
def handle_pricing_update_task(
        self,
        work_dict):
    """handle_pricing_update_task

    Writes pricing updates to S3 and Redis

    :param work_dict: dictionary for key/values
    """

    label = 'update_prices'

    log.info(f'task - {label} - start')

    ticker = ae_consts.TICKER
    ticker_id = 1
    rec = {
        'ticker': None,
        'ticker_id': None,
        'pricing_s3_bucket': None,
        'pricing_s3_key': None,
        'pricing_size': None,
        'pricing_redis_key': None,
        'news_s3_bucket': None,
        'news_s3_key': None,
        'news_size': None,
        'news_redis_key': None,
        'options_s3_bucket': None,
        'options_s3_key': None,
        'options_size': None,
        'options_redis_key': None
    }
    res = build_result.build_result(
        status=ae_consts.NOT_RUN,
        err=None,
        rec=rec)

    try:
        ticker = work_dict.get(
            'ticker',
            ae_consts.TICKER)
        ticker_id = int(work_dict.get(
            'ticker_id',
            1))

        rec['ticker'] = ticker
        rec['ticker_id'] = ticker_id

        pricing_data = work_dict['pricing']
        news_data = work_dict['news']
        options_data = work_dict['options']
        calls_data = options_data.get(
            'calls',
            ae_consts.EMPTY_DF_STR)
        puts_data = options_data.get(
            'puts',
            ae_consts.EMPTY_DF_STR)
        updated = work_dict['updated']
        label = work_dict.get(
            'label',
            label)

        cur_date = datetime.datetime.utcnow()
        cur_date_str = cur_date.strftime(
            '%Y_%m_%d_%H_%M_%S')

        pricing_s3_key = work_dict.get(
            'pricing_s3_key',
            f'pricing_ticker_{ticker}_id_{ticker_id}_date_{cur_date_str}')
        news_s3_key = work_dict.get(
            'news_s3_key',
            f'news_ticker_{ticker}_id_{ticker_id}_date_{cur_date_str}')
        options_s3_key = work_dict.get(
            'options_s3_key',
            f'options_ticker_{ticker}_id_{ticker_id}_date_{cur_date_str}')
        calls_s3_key = work_dict.get(
            'calls_s3_key',
            f'calls_ticker_{ticker}_id_{ticker_id}_date_{cur_date_str}')
        puts_s3_key = work_dict.get(
            'puts_s3_key',
            f'puts_ticker_{ticker}_id_{ticker_id}_date_{cur_date_str}')

        pricing_s3_bucket = work_dict.get(
            'pricing_s3_bucket',
            'pricing')
        news_s3_bucket = work_dict.get(
            'news_s3_bucket',
            'news')
        options_s3_bucket = work_dict.get(
            'options_s3_bucket',
            'options')

        pricing_by_ticker_redis_key = work_dict.get(
            'pricing_redis_key',
            f'price_{ticker}')
        news_by_ticker_redis_key = work_dict.get(
            'news_redis_key',
            f'news_{ticker}')
        options_by_ticker_redis_key = work_dict.get(
            'options_redis_key',
            f'options_{ticker}')
        calls_by_ticker_redis_key = work_dict.get(
            'calls_redis_key',
            f'calls_{ticker}')
        puts_by_ticker_redis_key = work_dict.get(
            'puts_redis_key',
            f'puts_{ticker}')

        pricing_size = len(str(
            pricing_data))
        news_size = len(str(
            news_data))
        options_size = len(str(
            options_data))
        calls_size = len(str(
            calls_data))
        puts_size = len(str(
            puts_data))

        payloads_to_publish = [
            {
                'ticker': ticker,
                'ticker_id': ticker_id,
                's3_bucket': pricing_s3_bucket,
                's3_key': pricing_s3_key,
                'data': pricing_data,
                'redis_key': pricing_by_ticker_redis_key,
                'size': pricing_size,
                'updated': updated,
                'label': label
            },
            {
                'ticker': ticker,
                'ticker_id': ticker_id,
                's3_bucket': options_s3_bucket,
                's3_key': options_s3_key,
                'data': options_data,
                'redis_key': options_by_ticker_redis_key,
                'size': options_size,
                'updated': updated,
                'label': label
            },
            {
                'ticker': ticker,
                'ticker_id': ticker_id,
                's3_bucket': options_s3_bucket,
                's3_key': calls_s3_key,
                'data': calls_data,
                'redis_key': calls_by_ticker_redis_key,
                'size': calls_size,
                'updated': updated,
                'label': label
            },
            {
                'ticker': ticker,
                'ticker_id': ticker_id,
                's3_bucket': options_s3_bucket,
                's3_key': puts_s3_key,
                'data': puts_data,
                'redis_key': puts_by_ticker_redis_key,
                'size': puts_size,
                'updated': updated,
                'label': label
            },
            {
                'ticker': ticker,
                'ticker_id': ticker_id,
                's3_bucket': news_s3_bucket,
                's3_key': news_s3_key,
                'data': news_data,
                'redis_key': news_by_ticker_redis_key,
                'size': news_size,
                'updated': updated,
                'label': label
            }
        ]

        rec['pricing_s3_bucket'] = pricing_s3_bucket
        rec['pricing_s3_key'] = pricing_s3_key
        rec['pricing_redis_key'] = pricing_by_ticker_redis_key
        rec['news_s3_bucket'] = news_s3_bucket
        rec['news_s3_key'] = news_s3_key
        rec['news_redis_key'] = news_by_ticker_redis_key
        rec['options_s3_bucket'] = options_s3_bucket
        rec['options_s3_key'] = options_s3_bucket
        rec['options_redis_key'] = options_by_ticker_redis_key

        total_payloads = len(payloads_to_publish)

        log.info(
            f'{label} ticker={ticker} processing payloads={total_payloads}')

        for ridx, r in enumerate(payloads_to_publish):
            log.info(
                f'{label} ticker={ticker} update={ridx}/{total_payloads} '
                f'key={r["s3_key"]} redis_key={r["redis_key"]}')
            r['celery_disabled'] = False
            r['label'] = f'handle_pricing_update_task-{label}'
            payload_res = \
                publisher.task_publish_pricing_update(
                    work_dict=r)
            log.info(
                f'{label} ticker={ticker} update={ridx}/{total_payloads} '
                f'status={ae_consts.get_status(status=payload_res["status"])} '
                f's3_key={r["s3_key"]} redis_key={r["redis_key"]}')
        # end of for all payloads to publish

        res = build_result.build_result(
            status=ae_consts.SUCCESS,
            err=None,
            rec=rec)

    except Exception as e:
        res = build_result.build_result(
            status=ae_consts.ERR,
            err=(
                'failed - handle_pricing_update_task '
                f'dict={work_dict} with ex={e}'),
            rec=rec)
        log.error(f'{label} - {res["err"]}')
    # end of try/ex

    log.info(
        'task - handle_pricing_update_task done - '
        f'{label} - status={ae_consts.get_status(res["status"])}')

    return get_task_results.get_task_results(
        work_dict=work_dict,
        result=res)
# end of handle_pricing_update_task


[docs]def run_handle_pricing_update_task( work_dict): """run_handle_pricing_update_task Celery wrapper for running without celery :param work_dict: task data """ label = work_dict.get( 'label', '') log.info(f'run_handle_pricing_update_task - {label} - start') response = build_result.build_result( status=ae_consts.NOT_RUN, err=None, rec={}) task_res = {} log.info( f'run_handle_pricing_update_task - {label} - done ' f'status={ae_consts.get_status(response["status"])} ' f'err={response["err"]} rec={response["rec"]}') # allow running without celery if ae_consts.is_celery_disabled( work_dict=work_dict): work_dict['celery_disabled'] = True task_res = handle_pricing_update_task( work_dict) if task_res: response = task_res.get( 'result', task_res) if ae_consts.ev('DEBUG_RESULTS', '0') == '1': response_details = response try: response_details = ae_consts.ppj(response) except Exception: response_details = response log.info( f'{label} handle_pricing_update_task ' f'task result={response_details}') else: log.error( f'{label} celery was disabled but the task={response} ' 'did not return anything') # end of if response else: task_res = handle_pricing_update_task.delay( work_dict=work_dict) rec = { 'task_id': task_res } response = build_result.build_result( status=ae_consts.SUCCESS, err=None, rec=rec) # if celery enabled if response: if ae_consts.ev('DEBUG_RESULTS', '0') == '1': log.info( f'run_handle_pricing_update_task - {label} - done ' f'status={ae_consts.get_status(response["status"])} ' f'err={response["err"]} rec={response["rec"]}') else: log.info( f'run_handle_pricing_update_task - {label} - done ' f'status={ae_consts.get_status(response["status"])} ' f'err={response["err"]}') else: log.info( f'run_handle_pricing_update_task - {label} - done ' 'no response') # end of if/else response return response
# end of run_handle_pricing_update_task