Source code for analysis_engine.work_tasks.get_new_pricing_data

"""
**Get New Pricing Data Task**

This will fetch data (pricing, financials, earnings, dividends, options,
and more) from these sources:

#.  IEX

#.  Tradier

#.  Yahoo - disabled as of 2019/01/03

**Detailed example for getting new pricing data**

.. code-block:: python

    import datetime
    import build_get_new_pricing_request
    from analysis_engine.api_requests
    from analysis_engine.work_tasks.get_new_pricing_data
        import get_new_pricing_data

    # store data
    cur_date = datetime.datetime.now().strftime('%Y-%m-%d')
    work = build_get_new_pricing_request(
        label=f'get-pricing-{cur_date}')
    work['ticker'] = 'TSLA'
    work['s3_bucket'] = 'pricing'
    work['s3_key'] = f'{work["ticker"]}-{cur_date}'
    work['redis_key'] = f'{work["ticker"]}-{cur_date}'
    work['celery_disabled'] = True
    res = get_new_pricing_data(
        work)
    print('full result dictionary:')
    print(res)
    if res['data']:
        print(
            'named datasets returned as '
            'json-serialized pandas DataFrames:')
        for k in res['data']:
            print(f' - {k}')

.. warning:: When fetching pricing data from sources like IEX,
    Please ensure the returned values are
    not serialized pandas Dataframes to prevent
    issues with celery task results. Instead
    it is preferred to returned a ``df.to_json()``
    before sending the results into the
    results backend.

.. tip:: This task uses the `analysis_engine.work_tasks.
    custom_task.CustomTask class <https://github.com/A
    lgoTraders/stock-analysis-engine/blob/master/anal
    ysis_engine/work_tasks/custom_task.py>`__ for
    task event handling.

**Sample work_dict request for this method**

`analysis_engine.api_requests.build_get_new_pricing_request <https://
github.com/AlgoTraders/stock-analysis-engine/blob/master/
analysis_engine/api_requests.py#L49>`__

**Supported Environment Variables**

::

    export DEBUG_RESULTS=1

"""

import datetime
import copy
import celery
import analysis_engine.consts as ae_consts
import analysis_engine.utils as ae_utils
import analysis_engine.iex.consts as iex_consts
import analysis_engine.iex.get_pricing_on_date as iex_pricing
import analysis_engine.td.consts as td_consts
import analysis_engine.build_result as build_result
import analysis_engine.get_task_results as get_task_results
import analysis_engine.work_tasks.custom_task as custom_task
import analysis_engine.options_dates as opt_dates
import analysis_engine.work_tasks.publish_pricing_update as publisher
import analysis_engine.yahoo.get_data as yahoo_data
import analysis_engine.iex.get_data as iex_data
import analysis_engine.td.get_data as td_data
import analysis_engine.send_to_slack as slack_utils
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)


@celery.task(
    bind=True,
    base=custom_task.CustomTask,
    queue='get_new_pricing_data')
def get_new_pricing_data(
        self,
        work_dict):
    """get_new_pricing_data

    Get Ticker information on:

    - prices - turn off with ``work_dict.get_pricing = False``
    - news - turn off with ``work_dict.get_news = False``
    - options - turn off with ``work_dict.get_options = False``

    :param work_dict: dictionary for key/values
    """

    label = 'get_new_pricing_data'

    log.debug(
        f'task - {label} - start '
        f'work_dict={work_dict}')

    num_success = 0
    ticker = ae_consts.TICKER
    ticker_id = ae_consts.TICKER_ID
    rec = {
        'pricing': None,
        'options': None,
        'calls': None,
        'puts': None,
        'news': None,
        'daily': None,
        'minute': None,
        'quote': None,
        'stats': None,
        'peers': None,
        'iex_news': None,
        'financials': None,
        'earnings': None,
        'dividends': None,
        'company': None,
        'exp_date': None,
        'publish_pricing_update': None,
        'num_success': num_success,
        'date': ae_utils.utc_now_str(),
        'updated': None,
        'version': ae_consts.DATASET_COLLECTION_VERSION
    }
    res = {
        'status': ae_consts.NOT_RUN,
        'err': None,
        'rec': rec
    }

    try:
        ticker = work_dict.get(
            'ticker',
            ticker)
        ticker_id = work_dict.get(
            'ticker_id',
            ae_consts.TICKER_ID)
        s3_bucket = work_dict.get(
            's3_bucket',
            ae_consts.S3_BUCKET)
        s3_key = work_dict.get(
            's3_key',
            ae_consts.S3_KEY)
        redis_key = work_dict.get(
            'redis_key',
            ae_consts.REDIS_KEY)
        exp_date = work_dict.get(
            'exp_date',
            None)
        cur_date = ae_utils.last_close()
        cur_strike = work_dict.get(
            'strike',
            None)
        contract_type = str(work_dict.get(
            'contract',
            'C')).upper()
        label = work_dict.get(
            'label',
            label)
        iex_datasets = work_dict.get(
            'iex_datasets',
            iex_consts.DEFAULT_FETCH_DATASETS)
        td_datasets = work_dict.get(
            'td_datasets',
            td_consts.DEFAULT_FETCH_DATASETS_TD)
        fetch_mode = work_dict.get(
            'fetch_mode',
            ae_consts.FETCH_MODE_ALL)
        iex_token = work_dict.get(
            'iex_token',
            iex_consts.IEX_TOKEN)
        td_token = work_dict.get(
            'td_token',
            td_consts.TD_TOKEN)
        str_fetch_mode = str(fetch_mode).lower()
        backfill_date = work_dict.get(
            'backfill_date',
            None)
        verbose = work_dict.get(
            'verbose',
            False)

        # control flags to deal with feed issues:
        get_iex_data = True
        get_td_data = True

        if (
                fetch_mode == ae_consts.FETCH_MODE_ALL
                or str_fetch_mode == 'initial'):
            get_iex_data = True
            get_td_data = True
            iex_datasets = ae_consts.IEX_INITIAL_DATASETS
        elif (
                fetch_mode == ae_consts.FETCH_MODE_ALL
                or str_fetch_mode == 'all'):
            get_iex_data = True
            get_td_data = True
            iex_datasets = ae_consts.IEX_DATASETS_DEFAULT
        elif (
                fetch_mode == ae_consts.FETCH_MODE_YHO
                or str_fetch_mode == 'yahoo'):
            get_iex_data = False
            get_td_data = False
        elif (
                fetch_mode == ae_consts.FETCH_MODE_IEX
                or str_fetch_mode == 'iex-all'):
            get_iex_data = True
            get_td_data = False
            iex_datasets = ae_consts.IEX_DATASETS_DEFAULT
        elif (
                fetch_mode == ae_consts.FETCH_MODE_IEX
                or str_fetch_mode == 'iex'):
            get_iex_data = True
            get_td_data = False
            iex_datasets = ae_consts.IEX_INTRADAY_DATASETS
        elif (
                fetch_mode == ae_consts.FETCH_MODE_INTRADAY
                or str_fetch_mode == 'intra'):
            get_iex_data = True
            get_td_data = True
            iex_datasets = ae_consts.IEX_INTRADAY_DATASETS
        elif (
                fetch_mode == ae_consts.FETCH_MODE_DAILY
                or str_fetch_mode == 'daily'):
            get_iex_data = True
            get_td_data = False
            iex_datasets = ae_consts.IEX_DAILY_DATASETS
        elif (
                fetch_mode == ae_consts.FETCH_MODE_WEEKLY
                or str_fetch_mode == 'weekly'):
            get_iex_data = True
            get_td_data = False
            iex_datasets = ae_consts.IEX_WEEKLY_DATASETS
        elif (
                fetch_mode == ae_consts.FETCH_MODE_TD
                or str_fetch_mode == 'td'):
            get_iex_data = False
            get_td_data = True
        else:
            get_iex_data = False
            get_td_data = False

            fetch_arr = str_fetch_mode.split(',')
            found_fetch = False
            iex_datasets = []
            for fetch_name in fetch_arr:
                if fetch_name not in iex_datasets:
                    if fetch_name == 'iex_min':
                        iex_datasets.append('minute')
                    elif fetch_name == 'min':
                        iex_datasets.append('minute')
                    elif fetch_name == 'minute':
                        iex_datasets.append('minute')
                    elif fetch_name == 'day':
                        iex_datasets.append('daily')
                    elif fetch_name == 'daily':
                        iex_datasets.append('daily')
                    elif fetch_name == 'iex_day':
                        iex_datasets.append('daily')
                    elif fetch_name == 'quote':
                        iex_datasets.append('quote')
                    elif fetch_name == 'iex_quote':
                        iex_datasets.append('quote')
                    elif fetch_name == 'iex_stats':
                        iex_datasets.append('stats')
                    elif fetch_name == 'stats':
                        iex_datasets.append('stats')
                    elif fetch_name == 'peers':
                        iex_datasets.append('peers')
                    elif fetch_name == 'iex_peers':
                        iex_datasets.append('peers')
                    elif fetch_name == 'news':
                        iex_datasets.append('news')
                    elif fetch_name == 'iex_news':
                        iex_datasets.append('news')
                    elif fetch_name == 'fin':
                        iex_datasets.append('financials')
                    elif fetch_name == 'iex_fin':
                        iex_datasets.append('financials')
                    elif fetch_name == 'earn':
                        iex_datasets.append('earnings')
                    elif fetch_name == 'iex_earn':
                        iex_datasets.append('earnings')
                    elif fetch_name == 'div':
                        iex_datasets.append('dividends')
                    elif fetch_name == 'iex_div':
                        iex_datasets.append('dividends')
                    elif fetch_name == 'comp':
                        iex_datasets.append('company')
                    elif fetch_name == 'iex_comp':
                        iex_datasets.append('company')
                    elif fetch_name == 'td':
                        get_td_data = True
                    else:
                        log.warn(
                            'unsupported IEX dataset '
                            f'{fetch_name}')
            found_fetch = (
                len(iex_datasets) != 0)
            if not found_fetch:
                log.error(
                    f'{label} - unsupported '
                    f'fetch_mode={fetch_mode} value')
            else:
                get_iex_data = True
                log.debug(
                    f'{label} - '
                    f'fetching={len(iex_datasets)} '
                    f'{iex_datasets} '
                    f'fetch_mode={fetch_mode}')
        # end of screening custom fetch_mode settings

        num_tokens = 0

        if get_iex_data:
            if not iex_token:
                log.warn(
                    f'{label} - '
                    'please set a valid IEX Cloud Account token ('
                    'https://iexcloud.io/cloud-login/#/register'
                    ') to fetch data from IEX Cloud. It must be '
                    'set as an environment variable like: '
                    'export IEX_TOKEN=<token>')
                get_iex_data = False
            else:
                num_tokens += 1
        # sanity check - disable IEX fetch if the token is not set
        if get_td_data:
            missing_td_token = [
                'MISSING_TD_TOKEN',
                'SETYOURTDTOKEN',
                'SETYOURTRADIERTOKENHERE'
            ]
            if td_token in missing_td_token:
                log.warn(
                    f'{label} - '
                    'please set a valid Tradier Account token ('
                    'https://developer.tradier.com/user/sign_up'
                    ') to fetch pricing data from Tradier. It must be '
                    'set as an environment variable like: '
                    'export TD_TOKEN=<token>')
                get_td_data = False
            else:
                num_tokens += 1
        # sanity check - disable Tradier fetch if the token is not set

        """
        as of Thursday, Jan. 3, 2019:
        https://developer.yahoo.com/yql/
        Important EOL Notice: As of Thursday, Jan. 3, 2019
        the YQL service at query.yahooapis.com will be retired
        """
        get_yahoo_data = False

        if (
                not get_iex_data and
                not get_td_data and
                not get_yahoo_data):
            err = None
            if num_tokens == 0:
                res['status'] = ae_consts.MISSING_TOKEN
                err = (
                    f'Please set a valid IEX_TOKEN or TD_TOKEN '
                    f'environment variable')
            else:
                err = (
                    f'Please set at least one supported datafeed from '
                    f'either: '
                    f'IEX Cloud (fetch -t TICKER -g iex) or '
                    f'Tradier (fetch -t TICKER -g td) '
                    f'for '
                    f'ticker={ticker} '
                    f'cur_date={cur_date} '
                    f'IEX enabled={get_iex_data} '
                    f'TD enabled={get_td_data} '
                    f'YHO enabled={get_yahoo_data}')
                res['status'] = ae_consts.ERR
                res['err'] = err
            return get_task_results.get_task_results(
                work_dict=work_dict,
                result=res)
        # end of checking that there is at least 1 feed on

        if not exp_date:
            exp_date = opt_dates.option_expiration(
                date=exp_date)
        else:
            exp_date = datetime.datetime.strptime(
                exp_date,
                '%Y-%m-%d')

        rec['updated'] = cur_date.strftime('%Y-%m-%d %H:%M:%S')
        log.debug(
            f'{label} getting pricing for ticker={ticker} '
            f'cur_date={cur_date} exp_date={exp_date} '
            f'IEX={get_iex_data} '
            f'TD={get_td_data} '
            f'YHO={get_yahoo_data}')

        yahoo_rec = {
            'ticker': ticker,
            'pricing': None,
            'options': None,
            'calls': None,
            'puts': None,
            'news': None,
            'exp_date': None,
            'publish_pricing_update': None,
            'date': None,
            'updated': None
        }

        # disabled on 2019-01-03
        if get_yahoo_data:
            log.debug(f'{label} YHO ticker={ticker}')
            yahoo_res = yahoo_data.get_data_from_yahoo(
                work_dict=work_dict)
            status_str = ae_consts.get_status(status=yahoo_res['status'])
            if yahoo_res['status'] == ae_consts.SUCCESS:
                yahoo_rec = yahoo_res['rec']
                msg = (
                    f'{label} YHO ticker={ticker} '
                    f'redis_key={redis_key}_[price,news,options] '
                    f'status={status_str} err={yahoo_res["err"]}')
                if ae_consts.ev('SHOW_SUCCESS', '0') == '1':
                    log.info(msg)
                else:
                    log.debug(msg)
                rec['pricing'] = yahoo_rec.get('pricing', '{}')
                rec['news'] = yahoo_rec.get('news', '{}')
                rec['options'] = yahoo_rec.get('options', '{}')
                rec['calls'] = rec['options'].get(
                    'calls', ae_consts.EMPTY_DF_STR)
                rec['puts'] = rec['options'].get(
                    'puts', ae_consts.EMPTY_DF_STR)
                num_success += 1
            else:
                log.error(
                    f'{label} failed YHO ticker={ticker} '
                    f'status={status_str} err={yahoo_res["err"]}')
        # end of get from yahoo

        if get_iex_data:
            num_iex_ds = len(iex_datasets)
            log.debug(f'{label} IEX datasets={num_iex_ds}')
            for idx, ft_type in enumerate(iex_datasets):
                dataset_field = iex_consts.get_ft_str(
                    ft_type=ft_type)

                log.debug(
                    f'{label} IEX={idx}/{num_iex_ds} '
                    f'field={dataset_field} ticker={ticker}')
                iex_label = f'{label}-{dataset_field}'
                iex_req = copy.deepcopy(work_dict)
                iex_req['label'] = iex_label
                iex_req['ft_type'] = ft_type
                iex_req['field'] = dataset_field
                iex_req['ticker'] = ticker
                iex_req['backfill_date'] = backfill_date
                iex_req['verbose'] = verbose

                iex_res = iex_data.get_data_from_iex(
                    work_dict=iex_req)

                status_str = (
                    ae_consts.get_status(status=iex_res['status']))
                if iex_res['status'] == ae_consts.SUCCESS:
                    iex_rec = iex_res['rec']
                    rk = f'{redis_key}_{dataset_field}'
                    if backfill_date:
                        rk = (
                            f'{ticker}_{backfill_date}_'
                            f'{dataset_field}')
                    msg = (
                        f'{label} IEX ticker={ticker} '
                        f'redis_key={rk} '
                        f'field={dataset_field} '
                        f'status={status_str} '
                        f'err={iex_res["err"]}')
                    if ae_consts.ev('SHOW_SUCCESS', '0') == '1':
                        log.info(msg)
                    else:
                        log.debug(msg)
                    if dataset_field == 'news':
                        rec['iex_news'] = iex_rec['data']
                    else:
                        rec[dataset_field] = iex_rec['data']
                    num_success += 1
                else:
                    log.debug(
                        f'{label} failed IEX ticker={ticker} '
                        f'field={dataset_field} '
                        f'status={status_str} err={iex_res["err"]}')
                # end of if/else succcess
            # end idx, ft_type in enumerate(iex_datasets):
        # end of if get_iex_data

        if get_td_data:
            num_td_ds = len(td_datasets)

            latest_pricing = None
            try:
                latest_pricing = iex_pricing.get_pricing_on_date(
                    ticker=ticker,
                    date_str=None,
                    label=label)
            except Exception as e:
                log.critical(
                    f'failed to get {ticker} iex latest pricing data '
                    f'with ex={e}')
            # end of trying to extract the latest pricing data from redis

            log.debug(
                f'{label} TD datasets={num_td_ds} '
                f'pricing={latest_pricing}')

            for idx, ft_type in enumerate(td_datasets):
                dataset_field = td_consts.get_ft_str_td(
                    ft_type=ft_type)
                log.debug(
                    f'{label} TD={idx}/{num_td_ds} '
                    f'field={dataset_field} ticker={ticker}')
                td_label = (
                    f'{label}-{dataset_field}')
                td_req = copy.deepcopy(work_dict)
                td_req['label'] = td_label
                td_req['ft_type'] = ft_type
                td_req['field'] = dataset_field
                td_req['ticker'] = ticker
                td_req['latest_pricing'] = latest_pricing
                td_res = td_data.get_data_from_td(
                    work_dict=td_req)

                status_str = (
                    ae_consts.get_status(status=td_res['status']))
                if td_res['status'] == ae_consts.SUCCESS:
                    td_rec = td_res['rec']
                    msg = (
                        f'{label} TD ticker={ticker} '
                        f'redis_key={redis_key}_{dataset_field} '
                        f'field={dataset_field} '
                        f'status={status_str} '
                        f'err={td_res["err"]}')
                    if ae_consts.ev('SHOW_SUCCESS', '0') == '1':
                        log.info(msg)
                    else:
                        log.debug(msg)
                    if dataset_field == 'tdcalls':
                        rec['tdcalls'] = td_rec['data']
                    if dataset_field == 'tdputs':
                        rec['tdputs'] = td_rec['data']
                    else:
                        rec[dataset_field] = td_rec['data']
                    num_success += 1
                else:
                    log.critical(
                        f'{label} failed TD ticker={ticker} '
                        f'field={dataset_field} '
                        f'status={status_str} err={td_res["err"]}')
                # end of if/else succcess
            # end idx, ft_type in enumerate(td_datasets):
        # end of if get_td_data

        rec['num_success'] = num_success

        update_req = {
            'data': rec
        }
        update_req['ticker'] = ticker
        update_req['ticker_id'] = ticker_id
        update_req['strike'] = cur_strike
        update_req['contract'] = contract_type
        update_req['s3_enabled'] = work_dict.get(
            's3_enabled',
            ae_consts.ENABLED_S3_UPLOAD)
        update_req['redis_enabled'] = work_dict.get(
            'redis_enabled',
            ae_consts.ENABLED_REDIS_PUBLISH)
        update_req['s3_bucket'] = s3_bucket
        update_req['s3_key'] = s3_key
        update_req['s3_access_key'] = work_dict.get(
            's3_access_key',
            ae_consts.S3_ACCESS_KEY)
        update_req['s3_secret_key'] = work_dict.get(
            's3_secret_key',
            ae_consts.S3_SECRET_KEY)
        update_req['s3_region_name'] = work_dict.get(
            's3_region_name',
            ae_consts.S3_REGION_NAME)
        update_req['s3_address'] = work_dict.get(
            's3_address',
            ae_consts.S3_ADDRESS)
        update_req['s3_secure'] = work_dict.get(
            's3_secure',
            ae_consts.S3_SECURE)
        update_req['redis_key'] = redis_key
        update_req['redis_address'] = work_dict.get(
            'redis_address',
            ae_consts.REDIS_ADDRESS)
        update_req['redis_password'] = work_dict.get(
            'redis_password',
            ae_consts.REDIS_PASSWORD)
        update_req['redis_db'] = int(work_dict.get(
            'redis_db',
            ae_consts.REDIS_DB))
        update_req['redis_expire'] = work_dict.get(
            'redis_expire',
            ae_consts.REDIS_EXPIRE)
        update_req['updated'] = rec['updated']
        update_req['label'] = label
        update_req['celery_disabled'] = True
        update_status = ae_consts.NOT_SET

        if backfill_date:
            update_req['redis_key'] = (
                f'{ticker}_{backfill_date}')
            update_req['s3_key'] = (
                f'{ticker}_{backfill_date}')

        try:
            update_res = publisher.run_publish_pricing_update(
                work_dict=update_req)
            update_status = update_res.get(
                'status',
                ae_consts.NOT_SET)
            status_str = ae_consts.get_status(status=update_status)
            if ae_consts.ev('DEBUG_RESULTS', '0') == '1':
                log.debug(
                    f'{label} update_res '
                    f'status={status_str} '
                    f'data={ae_consts.ppj(update_res)}')
            else:
                log.debug(
                    f'{label} run_publish_pricing_update '
                    f'status={status_str}')
            # end of if/else

            rec['publish_pricing_update'] = update_res
            res = build_result.build_result(
                status=ae_consts.SUCCESS,
                err=None,
                rec=rec)
        except Exception as f:
            err = (
                f'{label} publisher.run_publish_pricing_update failed '
                f'with ex={f}')
            log.error(err)
            res = build_result.build_result(
                status=ae_consts.ERR,
                err=err,
                rec=rec)
        # end of trying to publish results to connected services

    except Exception as e:
        res = build_result.build_result(
            status=ae_consts.ERR,
            err=(
                'failed - get_new_pricing_data '
                f'dict={work_dict} with ex={e}'),
            rec=rec)
        log.error(
            f'{label} - {res["err"]}')
    # end of try/ex

    if ae_consts.ev('DATASET_COLLECTION_SLACK_ALERTS', '0') == '1':
        env_name = 'DEV'
        if ae_consts.ev('PROD_SLACK_ALERTS', '1') == '1':
            env_name = 'PROD'
        done_msg = (
            f'Dataset collected ticker=*{ticker}* on '
            f'env=*{env_name}* '
            f'redis_key={redis_key} s3_key={s3_key} '
            f'IEX={get_iex_data} '
            f'TD={get_td_data} '
            f'YHO={get_yahoo_data}')
        log.debug(f'{label} sending slack msg={done_msg}')
        if res['status'] == ae_consts.SUCCESS:
            slack_utils.post_success(
                msg=done_msg,
                block=False,
                jupyter=True)
        else:
            slack_utils.post_failure(
                msg=done_msg,
                block=False,
                jupyter=True)
        # end of if/else success
    # end of publishing to slack

    log.debug(
        'task - get_new_pricing_data done - '
        f'{label} - status={ae_consts.get_status(res["status"])}')

    return get_task_results.get_task_results(
        work_dict=work_dict,
        result=res)
# end of get_new_pricing_data


[docs]def run_get_new_pricing_data( work_dict): """run_get_new_pricing_data Celery wrapper for running without celery :param work_dict: task data """ label = work_dict.get( 'label', '') log.debug(f'run_get_new_pricing_data - {label} - start') response = build_result.build_result( status=ae_consts.NOT_RUN, err=None, rec={}) task_res = {} # allow running without celery if ae_consts.is_celery_disabled( work_dict=work_dict): work_dict['celery_disabled'] = True task_res = get_new_pricing_data( work_dict) if task_res: response = task_res.get( 'result', task_res) if ae_consts.ev('DEBUG_RESULTS', '0') == '1': response_details = response try: response_details = ae_consts.ppj(response) except Exception: response_details = response log.debug( f'{label} task result={response_details}') else: log.error( f'{label} celery was disabled but the task={response} ' 'did not return anything') # end of if response else: task_res = get_new_pricing_data.delay( work_dict=work_dict) rec = { 'task_id': task_res } response = build_result.build_result( status=ae_consts.SUCCESS, err=None, rec=rec) # if celery enabled if response: status_str = ae_consts.get_status(response['status']) if ae_consts.ev('DEBUG_RESULTS', '0') == '1': log.debug( f'run_get_new_pricing_data - {label} - done ' f'status={status_str} ' f'err={response["err"]} ' f'rec={response["rec"]}') else: log.debug( f'run_get_new_pricing_data - {label} - done ' f'status={status_str} ' f'rec={response["rec"]}') else: log.debug( f'run_get_new_pricing_data - {label} - done ' 'no response') # end of if/else response return response
# end of run_get_new_pricing_data