Source code for analysis_engine.algo_runner

"""
A class for running backtests and the latest pricing data
with an automated publishing of the ``Trading History`` to S3
"""

import os
import datetime
import json
import pandas as pd
import analysis_engine.consts as ae_consts
import analysis_engine.utils as ae_utils
import analysis_engine.build_dataset_node as build_dataset_node
import analysis_engine.load_history_dataset as load_history_utils
import analysis_engine.run_custom_algo as run_custom_algo
import analysis_engine.publish as publish
import analysis_engine.algo as base_algo
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)


[docs]class AlgoRunner: """AlgoRunner Run an algorithm backtest or with the latest pricing data and publish the compressed trading history to s3 which can be used to train AI **Full Backtest** .. code-block:: python import analysis_engine.algo_runner as algo_runner runner = algo_runner.AlgoRunner('SPY') runner.start() **Run Algorithm with Latest Pricing Data** .. code-block:: python import analysis_engine.algo_runner as algo_runner import analysis_engine.plot_trading_history as plot ticker = 'SPY' runner = algo_runner.AlgoRunner(ticker) # run the algorithm with the latest 200 minutes: df = runner.latest() print(df[['minute', 'close']].tail(5)) plot.plot_trading_history( title=( f'{ticker} - ${df["close"].iloc[-1]} ' f'at: {df["minute"].iloc[-1]}'), df=df) """ def __init__( self, ticker, algo_config=None, start_date=None, end_date=None, history_loc=None, predictions_loc=None, run_on_engine=False, verbose_algo=False, verbose_processor=False, verbose_indicators=False, **kwargs): """__init__ constructor :param ticker: string ticker :param algo_config: optional - string path to file (default is ``./cfg/default_algo.json``) :param history_loc: optional - string trading history location (default is ``s3://algohistory/trade_history_{ticker}``) :param predictions_loc: optional - string predictions location :param start_date: optional - string start date :param end_date: optional - string end date :param run_on_engine: optional - bool flag for running local or distributed on an engine (default is ``False`` - local) :param verbose_algo: optional - bool flag for debugging the algo (default is ``False``) :param verbose_processor: optional - bool flag for debugging the algo's indicator processor (default is ``False``) :param verbose_indicators: optional - bool flag for debugging the algo's indicators (default is ``False``) :param kwargs: keyword args dictionary """ self.ticker = None if ticker: self.ticker = str(ticker).upper() self.start_date = start_date self.end_date = end_date self.start_day = None self.end_day = None self.run_on_engine = run_on_engine self.algo_history_loc = ( f's3://algohistory/trade_history_{self.ticker}') self.algo_predictions_loc = ( f's3://predictions/{self.ticker}') if history_loc: self.algo_history_loc = history_loc if predictions_loc: self.algo_predictions_loc = predictions_loc self.pt_s3_access_key = ae_consts.ev( 'PREDICTIONS_S3_ACCESS_KEY', ae_consts.S3_ACCESS_KEY) self.pt_s3_secret_key = ae_consts.ev( 'PREDICTIONS_S3_SECRET_KEY', ae_consts.S3_SECRET_KEY) self.pt_s3_address = ae_consts.ev( 'PREDICTIONS_S3_ADDRESS', ae_consts.S3_ADDRESS) self.pt_s3_secure = str(ae_consts.ev( 'PREDICTIONS_S3_SECURE', ae_consts.S3_SECURE)) == '1' self.pt_s3_region = ae_consts.ev( 'PREDICTIONS_S3_REGION_NAME', ae_consts.S3_REGION_NAME) self.pt_s3_bucket = self.algo_predictions_loc.split('/')[-2] self.pt_s3_key = self.algo_predictions_loc.split('/')[-1] self.config_dict = None self.use_config_file = algo_config if not self.use_config_file: if os.path.exists('./cfg/default_algo.json'): self.use_config_file = ( './cfg/default_algo.json') elif os.path.exists('/opt/sa/cfg/default_algo.json'): self.use_config_file = ( '/opt/sa/cfg/default_algo.json') else: log.critical( f'Failed: missing algo_config argument pointing to a ' f'config file') return self.config_dict = json.loads(open(self.use_config_file, 'r').read()) self.algo_mod_path = self.config_dict.get( 'algo_path', ae_consts.ALGO_MODULE_PATH) if not os.path.exists(self.algo_mod_path): log.critical( f'missing algorithm module file from config: ' f'{self.algo_mod_path}') return self.balance = 10000.0 self.commission = 6.0 self.use_start_date = None self.use_end_date = None self.inspect_datasets = False self.history_json_file = None self.run_this_date = None self.algo_obj = None self.algo_report_loc = None self.algo_extract_loc = None self.backtest_loc = None self.raise_on_err = True self.ssl_options = ae_consts.SSL_OPTIONS self.transport_options = ae_consts.TRANSPORT_OPTIONS self.broker_url = ae_consts.WORKER_BROKER_URL self.backend_url = ae_consts.WORKER_BACKEND_URL self.path_to_config_module = ae_consts.WORKER_CELERY_CONFIG_MODULE self.include_tasks = ae_consts.INCLUDE_TASKS self.load_from_s3_bucket = None self.load_from_s3_key = None self.load_from_redis_key = None self.load_from_file = None self.load_compress = True self.load_publish = True self.load_config = None self.report_redis_key = None self.report_s3_bucket = None self.report_s3_key = None self.report_file = None self.report_compress = True self.report_publish = False self.report_config = None self.history_redis_key = None self.history_s3_bucket = None self.history_s3_key = None self.history_file = None self.history_compress = True self.history_publish = True self.history_config = None self.extract_redis_key = None self.extract_s3_bucket = None self.extract_s3_key = None self.extract_file = None self.extract_save_dir = None self.extract_compress = False self.extract_publish = False self.extract_config = None self.s3_enabled = True self.s3_access_key = ae_consts.S3_ACCESS_KEY self.s3_secret_key = ae_consts.S3_SECRET_KEY self.s3_region_name = ae_consts.S3_REGION_NAME self.s3_address = ae_consts.S3_ADDRESS self.s3_bucket = ae_consts.S3_BUCKET self.s3_key = None self.s3_secure = ae_consts.S3_SECURE self.redis_enabled = True self.redis_address = ae_consts.REDIS_ADDRESS self.redis_key = None self.redis_password = ae_consts.REDIS_PASSWORD self.redis_db = ae_consts.REDIS_DB self.redis_expire = ae_consts.REDIS_EXPIRE self.redis_serializer = 'json' self.redis_encoding = 'utf-8' self.publish_to_s3 = True self.publish_to_redis = True self.publish_to_slack = False self.slack_enabled = False self.slack_code_block = False self.slack_full_width = False self.dataset_type = ae_consts.SA_DATASET_TYPE_ALGO_READY self.serialize_datasets = ae_consts.DEFAULT_SERIALIZED_DATASETS self.compress = False self.encoding = 'utf-8' self.debug = False self.num_rows = 0 self.auto_fill = True self.timeseries = 'minute' self.trade_strategy = 'count' self.algo_history_s3_bucket = None self.algo_history_s3_key = None if self.start_date: self.use_start_date = f'{str(self.start_date)} 00:00:00' datetime.datetime.strptime( self.start_date, ae_consts.COMMON_DATE_FORMAT) if self.end_date: self.use_end_date = f'{str(self.end_date)} 00:00:00' datetime.datetime.strptime( self.end_date, ae_consts.COMMON_DATE_FORMAT) """ Finalize the algo config """ self.debug = False self.verbose_algo = verbose_algo self.verbose_processor = verbose_processor self.verbose_indicators = verbose_indicators if self.config_dict: if self.ticker: self.config_dict['ticker'] = self.ticker else: self.ticker = ( str(self.config_dict['ticker']).upper()) self.config_dict['balance'] = self.balance self.config_dict['commission'] = self.commission if self.verbose_algo: self.config_dict['verbose'] = self.verbose_algo if self.verbose_processor: self.config_dict['verbose_processor'] = self.verbose_processor if self.verbose_indicators: self.config_dict['verbose_indicators'] = self.verbose_indicators if self.inspect_datasets: self.config_dict['inspect_datasets'] = self.inspect_datasets if self.run_this_date: self.config_dict['run_this_date'] = self.run_this_date if self.backtest_loc: if 's3://' in self.backtest_loc: self.load_from_s3_bucket = self.backtest_loc.split('/')[-2] self.load_from_s3_key = self.backtest_loc.split('/')[-1] elif 'redis://' in self.backtest_loc: self.load_from_redis_key = self.backtest_loc.split('/')[-1] elif 'file:/' in self.backtest_loc: self.load_from_file = self.backtest_loc.split(':')[-1] else: log.error( 'invalid -b <backtest dataset file> specified. ' f'{self.backtest_loc} ' 'please use either: ' '-b file:/opt/sa/tests/datasets/algo/SPY-latest.json or ' '-b s3://algoready/SPY-latest.json or ' '-b redis://SPY-latest') self.load_publish = True if self.algo_history_loc: if 's3://' in self.algo_history_loc: self.history_s3_bucket = self.algo_history_loc.split('/')[-2] self.history_s3_key = self.algo_history_loc.split('/')[-1] elif 'redis://' in self.algo_history_loc: self.history_redis_key = self.algo_history_loc.split('/')[-1] elif 'file:/' in self.algo_history_loc: self.history_file = self.algo_history_loc.split(':')[-1] else: log.error( 'invalid -p <backtest dataset file> specified. ' f'{self.algo_history_loc} ' 'please use either: ' '-p file:/opt/sa/tests/datasets/algo/SPY-latest.json or ' '-p s3://algoready/SPY-latest.json or ' '-p redis://SPY-latest') self.history_publish = True if self.algo_report_loc: if 's3://' in self.algo_report_loc: self.report_s3_bucket = self.algo_report_loc.split('/')[-2] self.report_s3_key = self.algo_report_loc.split('/')[-1] elif 'redis://' in self.algo_report_loc: self.report_redis_key = self.algo_report_loc.split('/')[-1] elif 'file:/' in self.algo_report_loc: self.report_file = self.algo_report_loc.split(':')[-1] else: log.error( 'invalid -o <backtest dataset file> specified. ' f'{self.algo_report_loc} ' 'please use either: ' '-o file:/opt/sa/tests/datasets/algo/SPY-latest.json or ' '-o s3://algoready/SPY-latest.json or ' '-o redis://SPY-latest') self.report_publish = True if self.algo_extract_loc: if 's3://' in self.algo_extract_loc: self.extract_s3_bucket = self.algo_extract_loc.split('/')[-2] self.extract_s3_key = self.algo_extract_loc.split('/')[-1] elif 'redis://' in self.algo_extract_loc: self.extract_redis_key = self.algo_extract_loc.split('/')[-1] elif 'file:/' in self.algo_extract_loc: self.extract_file = self.algo_extract_loc.split(':')[-1] else: log.error( 'invalid -e <backtest dataset file> specified. ' f'{self.algo_extract_loc} ' 'please use either: ' '-e file:/opt/sa/tests/datasets/algo/SPY-latest.json or ' '-e s3://algoready/SPY-latest.json or ' '-e redis://SPY-latest') self.extract_publish = True self.use_name = self.config_dict.get( 'name', 'missing-algo-name') self.auto_fill = self.config_dict.get( 'auto_fill', self.auto_fill) self.timeseries = self.config_dict.get( 'timeseries', self.timeseries) self.trade_strategy = self.config_dict.get( 'trade_strategy', self.trade_strategy) self.iex_datasets = ae_consts.IEX_DATASETS_DEFAULT self.common_fetch_vals = {} self.common_fetch_vals['ticker'] = self.ticker self.common_fetch_vals['celery_disabled'] = True self.common_fetch_vals['label'] = 'lp' self.common_fetch_vals['iex_datasets'] = self.iex_datasets self.common_fetch_vals['s3_enabled'] = False self.common_fetch_vals['s3_address'] = None self.common_fetch_vals['s3_bucket'] = None self.common_fetch_vals['s3_key'] = None self.common_fetch_vals['s3_access_key'] = None self.common_fetch_vals['s3_secret_key'] = None self.common_fetch_vals['s3_region_name'] = None self.common_fetch_vals['s3_secure'] = None self.common_fetch_vals['redis_enabled'] = True self.common_fetch_vals['redis_address'] = self.redis_address self.common_fetch_vals['redis_password'] = self.redis_password self.common_fetch_vals['redis_expire'] = None self.common_fetch_vals['redis_db'] = self.redis_db self.history_df = None self.slack_enabled = False # end of __init__
[docs] def start( self): """start Start the algorithm backtest """ log.info( 'starting algo ' f's3://{self.history_s3_bucket}/{self.history_s3_key}') self.algo_res = run_custom_algo.run_custom_algo( mod_path=self.algo_mod_path, ticker=self.config_dict['ticker'], balance=self.config_dict['balance'], commission=self.config_dict['commission'], name=self.use_name, start_date=self.use_start_date, end_date=self.use_end_date, auto_fill=self.auto_fill, config_dict=self.config_dict, load_from_s3_bucket=self.load_from_s3_bucket, load_from_s3_key=self.load_from_s3_key, load_from_redis_key=self.load_from_redis_key, load_from_file=self.load_from_file, load_compress=self.load_compress, load_publish=self.load_publish, load_config=self.load_config, report_redis_key=self.report_redis_key, report_s3_bucket=self.report_s3_bucket, report_s3_key=self.report_s3_key, report_file=self.report_file, report_compress=self.report_compress, report_publish=self.report_publish, report_config=self.report_config, history_redis_key=self.history_redis_key, history_s3_bucket=self.history_s3_bucket, history_s3_key=self.history_s3_key, history_file=self.history_file, history_compress=self.history_compress, history_publish=self.history_publish, history_config=self.history_config, extract_redis_key=self.extract_redis_key, extract_s3_bucket=self.extract_s3_bucket, extract_s3_key=self.extract_s3_key, extract_file=self.extract_file, extract_save_dir=self.extract_save_dir, extract_compress=self.extract_compress, extract_publish=self.extract_publish, extract_config=self.extract_config, publish_to_slack=self.publish_to_slack, publish_to_s3=self.publish_to_s3, publish_to_redis=self.publish_to_redis, dataset_type=self.dataset_type, serialize_datasets=self.serialize_datasets, compress=self.compress, encoding=self.encoding, redis_enabled=self.redis_enabled, redis_key=self.redis_key, redis_address=self.redis_address, redis_db=self.redis_db, redis_password=self.redis_password, redis_expire=self.redis_expire, redis_serializer=self.redis_serializer, redis_encoding=self.redis_encoding, s3_enabled=self.s3_enabled, s3_key=self.s3_key, s3_address=self.s3_address, s3_bucket=self.s3_bucket, s3_access_key=self.s3_access_key, s3_secret_key=self.s3_secret_key, s3_region_name=self.s3_region_name, s3_secure=self.s3_secure, slack_enabled=self.slack_enabled, slack_code_block=self.slack_code_block, slack_full_width=self.slack_full_width, dataset_publish_extract=self.extract_publish, dataset_publish_history=self.history_publish, dataset_publish_report=self.report_publish, run_on_engine=self.run_on_engine, auth_url=self.broker_url, backend_url=self.backend_url, include_tasks=self.include_tasks, ssl_options=self.ssl_options, transport_options=self.transport_options, path_to_config_module=self.path_to_config_module, timeseries=self.timeseries, trade_strategy=self.trade_strategy, verbose=self.verbose_algo, raise_on_err=self.raise_on_err) self.wait_for_algo_to_finish()
# end of start
[docs] def wait_for_algo_to_finish( self): """wait_for_algo_to_finish wait until the algorithm finishes """ self.algo_history_s3_bucket = None self.algo_history_s3_key = None self.task_id = None if self.run_on_engine: self.task_id = self.algo_res.get( 'rec', {}).get('task_id', None) if self.task_id: log.info( f'waiting - algo task_id={self.task_id} to finish') res = self.task_id.get() history_config = res.get( 'algo_req', {}).get( 'history_config', None) self.algo_history_s3_bucket = history_config.get( 's3_bucket', None) self.algo_history_s3_key = history_config.get( 's3_key', None) self.load_trading_history() else: history_records = self.algo_res.get( 'rec', {}).get( 'history', None) self.algo_history_s3_bucket = self.history_s3_bucket self.algo_history_s3_key = self.history_s3_key self.history_df = pd.DataFrame(history_records) self.determine_latest_times_in_history() self.num_rows = len(self.history_df.index) log.info( f'loaded history rows={self.num_rows} ' f'dates={self.start_date} to {self.end_date}')
# end of wait_for_algo_to_finish
[docs] def determine_latest_times_in_history( self): """determine_latest_times_in_history determine the latest minute or day in the pricing dataset and convert ``date`` and ``minute`` columns to ``datetime`` objects """ self.history_df['date'] = pd.to_datetime( self.history_df['date']) self.start_day = self.history_df['date'].iloc[0] self.end_day = self.history_df['date'].iloc[-1] self.start_date = self.history_df['date'].iloc[0] self.end_date = self.history_df['date'].iloc[-1] if 'minute' in self.history_df: self.history_df['minute'] = pd.to_datetime( self.history_df['minute']) self.start_date = self.history_df['minute'].iloc[0] self.end_date = self.history_df['minute'].iloc[-1]
# end of determine_latest_times_in_history
[docs] def load_trading_history( self, s3_access_key=None, s3_secret_key=None, s3_address=None, s3_region=None, s3_bucket=None, s3_key=None, s3_secure=ae_consts.NOT_SET, **kwargs): """load_trading_history Helper for loading an algorithm ``Trading History`` from S3 :param s3_access_key: access key :param s3_secret_key: secret :param s3_address: address :param s3_region: region :param s3_bucket: bucket :param s3_key: key :param s3_secure: secure flag :param kwargs: support for keyword arg dict """ use_s3_access_key = self.s3_access_key use_s3_secret_key = self.s3_secret_key use_s3_address = self.s3_address use_s3_region = self.s3_region_name use_s3_bucket = self.s3_bucket use_s3_key = self.s3_key use_s3_secure = self.s3_secure if s3_access_key: use_s3_access_key = s3_access_key if s3_secret_key: use_s3_secret_key = s3_secret_key if s3_address: use_s3_address = s3_address if s3_region: use_s3_region = s3_region if s3_bucket: use_s3_bucket = s3_bucket if s3_key: use_s3_key = s3_key if s3_secure != ae_consts.NOT_SET: use_s3_secure = s3_secure if s3_key and s3_bucket: log.info( f'using - td s3://{use_s3_address}/' f'{use_s3_bucket}/{use_s3_key}') else: log.info( f'load - td s3://{use_s3_address}/' f'{use_s3_bucket}/{use_s3_key}') load_res = load_history_utils.load_history_dataset( s3_access_key=use_s3_access_key, s3_secret_key=use_s3_secret_key, s3_address=use_s3_address, s3_region_name=use_s3_region, s3_bucket=use_s3_bucket, s3_key=use_s3_key, s3_secure=use_s3_secure) if self.ticker not in load_res: log.critical( 'failed to load history: ' f's3://{self.s3_bucket}/{self.s3_key}') self.history_df = None return self.history_df = load_res[self.ticker] self.determine_latest_times_in_history() log.info( f'found - td s3://{use_s3_address}/' f'{use_s3_bucket}/{use_s3_key} ' f'rows={len(self.history_df.index)}')
# end of load_trading_history
[docs] def get_history( self): """get_history""" return self.history_df
# end of get_history
[docs] def get_latest_day( self): """get_latest_day""" if 'date' in self.history_df: return self.end_day else: return None
# end of get_latest_day
[docs] def get_latest_minute( self): """get_latest_minute""" if 'minute' in self.history_df: return self.end_date else: return None
# end of get_latest_minute
[docs] def publish_trading_history( self, records_for_history, pt_s3_access_key=None, pt_s3_secret_key=None, pt_s3_address=None, pt_s3_region=None, pt_s3_bucket=None, pt_s3_key=None, pt_s3_secure=ae_consts.NOT_SET, **kwargs): """publish_trading_history Helper for publishing a trading history to another S3 service like AWS :param records_for_history: list of dictionaries for the history file :param pt_s3_access_key: access key :param pt_s3_secret_key: secret :param pt_s3_address: address :param pt_s3_region: region :param pt_s3_bucket: bucket :param pt_s3_key: key :param pt_s3_secure: secure flag :param kwargs: support for keyword arg dict """ use_s3_access_key = self.pt_s3_access_key use_s3_secret_key = self.pt_s3_secret_key use_s3_address = self.pt_s3_address use_s3_region = self.pt_s3_region use_s3_bucket = self.pt_s3_bucket use_s3_key = self.pt_s3_key use_s3_secure = self.pt_s3_secure use_s3_enabled = kwargs.get( 's3_enabled', True) use_redis_enabled = kwargs.get( 'redis_enabled', False) use_redis_address = kwargs.get( 'redis_address', None) use_redis_db = kwargs.get( 'redis_db', None) use_redis_key = kwargs.get( 'redis_key', None) use_redis_password = kwargs.get( 'redis_password', None) use_redis_expire = kwargs.get( 'redis_expire', None) use_redis_serializer = kwargs.get( 'redis_serializer', 'json') use_redis_encoding = kwargs.get( 'redis_encoding', 'utf-8') verbose = kwargs.get( 'verbose', False) if pt_s3_access_key: use_s3_access_key = pt_s3_access_key if pt_s3_secret_key: use_s3_secret_key = pt_s3_secret_key if pt_s3_address: use_s3_address = pt_s3_address if pt_s3_region: use_s3_region = pt_s3_region if pt_s3_bucket: use_s3_bucket = pt_s3_bucket if pt_s3_key: use_s3_key = pt_s3_key if pt_s3_secure != ae_consts.NOT_SET: use_s3_secure = pt_s3_secure rec = { 'tickers': self.ticker, 'version': int(ae_consts.ALGO_HISTORY_VERSION), 'last_trade_date': ae_utils.get_last_close_str(), 'algo_config_dict': self.config_dict, 'algo_name': self.use_name, 'created': ae_utils.utc_now_str(), self.ticker: records_for_history } num_bytes = len(str(rec)) num_mb = ae_consts.get_mb(num_bytes) msg = ( f'publish - {self.ticker} - {rec["last_trade_date"]} ' # f'{use_s3_access_key} with: {use_s3_secret_key} ' f's3_loc={use_s3_address}/{use_s3_bucket}/{use_s3_key} ' f'mb={num_mb}MB') log.info(msg) publish.publish( data=rec, label='pub', df_compress=True, compress=False, convert_to_dict=False, output_file=None, redis_enabled=use_redis_enabled, redis_key=use_redis_key, redis_address=use_redis_address, redis_db=use_redis_db, redis_password=use_redis_password, redis_expire=use_redis_expire, redis_serializer=use_redis_serializer, redis_encoding=use_redis_encoding, s3_enabled=use_s3_enabled, s3_key=use_s3_key, s3_address=use_s3_address, s3_bucket=use_s3_bucket, s3_access_key=use_s3_access_key, s3_secret_key=use_s3_secret_key, s3_region_name=use_s3_region, s3_secure=use_s3_secure, slack_enabled=False, verbose=verbose)
# end of publish_trading_history
[docs] def latest( self, date_str=None, start_row=-200, extract_iex=True, extract_yahoo=False, extract_td=True, verbose=False, **kwargs): """latest Run the algorithm with the latest pricing data. Also supports running a backtest for a historical date in the pricing history (format ``YYYY-MM-DD``) :param date_str: optional - string start date ``YYYY-MM-DD`` default is the latest close date :param start_row: negative number of rows back from the end of the list in the data default is ``-200`` where this means the algorithm will process the latest 200 rows in the minute dataset :param extract_iex: bool flag for extracting from ``IEX`` :param extract_yahoo: bool flag for extracting from ``Yahoo`` which is disabled as of 1/2019 :param extract_td: bool flag for extracting from ``Tradier`` :param verbose: bool flag for logs :param kwargs: keyword arg dict """ use_date_str = date_str if not use_date_str: use_date_str = ae_utils.get_last_close_str() log.info( f'creating algo') self.algo_obj = base_algo.BaseAlgo( ticker=self.config_dict['ticker'], balance=self.config_dict['balance'], commission=self.config_dict['commission'], name=self.use_name, start_date=self.use_start_date, end_date=self.use_end_date, auto_fill=self.auto_fill, config_dict=self.config_dict, load_from_s3_bucket=self.load_from_s3_bucket, load_from_s3_key=self.load_from_s3_key, load_from_redis_key=self.load_from_redis_key, load_from_file=self.load_from_file, load_compress=self.load_compress, load_publish=self.load_publish, load_config=self.load_config, report_redis_key=self.report_redis_key, report_s3_bucket=self.report_s3_bucket, report_s3_key=self.report_s3_key, report_file=self.report_file, report_compress=self.report_compress, report_publish=self.report_publish, report_config=self.report_config, history_redis_key=self.history_redis_key, history_s3_bucket=self.history_s3_bucket, history_s3_key=self.history_s3_key, history_file=self.history_file, history_compress=self.history_compress, history_publish=self.history_publish, history_config=self.history_config, extract_redis_key=self.extract_redis_key, extract_s3_bucket=self.extract_s3_bucket, extract_s3_key=self.extract_s3_key, extract_file=self.extract_file, extract_save_dir=self.extract_save_dir, extract_compress=self.extract_compress, extract_publish=self.extract_publish, extract_config=self.extract_config, publish_to_slack=self.publish_to_slack, publish_to_s3=self.publish_to_s3, publish_to_redis=self.publish_to_redis, dataset_type=self.dataset_type, serialize_datasets=self.serialize_datasets, compress=self.compress, encoding=self.encoding, redis_enabled=self.redis_enabled, redis_key=self.redis_key, redis_address=self.redis_address, redis_db=self.redis_db, redis_password=self.redis_password, redis_expire=self.redis_expire, redis_serializer=self.redis_serializer, redis_encoding=self.redis_encoding, s3_enabled=self.s3_enabled, s3_key=self.s3_key, s3_address=self.s3_address, s3_bucket=self.s3_bucket, s3_access_key=self.s3_access_key, s3_secret_key=self.s3_secret_key, s3_region_name=self.s3_region_name, s3_secure=self.s3_secure, slack_enabled=self.slack_enabled, slack_code_block=self.slack_code_block, slack_full_width=self.slack_full_width, dataset_publish_extract=self.extract_publish, dataset_publish_history=self.history_publish, dataset_publish_report=self.report_publish, run_on_engine=self.run_on_engine, auth_url=self.broker_url, backend_url=self.backend_url, include_tasks=self.include_tasks, ssl_options=self.ssl_options, transport_options=self.transport_options, path_to_config_module=self.path_to_config_module, timeseries=self.timeseries, trade_strategy=self.trade_strategy, verbose=False, raise_on_err=self.raise_on_err) log.info( f'run latest - start') ticker = self.config_dict['ticker'] dataset_id = f'{ticker}_{use_date_str}' self.common_fetch_vals['base_key'] = dataset_id verbose_extract = self.config_dict.get('verbose_extract', False) indicator_datasets = self.algo_obj.get_indicator_datasets() ticker_data = build_dataset_node.build_dataset_node( ticker=ticker, date=use_date_str, datasets=indicator_datasets, service_dict=self.common_fetch_vals, verbose=verbose_extract) algo_data_req = { ticker: [ { 'id': dataset_id, # id is currently the cache key in redis 'date': use_date_str, # used to confirm dates in asc order 'data': ticker_data, 'start_row': start_row } ] } if verbose: log.info( f'extract - {dataset_id} ' f'dataset={len(algo_data_req[ticker])}') # this could be a separate celery task try: if verbose: log.info( f'handle_data START - {dataset_id}') self.algo_obj.handle_data( data=algo_data_req) if verbose: log.info( f'handle_data END - {dataset_id}') except Exception as e: a_name = self.algo_obj.get_name() a_debug_msg = self.algo_obj.get_debug_msg() if not a_debug_msg: a_debug_msg = 'debug message not set' # a_config_dict = ae_consts.ppj(self.algo_obj.config_dict) msg = ( f'{dataset_id} - algo={a_name} ' f'encountered exception in handle_data tickers={ticker} ' f'from ex={e} ' f'and failed during operation: {a_debug_msg}') log.critical(f'{msg}') # end try/ex log.info('run latest - create history') history_ds = self.algo_obj.create_history_dataset() self.history_df = pd.DataFrame(history_ds[ticker]) self.determine_latest_times_in_history() self.num_rows = len(self.history_df.index) if verbose: log.info(self.history_df[['minute', 'close']].tail(5)) log.info( f'run latest minute={self.end_date} - ' f'rows={self.num_rows} - done') return self.get_history()
# end of latest # end of AlgoRunner