Source code for analysis_engine.td.fetch_api

"""
Fetch API calls wrapping Tradier

Supported environment variables:

::

    # verbose logging in this module
    export DEBUG_FETCH=1

"""

import json
import datetime
import requests
import pandas as pd
import analysis_engine.consts as ae_consts
import analysis_engine.utils as ae_utils
import analysis_engine.options_dates as opt_dates
import analysis_engine.url_helper as url_helper
import analysis_engine.dataset_scrub_utils as scrub_utils
import analysis_engine.td.consts as td_consts
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)


[docs]def fetch_calls( ticker=None, work_dict=None, scrub_mode='sort-by-date', verbose=False): """fetch_calls Fetch Tradier option calls for a ticker and return a tuple: (status, ``pandas.DataFrame``) .. code-block:: python import analysis_engine.td.fetch_api as td_fetch # Please set the TD_TOKEN environment variable to your token calls_status, calls_df = td_fetch.fetch_calls( ticker='SPY') print(f'Fetched SPY Option Calls from Tradier status={calls_status}:') print(calls_df) :param ticker: string ticker to fetch :param work_dict: dictionary of args used by the automation :param scrub_mode: optional - string type of scrubbing handler to run :param verbose: optional - bool for debugging """ label = 'fetch_calls' datafeed_type = td_consts.DATAFEED_TD_CALLS exp_date = None latest_pricing = {} latest_close = None if work_dict: ticker = work_dict.get( 'ticker', ticker) label = work_dict.get( 'label', label) exp_date = work_dict.get( 'exp_date', exp_date) latest_pricing = work_dict.get( 'latest_pricing', latest_pricing) latest_close = latest_pricing.get( 'close', latest_close) log.debug( f'{label} - calls - close={latest_close} ' f'ticker={ticker}') exp_date = opt_dates.option_expiration().strftime( ae_consts.COMMON_DATE_FORMAT) use_url = td_consts.TD_URLS['options'].format( ticker, exp_date) headers = td_consts.get_auth_headers() session = requests.Session() session.headers = headers res = url_helper.url_helper(sess=session).get( use_url ) if res.status_code != requests.codes.OK: if res.status_code in [401, 403]: log.critical( 'Please check the TD_TOKEN is correct ' f'received {res.status_code} during ' 'fetch for: calls') else: log.info( f'failed to get call with response={res} ' f'code={res.status_code} ' f'text={res.text}') return ae_consts.EMPTY, pd.DataFrame([{}]) records = json.loads(res.text) org_records = records.get( 'options', {}).get( 'option', []) if len(org_records) == 0: log.info( 'failed to get call records ' 'text={}'.format( res.text)) return ae_consts.EMPTY, pd.DataFrame([{}]) options_list = [] # assumes UTC conversion will work with the system clock created_minute = ( datetime.datetime.utcnow() - datetime.timedelta(hours=5)).strftime( '%Y-%m-%d %H:%M:00') last_close_date = ae_utils.get_last_close_str( fmt='%Y-%m-%d %H:%M:00') # hit bug where dates were None if not last_close_date: last_close_date = created_minute for node in org_records: node['date'] = last_close_date node['created'] = created_minute node['ticker'] = ticker if ( node['option_type'] == 'call' and node['expiration_type'] == 'standard' and float(node['bid']) > 0.01): node['opt_type'] = int(ae_consts.OPTION_CALL) node['exp_date'] = node['expiration_date'] new_node = {} for col in td_consts.TD_OPTION_COLUMNS: if col in node: if col in td_consts.TD_EPOCH_COLUMNS: # trade_date can be None if node[col] == 0: new_node[col] = None else: new_node[col] = ae_utils.epoch_to_dt( epoch=node[col]/1000, use_utc=False, convert_to_est=True).strftime( ae_consts.COMMON_TICK_DATE_FORMAT) """ Debug epoch ms converter: """ """ print('-----------') print(col) print(node[col]) print(new_node[col]) print('===========') """ # if/else valid date else: new_node[col] = node[col] # if date column to convert # if column is in the row # convert all columns options_list.append(new_node) # end of records full_df = pd.DataFrame(options_list).sort_values( by=[ 'strike' ], ascending=True) num_chains = len(full_df.index) df = None if latest_close: df_filter = ( (full_df['strike'] >= (latest_close - ae_consts.OPTIONS_LOWER_STRIKE)) & (full_df['strike'] <= (latest_close + ae_consts.OPTIONS_UPPER_STRIKE))) df = full_df[df_filter].copy().sort_values( by=[ 'date', 'strike' ]).reset_index() else: mid_chain_idx = int(num_chains / 2) low_idx = int( mid_chain_idx - ae_consts.MAX_OPTIONS_LOWER_STRIKE) high_idx = int( mid_chain_idx + ae_consts.MAX_OPTIONS_UPPER_STRIKE) if low_idx < 0: low_idx = 0 if high_idx > num_chains: high_idx = num_chains df = full_df[low_idx:high_idx].copy().sort_values( by=[ 'date', 'strike' ]).reset_index() scrubbed_df = scrub_utils.ingress_scrub_dataset( label=label, scrub_mode=scrub_mode, datafeed_type=datafeed_type, msg_format='df={} date_str={}', ds_id=ticker, date_str=exp_date, df=df) return ae_consts.SUCCESS, scrubbed_df
# end of fetch_calls
[docs]def fetch_puts( ticker=None, work_dict=None, scrub_mode='sort-by-date', verbose=False): """fetch_puts Fetch Tradier option puts for a ticker and return a tuple: (status, ``pandas.DataFrame``) .. code-block:: python import analysis_engine.td.fetch_api as td_fetch puts_status, puts_df = td_fetch.fetch_puts( ticker='SPY') print(f'Fetched SPY Option Puts from Tradier status={puts_status}:') print(puts_df) :param ticker: string ticker to fetch :param work_dict: dictionary of args used by the automation :param scrub_mode: optional - string type of scrubbing handler to run :param verbose: optional - bool for debugging """ label = 'fetch_calls' datafeed_type = td_consts.DATAFEED_TD_PUTS exp_date = None latest_pricing = {} latest_close = None if work_dict: ticker = work_dict.get( 'ticker', ticker) label = work_dict.get( 'label', label) exp_date = work_dict.get( 'exp_date', exp_date) latest_pricing = work_dict.get( 'latest_pricing', latest_pricing) latest_close = latest_pricing.get( 'close', latest_close) if verbose: log.info( f'{label} - puts - close={latest_close} ' f'ticker={ticker}') exp_date = opt_dates.option_expiration().strftime( ae_consts.COMMON_DATE_FORMAT) use_url = td_consts.TD_URLS['options'].format( ticker, exp_date) headers = td_consts.get_auth_headers() session = requests.Session() session.headers = headers res = url_helper.url_helper(sess=session).get( use_url ) if res.status_code != requests.codes.OK: if res.status_code in [401, 403]: log.critical( 'Please check the TD_TOKEN is correct ' f'received {res.status_code} during ' 'fetch for: puts') else: log.info( f'failed to get put with response={res} ' f'code={res.status_code} ' f'text={res.text}') return ae_consts.EMPTY, pd.DataFrame([{}]) records = json.loads(res.text) org_records = records.get( 'options', {}).get( 'option', []) if len(org_records) == 0: log.info( 'failed to get put records ' 'text={}'.format( res.text)) return ae_consts.EMPTY, pd.DataFrame([{}]) options_list = [] # assumes UTC conversion will work with the system clock created_minute = ( datetime.datetime.utcnow() - datetime.timedelta(hours=5)).strftime( '%Y-%m-%d %H:%M:00') last_close_date = ae_utils.get_last_close_str( fmt='%Y-%m-%d %H:%M:00') # hit bug where dates were None if not last_close_date: last_close_date = created_minute for node in org_records: node['date'] = last_close_date node['created'] = created_minute node['ticker'] = ticker if ( node['option_type'] == 'put' and node['expiration_type'] == 'standard' and float(node['bid']) > 0.01): node['opt_type'] = int(ae_consts.OPTION_PUT) node['exp_date'] = node['expiration_date'] new_node = {} for col in td_consts.TD_OPTION_COLUMNS: if col in node: if col in td_consts.TD_EPOCH_COLUMNS: # trade_date can be None if node[col] == 0: new_node[col] = None else: new_node[col] = ae_utils.epoch_to_dt( epoch=node[col]/1000, use_utc=False, convert_to_est=True).strftime( ae_consts.COMMON_TICK_DATE_FORMAT) """ Debug epoch ms converter: """ """ print('-----------') print(col) print(node[col]) print(new_node[col]) print('===========') """ # if/else valid date else: new_node[col] = node[col] # if date column to convert # if column is in the row # convert all columns options_list.append(new_node) # end of records full_df = pd.DataFrame(options_list).sort_values( by=[ 'strike' ], ascending=True) num_chains = len(full_df.index) df = None if latest_close: df_filter = ( (full_df['strike'] >= (latest_close - ae_consts.OPTIONS_LOWER_STRIKE)) & (full_df['strike'] <= (latest_close + ae_consts.OPTIONS_UPPER_STRIKE))) df = full_df[df_filter].copy().sort_values( by=[ 'date', 'strike' ]).reset_index() else: mid_chain_idx = int(num_chains / 2) low_idx = int( mid_chain_idx - ae_consts.MAX_OPTIONS_LOWER_STRIKE) high_idx = int( mid_chain_idx + ae_consts.MAX_OPTIONS_UPPER_STRIKE) if low_idx < 0: low_idx = 0 if high_idx > num_chains: high_idx = num_chains df = full_df[low_idx:high_idx].copy().sort_values( by=[ 'date', 'strike' ]).reset_index() scrubbed_df = scrub_utils.ingress_scrub_dataset( label=label, scrub_mode=scrub_mode, datafeed_type=datafeed_type, msg_format='df={} date_str={}', ds_id=ticker, date_str=exp_date, df=df) return ae_consts.SUCCESS, scrubbed_df
# end of fetch_puts