Source code for analysis_engine.yahoo.extract_df_from_redis

"""
Extract an Yahoo dataset from Redis (S3 support coming soon) and
load it into a ``pandas.DataFrame``

Supported environment variables:

::

    # verbose logging in this module
    export DEBUG_EXTRACT=1

    # verbose logging for just Redis operations in this module
    export DEBUG_REDIS_EXTRACT=1

    # verbose logging for just S3 operations in this module
    export DEBUG_S3_EXTRACT=1

    # to show debug, trace logging please export ``SHARED_LOG_CFG``
    # to a debug logger json file. To turn on debugging for this
    # library, you can export this variable to the repo's
    # included file with the command:
    export SHARED_LOG_CFG=/opt/sa/analysis_engine/log/debug-logging.json

"""

import pandas as pd
import analysis_engine.consts as ae_consts
import analysis_engine.utils as ae_utils
import analysis_engine.dataset_scrub_utils as scrub_utils
import analysis_engine.get_data_from_redis_key as redis_get
import analysis_engine.yahoo.consts as yahoo_consts
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)


[docs]def extract_pricing_dataset( work_dict, scrub_mode='sort-by-date'): """extract_pricing_dataset Extract the Yahoo pricing data for a ticker and return it as a pandas Dataframe :param work_dict: dictionary of args :param scrub_mode: type of scrubbing handler to run """ label = work_dict.get('label', 'extract') ds_id = work_dict.get('ticker') df_type = yahoo_consts.DATAFEED_PRICING_YAHOO df_str = yahoo_consts.get_datafeed_str_yahoo(df_type=df_type) redis_key = work_dict.get( 'redis_key', work_dict.get('pricing', 'missing-redis-key')) s3_key = work_dict.get( 's3_key', work_dict.get('pricing', 'missing-s3-key')) redis_host = work_dict.get( 'redis_host', None) redis_port = work_dict.get( 'redis_port', None) redis_db = work_dict.get( 'redis_db', ae_consts.REDIS_DB) log.debug( f'{label} - {df_str} - start - redis_key={redis_key} s3_key={s3_key}') if not redis_host and not redis_port: redis_host = ae_consts.REDIS_ADDRESS.split(':')[0] redis_port = ae_consts.REDIS_ADDRESS.split(':')[1] df = None status = ae_consts.NOT_RUN try: redis_rec = redis_get.get_data_from_redis_key( label=label, host=redis_host, port=redis_port, db=redis_db, password=work_dict.get('password', None), key=redis_key, decompress_df=True) status = redis_rec['status'] log.debug( f'{label} - {df_str} redis get data key={redis_key} ' f'status={ae_consts.get_status(status=status)}') if status == ae_consts.SUCCESS: log.debug(f'{label} - {df_str} redis convert pricing to json') cached_dict = redis_rec['rec']['data'] log.debug(f'{label} - {df_str} redis convert pricing to df') try: df = pd.DataFrame( cached_dict, index=[0]) except Exception: log.debug( f'{label} - {df_str} redis_key={redis_key} ' 'no pricing df found') return ae_consts.EMPTY, None # end of try/ex to convert to df log.debug( f'{label} - {df_str} redis_key={redis_key} done ' 'convert pricing to df') else: log.debug( f'{label} - {df_str} did not find valid redis pricing ' f'in redis_key={redis_key} ' f'status={ae_consts.get_status(status=status)}') except Exception as e: log.debug( f'{label} - {df_str} - ds_id={ds_id} failed getting pricing from ' f'redis={redis_host}:{redis_port}@{redis_db} ' f'key={redis_key} ex={e}') return ae_consts.ERR, None # end of try/ex extract from redis log.debug( f'{label} - {df_str} ds_id={ds_id} extract scrub={scrub_mode}') scrubbed_df = scrub_utils.extract_scrub_dataset( label=label, scrub_mode=scrub_mode, datafeed_type=df_type, msg_format='df={} date_str={}', ds_id=ds_id, df=df) status = ae_consts.SUCCESS return status, scrubbed_df
# end of extract_pricing_dataset
[docs]def extract_yahoo_news_dataset( work_dict, scrub_mode='sort-by-date'): """extract_yahoo_news_dataset Extract the Yahoo news data for a ticker and return it as a pandas Dataframe :param work_dict: dictionary of args :param scrub_mode: type of scrubbing handler to run """ label = work_dict.get('label', 'extract') ds_id = work_dict.get('ticker') df_type = yahoo_consts.DATAFEED_NEWS_YAHOO df_str = yahoo_consts.get_datafeed_str_yahoo(df_type=df_type) redis_key = work_dict.get( 'redis_key', work_dict.get('news', 'missing-redis-key')) s3_key = work_dict.get( 's3_key', work_dict.get('news', 'missing-s3-key')) redis_host = work_dict.get( 'redis_host', None) redis_port = work_dict.get( 'redis_port', None) redis_db = work_dict.get( 'redis_db', ae_consts.REDIS_DB) log.debug( f'{label} - {df_str} - start - redis_key={redis_key} s3_key={s3_key}') if not redis_host and not redis_port: redis_host = ae_consts.REDIS_ADDRESS.split(':')[0] redis_port = ae_consts.REDIS_ADDRESS.split(':')[1] df = None status = ae_consts.NOT_RUN try: redis_rec = redis_get.get_data_from_redis_key( label=label, host=redis_host, port=redis_port, db=redis_db, password=work_dict.get('password', None), key=redis_key, decompress_df=True) status = redis_rec['status'] log.debug( f'{label} - {df_str} redis get data key={redis_key} ' f'status={ae_consts.get_status(status=status)}') if status == ae_consts.SUCCESS: cached_dict = redis_rec['rec']['data'] log.debug(f'{label} - {df_str} redis convert news to df') try: df = pd.DataFrame( cached_dict) except Exception: log.debug( f'{label} - {df_str} redis_key={redis_key} ' 'no news df found') return ae_consts.EMPTY, None # end of try/ex to convert to df log.debug( f'{label} - {df_str} redis_key={redis_key} done ' f'convert news to df') else: log.debug( f'{label} - {df_str} did not find valid redis news ' f'in redis_key={redis_key} ' f'status={ae_consts.get_status(status=status)}') except Exception as e: log.debug( f'{label} - {df_str} - ds_id={ds_id} failed getting news from ' f'redis={redis_host}:{redis_port}@{redis_db} key={redis_key} ' f'ex={e}') return ae_consts.ERR, None # end of try/ex extract from redis log.debug(f'{label} - {df_str} ds_id={ds_id} extract scrub={scrub_mode}') scrubbed_df = scrub_utils.extract_scrub_dataset( label=label, scrub_mode=scrub_mode, datafeed_type=df_type, msg_format='df={} date_str={}', ds_id=ds_id, df=df) status = ae_consts.SUCCESS return status, scrubbed_df
# end of extract_yahoo_news_dataset
[docs]def extract_option_calls_dataset( work_dict, scrub_mode='sort-by-date'): """extract_option_calls_dataset Extract the Yahoo options calls for a ticker and return it as a ``pandas.Dataframe`` :param work_dict: dictionary of args :param scrub_mode: type of scrubbing handler to run """ label = f'{work_dict.get("label", "extract")}-calls' ds_id = work_dict.get('ticker') df_type = yahoo_consts.DATAFEED_OPTIONS_YAHOO df_str = yahoo_consts.get_datafeed_str_yahoo(df_type=df_type) redis_key = work_dict.get( 'redis_key', work_dict.get('calls', 'missing-redis-key')) s3_key = work_dict.get( 's3_key', work_dict.get('calls', 'missing-s3-key')) redis_host = work_dict.get( 'redis_host', None) redis_port = work_dict.get( 'redis_port', None) redis_db = work_dict.get( 'redis_db', ae_consts.REDIS_DB) log.debug( f'{label} - {df_str} - start - redis_key={redis_key} s3_key={s3_key}') if not redis_host and not redis_port: redis_host = ae_consts.REDIS_ADDRESS.split(':')[0] redis_port = ae_consts.REDIS_ADDRESS.split(':')[1] exp_date_str = None calls_df = None status = ae_consts.NOT_RUN try: redis_rec = redis_get.get_data_from_redis_key( label=label, host=redis_host, port=redis_port, db=redis_db, password=work_dict.get('password', None), key=redis_key, decompress_df=True) status = redis_rec['status'] log.debug( f'{label} - {df_str} redis get data key={redis_key} ' f'status={ae_consts.get_status(status=status)}') if status == ae_consts.SUCCESS: calls_json = None if 'calls' in redis_rec['rec']['data']: calls_json = redis_rec['rec']['data']['calls'] else: calls_json = redis_rec['rec']['data'] log.debug(f'{label} - {df_str} redis convert calls to df') exp_date_str = None try: calls_df = pd.read_json( calls_json, orient='records') exp_epoch_value = calls_df['expiration'].iloc[-1] exp_date_str = ae_utils.convert_epoch_to_datetime_string( epoch=exp_epoch_value, fmt=ae_consts.COMMON_DATE_FORMAT, use_utc=True) except Exception: log.debug( f'{label} - {df_str} redis_key={redis_key} ' 'no calls df found') return ae_consts.EMPTY, None # end of try/ex to convert to df log.debug( f'{label} - {df_str} redis_key={redis_key} ' f'calls={len(calls_df.index)} exp_date={exp_date_str}') else: log.debug( f'{label} - {df_str} did not find valid redis option calls ' f'in redis_key={redis_key} ' f'status={ae_consts.get_status(status=status)}') except Exception as e: log.debug( f'{label} - {df_str} - ds_id={ds_id} failed getting option calls ' f'from redis={redis_host}:{redis_port}@{redis_db} ' f'key={redis_key} ex={e}') return ae_consts.ERR, None # end of try/ex extract from redis log.debug(f'{label} - {df_str} ds_id={ds_id} extract scrub={scrub_mode}') scrubbed_df = scrub_utils.extract_scrub_dataset( label=label, scrub_mode=scrub_mode, datafeed_type=df_type, msg_format='df={} date_str={}', ds_id=ds_id, df=calls_df) status = ae_consts.SUCCESS return status, scrubbed_df
# end of extract_option_calls_dataset
[docs]def extract_option_puts_dataset( work_dict, scrub_mode='sort-by-date'): """extract_option_puts_dataset Extract the Yahoo options puts for a ticker and return it as a ``pandas.Dataframe`` :param work_dict: dictionary of args :param scrub_mode: type of scrubbing handler to run """ label = f'{work_dict.get("label", "extract")}-puts' ds_id = work_dict.get('ticker') df_type = yahoo_consts.DATAFEED_OPTIONS_YAHOO df_str = yahoo_consts.get_datafeed_str_yahoo(df_type=df_type) redis_key = work_dict.get( 'redis_key', work_dict.get('puts', 'missing-redis-key')) s3_key = work_dict.get( 's3_key', work_dict.get('puts', 'missing-s3-key')) redis_host = work_dict.get( 'redis_host', None) redis_port = work_dict.get( 'redis_port', None) redis_db = work_dict.get( 'redis_db', ae_consts.REDIS_DB) log.debug( f'{label} - {df_str} - start - redis_key={redis_key} s3_key={s3_key}') if not redis_host and not redis_port: redis_host = ae_consts.REDIS_ADDRESS.split(':')[0] redis_port = ae_consts.REDIS_ADDRESS.split(':')[1] exp_date_str = None puts_df = None status = ae_consts.NOT_RUN try: redis_rec = redis_get.get_data_from_redis_key( label=label, host=redis_host, port=redis_port, db=redis_db, password=work_dict.get('password', None), key=redis_key, decompress_df=True) status = redis_rec['status'] log.debug( f'{label} - {df_str} redis get data key={redis_key} ' f'status={ae_consts.get_status(status=status)}') if status == ae_consts.SUCCESS: puts_json = None if 'puts' in redis_rec['rec']['data']: puts_json = redis_rec['rec']['data']['puts'] else: puts_json = redis_rec['rec']['data'] log.debug(f'{label} - {df_str} redis convert puts to df') try: puts_df = pd.read_json( puts_json, orient='records') exp_epoch_value = puts_df['expiration'].iloc[-1] exp_date_str = ae_utils.convert_epoch_to_datetime_string( epoch=exp_epoch_value, fmt=ae_consts.COMMON_DATE_FORMAT, use_utc=True) except Exception: log.debug( f'{label} - {df_str} redis_key={redis_key} ' 'no puts df found') return ae_consts.EMPTY, None # end of try/ex to convert to df log.debug( f'{label} - {df_str} redis_key={redis_key} ' f'puts={len(puts_df.index)} exp_date={exp_date_str}') else: log.debug( f'{label} - {df_str} did not find valid redis option puts ' f'in redis_key={redis_key} ' f'status={ae_consts.get_status(status=status)}') except Exception as e: log.debug( f'{label} - {df_str} - ds_id={ds_id} failed getting option puts ' f'from redis={redis_host}:{redis_port}@{redis_db} ' f'key={redis_key} ex={e}') return ae_consts.ERR, None # end of try/ex extract from redis log.debug(f'{label} - {df_str} ds_id={ds_id} extract scrub={scrub_mode}') scrubbed_df = scrub_utils.extract_scrub_dataset( label=label, scrub_mode=scrub_mode, datafeed_type=df_type, msg_format='df={} date_str={}', ds_id=ds_id, df=puts_df) status = ae_consts.SUCCESS return status, scrubbed_df
# end of extract_option_puts_dataset