Source code for analysis_engine.run_algo

Run an Algo

**Supported environment variables**


    export REDIS_ADDRESS="localhost:6379"
    export REDIS_DB="0"
    export S3_ADDRESS="localhost:9000"
    export S3_BUCKET="dev"
    export AWS_ACCESS_KEY_ID="trexaccesskey"
    export AWS_SECRET_ACCESS_KEY="trex123321"
    export AWS_DEFAULT_REGION="us-east-1"
    export S3_SECURE="0"
    export WORKER_BROKER_URL="redis://"
    export WORKER_BACKEND_URL="redis://"

    # to show debug, trace logging please export ``SHARED_LOG_CFG``
    # to a debug logger json file. To turn on debugging for this
    # library, you can export this variable to the repo's
    # included file with the command:
    export SHARED_LOG_CFG=/opt/sa/analysis_engine/log/debug-logging.json

import os
import datetime
import json
import analysis_engine.consts as ae_consts
import analysis_engine.algo as base_algo
import analysis_engine.utils as ae_utils
import analysis_engine.build_algo_request as algo_utils
import analysis_engine.build_dataset_node as build_ds_node
import analysis_engine.build_result as build_result
import analysis_engine.api_requests as api_requests
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)

[docs]def run_algo( ticker=None, tickers=None, algo=None, # optional derived ``analysis_engine.algo.Algo`` instance balance=None, # float starting base capital commission=None, # float for single trade commission for buy or sell start_date=None, # string YYYY-MM-DD HH:MM:SS end_date=None, # string YYYY-MM-DD HH:MM:SS datasets=None, # string list of identifiers num_owned_dict=None, # not supported cache_freq='daily', # 'minute' not supported auto_fill=True, load_config=None, report_config=None, history_config=None, extract_config=None, use_key=None, extract_mode='all', iex_datasets=None, redis_enabled=True, redis_address=None, redis_db=None, redis_password=None, redis_expire=None, redis_key=None, s3_enabled=True, s3_address=None, s3_bucket=None, s3_access_key=None, s3_secret_key=None, s3_region_name=None, s3_secure=False, s3_key=None, celery_disabled=True, broker_url=None, result_backend=None, label=None, name=None, timeseries=None, trade_strategy=None, verbose=False, publish_to_slack=True, publish_to_s3=True, publish_to_redis=True, extract_datasets=None, config_file=None, config_dict=None, version=1, raise_on_err=True, **kwargs): """run_algo Run an algorithm with steps: 1) Extract redis keys between dates 2) Compile a data pipeline dictionary (call it ``data``) 3) Call algorithm's ``myalgo.handle_data(data=data)`` .. note:: If no ``algo`` is set, the ``analysis_engine.algo.BaseAlgo`` algorithm is used. .. note:: Please ensure Redis and Minio are running before trying to extract tickers **Stock tickers to extract** :param ticker: single stock ticker/symbol/ETF to extract :param tickers: optional - list of tickers to extract :param use_key: optional - extract historical key from Redis **Algo Configuration** :param algo: derived instance of ``analysis_engine.algo.Algo`` object :param balance: optional - float balance parameter can also be set on the ``algo`` object if not set on the args :param commission: float for single trade commission for buy or sell. can also be set on the ``algo`` objet :param start_date: string ``YYYY-MM-DD_HH:MM:SS`` cache value :param end_date: string ``YYYY-MM-DD_HH:MM:SS`` cache value :param dataset_types: list of strings that are ``iex`` or ``yahoo`` datasets that are cached. :param cache_freq: optional - depending on if you are running data feeds on a ``daily`` cron (default) vs every ``minute`` (or faster) :param num_owned_dict: not supported yet :param auto_fill: optional - boolean for auto filling buy/sell orders for backtesting (default is ``True``) :param trading_calendar: ``trading_calendar.TradingCalendar`` object, by default ``analysis_engine.calendars. always_open.AlwaysOpen`` trading calendar # TradingCalendar by ``TFSExchangeCalendar`` :param config_file: path to a json file containing custom algorithm object member values (like indicator configuration and predict future date units ahead for a backtest) :param config_dict: optional - dictionary that can be passed to derived class implementations of: ``def load_from_config(config_dict=config_dict)`` **Timeseries** :param timeseries: optional - string to set ``day`` or ``minute`` backtesting or live trading (default is ``minute``) **Trading Strategy** :param trade_strategy: optional - string to set the type of ``Trading Strategy`` for backtesting or live trading (default is ``count``) **Algorithm Dataset Loading, Extracting, Reporting and Trading History arguments** :param load_config: optional - dictionary for setting member variables to load an agorithm-ready dataset from a file, s3 or redis :param report_config: optional - dictionary for setting member variables to publish an algo ``trading performance report`` to s3, redis, a file or slack :param history_config: optional - dictionary for setting member variables to publish an algo ``trade history`` to s3, redis, a file or slack :param extract_config: optional - dictionary for setting member variables to publish an algo ``trading performance report`` to s3, redis, a file or slack **(Optional) Data sources, datafeeds and datasets to gather** :param iex_datasets: list of strings for gathering specific `IEX datasets <>`__ which are set as consts: ``analysis_engine.iex.consts.FETCH_*``. **(Optional) Redis connectivity arguments** :param redis_enabled: bool - toggle for auto-caching all datasets in Redis (default is ``True``) :param redis_address: Redis connection string format is ``host:port`` (default is ``localhost:6379``) :param redis_db: Redis db to use (default is ``0``) :param redis_password: optional - Redis password (default is ``None``) :param redis_expire: optional - Redis expire value (default is ``None``) :param redis_key: optional - redis key not used (default is ``None``) **(Optional) Minio (S3) connectivity arguments** :param s3_enabled: bool - toggle for auto-archiving on Minio (S3) (default is ``True``) :param s3_address: Minio S3 connection string format ``host:port`` (default is ``localhost:9000``) :param s3_bucket: S3 Bucket for storing the artifacts (default is ``dev``) which should be viewable on a browser: http://localhost:9000/minio/dev/ :param s3_access_key: S3 Access key (default is ``trexaccesskey``) :param s3_secret_key: S3 Secret key (default is ``trex123321``) :param s3_region_name: S3 region name (default is ``us-east-1``) :param s3_secure: Transmit using tls encryption (default is ``False``) :param s3_key: optional s3 key not used (default is ``None``) **(Optional) Celery worker broker connectivity arguments** :param celery_disabled: bool - toggle synchronous mode or publish to an engine connected to the `Celery broker and backend <>`__ (default is ``True`` - synchronous mode without an engine or need for a broker or backend for Celery) :param broker_url: Celery broker url (default is ``redis://``) :param result_backend: Celery backend url (default is ``redis://``) :param label: tracking log label :param publish_to_slack: optional - boolean for publishing to slack (coming soon) :param publish_to_s3: optional - boolean for publishing to s3 (coming soon) :param publish_to_redis: optional - boolean for publishing to redis (coming soon) **(Optional) Debugging** :param verbose: bool - show extract warnings and other debug logging (default is False) :param raise_on_err: optional - boolean for unittests and developing algorithms with the ``analysis_engine.run_algo.run_algo`` helper. When set to ``True`` exceptions will are raised to the calling functions :param kwargs: keyword arguments dictionary """ # dictionary structure with a list sorted on: ascending dates # algo_data_req[ticker][list][dataset] = pd.DataFrame algo_data_req = {} extract_requests = [] return_algo = False # return created algo objects for use by caller rec = {} msg = None use_tickers = tickers use_balance = balance use_commission = commission if ticker: use_tickers = [ticker] else: if not use_tickers: use_tickers = [] # if these are not set as args, but the algo object # has them, use them instead: if algo: if len(use_tickers) == 0: use_tickers = algo.get_tickers() if not use_balance: use_balance = algo.get_balance() if not use_commission: use_commission = algo.get_commission() default_iex_datasets = [ 'daily', 'minute', 'quote', 'stats', 'peers', 'news', 'financials', 'earnings', 'dividends', 'company' ] if not iex_datasets: iex_datasets = default_iex_datasets if redis_enabled: if not redis_address: redis_address = os.getenv( 'REDIS_ADDRESS', 'localhost:6379') if not redis_password: redis_password = os.getenv( 'REDIS_PASSWORD', None) if not redis_db: redis_db = int(os.getenv( 'REDIS_DB', '0')) if not redis_expire: redis_expire = os.getenv( 'REDIS_EXPIRE', None) if s3_enabled: if not s3_address: s3_address = os.getenv( 'S3_ADDRESS', 'localhost:9000') if not s3_access_key: s3_access_key = os.getenv( 'AWS_ACCESS_KEY_ID', 'trexaccesskey') if not s3_secret_key: s3_secret_key = os.getenv( 'AWS_SECRET_ACCESS_KEY', 'trex123321') if not s3_region_name: s3_region_name = os.getenv( 'AWS_DEFAULT_REGION', 'us-east-1') if not s3_secure: s3_secure = os.getenv( 'S3_SECURE', '0') == '1' if not s3_bucket: s3_bucket = os.getenv( 'S3_BUCKET', 'dev') if not broker_url: broker_url = os.getenv( 'WORKER_BROKER_URL', 'redis://') if not result_backend: result_backend = os.getenv( 'WORKER_BACKEND_URL', 'redis://') if not label: label = 'run-algo' num_tickers = len(use_tickers) last_close_str = ae_utils.get_last_close_str() if iex_datasets: if verbose: f'{label} - tickers={num_tickers} ' f'iex={json.dumps(iex_datasets)}') else: if verbose:'{label} - tickers={num_tickers}') ticker_key = use_key if not ticker_key: ticker_key = f'{ticker}_{last_close_str}' if not algo: algo = base_algo.BaseAlgo( ticker=None, tickers=use_tickers, balance=use_balance, commission=use_commission, config_dict=config_dict, name=label, auto_fill=auto_fill, timeseries=timeseries, trade_strategy=trade_strategy, publish_to_slack=publish_to_slack, publish_to_s3=publish_to_s3, publish_to_redis=publish_to_redis, raise_on_err=raise_on_err) return_algo = True # the algo object is stored # in the result at: res['rec']['algo'] if not algo: msg = f'{label} - missing algo object' log.error(msg) return build_result.build_result( status=ae_consts.EMPTY, err=msg, rec=rec) if raise_on_err: log.debug(f'{label} - enabling algo exception raises') algo.raise_on_err = True indicator_datasets = algo.get_indicator_datasets() if len(indicator_datasets) == 0: indicator_datasets = ae_consts.BACKUP_DATASETS f'using all datasets={indicator_datasets}') verbose_extract = False if config_dict: verbose_extract = config_dict.get('verbose_extract', False) common_vals = {} common_vals['base_key'] = ticker_key common_vals['celery_disabled'] = celery_disabled common_vals['ticker'] = ticker common_vals['label'] = label common_vals['iex_datasets'] = iex_datasets common_vals['s3_enabled'] = s3_enabled common_vals['s3_bucket'] = s3_bucket common_vals['s3_address'] = s3_address common_vals['s3_secure'] = s3_secure common_vals['s3_region_name'] = s3_region_name common_vals['s3_access_key'] = s3_access_key common_vals['s3_secret_key'] = s3_secret_key common_vals['s3_key'] = ticker_key common_vals['redis_enabled'] = redis_enabled common_vals['redis_address'] = redis_address common_vals['redis_password'] = redis_password common_vals['redis_db'] = redis_db common_vals['redis_key'] = ticker_key common_vals['redis_expire'] = redis_expire use_start_date_str = start_date use_end_date_str = end_date last_close_date = ae_utils.last_close() end_date_val = None cache_freq_fmt = ae_consts.COMMON_TICK_DATE_FORMAT if not use_end_date_str: use_end_date_str = last_close_date.strftime( cache_freq_fmt) end_date_val = ae_utils.get_date_from_str( date_str=use_end_date_str, fmt=cache_freq_fmt) start_date_val = None if not use_start_date_str: start_date_val = end_date_val - datetime.timedelta( days=60) use_start_date_str = start_date_val.strftime( cache_freq_fmt) else: start_date_val = datetime.datetime.strptime( use_start_date_str, ae_consts.COMMON_TICK_DATE_FORMAT) total_dates = (end_date_val - start_date_val).days if end_date_val < start_date_val: msg = ( f'{label} - invalid dates - start_date={start_date_val} is after ' f'end_date={end_date_val}') raise Exception(msg) if verbose: f'{label} - days={total_dates} ' f'start={use_start_date_str} ' f'end={use_end_date_str} ' f'datasets={indicator_datasets}') for ticker in use_tickers: req = algo_utils.build_algo_request( ticker=ticker, use_key=use_key, start_date=use_start_date_str, end_date=use_end_date_str, datasets=datasets, balance=use_balance, cache_freq=cache_freq, timeseries=timeseries, trade_strategy=trade_strategy, label=label) ticker_key = f'{ticker}_{last_close_str}' common_vals['ticker'] = ticker common_vals['base_key'] = ticker_key common_vals['redis_key'] = ticker_key common_vals['s3_key'] = ticker_key for date_key in req['extract_datasets']: date_req = api_requests.get_ds_dict( ticker=ticker, base_key=date_key, ds_id=label, service_dict=common_vals) node_date_key = date_key.replace( f'{ticker}_', '') extract_requests.append({ 'id': date_key, 'ticker': ticker, 'date_key': date_key, 'date': node_date_key, 'req': date_req}) # end of for all ticker in use_tickers first_extract_date = None last_extract_date = None total_extract_requests = len(extract_requests) cur_idx = 1 for idx, extract_node in enumerate(extract_requests): extract_ticker = extract_node['ticker'] extract_date = extract_node['date'] ds_node_id = extract_node['id'] if not first_extract_date: first_extract_date = extract_date last_extract_date = extract_date perc_progress = ae_consts.get_percent_done( progress=cur_idx, total=total_extract_requests) percent_label = ( f'{label} ' f'ticker={extract_ticker} ' f'date={extract_date} ' f'{perc_progress} ' f'{idx}/{total_extract_requests} ' f'{indicator_datasets}') if verbose: f'extracting - {percent_label}') ticker_bt_data = build_ds_node.build_dataset_node( ticker=extract_ticker, date=extract_date, service_dict=common_vals, datasets=indicator_datasets, log_label=label, verbose=verbose_extract) if ticker not in algo_data_req: algo_data_req[ticker] = [] algo_data_req[ticker].append({ 'id': ds_node_id, # id is currently the cache key in redis 'date': extract_date, # used to confirm dates in asc order 'data': ticker_bt_data }) if verbose: f'extract - {percent_label} ' f'dataset={len(algo_data_req[ticker])}') cur_idx += 1 # end of for service_dict in extract_requests # this could be a separate celery task status = ae_consts.NOT_RUN if len(algo_data_req) == 0: msg = ( f'{label} - nothing to test - no data found for ' f'tickers={use_tickers} ' f'between {first_extract_date} and {last_extract_date}') return build_result.build_result( status=ae_consts.EMPTY, err=msg, rec=rec) # this could be a separate celery task try: if verbose: f'handle_data START - {percent_label} from ' f'{first_extract_date} to {last_extract_date}') algo.handle_data( data=algo_data_req) if verbose: f'handle_data END - {percent_label} from ' f'{first_extract_date} to {last_extract_date}') except Exception as e: a_name = algo.get_name() a_debug_msg = algo.get_debug_msg() if not a_debug_msg: a_debug_msg = 'debug message not set' a_config_dict = ae_consts.ppj(algo.config_dict) msg = ( f'{percent_label} - algo={a_name} ' f'encountered exception in handle_data tickers={use_tickers} ' f'from {first_extract_date} to {last_extract_date} ex={e} ' f'and failed during operation: {a_debug_msg}') if raise_on_err: if algo: try: ind_obj = \ algo.get_indicator_process_last_indicator() if ind_obj: ind_obj_path = ind_obj.get_path_to_module() ind_obj_config = ae_consts.ppj( ind_obj.get_config()) found_error_hint = False if hasattr(ind_obj.use_df, 'to_json'): if len(ind_obj.use_df.index) == 0: log.critical( f'indicator failure report for ' f'last module: ' f'{ind_obj_path} ' f'indicator={ind_obj.get_name()} ' f'config={ind_obj_config} ' f'dataset={ind_obj.use_df.head(5)} ' f'name_of_dataset={ind_obj.uses_data}') log.critical( f'--------------------------------------' f'--------------------------------------') log.critical( f'Please check if this indicator: ' f'{ind_obj_path} ' f'supports Empty Dataframes') log.critical( f'--------------------------------------' f'--------------------------------------') found_error_hint = True # indicator error hints if not found_error_hint: log.critical( f'indicator failure report for last module: ' f'{ind_obj_path} ' f'indicator={ind_obj.get_name()} ' f'config={ind_obj_config} ' f'dataset={ind_obj.use_df.head(5)} ' f'name_of_dataset={ind_obj.uses_data}') except Exception as f: log.critical( f'failed to pull indicator processor ' f'last indicator for debugging ' f'from ex={e} with parsing ex={f}') # end of ignoring non-supported ways of creating # indicator processors log.error(msg) log.error( f'algo failure report: ' f'algo={a_name} handle_data() ' f'config={a_config_dict} ') log.critical( f'algo failed during operation: {a_debug_msg}') raise e else: log.error(msg) return build_result.build_result( status=ae_consts.ERR, err=msg, rec=rec) # end of try/ex # this could be a separate celery task try: if verbose: f'get_result START - {percent_label} from ' f'{first_extract_date} to {last_extract_date}') rec = algo.get_result() status = ae_consts.SUCCESS if verbose: f'get_result END - {percent_label} from ' f'{first_extract_date} to {last_extract_date}') except Exception as e: msg = ( f'{percent_label} - algo={algo.get_name()} encountered exception ' f'in get_result tickers={use_tickers} from ' f'{first_extract_date} to {last_extract_date} ex={e}') if raise_on_err: if algo: log.error( f'algo={algo.get_name()} failed in get_result with ' f'debug_msg={algo.get_debug_msg()}') log.error(msg) raise e else: log.error(msg) return build_result.build_result( status=ae_consts.ERR, err=msg, rec=rec) # end of try/ex if return_algo: rec['algo'] = algo return build_result.build_result( status=status, err=msg, rec=rec)
# end of run_algo