Source code for analysis_engine.run_custom_algo

"""
This is a wrapper for running your own custom algorithms

.. note:: Please refer to the `sa.py <https://
    github.com/AlgoTraders/stock-analysis-engine/blob/master/
    analysis_engine/scripts/sa.py>`__
    for the lastest usage examples.

Example with the command line tool:

::

    bt -t SPY -g /opt/sa/analysis_engine/mocks/example_algo_minute.py

"""

import os
import inspect
import types
import importlib.machinery
import datetime
import json
import analysis_engine.consts as ae_consts
import analysis_engine.build_algo_request as build_algo_request
import analysis_engine.build_publish_request as build_publish_request
import analysis_engine.build_result as build_result
import analysis_engine.run_algo as run_algo
import analysis_engine.work_tasks.get_celery_app as get_celery_app
import analysis_engine.algo as ae_algo
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)


[docs]def run_custom_algo( mod_path, ticker='SPY', balance=50000, commission=6.0, start_date=None, end_date=None, name='myalgo', auto_fill=True, config_file=None, config_dict=None, load_from_s3_bucket=None, load_from_s3_key=None, load_from_redis_key=None, load_from_file=None, load_compress=False, load_publish=True, load_config=None, report_redis_key=None, report_s3_bucket=None, report_s3_key=None, report_file=None, report_compress=False, report_publish=True, report_config=None, history_redis_key=None, history_s3_bucket=None, history_s3_key=None, history_file=None, history_compress=False, history_publish=True, history_config=None, extract_redis_key=None, extract_s3_bucket=None, extract_s3_key=None, extract_file=None, extract_save_dir=None, extract_compress=False, extract_publish=True, extract_config=None, publish_to_s3=True, publish_to_redis=True, publish_to_slack=True, dataset_type=ae_consts.SA_DATASET_TYPE_ALGO_READY, serialize_datasets=ae_consts.DEFAULT_SERIALIZED_DATASETS, compress=False, encoding='utf-8', redis_enabled=True, redis_key=None, redis_address=None, redis_db=None, redis_password=None, redis_expire=None, redis_serializer='json', redis_encoding='utf-8', s3_enabled=True, s3_key=None, s3_address=None, s3_bucket=None, s3_access_key=None, s3_secret_key=None, s3_region_name=None, s3_secure=False, slack_enabled=False, slack_code_block=False, slack_full_width=False, timeseries=None, trade_strategy=None, verbose=False, debug=False, dataset_publish_extract=False, dataset_publish_history=False, dataset_publish_report=False, run_on_engine=False, auth_url=ae_consts.WORKER_BROKER_URL, backend_url=ae_consts.WORKER_BACKEND_URL, include_tasks=ae_consts.INCLUDE_TASKS, ssl_options=ae_consts.SSL_OPTIONS, transport_options=ae_consts.TRANSPORT_OPTIONS, path_to_config_module=ae_consts.WORKER_CELERY_CONFIG_MODULE, raise_on_err=True): """run_custom_algo Run a custom algorithm that derives the ``analysis_engine.algo.BaseAlgo`` class .. note:: Make sure to only have **1** class defined in an algo module. Imports from other modules should work just fine. **Algorithm arguments** :param mod_path: file path to custom algorithm class module :param ticker: ticker symbol :param balance: float - starting balance capital for creating buys and sells :param commission: float - cost pet buy or sell :param name: string - name for tracking algorithm in the logs :param start_date: string - start date for backtest with format ``YYYY-MM-DD HH:MM:SS`` :param end_date: end date for backtest with format ``YYYY-MM-DD HH:MM:SS`` :param auto_fill: optional - boolean for auto filling buy and sell orders for backtesting (default is ``True``) :param config_file: path to a json file containing custom algorithm object member values (like indicator configuration and predict future date units ahead for a backtest) :param config_dict: optional - dictionary that can be passed to derived class implementations of: ``def load_from_config(config_dict=config_dict)`` **Timeseries** :param timeseries: optional - string to set ``day`` or ``minute`` backtesting or live trading (default is ``minute``) **Trading Strategy** :param trade_strategy: optional - string to set the type of ``Trading Strategy`` for backtesting or live trading (default is ``count``) **Running Distributed Algorithms on the Engine Workers** :param run_on_engine: optional - boolean flag for publishing custom algorithms to Celery ae workers for distributing algorithm workloads (default is ``False`` which will run algos locally) this is required for distributing algorithms :param auth_url: Celery broker address (default is ``redis://localhost:6379/11`` or ``analysis_engine.consts.WORKER_BROKER_URL`` environment variable) this is required for distributing algorithms :param backend_url: Celery backend address (default is ``redis://localhost:6379/12`` or ``analysis_engine.consts.WORKER_BACKEND_URL`` environment variable) this is required for distributing algorithms :param include_tasks: list of modules containing tasks to add (default is ``analysis_engine.consts.INCLUDE_TASKS``) :param ssl_options: security options dictionary (default is ``analysis_engine.consts.SSL_OPTIONS``) :param trasport_options: transport options dictionary (default is ``analysis_engine.consts.TRANSPORT_OPTIONS``) :param path_to_config_module: config module for advanced Celery worker connectivity requirements (default is ``analysis_engine.work_tasks.celery_config`` or ``analysis_engine.consts.WORKER_CELERY_CONFIG_MODULE``) **Load Algorithm-Ready Dataset From Source** Use these arguments to load algorithm-ready datasets from supported sources (file, s3 or redis) :param load_from_s3_bucket: optional - string load the algo from an a previously-created s3 bucket holding an s3 key with an algorithm-ready dataset for use with: ``handle_data`` :param load_from_s3_key: optional - string load the algo from an a previously-created s3 key holding an algorithm-ready dataset for use with: ``handle_data`` :param load_from_redis_key: optional - string load the algo from a a previously-created redis key holding an algorithm-ready dataset for use with: ``handle_data`` :param load_from_file: optional - string path to a previously-created local file holding an algorithm-ready dataset for use with: ``handle_data`` :param load_compress: optional - boolean flag for toggling to decompress or not when loading an algorithm-ready dataset (``True`` means the dataset must be decompressed to load correctly inside an algorithm to run a backtest) :param load_publish: boolean - toggle publishing the load progress to slack, s3, redis or a file (default is ``True``) :param load_config: optional - dictionary for setting member variables to load an agorithm-ready dataset from a file, s3 or redis **Publishing Control Bool Flags** :param publish_to_s3: optional - boolean for toggling publishing to s3 on/off (default is ``True``) :param publish_to_redis: optional - boolean for publishing to redis on/off (default is ``True``) :param publish_to_slack: optional - boolean for publishing to slack (default is ``True``) **Algorithm Trade History Arguments** :param history_redis_key: optional - string where the algorithm trading history will be stored in an redis key :param history_s3_bucket: optional - string where the algorithm trading history will be stored in an s3 bucket :param history_s3_key: optional - string where the algorithm trading history will be stored in an s3 key :param history_file: optional - string key where the algorithm trading history will be stored in a file serialized as a json-string :param history_compress: optional - boolean flag for toggling to decompress or not when loading an algorithm-ready dataset (``True`` means the dataset will be compressed on publish) :param history_publish: boolean - toggle publishing the history to s3, redis or a file (default is ``True``) :param history_config: optional - dictionary for setting member variables to publish an algo ``trade history`` to s3, redis, a file or slack **Algorithm Trade Performance Report Arguments (Output Dataset)** :param report_redis_key: optional - string where the algorithm ``trading performance report`` (report) will be stored in an redis key :param report_s3_bucket: optional - string where the algorithm report will be stored in an s3 bucket :param report_s3_key: optional - string where the algorithm report will be stored in an s3 key :param report_file: optional - string key where the algorithm report will be stored in a file serialized as a json-string :param report_compress: optional - boolean flag for toggling to decompress or not when loading an algorithm-ready dataset (``True`` means the dataset will be compressed on publish) :param report_publish: boolean - toggle publishing the ``trading performance report`` s3, redis or a file (default is ``True``) :param report_config: optional - dictionary for setting member variables to publish an algo ``trading performance report`` to s3, redis, a file or slack **Extract an Algorithm-Ready Dataset Arguments** :param extract_redis_key: optional - string where the algorithm report will be stored in an redis key :param extract_s3_bucket: optional - string where the algorithm report will be stored in an s3 bucket :param extract_s3_key: optional - string where the algorithm report will be stored in an s3 key :param extract_file: optional - string key where the algorithm report will be stored in a file serialized as a json-string :param extract_save_dir: optional - string path to auto-generated files from the algo :param extract_compress: optional - boolean flag for toggling to decompress or not when loading an algorithm-ready dataset (``True`` means the dataset will be compressed on publish) :param extract_publish: boolean - toggle publishing the used ``algorithm-ready dataset`` to s3, redis or a file (default is ``True``) :param extract_config: optional - dictionary for setting member variables to publish an algo ``trading performance report`` to s3, redis, a file or slack **Dataset Arguments** :param dataset_type: optional - dataset type (default is ``SA_DATASET_TYPE_ALGO_READY``) :param serialize_datasets: optional - list of dataset names to deserialize in the dataset (default is ``DEFAULT_SERIALIZED_DATASETS``) :param encoding: optional - string for data encoding **Publish Algorithm Datasets to S3, Redis or a File** :param dataset_publish_extract: optional - bool for publishing the algorithm's ``algorithm-ready`` dataset to: s3, redis or file :param dataset_publish_history: optional - bool for publishing the algorithm's ``trading history`` dataset to: s3, redis or file :param dataset_publish_report: optional - bool for publishing the algorithm's ``trading performance report`` dataset to: s3, redis or file **Redis connectivity arguments** :param redis_enabled: bool - toggle for auto-caching all datasets in Redis (default is ``True``) :param redis_key: string - key to save the data in redis (default is ``None``) :param redis_address: Redis connection string format: ``host:port`` (default is ``localhost:6379``) :param redis_db: Redis db to use (default is ``0``) :param redis_password: optional - Redis password (default is ``None``) :param redis_expire: optional - Redis expire value (default is ``None``) :param redis_serializer: not used yet - support for future pickle objects in redis :param redis_encoding: format of the encoded key in redis **Minio (S3) connectivity arguments** :param s3_enabled: bool - toggle for auto-archiving on Minio (S3) (default is ``True``) :param s3_key: string - key to save the data in redis (default is ``None``) :param s3_address: Minio S3 connection string format: ``host:port`` (default is ``localhost:9000``) :param s3_bucket: S3 Bucket for storing the artifacts (default is ``dev``) which should be viewable on a browser: http://localhost:9000/minio/dev/ :param s3_access_key: S3 Access key (default is ``trexaccesskey``) :param s3_secret_key: S3 Secret key (default is ``trex123321``) :param s3_region_name: S3 region name (default is ``us-east-1``) :param s3_secure: Transmit using tls encryption (default is ``False``) **Slack arguments** :param slack_enabled: optional - boolean for publishing to slack :param slack_code_block: optional - boolean for publishing as a code black in slack :param slack_full_width: optional - boolean for publishing as a to slack using the full width allowed **Debugging arguments** :param debug: optional - bool for debug tracking :param verbose: optional - bool for increasing logging :param raise_on_err: boolean - set this to ``False`` on prod to ensure exceptions do not interrupt services. With the default (``True``) any exceptions from the library and your own algorithm are sent back out immediately exiting the backtest. """ module_name = 'BaseAlgo' custom_algo_module = None new_algo_object = None use_custom_algo = False found_algo_module = True should_publish_extract_dataset = False should_publish_history_dataset = False should_publish_report_dataset = False use_config_file = None use_config_dict = config_dict if config_file: if os.path.exists(config_file): use_config_file = config_file if not config_dict: try: use_config_dict = json.loads(open( config_file, 'r').read()) except Exception as e: msg = ( f'failed parsing json config_file={config_file} ' f'with ex={e}') log.error(msg) raise Exception(msg) # end of loading the config_file err = None if mod_path: module_name = mod_path.split('/')[-1] loader = importlib.machinery.SourceFileLoader( module_name, mod_path) custom_algo_module = types.ModuleType( loader.name) loader.exec_module( custom_algo_module) use_custom_algo = True for member in inspect.getmembers(custom_algo_module): if module_name in str(member): found_algo_module = True break # for all members in this custom module file # if loading a custom algorithm module from a file on disk if not found_algo_module: err = ( f'unable to find custom algorithm module={custom_algo_module}') if mod_path: err = ( 'analysis_engine.run_custom_algo.run_custom_algo was unable ' f'to find custom algorithm module={custom_algo_module} with ' f'provided path to \n file: {mod_path} \n' '\n' 'Please confirm ' 'that the class inherits from the BaseAlgo class like:\n' '\n' 'import analysis_engine.algo\n' 'class MyAlgo(analysis_engine.algo.BaseAlgo):\n ' '\n' 'If it is then please file an issue on github:\n ' 'https://github.com/AlgoTraders/stock-analysis-engine/' 'issues/new \n\nFor now this error results in a shutdown' '\n') # if mod_path set if verbose or debug: log.error(err) return build_result.build_result( status=ae_consts.ERR, err=err, rec=None) # if not found_algo_module use_start_date = start_date use_end_date = end_date if not use_end_date: end_date = datetime.datetime.utcnow() use_end_date = end_date.strftime( ae_consts.COMMON_TICK_DATE_FORMAT) if not use_start_date: start_date = end_date - datetime.timedelta(days=75) use_start_date = start_date.strftime( ae_consts.COMMON_TICK_DATE_FORMAT) if verbose: log.info( f'{name} {ticker} setting default start_date={use_start_date}') # Load an algorithm-ready dataset from: # file, s3, or redis if not load_config: load_config = build_publish_request.build_publish_request( ticker=ticker, output_file=None, s3_bucket=None, s3_key=None, redis_key=None, compress=load_compress, redis_enabled=publish_to_redis, redis_address=redis_address, redis_db=redis_db, redis_password=redis_password, redis_expire=redis_expire, redis_serializer=redis_serializer, redis_encoding=redis_encoding, s3_enabled=publish_to_s3, s3_address=s3_address, s3_access_key=s3_access_key, s3_secret_key=s3_secret_key, s3_region_name=s3_region_name, s3_secure=s3_secure, slack_enabled=publish_to_slack, slack_code_block=slack_code_block, slack_full_width=slack_full_width, verbose=verbose, label=f'load-{name}') if load_from_file: load_config['output_file'] = load_from_file if load_from_redis_key: load_config['redis_key'] = load_from_redis_key load_config['redis_enabled'] = True if load_from_s3_bucket and load_from_s3_key: load_config['s3_bucket'] = load_from_s3_bucket load_config['s3_key'] = load_from_s3_key load_config['s3_enabled'] = True # end of building load_config dictionary if not already set # Automatically save all datasets to an algorithm-ready: # file, s3, or redis if not extract_config: extract_config = build_publish_request.build_publish_request( ticker=ticker, output_file=None, s3_bucket=None, s3_key=None, redis_key=None, compress=extract_compress, redis_enabled=publish_to_redis, redis_address=redis_address, redis_db=redis_db, redis_password=redis_password, redis_expire=redis_expire, redis_serializer=redis_serializer, redis_encoding=redis_encoding, s3_enabled=publish_to_s3, s3_address=s3_address, s3_access_key=s3_access_key, s3_secret_key=s3_secret_key, s3_region_name=s3_region_name, s3_secure=s3_secure, slack_enabled=publish_to_slack, slack_code_block=slack_code_block, slack_full_width=slack_full_width, verbose=verbose, label=f'extract-{name}') should_publish_extract_dataset = False if extract_file: extract_config['output_file'] = extract_file should_publish_extract_dataset = True if extract_redis_key and publish_to_redis: extract_config['redis_key'] = extract_redis_key extract_config['redis_enabled'] = True should_publish_extract_dataset = True if extract_s3_bucket and extract_s3_key and publish_to_s3: extract_config['s3_bucket'] = extract_s3_bucket extract_config['s3_key'] = extract_s3_key extract_config['s3_enabled'] = True should_publish_extract_dataset = True else: extract_config['s3_enabled'] = False # end of building extract_config dictionary if not already set # Automatically save the trading performance report: # file, s3, or redis if not report_config: report_config = build_publish_request.build_publish_request( ticker=ticker, output_file=None, s3_bucket=None, s3_key=None, redis_key=None, compress=report_compress, redis_enabled=publish_to_redis, redis_address=redis_address, redis_db=redis_db, redis_password=redis_password, redis_expire=redis_expire, redis_serializer=redis_serializer, redis_encoding=redis_encoding, s3_enabled=publish_to_s3, s3_address=s3_address, s3_access_key=s3_access_key, s3_secret_key=s3_secret_key, s3_region_name=s3_region_name, s3_secure=s3_secure, slack_enabled=publish_to_slack, slack_code_block=slack_code_block, slack_full_width=slack_full_width, verbose=verbose, label=f'report-{name}') should_publish_report_dataset = False if report_file: report_config['output_file'] = report_file should_publish_report_dataset = True if report_redis_key and publish_to_redis: report_config['redis_key'] = report_redis_key report_config['redis_enabled'] = True should_publish_report_dataset = True if report_s3_bucket and report_s3_key and publish_to_s3: report_config['s3_bucket'] = report_s3_bucket report_config['s3_key'] = report_s3_key report_config['s3_enabled'] = True should_publish_report_dataset = True # end of building report_config dictionary if not already set # Automatically save the trade history: # file, s3, or redis if not history_config: history_config = build_publish_request.build_publish_request( ticker=ticker, output_file=None, s3_bucket=None, s3_key=None, redis_key=None, compress=report_compress, redis_enabled=publish_to_redis, redis_address=redis_address, redis_db=redis_db, redis_password=redis_password, redis_expire=redis_expire, redis_serializer=redis_serializer, redis_encoding=redis_encoding, s3_enabled=publish_to_s3, s3_address=s3_address, s3_access_key=s3_access_key, s3_secret_key=s3_secret_key, s3_region_name=s3_region_name, s3_secure=s3_secure, slack_enabled=publish_to_slack, slack_code_block=slack_code_block, slack_full_width=slack_full_width, verbose=verbose, label=f'history-{name}') should_publish_history_dataset = False if history_file: history_config['output_file'] = history_file should_publish_history_dataset = True if history_redis_key and publish_to_redis: history_config['redis_key'] = history_redis_key history_config['redis_enabled'] = True should_publish_history_dataset = True if history_s3_bucket and history_s3_key and publish_to_s3: history_config['s3_bucket'] = history_s3_bucket history_config['s3_key'] = history_s3_key history_config['s3_enabled'] = True should_publish_history_dataset = True # end of building history_config dictionary if not already set if verbose: remove_vals = [ 's3_access_key', 's3_secret_key', 'redis_password' ] debug_extract_config = {} for k in extract_config: if k not in remove_vals: debug_extract_config[k] = extract_config[k] debug_report_config = {} for k in report_config: if k not in remove_vals: debug_report_config[k] = report_config[k] debug_history_config = {} for k in history_config: if k not in remove_vals: debug_history_config[k] = history_config[k] debug_load_config = {} for k in load_config: if k not in remove_vals: debug_load_config[k] = load_config[k] log.info( f'{name} {ticker} using extract config ' f'{ae_consts.ppj(debug_extract_config)}') log.info( f'{name} {ticker} using report config ' f'{ae_consts.ppj(debug_report_config)}') log.info( f'{name} {ticker} using trade history config ' f'{ae_consts.ppj(debug_history_config)}') log.info( f'{name} {ticker} using load config ' f'{ae_consts.ppj(debug_load_config)}') log.info( f'{name} {ticker} - building algo request') # end of verbose algo_req = build_algo_request.build_algo_request( ticker=ticker, balance=balance, commission=commission, start_date=use_start_date, end_date=use_end_date, timeseries=timeseries, trade_strategy=trade_strategy, config_file=use_config_file, config_dict=use_config_dict, load_config=load_config, history_config=history_config, report_config=report_config, extract_config=extract_config, label=name) algo_req['name'] = name algo_req['should_publish_extract_dataset'] = should_publish_extract_dataset algo_req['should_publish_history_dataset'] = should_publish_history_dataset algo_req['should_publish_report_dataset'] = should_publish_report_dataset algo_res = build_result.build_result( status=ae_consts.NOT_RUN, err=None, rec=None) if run_on_engine: rec = { 'algo_req': algo_req, 'task_id': None } task_name = ( 'analysis_engine.work_tasks.' 'task_run_algo.task_run_algo') if verbose: log.info(f'starting distributed algo task={task_name}') elif debug: log.info( 'starting distributed algo by publishing to ' f'task={task_name} broker={auth_url} backend={backend_url}') # Get the Celery app app = get_celery_app.get_celery_app( name=__name__, auth_url=auth_url, backend_url=backend_url, path_to_config_module=path_to_config_module, ssl_options=ssl_options, transport_options=transport_options, include_tasks=include_tasks) if debug: log.info( f'calling distributed algo task={task_name} ' f'request={ae_consts.ppj(algo_req)}') elif verbose: log.info(f'calling distributed algo task={task_name}') job_id = app.send_task( task_name, (algo_req,)) if verbose: log.info(f'calling task={task_name} - success job_id={job_id}') rec['task_id'] = job_id algo_res = build_result.build_result( status=ae_consts.SUCCESS, err=None, rec=rec) return algo_res # end of run_on_engine if use_custom_algo: if verbose: log.info( f'inspecting {custom_algo_module} for class {module_name}') use_class_member_object = None for member in inspect.getmembers(custom_algo_module): if module_name in str(member): if verbose: log.info(f'start {name} with {member[1]}') use_class_member_object = member break # end of looking over the class definition but did not find it if use_class_member_object: new_algo_object = member[1]( **algo_req) else: err = ( 'did not find a derived analysis_engine.algo.BaseAlgo ' f'class in the module file={mod_path} ' f'for ticker={ticker} algo_name={name}') if verbose or debug: log.error(err) return build_result.build_result( status=ae_consts.ERR, err=err, rec=None) # end of finding a valid algorithm object else: new_algo_object = ae_algo.BaseAlgo( **algo_req) # if using a custom module path or the BaseAlgo if new_algo_object: # heads up - logging this might have passwords in the algo_req # log.debug( # f'{name} algorithm request: {algo_req}') if verbose: log.info( f'{name} - run ticker={ticker} from {use_start_date} ' f'to {use_end_date}') algo_res = run_algo.run_algo( algo=new_algo_object, raise_on_err=raise_on_err, **algo_req) algo_res['algo'] = new_algo_object if verbose: log.info( f'{name} - run ticker={ticker} from {use_start_date} ' f'to {use_end_date}') if custom_algo_module: if verbose: log.info( f'{name} - done run_algo ' f'custom_algo_module={custom_algo_module} ' f'module_name={module_name} ticker={ticker} ' f'from {use_start_date} to {use_end_date}') else: if verbose: log.info( f'{name} - done run_algo BaseAlgo ticker={ticker} ' f'from {use_start_date} to {use_end_date}') else: err = ( 'missing a derived analysis_engine.algo.BaseAlgo ' f'class in the module file={mod_path} for ticker={ticker} ' f'algo_name={name}') return build_result.build_result( status=ae_consts.ERR, err=err, rec=None) # end of finding a valid algorithm object algo = algo_res.get( 'algo', None) if not algo: err = ( f'failed creating algorithm object - ticker={ticker} ' f'status={ae_consts.get_status(status=algo_res["status"])} ' f'error={algo_res["err"]} algo name={name} ' f'custom_algo_module={custom_algo_module} ' f'module_name={module_name} ' f'from {use_start_date} to {use_end_date}') return build_result.build_result( status=ae_consts.ERR, err=err, rec=None) if should_publish_extract_dataset or dataset_publish_extract: s3_log = '' redis_log = '' file_log = '' use_log = 'publish' if (extract_config['redis_address'] and extract_config['redis_db'] >= 0 and extract_config['redis_key']): redis_log = ( f'redis://{extract_config["redis_address"]}' f'@{extract_config["redis_db"]}/{extract_config["redis_key"]}') use_log += f' {redis_log}' else: extract_config['redis_enabled'] = False if (extract_config['s3_address'] and extract_config['s3_bucket'] and extract_config['s3_key']): s3_log = ( f's3://{extract_config["s3_address"]}' f'/{extract_config["s3_bucket"]}/{extract_config["s3_key"]}') use_log += f' {s3_log}' else: extract_config['s3_enabled'] = False if extract_config['output_file']: file_log = f'file:{extract_config["output_file"]}' use_log += f' {file_log}' if verbose: log.info( f'{name} - publish - start ticker={ticker} ' f'algorithm-ready {use_log}') publish_status = algo.publish_input_dataset( **extract_config) if publish_status != ae_consts.SUCCESS: msg = ( 'failed to publish algorithm-ready datasets ' f'with status {ae_consts.get_status(status=publish_status)} ' f'attempted to {use_log}') log.error(msg) return build_result.build_result( status=ae_consts.ERR, err=err, rec=None) if verbose: log.info( f'{name} - publish - done ticker={ticker} ' f'algorithm-ready {use_log}') # if publish the algorithm-ready dataset if should_publish_history_dataset or dataset_publish_history: s3_log = '' redis_log = '' file_log = '' use_log = 'publish' if (history_config['redis_address'] and history_config['redis_db'] >= 0 and history_config['redis_key']): redis_log = ( f'redis://{history_config["redis_address"]}' f'@{history_config["redis_db"]}/{history_config["redis_key"]}') use_log += f' {redis_log}' else: history_config['redis_enabled'] = False if (history_config['s3_address'] and history_config['s3_bucket'] and history_config['s3_key']): s3_log = ( f's3://{history_config["s3_address"]}' f'/{history_config["s3_bucket"]}/{history_config["s3_key"]}') use_log += f' {s3_log}' else: history_config['s3_enabled'] = False if history_config['output_file']: file_log = f'file:{history_config["output_file"]}' use_log += f' {file_log}' if verbose: log.info( f'{name} - publish - start ticker={ticker} trading ' f'history {use_log}') publish_status = algo.publish_trade_history_dataset( **history_config) if publish_status != ae_consts.SUCCESS: msg = ( 'failed to publish trading history datasets ' f'with status {ae_consts.get_status(status=publish_status)} ' f'attempted to {use_log}') log.error(msg) return build_result.build_result( status=ae_consts.ERR, err=err, rec=None) if verbose: log.info( f'{name} - publish - done ticker={ticker} trading ' f'history {use_log}') # if publish an trading history dataset if should_publish_report_dataset or dataset_publish_report: s3_log = '' redis_log = '' file_log = '' use_log = 'publish' if (report_config['redis_address'] and report_config['redis_db'] >= 0 and report_config['redis_key']): redis_log = ( f'redis://{report_config["redis_address"]}' f'@{report_config["redis_db"]}/{report_config["redis_key"]}') use_log += f' {redis_log}' else: report_config['redis_enabled'] = False if (report_config['s3_address'] and report_config['s3_bucket'] and report_config['s3_key']): s3_log = ( f's3://{report_config["s3_address"]}' f'/{report_config["s3_bucket"]}/{report_config["s3_key"]}') use_log += f' {s3_log}' else: report_config['s3_enabled'] = False if report_config['output_file']: file_log = f'file:{report_config["output_file"]}' use_log += f' {file_log}' if verbose: log.info( f'{name} - publishing ticker={ticker} trading performance ' f'report {use_log}') publish_status = algo.publish_report_dataset( **report_config) if publish_status != ae_consts.SUCCESS: msg = ( 'failed to publish trading performance report datasets ' f'with status {ae_consts.get_status(status=publish_status)} ' f'attempted to {use_log}') log.error(msg) return build_result.build_result( status=ae_consts.ERR, err=err, rec=None) if verbose: log.info( f'{name} - publish - done ticker={ticker} trading performance ' f'report {use_log}') # if publish an trading performance report dataset if verbose: log.info( f'{name} - done publishing datasets for ticker={ticker} ' f'from {use_start_date} to {use_end_date}') return algo_res
# end of run_custom_algo