Source code for analysis_engine.algo

"""
Algorithms automatically provide the following
member variables to any custom algorithm that derives
the ``analysis_engine.algo.BaseAlgo.process`` method.

By deriving the ``process()`` member method using an inherited
class, you can quickly build algorithms that
determine **buy** and **sell** conditions from
any of the automatically extracted
datasets from the redis pipeline:

- ``self.df_daily``
- ``self.df_minute``
- ``self.df_calls``
- ``self.df_puts``
- ``self.df_quote``
- ``self.df_pricing``
- ``self.df_stats``
- ``self.df_peers``
- ``self.df_iex_news``
- ``self.df_financials``
- ``self.df_earnings``
- ``self.df_dividends``
- ``self.df_company``
- ``self.df_yahoo_news``
- ``self.df_tdcalls``
- ``self.df_tdputs``

**Recent Pricing Information**

- ``self.latest_close``
- ``self.latest_high``
- ``self.latest_open``
- ``self.latest_low``
- ``self.latest_volume``
- ``self.today_close``
- ``self.today_high``
- ``self.today_open``
- ``self.today_low``
- ``self.today_volume``
- ``self.ask``
- ``self.bid``

**Latest Backtest Date and Intraday Minute**

- ``self.latest_min``
- ``self.backtest_date``

.. note:: **self.latest_min** - Latest minute row in ``self.df_minute``

.. note:: **self.backtest_date** - Latest dataset date which is considered the
    backtest date for historical testing with the data pipeline
    structure (it's the ``date`` key in the dataset node root level)

**Trading Strategy**

- ``self.trade_strategy = 'count'`` - if the number of indicators
    saying buy or sell exceeds the buy/sell rules ``min_indicators``
    the algorithm will trigger a buy or sell
- ``self.buy_reason`` - derived algorithms can attach custom
    buy reasons as a string to each trade order
- ``self.sell_reason`` - derived algorithms can attach custom
    sell reasons as a string to each trade order

**Timeseries**

- ``self.timeseries``- use an algorithm config to set
    ``day`` or ``minute`` to process daily or intraday
    minute by minute datasets. Indicators will still
    have access to all datasets, this just makes it
    easier to utilize the helper within an indicator
    to quickly get the correct dataset:

    .. code-block:: python

        df_status, use_df = self.get_subscribed_dataset(
            dataset=dataset)

**Balance Information**

- ``self.balance`` - current algorithm account balance
- ``self.prev_bal`` - previous balance
- ``self.net_value`` - total value the algorithm has
    left remaining since starting trading. this includes
    the number of ``self.num_owned`` shares with the
    ``self.latest_close`` price included
- ``self.net_gain`` - amount the algorithm has
    made since starting including owned shares
    with the ``self.latest_close`` price included

.. note:: If a key is not in the dataset, the
    algorithms's member variable will be an empty
    pandas DataFrame created with: ``pandas.DataFrame([])``
    except ``self.pricing`` which is just a dictionary.
    Please ensure the engine successfully fetched
    and cached the dataset in redis using a tool like
    ``redis-cli`` and a query of ``keys *`` or
    ``keys <TICKER>_*`` on large deployments.

**Indicator Information**

- ``self.buy_rules`` - optional - custom dictionary for passing
    buy-side business rules to a custom algorithm
- ``self.sell_rules`` - optional - custom dictionary for passing
    sale-side business rules to a custom algorithm
- ``self.min_buy_indicators`` - if ``self.buy_rules`` has
    a value for buying if a ``minimum`` number of indicators
    detect a value that is within a buy condition
- ``self.min_sell_indicators`` - if ``self.sell_rules`` has
    a value for selling if a ``minimum`` number of indicators
    detect a value that is within a sell condition
- ``self.latest_ind_report`` - latest dictionary of values
    from the ``IndicatorProcessor.process()``
- ``self.latest_buys`` - latest indicators saying buy
- ``self.latest_sells`` - latest indicators saying sell
- ``self.num_latest_buys`` - latest number of indicators saying buy
- ``self.num_latest_sells`` - latest number of indicators saying sell
- ``self.iproc`` - member variables for the ``IndicatorProcessor``
    that holds all of the custom algorithm indicators

Indicator buy and sell records in ``self.latest_buys`` and
``self.latest_sells`` have a dictionary structure:

.. code-block:: python

    {
        'name': indicator_name,
        'id': indicator_id,
        'report': indicator_report_dict,
        'cell': indicator cell number
    }

**Supported environment variables**

::

    # to show debug, trace logging please export ``SHARED_LOG_CFG``
    # to a debug logger json file. To turn on debugging for this
    # library, you can export this variable to the repo's
    # included file with the command:
    export SHARED_LOG_CFG=/opt/sa/analysis_engine/log/debug-logging.json
"""

import os
import json
import datetime
import pandas as pd
import analysis_engine.consts as ae_consts
import analysis_engine.utils as ae_utils
import analysis_engine.load_history_dataset as load_history_utils
import analysis_engine.get_data_from_redis_key as redis_get
import analysis_engine.indicators.indicator_processor as ind_processor
import analysis_engine.build_trade_history_entry as history_utils
import analysis_engine.plot_trading_history as plot_trading_history
import analysis_engine.build_buy_order as buy_utils
import analysis_engine.build_sell_order as sell_utils
import analysis_engine.publish as publish
import analysis_engine.build_publish_request as build_publish_request
import analysis_engine.load_dataset as load_dataset
import analysis_engine.prepare_history_dataset as prepare_history
import analysis_engine.prepare_report_dataset as prepare_report
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)


[docs]class BaseAlgo: """BaseAlgo Run an algorithm against multiple tickers at once through the redis dataframe pipeline provided by `analysis_engine.extract.extract <https://github.com/AlgoTraders/stock-analysis-engine/bl ob/master/analysis_engine/extract.py>`__. **Data Pipeline Structure** This algorithm can handle an extracted dictionary with structure: .. code-block:: python import pandas as pd from analysis_engine.algo import BaseAlgo ticker = 'SPY' demo_algo = BaseAlgo( ticker=ticker, balance=1000.00, commission=6.00, name=f'test-{ticker}') date = '2018-11-05' dataset_id = f'{ticker}_{date}' # mock the data pipeline in redis: data = { ticker: [ { 'id': dataset_id, 'date': date, 'data': { 'daily': pd.DataFrame([ { 'high': 280.01, 'low': 270.01, 'open': 275.01, 'close': 272.02, 'volume': 123, 'date': '2018-11-01 15:59:59' }, { 'high': 281.01, 'low': 271.01, 'open': 276.01, 'close': 273.02, 'volume': 124, 'date': '2018-11-02 15:59:59' }, { 'high': 282.01, 'low': 272.01, 'open': 277.01, 'close': 274.02, 'volume': 121, 'date': '2018-11-05 15:59:59' } ]), 'calls': pd.DataFrame([]), 'puts': pd.DataFrame([]), 'minute': pd.DataFrame([]), 'pricing': pd.DataFrame([]), 'quote': pd.DataFrame([]), 'news': pd.DataFrame([]), 'news1': pd.DataFrame([]), 'dividends': pd.DataFrame([]), 'earnings': pd.DataFrame([]), 'financials': pd.DataFrame([]), 'stats': pd.DataFrame([]), 'peers': pd.DataFrame([]), 'company': pd.DataFrame([]) } } ] } # run the algorithm demo_algo.handle_data(data=data) # get the algorithm results results = demo_algo.get_result() print(results) """ def __init__( self, ticker=None, balance=5000.0, commission=6.0, tickers=None, name=None, use_key=None, auto_fill=True, version=1, config_file=None, config_dict=None, output_dir=None, publish_to_slack=False, publish_to_s3=False, publish_to_redis=False, publish_input=True, publish_history=True, publish_report=True, load_from_s3_bucket=None, load_from_s3_key=None, load_from_redis_key=None, load_from_file=None, load_compress=False, load_publish=True, load_config=None, report_redis_key=None, report_s3_bucket=None, report_s3_key=None, report_file=None, report_compress=False, report_publish=True, report_config=None, history_redis_key=None, history_s3_bucket=None, history_s3_key=None, history_file=None, history_compress=False, history_publish=True, history_config=None, extract_redis_key=None, extract_s3_bucket=None, extract_s3_key=None, extract_file=None, extract_save_dir=None, extract_compress=False, extract_publish=True, extract_config=None, dataset_type=ae_consts.SA_DATASET_TYPE_ALGO_READY, serialize_datasets=ae_consts.DEFAULT_SERIALIZED_DATASETS, timeseries=None, trade_strategy=None, verbose=False, verbose_processor=False, verbose_indicators=False, verbose_trading=False, verbose_load=False, verbose_extract=False, verbose_history=False, verbose_report=False, inspect_datasets=False, raise_on_err=True, **kwargs): """__init__ Build an analysis algorithm Use an algorithm object to: 1) `Generate algorithm-ready datasets <https://gith ub.com/AlgoTraders/stock-analysis-engine#extra ct-algorithm-ready-datasets>`__ 2) Backtest trading theories with offline 3) Issue trading alerts from the latest fetched datasets **(Optional) Trading Parameters** :param ticker: single ticker string :param balance: starting capital balance (default is ``5000.00``) :param commission: cost for commission for a single buy or sell trade :param tickers: optional - list of ticker strings :param name: optional - log tracking name or algo name :param use_key: optional - key for saving output in s3, redis, file :param auto_fill: optional - boolean for auto filling buy and sell orders for backtesting (default is ``True``) :param version: optional - version tracking value (default is ``1``) **Derived Config Loading for Indicators and Custom Backtest Values** :param config_file: path to a json file containing custom algorithm object member values (like indicator configuration and predict future date units ahead for a backtest) :param config_dict: optional - dictionary that can be passed to derived class implementations of: ``def load_from_config(config_dict=config_dict)`` **Run a Backtest with an Algorithm-Ready Dataset in S3, Redis or a File** Use these arguments to load algorithm-ready datasets from supported sources (s3, redis or a file) :param load_from_s3_bucket: optional - string load the algo from an a previously-created s3 bucket holding an s3 key with an algorithm-ready dataset for use with: ``handle_data`` :param load_from_s3_key: optional - string load the algo from an a previously-created s3 key holding an algorithm-ready dataset for use with: ``handle_data`` :param load_from_redis_key: optional - string load the algo from a a previously-created redis key holding an algorithm-ready dataset for use with: ``handle_data`` :param load_from_file: optional - string path to a previously-created local file holding an algorithm-ready dataset for use with: ``handle_data`` :param load_compress: optional - boolean flag for toggling to decompress or not when loading an algorithm-ready dataset (``True`` means the dataset must be decompressed to load correctly inside an algorithm to run a backtest) :param load_publish: boolean - toggle publishing the load progress to slack, s3, redis or a file (default is ``True``) :param load_config: optional - dictionary for setting member variables to load an algorithm-ready dataset for backtesting (used by ``run_custom_algo``) **Algorithm Trade History Arguments** :param history_redis_key: optional - string where the algorithm trading history will be stored in an redis key :param history_s3_bucket: optional - string where the algorithm trading history will be stored in an s3 bucket :param history_s3_key: optional - string where the algorithm trading history will be stored in an s3 key :param history_file: optional - string file path for saving the ``trading history`` :param history_compress: optional - boolean flag for toggling to decompress or not when loading an algorithm-ready dataset (``True`` means the dataset will be compressed on publish) :param history_publish: boolean - toggle publishing the history to s3, redis or a file (default is ``True``) :param history_config: optional - dictionary for setting member variables to publish an algo ``trade history`` to s3, redis, a file or slack (used by ``run_custom_algo``) **Algorithm Trade Performance Report Arguments (Output Dataset)** :param report_redis_key: optional - string where the algorithm ``trading performance report`` (report) will be stored in an redis key :param report_s3_bucket: optional - string where the algorithm report will be stored in an s3 bucket :param report_s3_key: optional - string where the algorithm report will be stored in an s3 key :param report_file: optional - string file path for saving the ``trading performance report`` :param report_compress: optional - boolean flag for toggling to decompress or not when loading an algorithm-ready dataset (``True`` means the dataset will be compressed on publish) :param report_publish: boolean - toggle publishing the ``trading performance report`` s3, redis or a file (default is ``True``) :param report_config: optional - dictionary for setting member variables to publish an algo ``result`` or ``trading performance report`` to s3, redis, a file or slack (used by ``run_custom_algo``) **Extract an Algorithm-Ready Dataset Arguments** :param extract_redis_key: optional - string where the algorithm report will be stored in an redis key :param extract_s3_bucket: optional - string where the algorithm report will be stored in an s3 bucket :param extract_s3_key: optional - string where the algorithm report will be stored in an s3 key :param extract_file: optional - string file path for saving the processed datasets as an``algorithm-ready`` dataset :param extract_save_dir: optional - string path to auto-generated files from the algo :param extract_compress: optional - boolean flag for toggling to decompress or not when loading an algorithm-ready dataset (``True`` means the dataset will be compressed on publish) :param extract_publish: boolean - toggle publishing the used ``algorithm-ready dataset`` to s3, redis or a file (default is ``True``) :param extract_config: optional - dictionary for setting member variables to publish an algo ``input`` dataset (the contents of ``data`` from ``self.handle_data(data=data)`` (used by ``run_custom_algo``) **Dataset Arguments** :param dataset_type: optional - dataset type (default is ``ae_consts.SA_DATASET_TYPE_ALGO_READY``) :param serialize_datasets: optional - list of dataset names to deserialize in the dataset (default is ``ae_consts.DEFAULT_SERIALIZED_DATASETS``) :param encoding: optional - string for data encoding **(Optional) Publishing arguments** :param publish_to_slack: optional - boolean for publishing to slack (default is ``False``) :param publish_to_s3: optional - boolean for publishing to s3 (default is ``False``) :param publish_to_redis: optional - boolean for publishing to redis (default is ``False``) :param publish_input: boolean - toggle publishing all input datasets to s3 and redis (default ``True``) :param publish_history: boolean - toggle publishing the history to s3 and redis (default ``True``) :param publish_report: boolean - toggle publishing any generated datasets to s3 and redis (default ``True``) **Timeseries** :param timeseries: optional - string to set ``day`` or ``minute`` backtesting or live trading (default is ``minute``) **Trading Strategy** :param trade_strategy: optional - string to set the type of ``Trading Strategy`` for backtesting or live trading (default is ``count``) **Debugging arguments** :param verbose: optional - boolean for showing verbose algorithm logs (default is ``False``) :param verbose_processor: optional - boolean for showing verbose ``IndicatorProcessor`` logs (default is ``False``) :param verbose_indicators: optional - boolean for showing verbose ``Indicator`` logs (default is ``False`` which means an ``Indicator`` can set ``'verbose': True`` to enable logging per individal ``Indicator``) :param verbose_trading: optional - boolean for logging in the trading functions including the reasons why a buy or sell was opened :param verbose_load: optional - boolean for debugging algorithm ready dataset loading :param verbose_extract: optional - boolean for debugging algorithm ready dataset extraction :param verbose_history: optional - boolean for debugging trading history dataset :param verbose_report: optional - boolean for debugging algorithm report :param inspect_datasets: optional - boolean for logging what is sent to the algorithm's ``process()`` function (default is ``False`` as this will slow processing down) :param raise_on_err: optional - boolean for unittests and developing algorithms with the ``analysis_engine.run_algo.run_algo`` helper. .. note:: When set to ``True`` exceptions will are raised to the calling functions :param output_dir: optional - string path to auto-generated files from the algo **Future Argument Placeholder** :param kwargs: optional - dictionary of keyword arguments """ self.buys = [] self.sells = [] self.num_shares = 0 self.tickers = tickers if not self.tickers: if ticker: self.tickers = [ ticker.upper() ] else: raise Exception('BaseAlgo - please set a ticker to use') self.balance = balance self.starting_balance = balance self.starting_close = 0.0 self.timeseries = timeseries if not self.timeseries: self.timeseries = 'minute' self.trade_strategy = trade_strategy if not self.trade_strategy: self.trade_strategy = 'count' self.timeseries_value = ae_consts.ALGO_TIMESERIES_MINUTE self.trade_horizon = 5 self.commission = commission self.result = None self.name = name self.num_owned = None self.num_buys = 0 self.num_sells = 0 self.ticker_buys = [] self.ticker_sell = [] self.trade_price = 0.0 self.today_high = 0.0 self.today_low = 0.0 self.today_open = 0.0 self.today_close = 0.0 self.today_volume = 0 self.latest_close = 0.0 self.latest_high = 0.0 self.latest_open = 0.0 self.latest_low = 0.0 self.latest_volume = 0 self.latest_min = None self.last_minute = None self.backtest_date = None self.ask = 0.0 self.bid = 0.0 self.prev_bal = None self.prev_num_owned = None self.ds_id = None self.trade_date = None self.trade_type = ae_consts.TRADE_SHARES self.buy_hold_units = 20 self.sell_hold_units = 20 self.spread_exp_date = None self.last_close = None self.order_history = [] self.config_file = config_file self.config_dict = config_dict self.positions = {} self.created_on_date = datetime.datetime.utcnow() self.created_date = self.created_on_date.strftime( ae_consts.COMMON_TICK_DATE_FORMAT) self.created_buy = False self.should_buy = False self.buy_strength = None self.buy_risk = None self.created_sell = False self.should_sell = False self.sell_strength = None self.sell_risk = None self.stop_loss = None self.trailing_stop_loss = None self.last_handle_data = None self.last_ds_id = None self.last_ds_date = None self.last_ds_data = None self.ds_date = None self.ds_data = None self.df_daily = pd.DataFrame([{}]) self.df_minute = pd.DataFrame([{}]) self.df_stats = pd.DataFrame([{}]) self.df_peers = pd.DataFrame([{}]) self.df_financials = pd.DataFrame([]) self.df_earnings = pd.DataFrame([{}]) self.df_dividends = pd.DataFrame([{}]) self.df_quote = pd.DataFrame([{}]) self.df_company = pd.DataFrame([{}]) self.df_iex_news = pd.DataFrame([{}]) self.df_yahoo_news = pd.DataFrame([{}]) self.df_calls = pd.DataFrame([{}]) self.df_puts = pd.DataFrame([{}]) self.df_pricing = pd.DataFrame([{}]) self.df_tdcalls = pd.DataFrame([{}]) self.df_tdputs = pd.DataFrame([{}]) self.empty_pd = pd.DataFrame([{}]) self.empty_pd_str = ae_consts.EMPTY_DF_STR self.note = None self.debug_msg = '' self.version = version self.verbose = verbose self.verbose_processor = verbose_processor self.verbose_indicators = verbose_indicators self.verbose_trading = verbose_trading self.verbose_load = verbose_load self.verbose_extract = verbose_extract self.verbose_history = verbose_history self.verbose_report = verbose_report self.inspect_datasets = inspect_datasets self.run_this_date = None self.verbose = ae_consts.ev( 'AE_DEBUG', '0') == '1' self.publish_to_slack = publish_to_slack self.publish_to_s3 = publish_to_s3 self.publish_to_redis = publish_to_redis self.publish_history = publish_history self.publish_report = publish_report self.publish_input = publish_input self.raise_on_err = raise_on_err if not self.publish_to_s3: self.publish_to_s3 = ae_consts.ENABLED_S3_UPLOAD if not self.publish_to_redis: self.publish_to_redis = ae_consts.ENABLED_REDIS_PUBLISH self.output_file_dir = None self.output_file_prefix = None if self.raise_on_err: if self.tickers and len(self.tickers): self.output_file_prefix = str( self.tickers[0]).upper() self.output_file_dir = '/opt/sa/tests/datasets/algo' if not self.name: self.name = 'myalgo' """ Load tracking connectivity for recording - input - trade history - algorithm-generated datasets """ # parse optional input args self.save_as_key = use_key if not self.save_as_key: self.save_as_key = ( f'{self.name.replace(" ", "")}-' f'{ae_utils.utc_now_str(fmt="%Y-%m-%d-%H-%M-%S.%f")}') self.output_file_dir = '/opt/sa/tests/datasets/algo' if not output_dir: self.output_file_dir = output_dir # set up default keys self.default_output_file = ( f'{self.output_file_dir}/{self.save_as_key}.json') self.default_s3_key = f'{self.save_as_key}.json' self.default_redis_key = f'{self.save_as_key}' self.default_load_output_file = ( f'{self.output_file_dir}/ready-{self.save_as_key}.json') self.default_history_output_file = ( f'{self.output_file_dir}/history-{self.save_as_key}.json') self.default_report_output_file = ( f'{self.output_file_dir}/report-{self.save_as_key}.json') self.default_extract_output_file = ( f'{self.output_file_dir}/extract-{self.save_as_key}.json') self.default_load_redis_key = ( f'algo:ready:{self.default_redis_key}') self.default_history_redis_key = ( f'algo:history:{self.default_redis_key}') self.default_report_redis_key = ( f'algo:output:{self.default_redis_key}') self.default_extract_redis_key = ( f'algo:extract:{self.default_redis_key}') if not load_config: load_config = build_publish_request.build_publish_request() if not extract_config: extract_config = build_publish_request.build_publish_request() if not history_config: history_config = build_publish_request.build_publish_request() if not report_config: report_config = build_publish_request.build_publish_request() if not load_from_s3_bucket: load_from_s3_bucket = ae_consts.ALGO_READY_DATASET_S3_BUCKET_NAME if not extract_s3_bucket: extract_s3_bucket = ae_consts.ALGO_EXTRACT_DATASET_S3_BUCKET_NAME if not history_s3_bucket: history_s3_bucket = ae_consts.ALGO_HISTORY_DATASET_S3_BUCKET_NAME if not report_s3_bucket: report_s3_bucket = ae_consts.ALGO_REPORT_DATASET_S3_BUCKET_NAME # Load the input dataset publishing member variables self.extract_output_dir = extract_config.get( 'output_dir', None) self.extract_output_file = extract_config.get( 'output_file', None) self.extract_label = extract_config.get( 'label', self.name) self.extract_convert_to_json = extract_config.get( 'convert_to_json', True) self.extract_compress = extract_config.get( 'compress', ae_consts.ALGO_INPUT_COMPRESS) self.extract_redis_enabled = extract_config.get( 'redis_enabled', False) self.extract_redis_address = extract_config.get( 'redis_address', ae_consts.ENABLED_S3_UPLOAD) self.extract_redis_db = extract_config.get( 'redis_db', ae_consts.REDIS_DB) self.extract_redis_password = extract_config.get( 'redis_password', ae_consts.REDIS_PASSWORD) self.extract_redis_expire = extract_config.get( 'redis_expire', ae_consts.REDIS_EXPIRE) self.extract_redis_serializer = extract_config.get( 'redis_serializer', 'json') self.extract_redis_encoding = extract_config.get( 'redis_encoding', 'utf-8') self.extract_s3_enabled = extract_config.get( 's3_enabled', False) self.extract_s3_address = extract_config.get( 's3_address', ae_consts.S3_ADDRESS) self.extract_s3_bucket = extract_config.get( 's3_bucket', extract_s3_bucket) self.extract_s3_access_key = extract_config.get( 's3_access_key', ae_consts.S3_ACCESS_KEY) self.extract_s3_secret_key = extract_config.get( 's3_secret_key', ae_consts.S3_SECRET_KEY) self.extract_s3_region_name = extract_config.get( 's3_region_name', ae_consts.S3_REGION_NAME) self.extract_s3_secure = extract_config.get( 's3_secure', ae_consts.S3_SECURE) self.extract_slack_enabled = extract_config.get( 'slack_enabled', False) self.extract_slack_code_block = extract_config.get( 'slack_code_block', False) self.extract_slack_full_width = extract_config.get( 'slack_full_width', False) self.extract_redis_key = extract_config.get( 'redis_key', extract_redis_key) self.extract_s3_key = extract_config.get( 's3_key', extract_s3_key) self.extract_verbose = extract_config.get( 'verbose', False) # load the trade history publishing member variables self.history_output_dir = history_config.get( 'output_dir', None) self.history_output_file = history_config.get( 'output_file', None) self.history_label = history_config.get( 'label', self.name) self.history_convert_to_json = history_config.get( 'convert_to_json', True) self.history_compress = history_config.get( 'compress', ae_consts.ALGO_HISTORY_COMPRESS) self.history_redis_enabled = history_config.get( 'redis_enabled', False) self.history_redis_address = history_config.get( 'redis_address', ae_consts.ENABLED_S3_UPLOAD) self.history_redis_db = history_config.get( 'redis_db', ae_consts.REDIS_DB) self.history_redis_password = history_config.get( 'redis_password', ae_consts.REDIS_PASSWORD) self.history_redis_expire = history_config.get( 'redis_expire', ae_consts.REDIS_EXPIRE) self.history_redis_serializer = history_config.get( 'redis_serializer', 'json') self.history_redis_encoding = history_config.get( 'redis_encoding', 'utf-8') self.history_s3_enabled = history_config.get( 's3_enabled', False) self.history_s3_address = history_config.get( 's3_address', ae_consts.S3_ADDRESS) self.history_s3_bucket = history_config.get( 's3_bucket', history_s3_bucket) self.history_s3_access_key = history_config.get( 's3_access_key', ae_consts.S3_ACCESS_KEY) self.history_s3_secret_key = history_config.get( 's3_secret_key', ae_consts.S3_SECRET_KEY) self.history_s3_region_name = history_config.get( 's3_region_name', ae_consts.S3_REGION_NAME) self.history_s3_secure = history_config.get( 's3_secure', ae_consts.S3_SECURE) self.history_slack_enabled = history_config.get( 'slack_enabled', False) self.history_slack_code_block = history_config.get( 'slack_code_block', False) self.history_slack_full_width = history_config.get( 'slack_full_width', False) self.history_redis_key = history_config.get( 'redis_key', history_redis_key) self.history_s3_key = history_config.get( 's3_key', history_s3_key) self.history_verbose = history_config.get( 'verbose', False) # Load publishing for algorithm-generated report member variables self.report_output_dir = report_config.get( 'output_dir', None) self.report_output_file = report_config.get( 'output_file', None) self.report_label = report_config.get( 'label', self.name) self.report_convert_to_json = report_config.get( 'convert_to_json', True) self.report_compress = report_config.get( 'compress', ae_consts.ALGO_REPORT_COMPRESS) self.report_redis_enabled = report_config.get( 'redis_enabled', False) self.report_redis_address = report_config.get( 'redis_address', ae_consts.ENABLED_S3_UPLOAD) self.report_redis_db = report_config.get( 'redis_db', ae_consts.REDIS_DB) self.report_redis_password = report_config.get( 'redis_password', ae_consts.REDIS_PASSWORD) self.report_redis_expire = report_config.get( 'redis_expire', ae_consts.REDIS_EXPIRE) self.report_redis_serializer = report_config.get( 'redis_serializer', 'json') self.report_redis_encoding = report_config.get( 'redis_encoding', 'utf-8') self.report_s3_enabled = report_config.get( 's3_enabled', False) self.report_s3_address = report_config.get( 's3_address', ae_consts.S3_ADDRESS) self.report_s3_bucket = report_config.get( 's3_bucket', report_s3_bucket) self.report_s3_access_key = report_config.get( 's3_access_key', ae_consts.S3_ACCESS_KEY) self.report_s3_secret_key = report_config.get( 's3_secret_key', ae_consts.S3_SECRET_KEY) self.report_s3_region_name = report_config.get( 's3_region_name', ae_consts.S3_REGION_NAME) self.report_s3_secure = report_config.get( 's3_secure', ae_consts.S3_SECURE) self.report_slack_enabled = report_config.get( 'slack_enabled', False) self.report_slack_code_block = report_config.get( 'slack_code_block', False) self.report_slack_full_width = report_config.get( 'slack_full_width', False) self.report_redis_key = report_config.get( 'redis_key', report_redis_key) self.report_s3_key = report_config.get( 's3_key', report_s3_key) self.report_verbose = report_config.get( 'verbose', False) self.loaded_dataset = None # load the algorithm-ready dataset input member variables self.dsload_output_dir = load_config.get( 'output_dir', None) self.dsload_output_file = load_config.get( 'output_file', None) self.dsload_label = load_config.get( 'label', self.name) self.dsload_convert_to_json = load_config.get( 'convert_to_json', True) self.dsload_compress = load_config.get( 'compress', load_compress) self.dsload_redis_enabled = load_config.get( 'redis_enabled', False) self.dsload_redis_address = load_config.get( 'redis_address', ae_consts.ENABLED_S3_UPLOAD) self.dsload_redis_db = load_config.get( 'redis_db', ae_consts.REDIS_DB) self.dsload_redis_password = load_config.get( 'redis_password', ae_consts.REDIS_PASSWORD) self.dsload_redis_expire = load_config.get( 'redis_expire', ae_consts.REDIS_EXPIRE) self.dsload_redis_serializer = load_config.get( 'redis_serializer', 'json') self.dsload_redis_encoding = load_config.get( 'redis_encoding', 'utf-8') self.dsload_s3_enabled = load_config.get( 's3_enabled', False) self.dsload_s3_address = load_config.get( 's3_address', ae_consts.S3_ADDRESS) self.dsload_s3_bucket = load_config.get( 's3_bucket', load_from_s3_bucket) self.dsload_s3_access_key = load_config.get( 's3_access_key', ae_consts.S3_ACCESS_KEY) self.dsload_s3_secret_key = load_config.get( 's3_secret_key', ae_consts.S3_SECRET_KEY) self.dsload_s3_region_name = load_config.get( 's3_region_name', ae_consts.S3_REGION_NAME) self.dsload_s3_secure = load_config.get( 's3_secure', ae_consts.S3_SECURE) self.dsload_slack_enabled = load_config.get( 'slack_enabled', False) self.dsload_slack_code_block = load_config.get( 'slack_code_block', False) self.dsload_slack_full_width = load_config.get( 'slack_full_width', False) self.dsload_redis_key = load_config.get( 'redis_key', load_from_redis_key) self.dsload_s3_key = load_config.get( 's3_key', load_from_s3_key) self.dsload_verbose = load_config.get( 'verbose', False) self.include_custom = {} self.load_from_external_source() if self.config_dict: self.timeseries = self.config_dict.get( 'timeseries', 'day').lower() self.trade_horizon = int(self.config_dict.get( 'trade_horizon', '5')) self.load_custom_datasets() # end of loading initial values from a config_dict before derived self.iproc = None self.iproc_label = 'no-iproc-label' self.num_indicators = 0 self.latest_ind_report = None self.latest_buys = [] # latest indicators saying buy self.latest_sells = [] # latest indicators saying sell self.num_latest_buys = 0 # latest number of indicators saying buy self.num_latest_sells = 0 # latest number of indicators saying sell self.min_buy_indicators = 0 self.min_sell_indicators = 0 self.buy_rules = {} self.sell_rules = {} self.buy_shares = None self.is_live_trading = False self.found_minute_data = False self.use_minute = None self.intraday_start_min = None self.intraday_end_min = None self.intraday_events = {} self.ignore_history_keys = [ ] self.ind_conf_ignore_keys = [ 'buys', 'date', 'id', 'sells', 'ticker' ] self.buy_reason = None self.sell_reason = None """ if this is in a juptyer notebook this will show the plots at the end of each day... please avoid with the command line as the plot's window will block the algorithm until the window is closed """ self.show_balance = ae_consts.ev( 'SHOW_ALGO_BALANCE', '0') == '1' self.show_log = False self.red_column = 'close' self.blue_column = 'balance' self.green_column = None self.orange_column = None self.net_value = self.starting_balance self.net_gain = self.net_value self.load_from_config( config_dict=self.config_dict) self.starting_balance = self.balance self.net_value = self.starting_balance self.net_gain = self.net_value self.timeseries = str(self.timeseries).lower() if self.timeseries == 'day': self.timeseries_value = ae_consts.ALGO_TIMESERIES_DAY elif self.timeseries == 'daily': self.timeseries_value = ae_consts.ALGO_TIMESERIES_DAY elif self.timeseries == 'minute': self.timeseries_value = ae_consts.ALGO_TIMESERIES_MINUTE elif self.timeseries == 'intraday': self.timeseries_value = ae_consts.ALGO_TIMESERIES_MINUTE else: self.timeseries_value = ae_consts.ALGO_TIMESERIES_MINUTE self.trade_strategy = str(self.trade_strategy).lower() self.trade_off_num_indicators = False if self.trade_strategy == 'count': self.trade_off_num_indicators = True else: self.trade_off_num_indicators = True self.indicator_datasets = [] self.determine_indicator_datasets() # build the IndicatorProcessor after loading # values from an optional config_dict self.iproc = self.get_indicator_processor() if self.iproc: if not hasattr(self.iproc, 'process'): err = ( f'{self.name} - Please implement a process() method ' 'in the IndicatorProcessor - the current object=' f'{self.iproc} is missing one.') log.critical(err) raise Exception(err) self.iproc_label = self.iproc.get_label() self.num_indicators = self.iproc.get_num_indicators() self.min_buy_indicators = self.buy_rules.get( 'min_indicators', self.num_indicators) self.min_sell_indicators = self.sell_rules.get( 'min_indicators', self.num_indicators) # if indicator_processor exists # end of __init__
[docs] def determine_indicator_datasets( self): """determine_indicator_datasets Indicators are coupled to a dataset in the algorithm config file. This allows for identifying the exact datasets to pull from Redis to speed up backtesting. """ if self.config_dict: for ind_node in self.config_dict.get('indicators', []): uses_dataset = ind_node.get('uses_data', 'minute') if uses_dataset not in self.indicator_datasets: self.indicator_datasets.append(uses_dataset)
# end of determine_indicator_datasets
[docs] def get_indicator_datasets( self): """get_indicator_datasets""" return self.indicator_datasets
# end of get_indicator_datasets
[docs] def view_date_dataset_records( self, algo_id, ticker, node): """view_date_dataset_records View the dataset contents for a single node - use it with the algo config_dict by setting: :: "run_this_date": <string date YYYY-MM-DD> :param algo_id: string - algo identifier label for debugging datasets during specific dates :param ticker: string - ticker :param node: dataset to process """ # this will happen twice self.load_from_dataset( ds_data=node) self.inspect_dataset( algo_id=algo_id, ticker=ticker, dataset=node) if self.timeseries == 'minute': if len(self.df_minute.index) <= 1: log.error('EMPTY minute dataset') if self.raise_on_err: raise Exception('EMPTY minute dataset') return for i, row in self.df_minute.iterrows(): log.info(f'minute={i} date={row["date"]} close={row["close"]}') log.info(f'minute df len={len(self.df_minute.index)}') elif self.timeseries == 'day': if len(self.df_daily.index) == 0: log.error('EMPTY daily dataset') if self.raise_on_err: raise Exception('EMPTY daily dataset') return if hasattr(self.daily, 'to_json'): for i, row in self.df_daily.iterrows(): log.info( f'day={i} date={row["date"]} close={row["close"]}') log.info(f'day df len={len(self.daily.index)}')
# end of view_date_dataset_records
[docs] def get_indicator_processor( self, existing_processor=None): """get_indicator_processor singleton for getting the indicator processor :param existing_processor: allow derived algos to build their own indicator processor and pass it to the base """ if existing_processor: if self.verbose: log.info( f'{self.name} - loading existing ' f'processor={existing_processor.get_name()}') self.iproc = existing_processor else: if self.iproc: return self.iproc if not self.config_dict: if self.verbose: log.info( f'{self.name} - is missing an algorithm config_dict ' 'please add one to run indicators') else: self.iproc = ind_processor.IndicatorProcessor( config_dict=self.config_dict, label=f'{self.name}-prc', verbose=self.verbose_processor) # if use new or existing return self.iproc
# end of get_indicator_processor
[docs] def get_indicator_process_last_indicator( self): """get_indicator_process_last_indicator Used to pull the indicator object back up to any created ``analysis_engine.algo.BaseAlgo`` objects .. tip:: this is for debugging data and code issues inside an indicator """ return self.get_indicator_processor().get_last_ind_obj()
# end of get_indicator_process_last_indicator
[docs] def inspect_dataset( self, algo_id, ticker, dataset): """inspect_dataset Use this method inside of an algorithm's ``process()`` method to view the available datasets in the redis cache :param algo_id: string - algo identifier label for debugging datasets during specific dates :param ticker: string - ticker :param dataset: a dictionary of identifiers (for debugging) and """ log.info('--------------') log.info( f'process(algo_id={algo_id}, ticker={ticker}, data:') for k in dataset: log.info(f'main keys={k}') for k in dataset['data']: if hasattr(dataset['data'][k], 'to_json'): log.info( f'data key={k} contains a pandas.DataFrame with ' f'rows={len(dataset["data"][k].index)}') else: log.info( f'data key={k} contains a pandas.DataFrame with ' 'rows=0')
# end of inspect_dataset
[docs] def process( self, algo_id, ticker, dataset): """process Derive custom algorithm buy and sell conditions before placing orders. Just implement your own ``process`` method. :param algo_id: string - algo identifier label for debugging datasets during specific dates :param ticker: string - ticker :param dataset: a dictionary of identifiers (for debugging) and multiple pandas ``pandas.DataFrame`` objects. Dictionary where keys represent a label from one of the data sources (``IEX Cloud`` or ``Tradier``). Here is the supported dataset structure for the process method: .. note:: There are no required keys for ``data``, the list below is not hard-enforced by default. This is just a reference for what is available with the v1 engine. :: dataset = { 'id': <string TICKER_DATE - redis cache key>, 'date': <string DATE>, 'data': { 'daily': pd.DataFrame([]), 'minute': pd.DataFrame([]), 'quote': pd.DataFrame([]), 'stats': pd.DataFrame([]), 'peers': pd.DataFrame([]), 'news1': pd.DataFrame([]), 'financials': pd.DataFrame([]), 'earnings': pd.DataFrame([]), 'dividends': pd.DataFrame([]), 'calls': pd.DataFrame([]), 'puts': pd.DataFrame([]), 'pricing': pd.DataFrame([]), 'news': pd.DataFrame([]) } } example: :: dataset = { 'id': 'SPY_2018-11-02 'date': '2018-11-02', 'data': { 'daily': pd.DataFrame, 'minute': pd.DataFrame, 'calls': pd.DataFrame, 'puts': pd.DataFrame, 'news': pd.DataFrame } } """ use_date = self.trade_date num_rows = len(self.df_daily.index) if self.latest_min: use_date = self.latest_min if self.verbose or self.show_log: log.info( f'process {algo_id} - {use_date} - ' f'{self.name} ' f'bal={self.balance} ' f'net={self.net_gain} ' f'owned={self.num_owned} ' f'high={self.latest_high} ' f'low={self.latest_low} ' f'open={self.latest_open} ' f'close={self.latest_close} ' f'vol={self.latest_volume} ' f'cur_buy={self.num_latest_buys} ' f'min_buy={self.min_buy_indicators} ' f'num_buys={self.num_buys} ' f'cur_sell={self.num_latest_sells} ' f'min_sell={self.min_sell_indicators} ' f'num_sells={self.num_sells} ' f'rows={num_rows}') # flip these on to sell/buy # buys will not FILL if there's not enough funds to buy # sells will not FILL if there's nothing already owned self.should_sell = False self.should_buy = False """ Want to iterate over daily pricing data to determine buys or sells from the: self.df_daily dataset fetched from IEX? # loop over the rows in the daily dataset: for idx, row in self.df_daily.iterrows(): print(row) """
# end of process
[docs] def trade_off_indicator_buy_and_sell_signals( self, ticker, algo_id, reason_for_buy=None, reason_for_sell=None): """trade_off_indicator_buy_and_sell_signals Check if the minimum number of indicators for a buy or a sell were found. If there were, then commit the trade. .. code-block:: python if self.trade_off_num_indicators: if self.num_latest_buys >= self.min_buy_indicators: self.should_buy = True elif self.num_latest_sells >= self.min_sell_indicators: self.should_sell = True :param ticker: ticker symbol :param algo_id: string algo for tracking internal progress for debugging :param reason_for_buy: optional - string for tracking why the algo bought :param reason_for_sell: optional - string for tracking why the algo sold """ if self.trade_off_num_indicators: if self.num_latest_buys >= self.min_buy_indicators: self.should_buy = True elif self.num_latest_sells >= self.min_sell_indicators: self.should_sell = True if self.num_owned and self.should_sell: if self.verbose_trading or self.verbose: log.critical( 'TRADE - SELLDECISION - ' f'{algo_id} ' f'trade_off_num={self.trade_off_num_indicators} ' f'num_sells={self.num_latest_sells} > ' f'min_sells={self.min_sell_indicators} ' f'should_sell={self.should_sell}') self.create_sell_order( ticker=ticker, shares=self.num_owned, minute=self.use_minute, row={ 'name': algo_id, 'close': self.latest_close, 'date': self.trade_date }, is_live_trading=self.is_live_trading, reason=reason_for_buy) # if own shares and should sell # else if should buy: elif self.should_buy: if self.verbose_trading or self.verbose: log.critical( 'TRADE - BUYDECISION - ' f'{algo_id} ' f'trade_off_num={self.trade_off_num_indicators} ' f'num_buys={self.num_latest_buys} > ' f'min_buys={self.min_buy_indicators} ' f'should_buy={self.should_buy}') self.create_buy_order( ticker=ticker, shares=self.buy_shares, minute=self.use_minute, row={ 'name': algo_id, 'close': self.latest_close, 'date': self.trade_date }, is_live_trading=self.is_live_trading, reason=reason_for_sell)
# end of should_buy # end of trade_off_indicator_buy_and_sell_signals
[docs] def load_from_external_source( self, path_to_file=None, s3_bucket=None, s3_key=None, redis_key=None): """load_from_external_source Load an algorithm-ready dataset for ``handle_data`` backtesting and trade performance analysis from: - Local file - S3 - Redis :param path_to_file: optional - path to local file :param s3_bucket: optional - s3 s3_bucket :param s3_key: optional - s3 key :param redis_key: optional - redis key """ if path_to_file: self.dsload_output_file = path_to_file if s3_key: self.dsload_s3_key = s3_key if s3_bucket: self.dsload_s3_bucket = s3_bucket self.dsload_s3_enabled = True if redis_key: self.dsload_redis_key = redis_key self.dsload_redis_enabled = True if (self.dsload_s3_key and self.dsload_s3_bucket and self.dsload_s3_enabled and not self.loaded_dataset): self.debug_msg = ( f'external load START - s3={self.dsload_s3_address}:' f'{self.dsload_s3_bucket}/{self.dsload_s3_key}') if self.verbose: log.info(self.debug_msg) self.loaded_dataset = load_dataset.load_dataset( s3_enabled=self.dsload_s3_enabled, s3_address=self.dsload_s3_address, s3_key=self.dsload_s3_key, s3_bucket=self.dsload_s3_bucket, s3_access_key=self.dsload_s3_access_key, s3_secret_key=self.dsload_s3_secret_key, s3_region_name=self.dsload_s3_region_name, s3_secure=self.dsload_s3_secure, compress=True, encoding=self.dsload_redis_encoding) if self.loaded_dataset: self.debug_msg = ( f'external load SUCCESS - s3={self.dsload_s3_address}:' f'{self.dsload_s3_bucket}/{self.dsload_s3_key}') else: self.debug_msg = ( f'external load FAILED - s3={self.dsload_s3_address}:' f'{self.dsload_s3_bucket}/{self.dsload_s3_key}') log.error(self.debug_msg) raise Exception(self.debug_msg) elif (self.dsload_redis_key and self.dsload_redis_enabled and not self.loaded_dataset): self.debug_msg = ( f'external load START - redis={self.dsload_redis_address}:' f'{self.dsload_redis_db}/{self.dsload_redis_key}') log.debug(self.debug_msg) self.loaded_dataset = load_dataset.load_dataset( redis_enabled=self.dsload_redis_enabled, redis_address=self.dsload_redis_address, redis_key=self.dsload_redis_key, redis_db=self.dsload_redis_db, redis_password=self.dsload_redis_password, redis_expire=self.dsload_redis_expire, redis_serializer=self.dsload_redis_serializer, redis_encoding=self.dsload_redis_encoding, compress=self.dsload_compress, encoding=self.dsload_redis_encoding) if self.loaded_dataset: self.debug_msg = ( 'external load SUCCESS - ' f'redis={self.dsload_redis_address}:' f'{self.dsload_redis_db}/{self.dsload_redis_key}') else: self.debug_msg = ( 'external load FAILED - ' f'redis={self.dsload_redis_address}:' f'{self.dsload_redis_db}/{self.dsload_redis_key}') log.error(self.debug_msg) raise Exception(self.debug_msg) elif (self.dsload_output_file and not self.loaded_dataset): if os.path.exists(self.dsload_output_file): self.debug_msg = ( f'external load START - file={self.dsload_output_file}') log.debug(self.debug_msg) self.loaded_dataset = load_dataset.load_dataset( path_to_file=self.dsload_output_file, compress=self.dsload_compress, encoding=self.extract_redis_encoding) if self.loaded_dataset: self.debug_msg = ( 'external load SUCCESS - ' f'file={self.dsload_output_file}') else: self.debug_msg = ( 'external load FAILED - ' f'file={self.dsload_output_file}') log.error(self.debug_msg) raise Exception(self.debug_msg) else: self.debug_msg = ( 'external load - did not find ' f'file={self.dsload_output_file}') log.error(self.debug_msg) raise Exception(self.debug_msg) # end of if supported external loader log.debug( 'external load END')
# end of load_from_external_source
[docs] def publish_report_dataset( self, **kwargs): """publish_report_dataset publish trade history datasets to caches (redis), archives (minio s3), a local file (``output_file``) and slack :param kwargs: keyword argument dictionary :return: tuple: ``status``, ``output_file`` """ # parse optional input args output_file = kwargs.get( 'output_file', None) label = kwargs.get( 'label', self.name) redis_enabled = kwargs.get( 'redis_enabled', False) redis_address = kwargs.get( 'redis_address', self.report_redis_address) redis_db = kwargs.get( 'redis_db', self.report_redis_db) redis_password = kwargs.get( 'redis_password', self.report_redis_password) redis_expire = kwargs.get( 'redis_expire', self.report_redis_expire) redis_serializer = kwargs.get( 'redis_serializer', self.report_redis_serializer) redis_encoding = kwargs.get( 'redis_encoding', self.report_redis_encoding) s3_enabled = kwargs.get( 's3_enabled', False) s3_address = kwargs.get( 's3_address', self.report_s3_address) s3_bucket = kwargs.get( 's3_bucket', self.report_s3_bucket) s3_access_key = kwargs.get( 's3_access_key', self.report_s3_access_key) s3_secret_key = kwargs.get( 's3_secret_key', self.report_s3_secret_key) s3_region_name = kwargs.get( 's3_region_name', self.report_s3_region_name) s3_secure = kwargs.get( 's3_secure', self.report_s3_secure) slack_enabled = kwargs.get( 'slack_enabled', self.report_slack_enabled) slack_code_block = kwargs.get( 'slack_code_block', self.report_slack_code_block) slack_full_width = kwargs.get( 'slack_full_width', self.report_slack_full_width) redis_key = kwargs.get( 'redis_key', self.report_redis_key) s3_key = kwargs.get( 's3_key', self.report_s3_key) verbose = kwargs.get( 'verbose', self.report_verbose) status = ae_consts.NOT_RUN if not self.publish_report: if self.verbose: log.info( 'report publish - disabled - ' f'{self.name} - tickers={self.tickers}') return status output_record = self.create_report_dataset() if output_file or s3_enabled or redis_enabled or slack_enabled: if self.verbose: log.info( f'report build json - {self.name} - ' f'tickers={self.tickers}') use_data = output_record num_bytes = len(str(use_data)) num_mb = ae_consts.get_mb(num_bytes) log.info( 'report publish - START - ' f'{self.name} - tickers={self.tickers} ' f'file={output_file} size={num_mb}MB ' f's3={s3_enabled} s3_key={s3_key} ' f'redis={redis_enabled} redis_key={redis_key} ' f'slack={slack_enabled}') publish_status = publish.publish( data=use_data, label=label, df_compress=True, compress=False, convert_to_dict=False, output_file=output_file, redis_enabled=redis_enabled, redis_key=redis_key, redis_address=redis_address, redis_db=redis_db, redis_password=redis_password, redis_expire=redis_expire, redis_serializer=redis_serializer, redis_encoding=redis_encoding, s3_enabled=s3_enabled, s3_key=s3_key, s3_address=s3_address, s3_bucket=s3_bucket, s3_access_key=s3_access_key, s3_secret_key=s3_secret_key, s3_region_name=s3_region_name, s3_secure=s3_secure, slack_enabled=slack_enabled, slack_code_block=slack_code_block, slack_full_width=slack_full_width, verbose=verbose) status = publish_status log.info( 'report publish - END - ' f'{ae_consts.get_status(status=status)} ' f'{self.name} - tickers={self.tickers} ' f'file={output_file} s3={s3_key} ' f'redis={redis_key} size={num_mb}MB') else: status = ae_consts.SUCCESS if self.verbose: log.info( f'{self.name} - report not publishing for ' f'output_file={output_file} s3_enabled={s3_enabled} ' f'redis_enabled={redis_enabled} ' f'slack_enabled={slack_enabled}') # end of handling for publish return status
# end of publish_report_dataset
[docs] def create_report_dataset( self): """create_report_dataset Create the ``Trading Performance Report`` dataset during the ``self.publish_input_dataset()`` member method. Inherited Algorithm classes can derive how they build a custom ``Trading Performance Report`` dataset before publishing by implementing this method in the derived class. """ if self.verbose: log.info('report - create start') if self.last_handle_data: data_for_tickers = self.get_supported_tickers_in_data( data=self.last_handle_data) else: data_for_tickers = self.tickers num_tickers = len(data_for_tickers) if num_tickers > 0: self.debug_msg = ( f'{self.name} handle - tickers={json.dumps(data_for_tickers)}') output_record = {} for ticker in data_for_tickers: if ticker not in output_record: output_record[ticker] = [] num_ticker_datasets = len(self.last_handle_data[ticker]) cur_idx = 1 for idx, node in enumerate(self.last_handle_data[ticker]): track_label = self.build_progress_label( progress=cur_idx, total=num_ticker_datasets) algo_id = f'{ticker} {track_label}' if self.verbose: log.info( f'{self.name} report - {algo_id} - ds={node["date"]}') new_node = { 'id': node['id'], 'date': node['date'], 'data': {} } output_record[ticker].append(new_node) cur_idx += 1 # end for all self.last_handle_data[ticker] # end of converting dataset return output_record
# end of create_report_dataset
[docs] def publish_trade_history_dataset( self, **kwargs): """publish_trade_history_dataset publish trade history datasets to caches (redis), archives (minio s3), a local file (``output_file``) and slack :param kwargs: keyword argument dictionary :return: tuple: ``status``, ``output_file`` """ # parse optional input args output_file = kwargs.get( 'output_file', None) label = kwargs.get( 'label', self.name) redis_enabled = kwargs.get( 'redis_enabled', False) redis_address = kwargs.get( 'redis_address', self.history_redis_address) redis_db = kwargs.get( 'redis_db', self.history_redis_db) redis_password = kwargs.get( 'redis_password', self.history_redis_password) redis_expire = kwargs.get( 'redis_expire', self.history_redis_expire) redis_serializer = kwargs.get( 'redis_serializer', self.history_redis_serializer) redis_encoding = kwargs.get( 'redis_encoding', self.history_redis_encoding) s3_enabled = kwargs.get( 's3_enabled', False) s3_address = kwargs.get( 's3_address', self.history_s3_address) s3_bucket = kwargs.get( 's3_bucket', self.history_s3_bucket) s3_access_key = kwargs.get( 's3_access_key', self.history_s3_access_key) s3_secret_key = kwargs.get( 's3_secret_key', self.history_s3_secret_key) s3_region_name = kwargs.get( 's3_region_name', self.history_s3_region_name) s3_secure = kwargs.get( 's3_secure', self.history_s3_secure) slack_enabled = kwargs.get( 'slack_enabled', self.history_slack_enabled) slack_code_block = kwargs.get( 'slack_code_block', self.history_slack_code_block) slack_full_width = kwargs.get( 'slack_full_width', self.history_slack_full_width) redis_key = kwargs.get( 'redis_key', self.history_redis_key) s3_key = kwargs.get( 's3_key', self.history_s3_key) verbose = kwargs.get( 'verbose', self.history_verbose) add_metrics_to_key = kwargs.get( 'add_metrics_to_key', False) status = ae_consts.NOT_RUN if not self.publish_history: log.info( 'history publish - disabled - ' f'{self.name} - tickers={self.tickers}') return status # end of screening for returning early output_record = self.create_history_dataset() if output_file or s3_enabled or redis_enabled or slack_enabled: if self.verbose: log.info( f'history build json - {self.name} - ' f'tickers={self.tickers}') # for mass trade history publishing, make it # easy to find the best-of runs if add_metrics_to_key: (self.num_owned, self.ticker_buys, self.ticker_sells) = self.get_ticker_positions( ticker=self.tickers[0]) status_str = 'NEGATIVE' if self.net_gain > 0: status_str = 'POSITIVE' now = datetime.datetime.utcnow() seconds = ae_consts.to_f(( now - self.created_on_date).total_seconds()) # https://stackoverflow.com/questions/6870824/ # what-is-the-maximum-length-of-a-filename-in-s3 # 1024 characters s3_key = ( f'{ae_consts.to_f(self.net_gain)}_netgain_' f'{ae_consts.to_f(self.net_value)}_netvalue_' f'{status_str}_' f'{ae_consts.to_f(self.starting_balance)}_startbalance_' f'{ae_consts.to_f(self.balance)}_endbalance_' f'{self.num_owned}_shares_' f'{ae_consts.to_f(self.latest_close)}_close_' f'{self.num_buys}_buys_{self.num_sells}_sells_' f'{self.min_buy_indicators}_minbuyinds_' f'{self.min_sell_indicators}_minsellinds_' f'{seconds}_seconds_{s3_key}')[0:1023] # end of if add metrics to key use_data = output_record num_bytes = len(str(use_data)) num_mb = ae_consts.get_mb(num_bytes) log.info( 'history publish - START - ' f'{self.name} - ticker={self.tickers[0]} ' f'file={output_file} size={num_mb}MB ' f's3={s3_address}/{s3_bucket} s3_key={s3_key} ' f'redis={redis_enabled} redis_key={redis_key} ' f'slack={slack_enabled}') publish_status = publish.publish( data=use_data, label=label, df_compress=True, compress=False, convert_to_dict=False, output_file=output_file, redis_enabled=redis_enabled, redis_key=redis_key, redis_address=redis_address, redis_db=redis_db, redis_password=redis_password, redis_expire=redis_expire, redis_serializer=redis_serializer, redis_encoding=redis_encoding, s3_enabled=s3_enabled, s3_key=s3_key, s3_address=s3_address, s3_bucket=s3_bucket, s3_access_key=s3_access_key, s3_secret_key=s3_secret_key, s3_region_name=s3_region_name, s3_secure=s3_secure, slack_enabled=slack_enabled, slack_code_block=slack_code_block, slack_full_width=slack_full_width, verbose=verbose) status = publish_status log.info( 'history publish - END - ' f'{ae_consts.get_status(status=status)} ' f'{self.name} - tickers={self.tickers} ' f'file={output_file} s3={s3_key} redis={redis_key} ' f'size={num_mb}MB') else: status = ae_consts.SUCCESS if self.verbose: log.info( f'{self.name} - history not publishing for ' f'output_file={output_file} s3_enabled={s3_enabled} ' f'redis_enabled={redis_enabled} ' f'slack_enabled={slack_enabled}') # end of handling for publish return status
# end of publish_trade_history_dataset
[docs] def create_history_dataset( self): """create_history_dataset Create the ``Trading History`` dataset during the ``self.publish_trade_history_dataset()`` member method. Inherited Algorithm classes can derive how they build a custom ``Trading History`` dataset before publishing by implementing this method in the derived class. """ if self.verbose: log.info('history - create start') if self.last_handle_data: data_for_tickers = self.get_supported_tickers_in_data( data=self.last_handle_data) else: data_for_tickers = self.tickers num_tickers = len(data_for_tickers) if num_tickers > 0: self.debug_msg = ( f'{self.name} handle - tickers={json.dumps(data_for_tickers)}') history_by_ticker = {} for ticker in data_for_tickers: ticker_history_rec_list = self.build_ticker_history( ticker=ticker, ignore_keys=self.ignore_history_keys) history_by_ticker[ticker] = ticker_history_rec_list # end for all tickers to filter output_record = { 'tickers': data_for_tickers, 'version': int(ae_consts.ALGO_HISTORY_VERSION), 'last_trade_date': ae_utils.get_last_close_str(), 'algo_config_dict': self.config_dict, 'algo_name': self.name, 'created': ae_utils.utc_now_str() } for ticker in data_for_tickers: if ticker not in output_record: output_record[ticker] = [] num_ticker_datasets = len(history_by_ticker[ticker]) cur_idx = 1 for idx, node in enumerate(history_by_ticker[ticker]): track_label = self.build_progress_label( progress=cur_idx, total=num_ticker_datasets) algo_id = f'{ticker} {track_label}' if self.verbose: log.info( f'''{self.name} history - {algo_id} - ds={node.get( 'minute', node.get( 'date', 'no-date-set'))}''') output_record[ticker].append(node) cur_idx += 1 # end for all self.last_handle_data[ticker] # end of converting dataset return output_record
# end of create_history_dataset
[docs] def build_ticker_history( self, ticker, ignore_keys): """build_ticker_history For all records in ``self.order_history`` compile a filter list of history records per ``ticker`` while pruning any keys that are in the list of ``ignore_keys`` :param ticker: string ticker symbol :param ignore_history_keys: list of keys to not include in the history report """ history_for_ticker = [] for org_node in self.order_history: status = org_node.get('status', ae_consts.INVALID) if status != ae_consts.INVALID: # trade history dictionaries # will be permanently changed # unless an expensive deep copy is # used by a derived class with: # node = copy.deepcopy(org_node) node = org_node is_valid = True for i in ignore_keys: node.pop(i, None) if is_valid: history_for_ticker.append(node) # end of all order history records return history_for_ticker
# end of build_ticker_history
[docs] def publish_input_dataset( self, **kwargs): """publish_input_dataset publish input datasets to caches (redis), archives (minio s3), a local file (``output_file``) and slack :param kwargs: keyword argument dictionary :return: tuple: ``status``, ``output_file`` """ # parse optional input args output_file = kwargs.get( 'output_file', None) label = kwargs.get( 'label', self.name) redis_enabled = kwargs.get( 'redis_enabled', False) redis_address = kwargs.get( 'redis_address', self.extract_redis_address) redis_db = kwargs.get( 'redis_db', self.extract_redis_db) redis_password = kwargs.get( 'redis_password', self.extract_redis_password) redis_expire = kwargs.get( 'redis_expire', self.extract_redis_expire) redis_serializer = kwargs.get( 'redis_serializer', self.extract_redis_serializer) redis_encoding = kwargs.get( 'redis_encoding', self.extract_redis_encoding) s3_enabled = kwargs.get( 's3_enabled', False) s3_address = kwargs.get( 's3_address', self.extract_s3_address) s3_bucket = kwargs.get( 's3_bucket', self.extract_s3_bucket) s3_access_key = kwargs.get( 's3_access_key', self.extract_s3_access_key) s3_secret_key = kwargs.get( 's3_secret_key', self.extract_s3_secret_key) s3_region_name = kwargs.get( 's3_region_name', self.extract_s3_region_name) s3_secure = kwargs.get( 's3_secure', self.extract_s3_secure) slack_enabled = kwargs.get( 'slack_enabled', self.extract_slack_enabled) slack_code_block = kwargs.get( 'slack_code_block', self.extract_slack_code_block) slack_full_width = kwargs.get( 'slack_full_width', self.extract_slack_full_width) redis_key = kwargs.get( 'redis_key', self.extract_redis_key) s3_key = kwargs.get( 's3_key', self.extract_s3_key) verbose = kwargs.get( 'verbose', self.extract_verbose) status = ae_consts.NOT_RUN if not self.publish_input: log.info( 'input publish - disabled - ' f'{self.name} - tickers={self.tickers}') return status output_record = self.create_algorithm_ready_dataset() if output_file or s3_enabled or redis_enabled or slack_enabled: if self.verbose: log.info( f'input build json - {self.name} - tickers={self.tickers}') use_data = output_record num_bytes = len(str(use_data)) num_mb = ae_consts.get_mb(num_bytes) log.info( f'input publish - START - {self.name} - ' f'tickers={self.tickers} file={output_file} size={num_mb}MB ' f's3={s3_enabled} s3_key={s3_key} redis={redis_enabled} ' f'redis_key={redis_key} slack={slack_enabled}') publish_status = publish.publish( data=use_data, label=label, df_compress=True, compress=False, convert_to_dict=False, output_file=output_file, redis_enabled=redis_enabled, redis_key=redis_key, redis_address=redis_address, redis_db=redis_db, redis_password=redis_password, redis_expire=redis_expire, redis_serializer=redis_serializer, redis_encoding=redis_encoding, s3_enabled=s3_enabled, s3_key=s3_key, s3_address=s3_address, s3_bucket=s3_bucket, s3_access_key=s3_access_key, s3_secret_key=s3_secret_key, s3_region_name=s3_region_name, s3_secure=s3_secure, slack_enabled=slack_enabled, slack_code_block=slack_code_block, slack_full_width=slack_full_width, verbose=verbose) status = publish_status log.info( f'input publish - END - {ae_consts.get_status(status=status)} ' f'{self.name} - tickers={self.tickers} file={output_file} ' f's3={s3_key} redis={redis_key} size={num_mb}MB') else: status = ae_consts.SUCCESS if self.verbose: log.info( f'{self.name} - input not publishing for ' f'output_file={output_file} s3_enabled={s3_enabled} ' f'redis_enabled={redis_enabled} ' f'slack_enabled={slack_enabled}') # end of handling for publish return status
# end of publish_input_dataset
[docs] def create_algorithm_ready_dataset( self): """create_algorithm_ready_dataset Create the ``Algorithm-Ready`` dataset during the ``self.publish_input_dataset()`` member method. Inherited Algorithm classes can derive how they build a custom ``Algorithm-Ready`` dataset before publishing by implementing this method in the derived class. """ if self.verbose: log.info('algo-ready - create start') data_for_tickers = self.get_supported_tickers_in_data( data=self.last_handle_data) num_tickers = len(data_for_tickers) if num_tickers > 0: self.debug_msg = ( f'{self.name} handle - tickers={json.dumps(data_for_tickers)}') output_record = {} for ticker in data_for_tickers: if ticker not in output_record: output_record[ticker] = [] num_ticker_datasets = len(self.last_handle_data[ticker]) cur_idx = 1 for idx, node in enumerate(self.last_handle_data[ticker]): track_label = self.build_progress_label( progress=cur_idx, total=num_ticker_datasets) algo_id = f'{ticker} {track_label}' if self.verbose: log.info( f'{self.name} convert - {algo_id} - ds={node["date"]}') new_node = { 'id': node['id'], 'date': node['date'], 'data': {} } # parse the dataset node and set member variables self.debug_msg = ( f'{ticker} START - convert load dataset ' f'id={node.get("id", "missing-id")}') self.load_from_dataset( ds_data=node) for ds_key in node['data']: empty_ds = self.empty_pd_str data_val = node['data'][ds_key] if ds_key not in new_node['data']: new_node['data'][ds_key] = empty_ds self.debug_msg = ( f'convert node={node} ds_key={ds_key}') if hasattr(data_val, 'to_json'): new_node['data'][ds_key] = data_val.to_json( orient='records', date_format='iso') else: if not data_val: new_node['data'][ds_key] = empty_ds else: new_node['data'][ds_key] = json.dumps( data_val) # if/else # for all dataset values in data self.debug_msg = ( f'{ticker} END - convert load dataset ' f'id={node.get("id", "missing-id")}') output_record[ticker].append(new_node) cur_idx += 1 # end for all self.last_handle_data[ticker] # end of converting dataset return output_record
# end of create_algorithm_ready_dataset
[docs] def get_ticker_positions( self, ticker): """get_ticker_positions get the current positions for a ticker and returns a tuple: ``num_owned (integer), buys (list), sells (list)``` .. code-block:: python num_owned, buys, sells = self.get_ticker_positions( ticker=ticker) :param ticker: ticker to lookup """ buys = None sells = None num_owned = None self.num_buys = 0 self.num_sells = 0 if ticker in self.positions: num_owned = self.positions[ticker].get( 'shares', None) buys = self.positions[ticker].get( 'buys', []) sells = self.positions[ticker].get( 'sells', []) self.num_buys = len(buys) self.num_sells = len(sells) # if own the ticker self.net_value = ae_consts.to_f(self.balance) if self.latest_close and num_owned: self.net_value = ae_consts.to_f( self.balance + ( num_owned * self.latest_close)) self.net_gain = ae_consts.to_f( self.net_value - self.starting_balance) return num_owned, buys, sells
# end of get_ticker_positions
[docs] def get_trade_history_node( self): """get_trade_history_node Helper for quickly building a history node on a derived algorithm. Whatever member variables are in the base class ``analysis_engine.algo.BaseAlgo`` will be added automatically into the returned: ``historical transaction dictionary`` .. tip:: if you get a ``None`` back it means there could be a bug in how you are using the member variables (likely created an invalid math calculation) or could be a bug in the helper: `build_trade_history_entry <https:// github.com/AlgoTraders/stock-analysis-engine/blob/ma ster/analysis_engine/build_trade_history_entry.py>`__ """ history_dict = history_utils.build_trade_history_entry( ticker=self.ticker, algo_start_price=self.starting_close, original_balance=self.starting_balance, num_owned=self.num_owned, close=self.trade_price, balance=self.balance, commission=self.commission, date=self.trade_date, minute=self.use_minute, trade_type=self.trade_type, high=self.latest_high, low=self.latest_low, open_val=self.latest_open, volume=self.latest_volume, today_high=self.today_high, today_low=self.today_low, today_open_val=self.today_open, today_close=self.today_close, today_volume=self.today_volume, ask=self.ask, bid=self.bid, stop_loss=self.stop_loss, trailing_stop_loss=self.trailing_stop_loss, buy_hold_units=self.buy_hold_units, sell_hold_units=self.sell_hold_units, spread_exp_date=self.spread_exp_date, prev_balance=self.prev_bal, prev_num_owned=self.prev_num_owned, total_buys=self.num_buys, total_sells=self.num_sells, buy_triggered=self.should_buy, buy_strength=self.buy_strength, buy_risk=self.buy_risk, sell_triggered=self.should_sell, sell_strength=self.sell_strength, sell_risk=self.sell_risk, num_indicators_buy=self.num_latest_buys, num_indicators_sell=self.num_latest_sells, min_buy_indicators=self.min_buy_indicators, min_sell_indicators=self.min_sell_indicators, net_gain=self.net_gain, net_value=self.net_value, note=self.note, ds_id=self.ds_id, version=self.version) return history_dict
# end of get_trade_history_node
[docs] def load_from_config( self, config_dict): """load_from_config support for replaying algorithms from a trading history :param config_dict: algorithm configuration values usually from a previous trading history or for quickly testing dataset theories in a development environment """ if config_dict: if not self.verbose: self.verbose = config_dict.get('verbose', False) if self.verbose: for k in config_dict: log.debug( f'setting algo member={k} to ' f'config value={config_dict[k]}') # end of logging all config keys for k in config_dict: self.__dict__[k] = config_dict[k]
# end of assigning config keys to member variables # end if config dict set # end of load_from_config
[docs] def get_name(self): """get_name""" return self.name
# end of get_name
[docs] def get_result(self): """get_result""" self.debug_msg = ( 'building results') finished_date = ae_utils.utc_now_str() self.result = { 'name': self.name, 'created': self.created_date, 'updated': finished_date, 'open_positions': self.positions, 'buys': self.get_buys(), 'sells': self.get_sells(), 'num_processed': len(self.order_history), 'history': self.order_history, 'balance': self.balance, 'commission': self.commission } return self.result
# end of get_result
[docs] def get_debug_msg( self): """get_debug_msg debug algorithms that failed by viewing the last ``self.debug_msg`` they set """ return self.debug_msg
# end of get_debug_msg
[docs] def get_tickers( self): """get_tickers""" return self.tickers
# end of get_tickers
[docs] def get_balance( self): """get_balance""" return self.balance
# end of get_balance
[docs] def get_commission( self): """get_commission""" return self.commission
# end of get_commission
[docs] def get_buys( self): """get_buys""" return self.buys
# end of get_buys
[docs] def get_sells( self): """get_sells""" return self.sells
# end of get_sells
[docs] def get_history_dataset( self): """get_history_dataset""" return prepare_history.prepare_history_dataset( data=self.create_history_dataset(), convert_to_dict=False)
# end of get_history_dataset
[docs] def get_report_dataset( self): """get_report_dataset""" return prepare_report.prepare_report_dataset( data=self.create_report_dataset(), convert_to_dict=False)
# end of get_report_dataset
[docs] def get_owned_shares( self, ticker): """get_owned_shares :param ticker: ticker to lookup """ num_owned = 0 if ticker in self.positions: num_owned = self.positions[ticker].get( 'shares', None) return num_owned
# end of get_owned_shares
[docs] def create_buy_order( self, ticker, row, minute=None, shares=None, reason=None, orient='records', date_format='iso', is_live_trading=False): """create_buy_order create a buy order at the close or ask price .. note:: setting the ``minute`` is required to build a minute-by-minute ``Trading History`` :param ticker: string ticker :param shares: optional - integer number of shares to buy if None buy max number of shares at the ``close`` with the available ``balance`` amount. :param row: ``dictionary`` or ``pandas.DataFrame`` row record that will be converted to a json-serialized string :param minute: optional - string datetime when the order minute the order was placed. For ``day`` timeseries this is the close of trading (16:00:00 for the day) and for ``minute`` timeseries the value will be the latest minute from the ``self.df_minute`` ``pandas.DataFrame``. Normally this value should be set to the ``self.use_minute``, and the format is ``ae_consts.COMMON_TICK_DATE_FORMAT`` :param reason: optional - reason for creating the order which is useful for troubleshooting order histories :param orient: optional - pandas orient for ``row.to_json()`` :param date_format: optional - pandas date_format parameter for ``row.to_json()`` :param is_live_trading: optional - bool for filling trades for live trading or for backtest tuning filled (default ``False`` which is backtest mode) """ close = row['close'] required_amount_for_a_buy = close + self.commission if required_amount_for_a_buy > self.balance: if self.verbose_trading: log.info( f'{self.name} - buy - not enough funds={self.balance} < ' f'required={required_amount_for_a_buy} with ' f'shares={self.num_owned}') return dataset_date = row['date'] use_date = dataset_date if minute: use_date = minute if self.verbose_trading: log.info( f'{self.name} - buy start {use_date} {ticker}@{close} - ' f'shares={shares}') new_buy = None order_details = row if hasattr(row, 'to_json'): order_details = row.to_json( orient=orient, date_format=date_format), try: num_owned = self.get_owned_shares( ticker=ticker) new_buy = buy_utils.build_buy_order( ticker=ticker, close=close, num_owned=num_owned, shares=shares, balance=self.balance, commission=self.commission, date=dataset_date, minute=minute, use_key=f'{ticker}_{dataset_date}', details=order_details, is_live_trading=is_live_trading, reason=reason) prev_shares = num_owned if not prev_shares: prev_shares = 0 prev_bal = ae_consts.to_f(self.balance) if new_buy['status'] == ae_consts.TRADE_FILLED: if ticker in self.positions: self.positions[ticker]['shares'] = int( new_buy['shares']) self.positions[ticker]['buys'].append( new_buy) (self.num_owned, self.ticker_buys, self.ticker_sells) = self.get_ticker_positions( ticker=ticker) self.created_buy = True else: self.positions[ticker] = { 'shares': new_buy['shares'], 'buys': [ new_buy ], 'sells': [] } self.balance = new_buy['balance'] if self.verbose_trading: log.info( f'{self.name} - buy end {use_date} {ticker}@{close} ' f'{ae_consts.get_status(status=new_buy["status"])} ' f'shares={new_buy["shares"]} ' f'cost={new_buy["buy_price"]} bal={self.balance} ' f'prev_shares={prev_shares} prev_bal={prev_bal}') else: if self.verbose_trading: log.info( f'{self.name} - buy fail {use_date} {ticker}@{close} ' f'{ae_consts.get_status(status=new_buy["status"])} ' f'shares={num_owned} cost={new_buy["buy_price"]} ' f'bal={self.balance} ') # end of if trade worked or not # update the buys self.buys.append(new_buy) (self.num_owned, self.ticker_buys, self.ticker_sells) = self.get_ticker_positions( ticker=ticker) # record the ticker's event if it's a minute timeseries if minute: self.last_history_dict = self.get_trade_history_node() if self.latest_ind_report: for k in self.latest_ind_report: if k not in self.ind_conf_ignore_keys: self.last_history_dict[k] = ( self.latest_ind_report[k]) except Exception as e: self.debug_msg = ( f'{self.name} - buy {ticker}@{close} - FAILED with ex={e}') log.error(self.debug_msg) if self.raise_on_err: raise e
# end of try/ex # end of create_buy_order
[docs] def create_sell_order( self, ticker, row, minute=None, shares=None, reason=None, orient='records', date_format='iso', is_live_trading=False): """create_sell_order create a sell order at the close or ask price .. note:: setting the ``minute`` is required to build a minute-by-minute ``Trading History`` :param ticker: string ticker :param shares: optional - integer number of shares to sell if None sell all owned shares at the ``close`` :param row: ``pandas.DataFrame`` row record that will be converted to a json-serialized string :param minute: optional - string datetime when the order minute the order was placed. For ``day`` timeseries this is the close of trading (16:00:00 for the day) and for ``minute`` timeseries the value will be the latest minute from the ``self.df_minute`` ``pandas.DataFrame``. Normally this value should be set to the ``self.use_minute``, and the format is ``ae_consts.COMMON_TICK_DATE_FORMAT`` :param reason: optional - reason for creating the order which is useful for troubleshooting order histories :param orient: optional - pandas orient for ``row.to_json()`` :param date_format: optional - pandas date_format parameter for ``row.to_json()`` :param is_live_trading: optional - bool for filling trades for live trading or for backtest tuning filled (default ``False`` which is backtest mode) """ close = row['close'] required_amount_for_a_sell = self.commission if required_amount_for_a_sell > self.balance: if self.verbose_trading: log.info( f'{self.name} - sell - not enough funds={self.balance} < ' f'required={required_amount_for_a_sell} with ' f'shareds={self.num_owned}') return dataset_date = row['date'] use_date = dataset_date if minute: use_date = minute if self.verbose_trading: log.info( f'{self.name} - sell start {use_date} {ticker}@{close}') new_sell = None order_details = row if hasattr(row, 'to_json'): order_details = row.to_json( orient=orient, date_format=date_format), try: num_owned = self.get_owned_shares( ticker=ticker) new_sell = sell_utils.build_sell_order( ticker=ticker, close=close, num_owned=num_owned, shares=shares, balance=self.balance, commission=self.commission, date=dataset_date, minute=minute, use_key=f'{ticker}_{dataset_date}', details=order_details, is_live_trading=is_live_trading, reason=reason) prev_shares = num_owned if not prev_shares: prev_shares = 0 prev_bal = ae_consts.to_f(self.balance) if new_sell['status'] == ae_consts.TRADE_FILLED: if ticker in self.positions: self.positions[ticker]['shares'] = int( new_sell['shares']) self.positions[ticker]['sells'].append( new_sell) (self.num_owned, self.ticker_buys, self.ticker_sells) = self.get_ticker_positions( ticker=ticker) self.created_sell = True else: self.positions[ticker] = { 'shares': new_sell['shares'], 'buys': [], 'sells': [ new_sell ] } self.balance = new_sell['balance'] if self.verbose_trading: log.info( f'{self.name} - sell end {use_date} {ticker}@{close} ' f'{ae_consts.get_status(status=new_sell["status"])} ' f'shares={num_owned} cost={new_sell["sell_price"]} ' f'bal={self.balance} prev_shares={prev_shares} ' f'prev_bal={prev_bal}') else: if self.verbose_trading: log.info( f'{self.name} - sell fail {use_date} {ticker}@{close} ' f'{ae_consts.get_status(status=new_sell["status"])} ' f'shares={num_owned} cost={new_sell["sell_price"]} ' f'bal={self.balance}') # end of if trade worked or not # update the sells self.sells.append(new_sell) (self.num_owned, self.ticker_buys, self.ticker_sells) = self.get_ticker_positions( ticker=ticker) # record the ticker's event if it's a minute timeseries if minute: self.last_history_dict = self.get_trade_history_node() if self.latest_ind_report: for k in self.latest_ind_report: if k not in self.ind_conf_ignore_keys: self.last_history_dict[k] = ( self.latest_ind_report[k]) except Exception as e: self.debug_msg = ( f'{self.name} - sell {ticker}@{close} - FAILED with ex={e}') log.error(self.debug_msg) if self.raise_on_err: raise e
# end of try/ex # end of create_sell_order
[docs] def build_progress_label( self, progress, total): """build_progress_label create a progress label string for the logs :param progress: progress counter :param total: total number of counts """ percent_done = ae_consts.get_percent_done( progress=progress, total=total) progress_label = f'{percent_done} {progress}/{total}' return progress_label
# end of build_progress_label
[docs] def get_supported_tickers_in_data( self, data): """get_supported_tickers_in_data For all updates found in ``data`` compare to the supported list of ``self.tickers`` to make sure the updates are relevant for this algorithm. :param data: new data stream to process in this algo """ data_for_tickers = [] for ticker in self.tickers: if ticker in data: data_for_tickers.append( ticker) # end of finding tickers for this algo return data_for_tickers
# end of get_supported_tickers_in_data
[docs] def load_from_dataset( self, ds_data): """load_from_dataset Load the member variables from the extracted ``ds_data`` dataset. algorithms automatically provide the following member variables to ``myalgo.process()`` for quickly building algorithms: - ``self.df_daily`` - ``self.df_minute`` - ``self.df_calls`` - ``self.df_puts`` - ``self.df_quote`` - ``self.df_pricing`` - ``self.df_stats`` - ``self.df_peers`` - ``self.df_iex_news`` - ``self.df_financials`` - ``self.df_earnings`` - ``self.df_dividends`` - ``self.df_company`` - ``self.df_yahoo_news`` - ``self.df_tdcalls`` - ``self.df_tdputs`` .. note:: If a key is not in the dataset, the algorithms's member variable will be an empty ``pandas.DataFrame([])``. Please ensure the engine cached the dataset in redis using a tool like ``redis-cli`` to verify the values are in memory. :param ds_data: extracted, structured dataset from redis """ # back up for debugging/tracking/comparing self.last_ds_id = self.ds_id self.last_ds_date = self.ds_date self.last_ds_data = self.ds_data # load the new one self.ds_data = ds_data self.ds_id = self.ds_data.get( 'id', 'missing-ID') self.ds_date = self.ds_data.get( 'date', 'missing-DATE') self.ds_data = self.ds_data.get( 'data', 'missing-DATA') self.df_daily = self.ds_data.get( 'daily', self.empty_pd) self.df_minute = self.ds_data.get( 'minute', self.empty_pd) self.df_stats = self.ds_data.get( 'stats', self.empty_pd) self.df_peers = self.ds_data.get( 'peers', self.empty_pd) self.df_financials = self.ds_data.get( 'financials', self.empty_pd) self.df_earnings = self.ds_data.get( 'earnings', self.empty_pd) self.df_dividends = self.ds_data.get( 'dividends', self.empty_pd) self.df_quote = self.ds_data.get( 'quote', self.empty_pd) self.df_company = self.ds_data.get( 'company', self.empty_pd) self.df_iex_news = self.ds_data.get( 'news1', self.empty_pd) self.df_yahoo_news = self.ds_data.get( 'news', self.empty_pd) self.df_calls = self.ds_data.get( 'calls', self.empty_pd) self.df_puts = self.ds_data.get( 'puts', self.empty_pd) self.df_pricing = self.ds_data.get( 'pricing', {}) self.df_tdcalls = self.ds_data.get( 'tdcalls', self.empty_pd) self.df_tdputs = self.ds_data.get( 'tdputs', self.empty_pd) self.latest_min = None self.backtest_date = self.ds_date self.found_minute_data = False if not hasattr(self.df_daily, 'index'): self.df_daily = self.empty_pd if not hasattr(self.df_minute, 'index'): self.df_minute = self.empty_pd else: if 'date' in self.df_minute: self.latest_min = self.df_minute['date'].iloc[-1] self.found_minute_data = True if not self.found_minute_data: if ae_consts.is_df(self.df_tdcalls): if 'date' in self.df_tdcalls: self.latest_min = self.df_tdcalls['date'].iloc[-1] self.found_minute_data = True if ae_consts.is_df(self.df_tdputs): if 'date' in self.df_tdputs: self.latest_min = self.df_tdputs['date'].iloc[-1] self.found_minute_data = True if not hasattr(self.df_stats, 'index'): self.df_stats = self.empty_pd if not hasattr(self.df_peers, 'index'): self.df_peers = self.empty_pd if not hasattr(self.df_financials, 'index'): self.df_financials = self.empty_pd if not hasattr(self.df_earnings, 'index'): self.df_earnings = self.empty_pd if not hasattr(self.df_dividends, 'index'): self.df_dividends = self.empty_pd if not hasattr(self.df_quote, 'index'): self.df_quote = self.empty_pd if not hasattr(self.df_company, 'index'): self.df_company = self.empty_pd if not hasattr(self.df_iex_news, 'index'): self.df_iex_news = self.empty_pd if not hasattr(self.df_yahoo_news, 'index'): self.df_yahoo_news = self.empty_pd if not hasattr(self.df_calls, 'index'): self.df_calls = self.empty_pd if not hasattr(self.df_puts, 'index'): self.df_puts = self.empty_pd if not hasattr(self.df_pricing, 'index'): self.df_pricing = self.empty_pd if not hasattr(self.df_tdcalls, 'index'): self.df_tdcalls = self.empty_pd if not hasattr(self.df_tdputs, 'index'): self.df_tdputs = self.empty_pd # set internal values: self.trade_date = self.ds_date self.created_buy = False self.created_sell = False self.should_buy = False self.should_sell = False # by default assume close of trading for the day self.use_minute = f'{self.trade_date} 16:00:00' try: if hasattr(self.df_daily, 'index'): columns = list(self.df_daily.columns.values) if 'high' in columns: self.today_high = float( self.df_daily.iloc[-1]['high']) self.latest_high = self.today_high if 'low' in columns: self.today_low = float( self.df_daily.iloc[-1]['low']) self.latest_low = self.today_low if 'open' in columns: self.today_open = float( self.df_daily.iloc[-1]['open']) self.latest_open = self.today_open if 'close' in columns: self.today_close = float( self.df_daily.iloc[-1]['close']) self.trade_price = self.today_close self.latest_close = self.trade_price if not self.starting_close: self.starting_close = self.today_close if 'volume' in columns: self.today_volume = int( self.df_daily.iloc[-1]['volume']) self.latest_volume = self.today_volume if hasattr(self.df_minute, 'index'): columns = list(self.df_minute.columns.values) if 'high' in columns: self.latest_high = float( self.df_minute.iloc[-1]['high']) if 'low' in columns: self.latest_low = float( self.df_minute.iloc[-1]['low']) if 'open' in columns: self.latest_open = float( self.df_minute.iloc[-1]['open']) if 'close' in columns: self.latest_close = float( self.df_minute.iloc[-1]['close']) self.trade_price = self.latest_close if not self.starting_close: self.starting_close = self.latest_close if 'volume' in columns: self.latest_volume = int( self.df_minute.iloc[-1]['volume']) except Exception as e: self.debug_msg = ( f'{self.name} handle - FAILED getting latest prices ' f'for algo={self.ds_id} - ds={self.ds_date} ex={e}') log.error(self.debug_msg) if self.raise_on_err: raise e
# end of trying to get the latest prices out of the # datasets # end of load_from_dataset
[docs] def reset_for_next_run( self): """reset_for_next_run work in progress - clean up all internal member variables for another run .. note:: random or probablistic predictions may not create the same trading history_output_file """ self.debug_msg = '' self.loaded_dataset = None self.last_history_dict = None self.last_handle_data = None self.order_history = [] self.use_minute = None self.intraday_start_min = None self.intraday_end_min = None self.intraday_events = {}
# end of reset_for_next_run
[docs] def populate_intraday_events_dict( self, start_min, end_min): """populate_intraday_events_dict For tracking intraday buy/sell/news events with indicators use this method to build a dictionary where keys are the minutes between ``start_date`` and ``end_date``. If both are ``None`` then the ``self.df_minute`` :param start_min: start datetime for building the ``self.intraday_events`` dictionary keys :param end_min: end datetime for building the ``self.intraday_events`` dictionary keys """ self.intraday_events = {} if not self.found_minute_data: return if end_min < start_min: raise Exception( 'Invalid end_min must be greater than start_min - ' 'self.populate_intraday_events_dict(' f'start_min={start_min}, end_min={end_min}) ' f'algo={self.name}') num_minutes = ((end_min - start_min).total_seconds() / 60.0) + 1 if num_minutes > 1440: raise Exception( f'Invalid number of minutes={num_minutes} between ' f'start_min={start_min} and end_min={end_min} is more than ' 'the number of minutes in a single day: 1440 ' f'algo={self.name}') log.info(f'num_minutes={num_minutes} between: {start_min} - {end_min}') self.intraday_start_min = start_min self.intraday_end_min = end_min cur_min = start_min while cur_min <= end_min: min_str = cur_min.strftime(ae_consts.COMMON_TICK_DATE_FORMAT) self.intraday_events[min_str] = {} for t in self.tickers: self.intraday_events[min_str][t] = [] cur_min += datetime.timedelta(minutes=1)
# end of while minutes to add to the self.intraday_events dict # end of populate_intraday_events_dict
[docs] def record_trade_history_for_dataset( self, node): """record_trade_history_for_dataset Build a daily or minute-by-minute trading history To run an algorithm minute-by-minute set the configuration to use: .. code-block:: python 'timeseries': 'minute' :param node: cached dataset dictionary node """ # if set to minutes, but this dataset is missing minute-data # then record as if it was a daily use_day_timeseries = ( self.timeseries_value == ae_consts.ALGO_TIMESERIES_DAY) use_minute_timeseries = ( self.timeseries_value == ae_consts.ALGO_TIMESERIES_MINUTE) if use_day_timeseries or ( not self.found_minute_data and use_minute_timeseries): self.use_minute = f'{self.trade_date} 16:00:00' self.last_history_dict = self.get_trade_history_node() if self.last_history_dict: if self.latest_ind_report: for k in self.latest_ind_report: if k not in self.ind_conf_ignore_keys: self.last_history_dict[k] = ( self.latest_ind_report[k]) self.order_history.append(self.last_history_dict) # end of if day timeseries elif (use_minute_timeseries and self.found_minute_data): # add the end of day point to the history self.last_history_dict = self.get_trade_history_node() if self.last_history_dict: if self.latest_ind_report: for k in self.latest_ind_report: if k not in self.ind_conf_ignore_keys: self.last_history_dict[k] = ( self.latest_ind_report[k]) self.order_history.append(self.last_history_dict) else: raise Exception( f'Unsupported self.timeseries={self.timeseries} and ' f'self.found_minute_data={self.found_minute_data} - ' 'please use timeseries=day or timeseries=minute or ' 'timeseries=intraday and ensure the ' 'datasets have \'minute\' data')
# end of processing trading history for this dataset # end of record_trade_history_for_dataset
[docs] def handle_data( self, data): """handle_data process new data for the algorithm using a multi-ticker mapping structure :param data: dictionary of extracted data from the redis pipeline with a structure: :: ticker = 'SPY' # string usually: YYYY-MM-DD date = '2018-11-05' # redis cache key for the dataset format: <ticker>_<date> dataset_id = f'{ticker}_{date}' dataset = { ticker: [ { 'id': dataset_id, 'date': date, 'data': { 'daily': pd.DataFrame([]), 'minute': pd.DataFrame([]), 'quote': pd.DataFrame([]), 'stats': pd.DataFrame([]), 'peers': pd.DataFrame([]), 'news1': pd.DataFrame([]), 'financials': pd.DataFrame([]), 'earnings': pd.DataFrame([]), 'dividends': pd.DataFrame([]), 'calls': pd.DataFrame([]), 'puts': pd.DataFrame([]), 'pricing': pd.DataFrame([]), 'news': pd.DataFrame([]) } } ] } """ self.debug_msg = ( f'{self.name} handle - start') if self.loaded_dataset: if self.verbose: log.info( f'{self.name} handle - using existing dataset ' f'file={self.dsload_output_file} ' f's3={self.dsload_s3_key} ' f'redis={self.dsload_redis_key}') data = self.loaded_dataset data_for_tickers = self.get_supported_tickers_in_data( data=data) num_tickers = len(data_for_tickers) if num_tickers > 0: self.debug_msg = ( f'{self.name} handle - ' f'tickers={json.dumps(data_for_tickers)}') for ticker in data_for_tickers: num_ticker_datasets = len(data[ticker]) cur_idx = 1 for idx, node in enumerate(data[ticker]): node_date = node.get('date', 'missing-date') track_label = self.build_progress_label( progress=cur_idx, total=num_ticker_datasets) algo_id = ( f'{ticker} {track_label}') self.debug_msg = ( f'{self.name} handle - {algo_id} - ' f'id={node["id"]} ds={node_date}') valid_run = False if self.run_this_date: if node_date == self.run_this_date: log.critical( f'{self.name} handle - starting at ' f'date={node_date} with just this dataset: ') log.info( f'{node["data"]}') valid_run = True self.verbose = True self.verbose_trading = True if self.inspect_dataset: self.view_date_dataset_records( algo_id=algo_id, ticker=ticker, node=node) else: valid_run = True if valid_run: self.ticker = ticker self.prev_bal = self.balance self.prev_num_owned = self.num_owned (self.num_owned, self.ticker_buys, self.ticker_sells) = self.get_ticker_positions( ticker=ticker) use_daily_timeseries = ( self.timeseries_value == ae_consts.ALGO_TIMESERIES_DAY) node['data']['custom'] = self.include_custom if use_daily_timeseries: self.handle_daily_dataset( algo_id=algo_id, ticker=ticker, node=node) else: self.handle_minute_dataset( algo_id=algo_id, ticker=ticker, node=node, start_row=node.get('start_row', 0)) # end of processing datasets for day vs minute # if not debugging a specific dataset in the cache if (self.show_balance and (self.num_buys > 0 or self.num_sells > 0)): self.debug_msg = ( f'{self.name} handle - plot start balance') self.plot_trading_history_with_balance( algo_id=algo_id, ticker=ticker, node=node) self.debug_msg = ( f'{self.name} handle - plot done balance') # if showing plots while the algo runs if self.verbose: log.info( f'{self.name} done {node_date}') cur_idx += 1 # for all supported tickers # store the last handle dataset self.last_handle_data = data self.debug_msg = ( f'{self.name} handle - end tickers={num_tickers}')
# end of handle_data
[docs] def handle_daily_dataset( self, algo_id, ticker, node): """handle_daily_dataset handle running the algorithm with daily values This method will call ``BaseAlgo.process()`` once per day which is also utilizing the daily caching strategy :param algo_id: string - algo identifier label for debugging datasets during specific dates :param ticker: string - ticker :param node: dataset to process """ # parse the dataset node and set member variables self.debug_msg = ( f'{ticker} START - load dataset id={node.get("id", "missing-id")}') self.load_from_dataset( ds_data=node) self.debug_msg = ( f'{ticker} END - load dataset id={node.get("id", "missing-id")}') """ Indicator Processor processes the dataset: df_daily """ self.latest_buys = [] self.latest_sells = [] if self.iproc: self.debug_msg = f'{ticker} BASEALGO-START - indicator processing' self.latest_ind_report = self.iproc.process( algo_id=algo_id, ticker=self.ticker, dataset=node) self.latest_buys = self.latest_ind_report.get( 'buys', []) self.latest_sells = self.latest_ind_report.get( 'sells', []) self.debug_msg = f'{ticker} BASEALGO-END - indicator processing' # end of indicator processing self.num_latest_buys = len(self.latest_buys) self.num_latest_sells = len(self.latest_sells) """ Call the Algorithm's process() method """ self.debug_msg = ( f'{ticker} START - process id={node.get("id", "missing-id")}') self.process( algo_id=algo_id, ticker=self.ticker, dataset=node) self.debug_msg = ( f'{ticker} END - process id={node.get("id", "missing-id")}') """ Execute trades based off self.trade_strategy """ self.debug_msg = ( f'{ticker} START - trade id={node.get("id", "missing-id")}') self.trade_off_indicator_buy_and_sell_signals( ticker=ticker, algo_id=algo_id, reason_for_buy=self.buy_reason, reason_for_sell=self.sell_reason) self.debug_msg = ( f'{ticker} END - trade id={node.get("id", "missing-id")}') """ Record the Trading History record analysis/review using: myalgo.get_result() """ self.debug_msg = ( f'{ticker} START - history id={node.get("id", "missing-id")}') self.record_trade_history_for_dataset( node=node) self.debug_msg = ( f'{ticker} END - history id={node.get("id", "missing-id")}')
# end of handle_daily_dataset
[docs] def prepare_for_new_indicator_run( self): """prepare_for_new_indicator_run Call this for non-daily datasets specifically if the algorithm is using ``minute`` timeseries """ self.prev_bal = self.balance self.prev_num_owned = self.num_owned self.should_buy = False self.should_sell = False self.num_latest_buys = 0 self.num_latest_sells = 0
# end of prepare_for_new_indicator_run
[docs] def handle_minute_dataset( self, algo_id, ticker, node, start_row=0): """handle_minute_dataset handle running the algorithm with daily values This method will call ``BaseAlgo.process()`` once per day which is also utilizing the daily caching strategy :param algo_id: string - algo identifier label for debugging datasets during specific dates :param ticker: string - ticker :param node: dataset to process :param start_row: start row default is ``0`` """ # parse the dataset node and set member variables node_id = node.get('id', 'missing-id') node_date = node.get('date', 'missing-date') self.debug_msg = ( f'{ticker} START - load dataset id={node_id}') self.load_from_dataset( ds_data=node) self.debug_msg = ( f'{ticker} END - load dataset id={node_id}') if not self.found_minute_data: if self.verbose: log.error( f'algo={self.name} is missing minute data for ' f'day={node_date}') """ Record the Trading History record analysis/review using: myalgo.get_result() """ self.use_minute = ( f'{self.trade_date} 16:00:00') self.debug_msg = ( f'{ticker} START - saving for missing minute history ' f'id={node_id}') self.record_trade_history_for_dataset( node=node) self.debug_msg = ( f'{ticker} END - for missing history id={node_id}') return num_rows = len(self.df_minute.index) if num_rows == 0: log.warn( f'no minute data for {ticker} on: {node_id} ' f'rows={num_rows}') return # if no minute data found for minute_idx, row in self.df_minute[start_row:].iterrows(): # map the latest values for the algo to use # as if the minute was the latest trading time # as it iterates minute-by-minute self.latest_min = row.get('date', None) if not self.latest_min: log.warn( f'no cached minute data found in cache for {ticker} ' f'on: {node_id} rows={num_rows}') return self.latest_high = row.get('high', None) self.latest_low = row.get('low', None) self.latest_open = row.get('open', None) self.latest_close = row.get('close', None) self.latest_volume = row.get('volume', None) self.trade_price = self.latest_close self.use_minute = self.latest_min.strftime( ae_consts.COMMON_TICK_DATE_FORMAT) self.show_log = False # log every 5 days just to see progress if self.last_minute: num_day_since_last_log = ( self.latest_min - self.last_minute).days if num_day_since_last_log > 5: self.show_log = True self.last_minute = self.latest_min else: # start on monday if self.latest_min.weekday() == 0: self.last_minute = self.latest_min if not self.starting_close: self.starting_close = self.latest_close # allow algos to set these custom strings # for tracking why a buy and sell happened self.buy_reason = None self.sell_reason = None self.prepare_for_new_indicator_run() track_label = self.build_progress_label( progress=(minute_idx + 1), total=num_rows) minute_algo_id = ( f'{algo_id} at minute ' f'{self.latest_min} - {track_label}') (self.num_owned, self.ticker_buys, self.ticker_sells) = self.get_ticker_positions( ticker=ticker) """ Indicator Processor processes the dataset: minute df """ self.latest_buys = [] self.latest_sells = [] if self.iproc: self.debug_msg = ( f'{ticker} START - indicator processing ' f'daily [0-{minute_idx + 1}]') # prune off the minutes that are not the latest node['data']['minute'] = self.df_minute.iloc[0:(minute_idx+1)] self.latest_ind_report = self.iproc.process( algo_id=minute_algo_id, ticker=self.ticker, dataset=node) self.latest_buys = self.latest_ind_report.get( 'buys', []) self.latest_sells = self.latest_ind_report.get( 'sells', []) self.debug_msg = ( f'{ticker} END - indicator processing') # end of indicator processing self.num_latest_buys = len(self.latest_buys) self.num_latest_sells = len(self.latest_sells) if self.inspect_datasets: self.inspect_dataset( algo_id=algo_id, ticker=ticker, dataset=node) """ Call the Algorithm's process() method """ self.debug_msg = ( f'{ticker} START - process id={node_id}') self.process( algo_id=algo_id, ticker=self.ticker, dataset=node) self.debug_msg = ( f'{ticker} END - process id={node_id}') """ Execute trades based off self.trade_strategy """ self.debug_msg = ( f'{ticker} START - trade id={node_id}') self.trade_off_indicator_buy_and_sell_signals( ticker=ticker, algo_id=algo_id, reason_for_buy=self.buy_reason, reason_for_sell=self.sell_reason) self.debug_msg = ( f'{ticker} END - trade id={node_id}') """ Record the Trading History record analysis/review using: myalgo.get_result() """ self.debug_msg = ( f'{ticker} START - history id={node_id}') self.record_trade_history_for_dataset( node=node) self.debug_msg = ( f'{ticker} END - history id={node_id}')
# end for all rows in the minute dataset # end of handle_minute_dataset
[docs] def plot_trading_history_with_balance( self, algo_id, ticker, node): """ This will live plot the trading history after each day is done :param algo_id: string - algo identifier label for debugging datasets during specific dates :param ticker: string - ticker :param node: dataset to process """ trading_history_dict = self.get_history_dataset() history_df = trading_history_dict[ticker] if not hasattr(history_df, 'to_json'): return first_date = history_df['date'].iloc[0] end_date = history_df['date'].iloc[-1] title = ( f'Trading History {ticker} for Algo ' f'{trading_history_dict["algo_name"]}\n' f'Backtest dates from {first_date} to {end_date}') use_xcol = 'date' use_as_date_format = '%d\n%b' if self.config_dict['timeseries'] == 'minute': use_xcol = 'minute' use_as_date_format = '%d %H:%M:%S\n%b' xlabel = f'Dates vs {trading_history_dict["algo_name"]} values' ylabel = f'Algo {trading_history_dict["algo_name"]}\nvalues' df_filter = (history_df['close'] > 0.01) # set default columns: red = self.red_column blue = self.blue_column green = self.green_column orange = self.orange_column plot_trading_history.plot_trading_history( title=title, df=history_df, red=red, blue=blue, green=green, orange=orange, date_col=use_xcol, date_format=use_as_date_format, xlabel=xlabel, ylabel=ylabel, df_filter=df_filter, show_plot=True, dropna_for_all=True)
# end of plot_trading_history_with_balance
[docs] def load_custom_datasets( self): """load_custom_datasets Handler for loading custom datasets for indicators .. tip:: Custom datasets allow indicators to analyze more than the default pricing data provided by ``IEX Cloud`` and ``Tradier``. This is helpful for building indicators to analyze and train AI from a previous algorithm ``Trading History``. """ label = f'load_custom_ds' ticker = self.tickers[0] ds_list = self.config_dict.get( 'custom_datasets', []) redis_address = ae_consts.REDIS_ADDRESS redis_db = ae_consts.REDIS_DB redis_password = ae_consts.REDIS_PASSWORD for node in ds_list: ds_key = node['ds_key'] redis_loc = node.get('redis_loc', None) s3_loc = node.get('s3_loc', None) ds_type = node.get('type', 'trade_history').lower() label = f'load_custom_ds_{ds_key}', log.info( f'loading {ds_key} {ds_type} from ' f'redis={redis_loc} ' f's3={s3_loc}') custom_ds = None publish_to_redis = False if redis_loc: if ds_type == 'trade_history': get_result = redis_get.get_data_from_redis_key( label=label, host=redis_address.split(':')[0], port=redis_address.split(':')[-1], password=redis_password, db=redis_db, key=redis_loc, decompress_df=True) if get_result['status'] == ae_consts.SUCCESS: publish_to_redis = False custom_ds = get_result['rec']['data'] else: log.info( f'did not find {ds_key} {ds_type} in redis=' f'{redis_loc}') publish_to_redis = True # if need to load from redis if ( s3_loc and publish_to_redis): s3_bucket = s3_loc.split('/')[2] s3_key = s3_loc.split('/')[-1] if ds_type == 'trade_history': load_res = load_history_utils.load_history_dataset( s3_bucket=s3_bucket, s3_key=s3_key) custom_ds = load_res[ticker] publish_to_redis = True # if need to download from s3 if ds_type == 'trade_history': if ticker in custom_ds: custom_ticker_df = pd.DataFrame( custom_ds[ticker]) if ae_consts.is_df(df=custom_ticker_df): date_cols = [ 'date', 'minute' ] for d in date_cols: if d in custom_ticker_df: custom_ticker_df[d] = pd.to_datetime( custom_ticker_df[d]) if 'minute' in custom_ticker_df: custom_ticker_df.sort_values( by=[ 'minute' ], ascending=True) elif 'date' in custom_ticker_df: custom_ticker_df.sort_values( by=[ 'date' ], ascending=True) custom_ds[ticker] = custom_ticker_df self.include_custom[ds_key] = custom_ds if publish_to_redis: log.info( f'publishing {ds_key} to redis={redis_loc}') publish.publish( data=custom_ds, df_compress=True, convert_to_json=False, compress=False, label=f'load_custom_ds_{ds_key}', redis_enabled=True, redis_key=redis_loc, redis_address=redis_address, redis_db=redis_db, redis_password=redis_password, s3_enabled=False, slack_enabled=False, verbose=False)
# end of publishing to redis for speeding up next run # end of for all custom datasets to load # end of load_custom_datasets # end of BaseAlgo