Build Custom Algorithms Using the Base Algorithm Class

Algorithms automatically provide the following member variables to any custom algorithm that derives the analysis_engine.algo.BaseAlgo.process method.

By deriving the process() member method using an inherited class, you can quickly build algorithms that determine buy and sell conditions from any of the automatically extracted datasets from the redis pipeline:

  • self.df_daily
  • self.df_minute
  • self.df_calls
  • self.df_puts
  • self.df_quote
  • self.df_pricing
  • self.df_stats
  • self.df_peers
  • self.df_iex_news
  • self.df_financials
  • self.df_earnings
  • self.df_dividends
  • self.df_company
  • self.df_yahoo_news
  • self.df_tdcalls
  • self.df_tdputs

Recent Pricing Information

  • self.latest_close
  • self.latest_high
  • self.latest_open
  • self.latest_low
  • self.latest_volume
  • self.today_close
  • self.today_high
  • self.today_open
  • self.today_low
  • self.today_volume
  • self.ask
  • self.bid

Latest Backtest Date and Intraday Minute

  • self.latest_min
  • self.backtest_date

Note

self.latest_min - Latest minute row in self.df_minute

Note

self.backtest_date - Latest dataset date which is considered the backtest date for historical testing with the data pipeline structure (it’s the date key in the dataset node root level)

Trading Strategy

  • self.trade_strategy = 'count' - if the number of indicators
    saying buy or sell exceeds the buy/sell rules min_indicators the algorithm will trigger a buy or sell
  • self.buy_reason - derived algorithms can attach custom
    buy reasons as a string to each trade order
  • self.sell_reason - derived algorithms can attach custom
    sell reasons as a string to each trade order

Timeseries

  • self.timeseries- use an algorithm config to set

    day or minute to process daily or intraday minute by minute datasets. Indicators will still have access to all datasets, this just makes it easier to utilize the helper within an indicator to quickly get the correct dataset:

    df_status, use_df = self.get_subscribed_dataset(
        dataset=dataset)
    

Balance Information

  • self.balance - current algorithm account balance
  • self.prev_bal - previous balance
  • self.net_value - total value the algorithm has
    left remaining since starting trading. this includes the number of self.num_owned shares with the self.latest_close price included
  • self.net_gain - amount the algorithm has
    made since starting including owned shares with the self.latest_close price included

Note

If a key is not in the dataset, the algorithms’s member variable will be an empty pandas DataFrame created with: pandas.DataFrame([]) except self.pricing which is just a dictionary. Please ensure the engine successfully fetched and cached the dataset in redis using a tool like redis-cli and a query of keys * or keys <TICKER>_* on large deployments.

Indicator Information

  • self.buy_rules - optional - custom dictionary for passing
    buy-side business rules to a custom algorithm
  • self.sell_rules - optional - custom dictionary for passing
    sale-side business rules to a custom algorithm
  • self.min_buy_indicators - if self.buy_rules has
    a value for buying if a minimum number of indicators detect a value that is within a buy condition
  • self.min_sell_indicators - if self.sell_rules has
    a value for selling if a minimum number of indicators detect a value that is within a sell condition
  • self.latest_ind_report - latest dictionary of values
    from the IndicatorProcessor.process()
  • self.latest_buys - latest indicators saying buy
  • self.latest_sells - latest indicators saying sell
  • self.num_latest_buys - latest number of indicators saying buy
  • self.num_latest_sells - latest number of indicators saying sell
  • self.iproc - member variables for the IndicatorProcessor
    that holds all of the custom algorithm indicators

Indicator buy and sell records in self.latest_buys and self.latest_sells have a dictionary structure:

{
    'name': indicator_name,
    'id': indicator_id,
    'report': indicator_report_dict,
    'cell': indicator cell number
}

Supported environment variables

# to show debug, trace logging please export ``SHARED_LOG_CFG``
# to a debug logger json file. To turn on debugging for this
# library, you can export this variable to the repo's
# included file with the command:
export SHARED_LOG_CFG=/opt/sa/analysis_engine/log/debug-logging.json
class analysis_engine.algo.BaseAlgo(ticker=None, balance=5000.0, commission=6.0, tickers=None, name=None, use_key=None, auto_fill=True, version=1, config_file=None, config_dict=None, output_dir=None, publish_to_slack=False, publish_to_s3=False, publish_to_redis=False, publish_input=True, publish_history=True, publish_report=True, load_from_s3_bucket=None, load_from_s3_key=None, load_from_redis_key=None, load_from_file=None, load_compress=False, load_publish=True, load_config=None, report_redis_key=None, report_s3_bucket=None, report_s3_key=None, report_file=None, report_compress=False, report_publish=True, report_config=None, history_redis_key=None, history_s3_bucket=None, history_s3_key=None, history_file=None, history_compress=False, history_publish=True, history_config=None, extract_redis_key=None, extract_s3_bucket=None, extract_s3_key=None, extract_file=None, extract_save_dir=None, extract_compress=False, extract_publish=True, extract_config=None, dataset_type=20000, serialize_datasets=['daily', 'minute', 'quote', 'stats', 'peers', 'news1', 'financials', 'earnings', 'dividends', 'company', 'news', 'calls', 'puts', 'pricing', 'tdcalls', 'tdputs'], timeseries=None, trade_strategy=None, verbose=False, verbose_processor=False, verbose_indicators=False, verbose_trading=False, verbose_load=False, verbose_extract=False, verbose_history=False, verbose_report=False, inspect_datasets=False, raise_on_err=True, **kwargs)[source]

Run an algorithm against multiple tickers at once through the redis dataframe pipeline provided by analysis_engine.extract.extract.

Data Pipeline Structure

This algorithm can handle an extracted dictionary with structure:

import pandas as pd
from analysis_engine.algo import BaseAlgo
ticker = 'SPY'
demo_algo = BaseAlgo(
    ticker=ticker,
    balance=1000.00,
    commission=6.00,
    name=f'test-{ticker}')
date = '2018-11-05'
dataset_id = f'{ticker}_{date}'
# mock the data pipeline in redis:
data = {
    ticker: [
        {
            'id': dataset_id,
            'date': date,
            'data': {
                'daily': pd.DataFrame([
                    {
                        'high': 280.01,
                        'low': 270.01,
                        'open': 275.01,
                        'close': 272.02,
                        'volume': 123,
                        'date': '2018-11-01 15:59:59'
                    },
                    {
                        'high': 281.01,
                        'low': 271.01,
                        'open': 276.01,
                        'close': 273.02,
                        'volume': 124,
                        'date': '2018-11-02 15:59:59'
                    },
                    {
                        'high': 282.01,
                        'low': 272.01,
                        'open': 277.01,
                        'close': 274.02,
                        'volume': 121,
                        'date': '2018-11-05 15:59:59'
                    }
                ]),
                'calls': pd.DataFrame([]),
                'puts': pd.DataFrame([]),
                'minute': pd.DataFrame([]),
                'pricing': pd.DataFrame([]),
                'quote': pd.DataFrame([]),
                'news': pd.DataFrame([]),
                'news1': pd.DataFrame([]),
                'dividends': pd.DataFrame([]),
                'earnings': pd.DataFrame([]),
                'financials': pd.DataFrame([]),
                'stats': pd.DataFrame([]),
                'peers': pd.DataFrame([]),
                'company': pd.DataFrame([])
            }
        }
    ]
}

# run the algorithm
demo_algo.handle_data(data=data)

# get the algorithm results
results = demo_algo.get_result()

print(results)
build_progress_label(progress, total)[source]

create a progress label string for the logs

Parameters:
  • progress – progress counter
  • total – total number of counts
build_ticker_history(ticker, ignore_keys)[source]

For all records in self.order_history compile a filter list of history records per ticker while pruning any keys that are in the list of ignore_keys

Parameters:
  • ticker – string ticker symbol
  • ignore_history_keys – list of keys to not include in the history report
create_algorithm_ready_dataset()[source]

Create the Algorithm-Ready dataset during the self.publish_input_dataset() member method. Inherited Algorithm classes can derive how they build a custom Algorithm-Ready dataset before publishing by implementing this method in the derived class.

create_buy_order(ticker, row, minute=None, shares=None, reason=None, orient='records', date_format='iso', is_live_trading=False)[source]

create a buy order at the close or ask price

Note

setting the minute is required to build a minute-by-minute Trading History

Parameters:
  • ticker – string ticker
  • shares – optional - integer number of shares to buy if None buy max number of shares at the close with the available balance amount.
  • rowdictionary or pandas.DataFrame row record that will be converted to a json-serialized string
  • minute – optional - string datetime when the order minute the order was placed. For day timeseries this is the close of trading (16:00:00 for the day) and for minute timeseries the value will be the latest minute from the self.df_minute pandas.DataFrame. Normally this value should be set to the self.use_minute, and the format is ae_consts.COMMON_TICK_DATE_FORMAT
  • reason – optional - reason for creating the order which is useful for troubleshooting order histories
  • orient – optional - pandas orient for row.to_json()
  • date_format – optional - pandas date_format parameter for row.to_json()
  • is_live_trading – optional - bool for filling trades for live trading or for backtest tuning filled (default False which is backtest mode)
create_history_dataset()[source]

Create the Trading History dataset during the self.publish_trade_history_dataset() member method. Inherited Algorithm classes can derive how they build a custom Trading History dataset before publishing by implementing this method in the derived class.

create_report_dataset()[source]

Create the Trading Performance Report dataset during the self.publish_input_dataset() member method. Inherited Algorithm classes can derive how they build a custom Trading Performance Report dataset before publishing by implementing this method in the derived class.

create_sell_order(ticker, row, minute=None, shares=None, reason=None, orient='records', date_format='iso', is_live_trading=False)[source]

create a sell order at the close or ask price

Note

setting the minute is required to build a minute-by-minute Trading History

Parameters:
  • ticker – string ticker
  • shares – optional - integer number of shares to sell if None sell all owned shares at the close
  • rowpandas.DataFrame row record that will be converted to a json-serialized string
  • minute – optional - string datetime when the order minute the order was placed. For day timeseries this is the close of trading (16:00:00 for the day) and for minute timeseries the value will be the latest minute from the self.df_minute pandas.DataFrame. Normally this value should be set to the self.use_minute, and the format is ae_consts.COMMON_TICK_DATE_FORMAT
  • reason – optional - reason for creating the order which is useful for troubleshooting order histories
  • orient – optional - pandas orient for row.to_json()
  • date_format – optional - pandas date_format parameter for row.to_json()
  • is_live_trading – optional - bool for filling trades for live trading or for backtest tuning filled (default False which is backtest mode)
determine_indicator_datasets()[source]

Indicators are coupled to a dataset in the algorithm config file. This allows for identifying the exact datasets to pull from Redis to speed up backtesting.

get_balance()[source]
get_buys()[source]
get_commission()[source]
get_debug_msg()[source]

debug algorithms that failed by viewing the last self.debug_msg they set

get_history_dataset()[source]
get_indicator_datasets()[source]
get_indicator_process_last_indicator()[source]

Used to pull the indicator object back up to any created analysis_engine.algo.BaseAlgo objects

Tip

this is for debugging data and code issues inside an indicator

get_indicator_processor(existing_processor=None)[source]

singleton for getting the indicator processor

Parameters:existing_processor – allow derived algos to build their own indicator processor and pass it to the base
get_name()[source]
get_owned_shares(ticker)[source]
Parameters:ticker – ticker to lookup
get_report_dataset()[source]
get_result()[source]
get_sells()[source]
get_supported_tickers_in_data(data)[source]

For all updates found in data compare to the supported list of self.tickers to make sure the updates are relevant for this algorithm.

Parameters:data – new data stream to process in this algo
get_ticker_positions(ticker)[source]

get the current positions for a ticker and returns a tuple: num_owned (integer), buys (list), sells (list)`

num_owned, buys, sells = self.get_ticker_positions(
    ticker=ticker)
Parameters:ticker – ticker to lookup
get_tickers()[source]
get_trade_history_node()[source]

Helper for quickly building a history node on a derived algorithm. Whatever member variables are in the base class analysis_engine.algo.BaseAlgo will be added automatically into the returned: historical transaction dictionary

Tip

if you get a None back it means there could be a bug in how you are using the member variables (likely created an invalid math calculation) or could be a bug in the helper: build_trade_history_entry

handle_daily_dataset(algo_id, ticker, node)[source]

handle running the algorithm with daily values

This method will call BaseAlgo.process() once per day which is also utilizing the daily caching strategy

Parameters:
  • algo_id – string - algo identifier label for debugging datasets during specific dates
  • ticker – string - ticker
  • node – dataset to process
handle_data(data)[source]

process new data for the algorithm using a multi-ticker mapping structure

Parameters:data

dictionary of extracted data from the redis pipeline with a structure:

ticker = 'SPY'
# string usually: YYYY-MM-DD
date = '2018-11-05'
# redis cache key for the dataset format: <ticker>_<date>
dataset_id = f'{ticker}_{date}'
dataset = {
    ticker: [
        {
            'id': dataset_id,
            'date': date,
            'data': {
                'daily': pd.DataFrame([]),
                'minute': pd.DataFrame([]),
                'quote': pd.DataFrame([]),
                'stats': pd.DataFrame([]),
                'peers': pd.DataFrame([]),
                'news1': pd.DataFrame([]),
                'financials': pd.DataFrame([]),
                'earnings': pd.DataFrame([]),
                'dividends': pd.DataFrame([]),
                'calls': pd.DataFrame([]),
                'puts': pd.DataFrame([]),
                'pricing': pd.DataFrame([]),
                'news': pd.DataFrame([])
            }
        }
    ]
}
handle_minute_dataset(algo_id, ticker, node, start_row=0)[source]

handle running the algorithm with daily values

This method will call BaseAlgo.process() once per day which is also utilizing the daily caching strategy

Parameters:
  • algo_id – string - algo identifier label for debugging datasets during specific dates
  • ticker – string - ticker
  • node – dataset to process
  • start_row – start row default is 0
inspect_dataset(algo_id, ticker, dataset)[source]

Use this method inside of an algorithm’s process() method to view the available datasets in the redis cache

Parameters:
  • algo_id – string - algo identifier label for debugging datasets during specific dates
  • ticker – string - ticker
  • dataset – a dictionary of identifiers (for debugging) and
load_custom_datasets()[source]

Handler for loading custom datasets for indicators

Tip

Custom datasets allow indicators to analyze more than the default pricing data provided by IEX Cloud and Tradier. This is helpful for building indicators to analyze and train AI from a previous algorithm Trading History.

load_from_config(config_dict)[source]

support for replaying algorithms from a trading history

Parameters:config_dict – algorithm configuration values usually from a previous trading history or for quickly testing dataset theories in a development environment
load_from_dataset(ds_data)[source]

Load the member variables from the extracted ds_data dataset.

algorithms automatically provide the following member variables to myalgo.process() for quickly building algorithms:

  • self.df_daily
  • self.df_minute
  • self.df_calls
  • self.df_puts
  • self.df_quote
  • self.df_pricing
  • self.df_stats
  • self.df_peers
  • self.df_iex_news
  • self.df_financials
  • self.df_earnings
  • self.df_dividends
  • self.df_company
  • self.df_yahoo_news
  • self.df_tdcalls
  • self.df_tdputs

Note

If a key is not in the dataset, the algorithms’s member variable will be an empty pandas.DataFrame([]). Please ensure the engine cached the dataset in redis using a tool like redis-cli to verify the values are in memory.

Parameters:ds_data – extracted, structured dataset from redis
load_from_external_source(path_to_file=None, s3_bucket=None, s3_key=None, redis_key=None)[source]

Load an algorithm-ready dataset for handle_data backtesting and trade performance analysis from:

  • Local file
  • S3
  • Redis
Parameters:
  • path_to_file – optional - path to local file
  • s3_bucket – optional - s3 s3_bucket
  • s3_key – optional - s3 key
  • redis_key – optional - redis key
plot_trading_history_with_balance(algo_id, ticker, node)[source]

This will live plot the trading history after each day is done

Parameters:
  • algo_id – string - algo identifier label for debugging datasets during specific dates
  • ticker – string - ticker
  • node – dataset to process
populate_intraday_events_dict(start_min, end_min)[source]

For tracking intraday buy/sell/news events with indicators use this method to build a dictionary where keys are the minutes between start_date and end_date. If both are None then the self.df_minute

Parameters:
  • start_min – start datetime for building the self.intraday_events dictionary keys
  • end_min – end datetime for building the self.intraday_events dictionary keys
prepare_for_new_indicator_run()[source]

Call this for non-daily datasets specifically if the algorithm is using minute timeseries

process(algo_id, ticker, dataset)[source]

Derive custom algorithm buy and sell conditions before placing orders. Just implement your own process method.

Parameters:
  • algo_id – string - algo identifier label for debugging datasets during specific dates
  • ticker – string - ticker
  • dataset

    a dictionary of identifiers (for debugging) and multiple pandas pandas.DataFrame objects. Dictionary where keys represent a label from one of the data sources (IEX Cloud or Tradier). Here is the supported dataset structure for the process method:

    Note

    There are no required keys for data, the list below is not hard-enforced by default. This is just a reference for what is available with the v1 engine.

    dataset = {
        'id': <string TICKER_DATE - redis cache key>,
        'date': <string DATE>,
        'data': {
            'daily': pd.DataFrame([]),
            'minute': pd.DataFrame([]),
            'quote': pd.DataFrame([]),
            'stats': pd.DataFrame([]),
            'peers': pd.DataFrame([]),
            'news1': pd.DataFrame([]),
            'financials': pd.DataFrame([]),
            'earnings': pd.DataFrame([]),
            'dividends': pd.DataFrame([]),
            'calls': pd.DataFrame([]),
            'puts': pd.DataFrame([]),
            'pricing': pd.DataFrame([]),
            'news': pd.DataFrame([])
        }
    }
    

    example:

    dataset = {
        'id': 'SPY_2018-11-02
        'date': '2018-11-02',
        'data': {
            'daily': pd.DataFrame,
            'minute': pd.DataFrame,
            'calls': pd.DataFrame,
            'puts': pd.DataFrame,
            'news': pd.DataFrame
        }
    }
    
publish_input_dataset(**kwargs)[source]

publish input datasets to caches (redis), archives (minio s3), a local file (output_file) and slack

Parameters:kwargs – keyword argument dictionary
Returns:tuple: status, output_file
publish_report_dataset(**kwargs)[source]

publish trade history datasets to caches (redis), archives (minio s3), a local file (output_file) and slack

Parameters:kwargs – keyword argument dictionary
Returns:tuple: status, output_file
publish_trade_history_dataset(**kwargs)[source]

publish trade history datasets to caches (redis), archives (minio s3), a local file (output_file) and slack

Parameters:kwargs – keyword argument dictionary
Returns:tuple: status, output_file
record_trade_history_for_dataset(node)[source]

Build a daily or minute-by-minute trading history

To run an algorithm minute-by-minute set the configuration to use:

'timeseries': 'minute'
Parameters:node – cached dataset dictionary node
reset_for_next_run()[source]

work in progress - clean up all internal member variables for another run

Note

random or probablistic predictions may not create the same trading history_output_file

sell_reason = None

if this is in a juptyer notebook this will show the plots at the end of each day… please avoid with the command line as the plot’s window will block the algorithm until the window is closed

trade_off_indicator_buy_and_sell_signals(ticker, algo_id, reason_for_buy=None, reason_for_sell=None)[source]

Check if the minimum number of indicators for a buy or a sell were found. If there were, then commit the trade.

if self.trade_off_num_indicators:
    if self.num_latest_buys >= self.min_buy_indicators:
        self.should_buy = True
    elif self.num_latest_sells >= self.min_sell_indicators:
        self.should_sell = True
Parameters:
  • ticker – ticker symbol
  • algo_id – string algo for tracking internal progress for debugging
  • reason_for_buy – optional - string for tracking why the algo bought
  • reason_for_sell – optional - string for tracking why the algo sold
view_date_dataset_records(algo_id, ticker, node)[source]

View the dataset contents for a single node - use it with the algo config_dict by setting:

"run_this_date": <string date YYYY-MM-DD>
Parameters:
  • algo_id – string - algo identifier label for debugging datasets during specific dates
  • ticker – string - ticker
  • node – dataset to process