Source Code

These are documents for developing and understanding how the Stock Analysis Engine works. Please refer to the repository for the latest source code examples:

https://github.com/AlgoTraders/stock-analysis-engine/

Example API Requests

Helpers and examples for supported API Requests that each Celery Task supports:

  • analysis_engine.work_tasks.get_new_pricing_data
  • analysis_engine.work_tasks.handle_pricing_update_task
  • analysis_engine.work_tasks.publish_pricing_update
analysis_engine.api_requests.get_ds_dict(ticker, base_key=None, ds_id=None, label=None, service_dict=None)[source]

Get a dictionary with all cache keys for a ticker and return the dictionary. Use this method to decouple your apps from the underlying cache key implementations (if you do not need them).

Parameters:
  • ticker – ticker
  • base_key – optional - base key that is prepended in all cache keys
  • ds_id – optional - dataset id (useful for external database id)
  • label – optional - tracking label in the logs
  • service_dict – optional - parent call functions and Celery tasks can use this dictionary to seed the common service routes and endpoints. Refer to analysis_engine.consts.SERVICE_VALS for automatically-copied over keys by this helper.
analysis_engine.api_requests.build_get_new_pricing_request(label=None)[source]

Build a sample Celery task API request: analysis_engine.work_tasks.get_new_pricing_data

Used for testing: run_get_new_pricing_data(work)

Parameters:label – log label to use
analysis_engine.api_requests.build_publish_pricing_request(label=None)[source]

Build a sample Celery task API request: analysis_engine.work_tasks.publisher_pricing_update

Used for testing: run_publish_pricing_update(work)

Parameters:label – log label to use
analysis_engine.api_requests.build_cache_ready_pricing_dataset(label=None)[source]

Build a cache-ready pricing dataset to replicate the get_new_pricing_data task

Parameters:label – log label to use
analysis_engine.api_requests.build_publish_from_s3_to_redis_request(label=None)[source]

Build a sample Celery task API request: analysis_engine.work_tasks.publish_from_s3_to_redis

Used for testing: run_publish_from_s3_to_redis(work)

Parameters:label – log label to use
analysis_engine.api_requests.build_prepare_dataset_request(label=None)[source]

Build a sample Celery task API request: analysis_engine.work_tasks.prepare_pricing_dataset

Used for testing: run_prepare_pricing_dataset(work)

Parameters:label – log label to use
analysis_engine.api_requests.build_analyze_dataset_request(label=None)[source]

Build a sample Celery task API request: analysis_engine.work_tasks.analyze_pricing_dataset

Used for testing: run_analyze_pricing_dataset(work)

Parameters:label – log label to use
analysis_engine.api_requests.build_screener_analysis_request(ticker=None, tickers=None, fv_urls=None, fetch_mode='iex', iex_datasets=['daily', 'minute', 'quote', 'stats', 'peers', 'news', 'financials', 'earnings', 'dividends', 'company'], determine_sells=None, determine_buys=None, label='screener')[source]

Build a dictionary request for the task: analysis_engine.work_tasks.run_screener_analysis

Parameters:
  • ticker – ticker to add to the analysis
  • tickers – tickers to add to the analysis
  • fv_urls – finviz urls
  • fetch_mode – supports pulling from iex, yahoo, all (defaults to iex)
  • iex_datasets – datasets to fetch from iex (defaults to analysis_engine.con sts.IEX_DATASETS_DEFAULT)
  • determine_sells – string custom Celery task name for handling sell-side processing
  • determine_buys – string custom Celery task name for handling buy-side processing
  • label – log tracking label
Returns:

initial request dictionary:

req = {
    'tickers': use_tickers,
    'fv_urls': use_urls,
    'fetch_mode': fetch_mode,
    'iex_datasets': iex_datasets,
    's3_bucket': s3_bucket_name,
    's3_enabled': s3_enabled,
    'redis_enabled': redis_enabled,
    'determine_sells': determine_sells,
    'determine_buys': determine_buys,
    'label': label
}

Read from S3 as a String

Wrapper for downloading an S3 key as a string

analysis_engine.s3_read_contents_from_key.s3_read_contents_from_key(s3, s3_bucket_name, s3_key, encoding='utf-8', convert_as_json=True, compress=False)[source]

Download the S3 key contents as a string. This will raise exceptions.

Parameters:
  • s3 – existing S3 object
  • s3_bucket_name – bucket name
  • s3_key – S3 key
  • encoding – utf-8 by default
  • convert_to_json – auto-convert to a dict
  • compress – decompress using zlib

Get Task Results

Get Task Results

Debug by setting the environment variable:

export DEBUG_TASK=1
analysis_engine.get_task_results.get_task_results(work_dict=None, result=None, **kwargs)[source]

If celery is disabled by the environment key export CELERY_DISABLED=1 or requested in the work_dict['celery_disabled'] = True then return the task result dictionary, otherwise return None.

This method is useful for allowing tests to override the returned payloads during task chaining using @mock.patch.

Parameters:
  • work_dict – task work dictionary
  • result – task result dictionary
  • kwargs – keyword arguments

Constants

Utility methods and constants

Consts and helper functions

Algorithm Environment Variables

ALGO_MODULE_PATH = ev(
    'ALGO_MODULE_PATH',
    '/opt/sa/analysis_engine/mocks/example_algo_minute.py')
ALGO_BASE_MODULE_PATH = ev(
    'ALGO_BASE_MODULE_PATH',
    '/opt/sa/analysis_engine/algo.py')
ALGO_MODULE_NAME = ev(
    'ALGO_MODULE_NAME',
    'example_algo_minute')
ALGO_VERSION = ev(
    'ALGO_VERSION',
    '1')
ALGO_BUYS_S3_BUCKET_NAME = ev(
    'ALGO_BUYS_S3_BUCKET_NAME',
    'algobuys')
ALGO_SELLS_S3_BUCKET_NAME = ev(
    'ALGO_SELLS_S3_BUCKET_NAME',
    'algosells')
ALGO_RESULT_S3_BUCKET_NAME = ev(
    'ALGO_RESULT_S3_BUCKET_NAME',
    'algoresults')
ALGO_READY_DATASET_S3_BUCKET_NAME = ev(
    'ALGO_READY_DATASET_S3_BUCKET_NAME',
    'algoready')
ALGO_EXTRACT_DATASET_S3_BUCKET_NAME = ev(
    'ALGO_EXTRACT_DATASET_S3_BUCKET_NAME',
    'algoready')
ALGO_HISTORY_DATASET_S3_BUCKET_NAME = ev(
    'ALGO_HISTORY_DATASET_S3_BUCKET_NAME',
    'algohistory')
ALGO_REPORT_DATASET_S3_BUCKET_NAME = ev(
    'ALGO_REPORT_DATASET_S3_BUCKET_NAME',
    'algoreport')
ALGO_BACKUP_DATASET_S3_BUCKET_NAME = ev(
    'ALGO_BACKUP_DATASET_S3_BUCKET_NAME',
    'algobackup')
ALGO_READY_DIR = ev(
    'ALGO_READY_DIR',
    '/tmp')
ALGO_EXTRACT_DIR = ev(
    'ALGO_EXTRACT_DIR',
    '/tmp')
ALGO_HISTORY_DIR = ev(
    'ALGO_HISTORY_HISTORY_DIR',
    '/tmp')
ALGO_REPORT_DIR = ev(
    'ALGO_REPORT_DIR',
    '/tmp')
ALGO_LOAD_DIR = ev(
    'ALGO_LOAD_DIR',
    '/tmp')
ALGO_BACKUP_DIR = ev(
    'ALGO_BACKUP_DIR',
    '/tmp')
ALGO_READY_REDIS_ADDRESS = ev(
    'ALGO_READY_REDIS_ADDRESS',
    'localhost:6379')
ALGO_EXTRACT_REDIS_ADDRESS = ev(
    'ALGO_EXTRACT_REDIS_ADDRESS',
    'localhost:6379')
ALGO_HISTORY_REDIS_ADDRESS = ev(
    'ALGO_HISTORY_REDIS_ADDRESS',
    'localhost:6379')
ALGO_REPORT_REDIS_ADDRESS = ev(
    'ALGO_REPORT_REDIS_ADDRESS',
    'localhost:6379')
ALGO_BACKUP_REDIS_ADDRESS = ev(
    'ALGO_BACKUP_REDIS_ADDRESS',
    'localhost:6379')
ALGO_HISTORY_VERSION = ev(
    'ALGO_HISTORY_VERSION',
    '1')
ALGO_REPORT_VERSION = ev(
    'ALGO_REPORT_VERSION',
    '1')

Stock and Analysis Environment Variables

TICKER = ev(
    'TICKER',
    'SPY')
TICKER_ID = int(ev(
    'TICKER_ID',
    '1'))
DEFAULT_TICKERS = ev(
    'DEFAULT_TICKERS',
    'SPY,AMZN,TSLA,NFLX').split(',')
NEXT_EXP = opt_dates.option_expiration()
NEXT_EXP_STR = NEXT_EXP.strftime('%Y-%m-%d')

Logging Environment Variables

LOG_CONFIG_PATH = ev(
    'LOG_CONFIG_PATH',
    './analysis_engine/log/logging.json')

Slack Environment Variables

SLACK_WEBHOOK = ev(
    'SLACK_WEBHOOK',
    None)
SLACK_ACCESS_TOKEN = ev(
    'SLACK_ACCESS_TOKEN',
    None
)
SLACK_PUBLISH_PLOT_CHANNELS = ev(
    'SLACK_PUBLISH_PLOT_CHANNELS',
    None
)
PROD_SLACK_ALERTS = ev(
    'PROD_SLACK_ALERTS',
    '0')

Celery Environment Variables

SSL_OPTIONS = {}
TRANSPORT_OPTIONS = {}
WORKER_BROKER_URL = ev(
    'WORKER_BROKER_URL',
    'redis://localhost:6379/11')
WORKER_BACKEND_URL = ev(
    'WORKER_BACKEND_URL',
    'redis://localhost:6379/12')
WORKER_CELERY_CONFIG_MODULE = ev(
    'WORKER_CELERY_CONFIG_MODULE',
    'analysis_engine.work_tasks.celery_config')
WORKER_TASKS = ev(
    'WORKER_TASKS',
    ('analysis_engine.work_tasks.task_run_algo'))
INCLUDE_TASKS = WORKER_TASKS.split(',')

Supported S3 Environment Variables

ENABLED_S3_UPLOAD = ev(
    'ENABLED_S3_UPLOAD',
    '0') == '1'
S3_ACCESS_KEY = ev(
    'AWS_ACCESS_KEY_ID',
    'trexaccesskey')
S3_SECRET_KEY = ev(
    'AWS_SECRET_ACCESS_KEY',
    'trex123321')
S3_REGION_NAME = ev(
    'AWS_DEFAULT_REGION',
    'us-east-1')
S3_ADDRESS = ev(
    'S3_ADDRESS',
    '0.0.0.0:9000')
S3_SECURE = ev(
    'S3_SECURE',
    '0') == '1'
S3_BUCKET = ev(
    'S3_BUCKET',
    'pricing')
S3_COMPILED_BUCKET = ev(
    'S3_COMPILED_BUCKET',
    'compileddatasets')
S3_KEY = ev(
    'S3_KEY',
    'test_key')
DAILY_S3_BUCKET_NAME = ev(
    'DAILY_S3_BUCKET_NAME',
    'daily')
MINUTE_S3_BUCKET_NAME = ev(
    'MINUTE_S3_BUCKET_NAME',
    'minute')
QUOTE_S3_BUCKET_NAME = ev(
    'QUOTE_S3_BUCKET_NAME',
    'quote')
STATS_S3_BUCKET_NAME = ev(
    'STATS_S3_BUCKET_NAME',
    'stats')
PEERS_S3_BUCKET_NAME = ev(
    'PEERS_S3_BUCKET_NAME',
    'peers')
NEWS_S3_BUCKET_NAME = ev(
    'NEWS_S3_BUCKET_NAME',
    'news')
FINANCIALS_S3_BUCKET_NAME = ev(
    'FINANCIALS_S3_BUCKET_NAME',
    'financials')
EARNINGS_S3_BUCKET_NAME = ev(
    'EARNINGS_S3_BUCKET_NAME',
    'earnings')
DIVIDENDS_S3_BUCKET_NAME = ev(
    'DIVIDENDS_S3_BUCKET_NAME',
    'dividends')
COMPANY_S3_BUCKET_NAME = ev(
    'COMPANY_S3_BUCKET_NAME',
    'company')
PREPARE_S3_BUCKET_NAME = ev(
    'PREPARE_S3_BUCKET_NAME',
    'prepared')
ANALYZE_S3_BUCKET_NAME = ev(
    'ANALYZE_S3_BUCKET_NAME',
    'analyzed')
SCREENER_S3_BUCKET_NAME = ev(
    'SCREENER_S3_BUCKET_NAME',
    'screener-data')
PRICING_S3_BUCKET_NAME = ev(
    'PRICING_S3_BUCKET_NAME',
    'pricing')
OPTIONS_S3_BUCKET_NAME = ev(
    'OPTIONS_S3_BUCKET_NAME',
    'options')

Supported Redis Environment Variables

ENABLED_REDIS_PUBLISH = ev(
    'ENABLED_REDIS_PUBLISH',
    '0') == '1'
REDIS_ADDRESS = ev(
    'REDIS_ADDRESS',
    'localhost:6379')
REDIS_KEY = ev(
    'REDIS_KEY',
    'test_redis_key')
REDIS_PASSWORD = ev(
    'REDIS_PASSWORD',
    None)
REDIS_DB = int(ev(
    'REDIS_DB',
    '0'))
REDIS_EXPIRE = ev(
    'REDIS_EXPIRE',
    None)
analysis_engine.consts.get_indicator_type_as_int(val=None)[source]

convert the string to the INDICATOR_TYPE_MAPPING integer value

Parameters:val – integer to lookup in the INDICATOR_TYPE_MAPPING dictionary
analysis_engine.consts.get_indicator_category_as_int(val=None)[source]

convert the string to the INDICATOR_CATEGORY_MAPPING integer value

Parameters:val – integer to lookup in the INDICATOR_CATEGORY_MAPPING dictionary
analysis_engine.consts.get_indicator_uses_data_as_int(val=None)[source]

convert the string to the INDICATOR_USES_DATA_MAPPING integer value

Parameters:val – integer to lookup in the INDICATOR_USES_DATA_MAPPING dictionary
analysis_engine.consts.get_algo_timeseries_from_int(val)[source]

convert the integer value to the timeseries string found in the analysis_engine.consts.ALGO_TIMESERIES dictionary

Parameters:val – integer value for finding the string timeseries label
analysis_engine.consts.is_celery_disabled(work_dict=None)[source]
Parameters:work_dict – request to check
analysis_engine.consts.get_status(status)[source]

Return the string label for an integer status code which should be one of the ones above.

Parameters:status – integer status code
analysis_engine.consts.ppj(json_data)[source]
Parameters:json_data – dictionary to convert to a pretty-printed, multi-line string
analysis_engine.consts.to_float_str(val)[source]

convert the float to a string with 2 decimal points of precision

Parameters:val – float to change to a 2-decimal string
analysis_engine.consts.to_f(val)[source]

truncate the float to 2 decimal points of precision

Parameters:val – float to change
analysis_engine.consts.get_mb(num)[source]

convert a the number of bytes (as an integer) to megabytes with 2 decimal points of precision

Parameters:num – integer - number of bytes
analysis_engine.consts.ev(k, v)[source]
Parameters:
  • k – environment variable key
  • v – environment variable value
analysis_engine.consts.get_percent_done(progress, total)[source]

calculate percentage done to 2 decimal points of precision

Parameters:
  • progress – progress counter
  • total – total number of counts
analysis_engine.consts.get_redis_host_and_port(addr=None, req=None)[source]

parse the env REDIS_ADDRESS or addr string or a dictionary req and return a tuple for (host (str), port (int))

Parameters:
  • addr – optional - string redis address to parse format is host:port
  • req – optional - dictionary where the host and port are under the keys redis_host and redis_port