"""
Dataset Fetch API
"""
import os
import json
import analysis_engine.consts as ae_consts
import analysis_engine.utils as ae_utils
import analysis_engine.iex.consts as iex_consts
import analysis_engine.work_tasks.get_new_pricing_data as price_utils
import analysis_engine.iex.extract_df_from_redis as iex_extract_utils
import analysis_engine.yahoo.extract_df_from_redis as yahoo_extract_utils
import analysis_engine.td.extract_df_from_redis as td_extract_utils
import analysis_engine.api_requests as api_requests
import spylunking.log.setup_logging as log_utils
log = log_utils.build_colorized_logger(name=__name__)
[docs]def fetch(
ticker=None,
tickers=None,
fetch_mode=None,
iex_datasets=None,
redis_enabled=True,
redis_address=None,
redis_db=None,
redis_password=None,
redis_expire=None,
s3_enabled=True,
s3_address=None,
s3_bucket=None,
s3_access_key=None,
s3_secret_key=None,
s3_region_name=None,
s3_secure=False,
celery_disabled=True,
broker_url=None,
result_backend=None,
label=None,
verbose=False):
"""fetch
Fetch all supported datasets for a stock ``ticker`` or
a list of ``tickers`` and returns a dictionary. Once
run, the datasets will all be cached in Redis and archived
in Minio (S3) by default.
Python example:
.. code-block:: python
from analysis_engine.fetch import fetch
d = fetch(ticker='NFLX')
print(d)
for k in d['NFLX']:
print(f'dataset key: {k}')
By default, it synchronously automates:
- fetching all datasets
- caching all datasets in Redis
- archiving all datasets in Minio (S3)
- returns all datasets in a single dictionary
This was created for reducing the amount of typying in
Jupyter notebooks. It can be set up for use with a
distributed engine as well with the optional arguments
depending on your connectitivty requirements.
.. note:: Please ensure Redis and Minio are running
before trying to extract tickers
**Stock tickers to fetch**
:param ticker: single stock ticker/symbol/ETF to fetch
:param tickers: optional - list of tickers to fetch
**(Optional) Data sources, datafeeds and datasets to gather**
:param fetch_mode: data sources - default is ``all`` (both IEX
and Yahoo), ``iex`` for only IEX, ``yahoo`` for only Yahoo.
:param iex_datasets: list of strings for gathering specific `IEX
datasets <https://iexcloud.io/>`__
which are set as consts: ``analysis_engine.iex.consts.FETCH_*``.
**(Optional) Redis connectivity arguments**
:param redis_enabled: bool - toggle for auto-caching all
datasets in Redis
(default is ``True``)
:param redis_address: Redis connection string format: ``host:port``
(default is ``localhost:6379``)
:param redis_db: Redis db to use
(default is ``0``)
:param redis_password: optional - Redis password
(default is ``None``)
:param redis_expire: optional - Redis expire value
(default is ``None``)
**(Optional) Minio (S3) connectivity arguments**
:param s3_enabled: bool - toggle for auto-archiving on Minio (S3)
(default is ``True``)
:param s3_address: Minio S3 connection string format: ``host:port``
(default is ``localhost:9000``)
:param s3_bucket: S3 Bucket for storing the artifacts
(default is ``dev``) which should be viewable on a browser:
http://localhost:9000/minio/dev/
:param s3_access_key: S3 Access key
(default is ``trexaccesskey``)
:param s3_secret_key: S3 Secret key
(default is ``trex123321``)
:param s3_region_name: S3 region name
(default is ``us-east-1``)
:param s3_secure: Transmit using tls encryption
(default is ``False``)
**(Optional) Celery worker broker connectivity arguments**
:param celery_disabled: bool - toggle synchronous mode or publish
to an engine connected to the `Celery broker and backend
<https://github.com/celery/celery#transports-and-backends>`__
(default is ``True`` - synchronous mode without an engine
or need for a broker or backend for Celery)
:param broker_url: Celery broker url
(default is ``redis://0.0.0.0:6379/13``)
:param result_backend: Celery backend url
(default is ``redis://0.0.0.0:6379/14``)
:param label: tracking log label
**(Optional) Debugging**
:param verbose: bool - show fetch warnings
and other debug logging (default is False)
**Supported environment variables**
::
export REDIS_ADDRESS="localhost:6379"
export REDIS_DB="0"
export S3_ADDRESS="localhost:9000"
export S3_BUCKET="dev"
export AWS_ACCESS_KEY_ID="trexaccesskey"
export AWS_SECRET_ACCESS_KEY="trex123321"
export AWS_DEFAULT_REGION="us-east-1"
export S3_SECURE="0"
export WORKER_BROKER_URL="redis://0.0.0.0:6379/13"
export WORKER_BACKEND_URL="redis://0.0.0.0:6379/14"
"""
rec = {}
extract_records = []
use_tickers = tickers
if ticker:
use_tickers = [ticker]
else:
if not use_tickers:
use_tickers = []
default_iex_datasets = [
'daily',
'minute',
'quote',
'stats',
'peers',
'news',
'financials',
'earnings',
'dividends',
'company'
]
use_iex_datasets = iex_consts.FETCH_DATASETS
if len(use_iex_datasets) == 0:
use_iex_datasets = default_iex_datasets
if not iex_datasets:
iex_datasets = use_iex_datasets
if not fetch_mode:
fetch_mode = 'all'
if redis_enabled:
if not redis_address:
redis_address = os.getenv(
'REDIS_ADDRESS',
'localhost:6379')
if not redis_password:
redis_password = os.getenv(
'REDIS_PASSWORD',
None)
if not redis_db:
redis_db = int(os.getenv(
'REDIS_DB',
'0'))
if not redis_expire:
redis_expire = os.getenv(
'REDIS_EXPIRE',
None)
if s3_enabled:
if not s3_address:
s3_address = os.getenv(
'S3_ADDRESS',
'localhost:9000')
if not s3_access_key:
s3_access_key = os.getenv(
'AWS_ACCESS_KEY_ID',
'trexaccesskey')
if not s3_secret_key:
s3_secret_key = os.getenv(
'AWS_SECRET_ACCESS_KEY',
'trex123321')
if not s3_region_name:
s3_region_name = os.getenv(
'AWS_DEFAULT_REGION',
'us-east-1')
if not s3_secure:
s3_secure = os.getenv(
'S3_SECURE',
'0') == '1'
if not s3_bucket:
s3_bucket = os.getenv(
'S3_BUCKET',
'dev')
if not broker_url:
broker_url = os.getenv(
'WORKER_BROKER_URL',
'redis://0.0.0.0:6379/13')
if not result_backend:
result_backend = os.getenv(
'WORKER_BACKEND_URL',
'redis://0.0.0.0:6379/14')
if not label:
label = 'get-latest'
num_tickers = len(use_tickers)
last_close_str = ae_utils.get_last_close_str()
if iex_datasets:
log.info(
f'{label} - getting latest for tickers={num_tickers} '
f'iex={json.dumps(iex_datasets)}')
else:
log.info(f'{label} - getting latest for tickers={num_tickers}')
for ticker in use_tickers:
ticker_key = f'{ticker}_{last_close_str}'
fetch_req = api_requests.build_get_new_pricing_request()
fetch_req['base_key'] = ticker_key
fetch_req['celery_disabled'] = celery_disabled
fetch_req['ticker'] = ticker
fetch_req['label'] = label
fetch_req['fetch_mode'] = fetch_mode
fetch_req['iex_datasets'] = iex_datasets
fetch_req['s3_enabled'] = s3_enabled
fetch_req['s3_bucket'] = s3_bucket
fetch_req['s3_address'] = s3_address
fetch_req['s3_secure'] = s3_secure
fetch_req['s3_region_name'] = s3_region_name
fetch_req['s3_access_key'] = s3_access_key
fetch_req['s3_secret_key'] = s3_secret_key
fetch_req['s3_key'] = ticker_key
fetch_req['redis_enabled'] = redis_enabled
fetch_req['redis_address'] = redis_address
fetch_req['redis_password'] = redis_password
fetch_req['redis_db'] = redis_db
fetch_req['redis_key'] = ticker_key
fetch_req['redis_expire'] = redis_expire
fetch_req['redis_address'] = redis_address
fetch_req['s3_address'] = s3_address
log.info(
f'{label} - fetching ticker={ticker} last_close={last_close_str} '
f'redis_address={fetch_req["redis_address"]} '
f's3_address={fetch_req["s3_address"]}')
fetch_res = price_utils.run_get_new_pricing_data(
work_dict=fetch_req)
if fetch_res['status'] == ae_consts.SUCCESS:
log.info(
f'{label} - fetched ticker={ticker} '
'preparing for extraction')
extract_req = fetch_req
extract_records.append(extract_req)
else:
log.warning(
f'{label} - failed getting ticker={ticker} data '
f'status={ae_consts.get_status(status=fetch_res["status"])} '
f'err={fetch_res["err"]}')
# end of if worked or not
# end for all tickers to fetch
"""
Extract Datasets
"""
iex_daily_status = ae_consts.FAILED
iex_minute_status = ae_consts.FAILED
iex_quote_status = ae_consts.FAILED
iex_stats_status = ae_consts.FAILED
iex_peers_status = ae_consts.FAILED
iex_news_status = ae_consts.FAILED
iex_financials_status = ae_consts.FAILED
iex_earnings_status = ae_consts.FAILED
iex_dividends_status = ae_consts.FAILED
iex_company_status = ae_consts.FAILED
yahoo_news_status = ae_consts.FAILED
yahoo_options_status = ae_consts.FAILED
yahoo_pricing_status = ae_consts.FAILED
td_calls_status = ae_consts.FAILED
td_puts_status = ae_consts.FAILED
iex_daily_df = None
iex_minute_df = None
iex_quote_df = None
iex_stats_df = None
iex_peers_df = None
iex_news_df = None
iex_financials_df = None
iex_earnings_df = None
iex_dividends_df = None
iex_company_df = None
yahoo_option_calls_df = None
yahoo_option_puts_df = None
yahoo_pricing_df = None
yahoo_news_df = None
td_calls_df = None
td_puts_df = None
extract_iex = True
if fetch_mode not in ['all', 'iex']:
extract_iex = False
extract_yahoo = True
if fetch_mode not in ['all', 'yahoo']:
extract_yahoo = False
extract_td = True
if fetch_mode not in ['all', 'td']:
extract_td = False
for service_dict in extract_records:
ticker_data = {}
ticker = service_dict['ticker']
extract_req = api_requests.get_ds_dict(
ticker=ticker,
base_key=service_dict.get('base_key', None),
ds_id=label,
service_dict=service_dict)
if 'daily' in iex_datasets or extract_iex:
iex_daily_status, iex_daily_df = \
iex_extract_utils.extract_daily_dataset(
extract_req)
if iex_daily_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch iex_daily={ticker}')
if 'minute' in iex_datasets or extract_iex:
iex_minute_status, iex_minute_df = \
iex_extract_utils.extract_minute_dataset(
extract_req)
if iex_minute_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch iex_minute={ticker}')
if 'quote' in iex_datasets or extract_iex:
iex_quote_status, iex_quote_df = \
iex_extract_utils.extract_quote_dataset(
extract_req)
if iex_quote_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch iex_quote={ticker}')
if 'stats' in iex_datasets or extract_iex:
iex_stats_df, iex_stats_df = \
iex_extract_utils.extract_stats_dataset(
extract_req)
if iex_stats_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch iex_stats={ticker}')
if 'peers' in iex_datasets or extract_iex:
iex_peers_df, iex_peers_df = \
iex_extract_utils.extract_peers_dataset(
extract_req)
if iex_peers_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch iex_peers={ticker}')
if 'news' in iex_datasets or extract_iex:
iex_news_status, iex_news_df = \
iex_extract_utils.extract_news_dataset(
extract_req)
if iex_news_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch iex_news={ticker}')
if 'financials' in iex_datasets or extract_iex:
iex_financials_status, iex_financials_df = \
iex_extract_utils.extract_financials_dataset(
extract_req)
if iex_financials_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch iex_financials={ticker}')
if 'earnings' in iex_datasets or extract_iex:
iex_earnings_status, iex_earnings_df = \
iex_extract_utils.extract_dividends_dataset(
extract_req)
if iex_earnings_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch iex_earnings={ticker}')
if 'dividends' in iex_datasets or extract_iex:
iex_dividends_status, iex_dividends_df = \
iex_extract_utils.extract_dividends_dataset(
extract_req)
if iex_dividends_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch iex_dividends={ticker}')
if 'company' in iex_datasets or extract_iex:
iex_company_status, iex_company_df = \
iex_extract_utils.extract_dividends_dataset(
extract_req)
if iex_company_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch iex_company={ticker}')
# end of iex extracts
if extract_yahoo:
yahoo_options_status, yahoo_option_calls_df = \
yahoo_extract_utils.extract_option_calls_dataset(
extract_req)
yahoo_options_status, yahoo_option_puts_df = \
yahoo_extract_utils.extract_option_puts_dataset(
extract_req)
if yahoo_options_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch yahoo_options={ticker}')
yahoo_pricing_status, yahoo_pricing_df = \
yahoo_extract_utils.extract_pricing_dataset(
extract_req)
if yahoo_pricing_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch yahoo_pricing={ticker}')
yahoo_news_status, yahoo_news_df = \
yahoo_extract_utils.extract_yahoo_news_dataset(
extract_req)
if yahoo_news_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch yahoo_news={ticker}')
# end of yahoo extracts
if extract_td:
td_calls_status, td_calls_df = \
td_extract_utils.extract_option_calls_dataset(
extract_req)
if td_calls_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch tdcalls={ticker}')
td_puts_status, td_puts_df = \
td_extract_utils.extract_option_puts_dataset(
extract_req)
if td_puts_status != ae_consts.SUCCESS:
if verbose:
log.warning(f'unable to fetch tdputs={ticker}')
# td extracts
ticker_data['daily'] = iex_daily_df
ticker_data['minute'] = iex_minute_df
ticker_data['quote'] = iex_quote_df
ticker_data['stats'] = iex_stats_df
ticker_data['peers'] = iex_peers_df
ticker_data['news1'] = iex_news_df
ticker_data['financials'] = iex_financials_df
ticker_data['earnings'] = iex_earnings_df
ticker_data['dividends'] = iex_dividends_df
ticker_data['company'] = iex_company_df
ticker_data['calls'] = yahoo_option_calls_df
ticker_data['puts'] = yahoo_option_puts_df
ticker_data['pricing'] = yahoo_pricing_df
ticker_data['news'] = yahoo_news_df
ticker_data['tdcalls'] = td_calls_df
ticker_data['tdputs'] = td_puts_df
rec[ticker] = ticker_data
# end of for service_dict in extract_records
return rec
# end of fetch