Source Code¶
These are documents for developing and understanding how the Stock Analysis Engine works. Please refer to the repository for the latest source code examples:
Example API Requests¶
Helpers and examples for supported API Requests that each Celery Task supports:
- analysis_engine.work_tasks.get_new_pricing_data
- analysis_engine.work_tasks.handle_pricing_update_task
- analysis_engine.work_tasks.publish_pricing_update
-
analysis_engine.api_requests.
get_ds_dict
(ticker, base_key=None, ds_id=None, label=None, service_dict=None)[source]¶ Get a dictionary with all cache keys for a ticker and return the dictionary. Use this method to decouple your apps from the underlying cache key implementations (if you do not need them).
Parameters: - ticker – ticker
- base_key – optional - base key that is prepended in all cache keys
- ds_id – optional - dataset id (useful for external database id)
- label – optional - tracking label in the logs
- service_dict – optional - parent call functions and Celery
tasks can use this dictionary to seed the
common service routes and endpoints. Refer
to
analysis_engine.consts.SERVICE_VALS
for automatically-copied over keys by this helper.
-
analysis_engine.api_requests.
build_get_new_pricing_request
(label=None)[source]¶ Build a sample Celery task API request: analysis_engine.work_tasks.get_new_pricing_data
Used for testing: run_get_new_pricing_data(work)
Parameters: label – log label to use
-
analysis_engine.api_requests.
build_publish_pricing_request
(label=None)[source]¶ Build a sample Celery task API request: analysis_engine.work_tasks.publisher_pricing_update
Used for testing: run_publish_pricing_update(work)
Parameters: label – log label to use
-
analysis_engine.api_requests.
build_cache_ready_pricing_dataset
(label=None)[source]¶ Build a cache-ready pricing dataset to replicate the
get_new_pricing_data
taskParameters: label – log label to use
-
analysis_engine.api_requests.
build_publish_from_s3_to_redis_request
(label=None)[source]¶ Build a sample Celery task API request: analysis_engine.work_tasks.publish_from_s3_to_redis
Used for testing: run_publish_from_s3_to_redis(work)
Parameters: label – log label to use
-
analysis_engine.api_requests.
build_prepare_dataset_request
(label=None)[source]¶ Build a sample Celery task API request: analysis_engine.work_tasks.prepare_pricing_dataset
Used for testing: run_prepare_pricing_dataset(work)
Parameters: label – log label to use
-
analysis_engine.api_requests.
build_analyze_dataset_request
(label=None)[source]¶ Build a sample Celery task API request: analysis_engine.work_tasks.analyze_pricing_dataset
Used for testing: run_analyze_pricing_dataset(work)
Parameters: label – log label to use
-
analysis_engine.api_requests.
build_screener_analysis_request
(ticker=None, tickers=None, fv_urls=None, fetch_mode='iex', iex_datasets=['daily', 'minute', 'quote', 'stats', 'peers', 'news', 'financials', 'earnings', 'dividends', 'company'], determine_sells=None, determine_buys=None, label='screener')[source]¶ Build a dictionary request for the task:
analysis_engine.work_tasks.run_screener_analysis
Parameters: - ticker – ticker to add to the analysis
- tickers – tickers to add to the analysis
- fv_urls – finviz urls
- fetch_mode – supports pulling from
iex
,yahoo
,all
(defaults toiex
) - iex_datasets – datasets to fetch from
iex
(defaults toanalysis_engine.con sts.IEX_DATASETS_DEFAULT
) - determine_sells – string custom Celery task name for handling sell-side processing
- determine_buys – string custom Celery task name for handling buy-side processing
- label – log tracking label
Returns: initial request dictionary:
req = { 'tickers': use_tickers, 'fv_urls': use_urls, 'fetch_mode': fetch_mode, 'iex_datasets': iex_datasets, 's3_bucket': s3_bucket_name, 's3_enabled': s3_enabled, 'redis_enabled': redis_enabled, 'determine_sells': determine_sells, 'determine_buys': determine_buys, 'label': label }
Read from S3 as a String¶
Wrapper for downloading an S3 key as a string
-
analysis_engine.s3_read_contents_from_key.
s3_read_contents_from_key
(s3, s3_bucket_name, s3_key, encoding='utf-8', convert_as_json=True, compress=False)[source]¶ Download the S3 key contents as a string. This will raise exceptions.
Parameters: - s3 – existing S3 object
- s3_bucket_name – bucket name
- s3_key – S3 key
- encoding – utf-8 by default
- convert_to_json – auto-convert to a dict
- compress – decompress using
zlib
Get Task Results¶
Get Task Results
Debug by setting the environment variable:
export DEBUG_TASK=1
-
analysis_engine.get_task_results.
get_task_results
(work_dict=None, result=None, **kwargs)[source]¶ If celery is disabled by the environment key
export CELERY_DISABLED=1
or requested in thework_dict['celery_disabled'] = True
then return the task result dictionary, otherwise returnNone
.This method is useful for allowing tests to override the returned payloads during task chaining using
@mock.patch
.Parameters: - work_dict – task work dictionary
- result – task result dictionary
- kwargs – keyword arguments
Constants¶
Utility methods and constants
Consts and helper functions
Algorithm Environment Variables
ALGO_MODULE_PATH = ev(
'ALGO_MODULE_PATH',
'/opt/sa/analysis_engine/mocks/example_algo_minute.py')
ALGO_BASE_MODULE_PATH = ev(
'ALGO_BASE_MODULE_PATH',
'/opt/sa/analysis_engine/algo.py')
ALGO_MODULE_NAME = ev(
'ALGO_MODULE_NAME',
'example_algo_minute')
ALGO_VERSION = ev(
'ALGO_VERSION',
'1')
ALGO_BUYS_S3_BUCKET_NAME = ev(
'ALGO_BUYS_S3_BUCKET_NAME',
'algobuys')
ALGO_SELLS_S3_BUCKET_NAME = ev(
'ALGO_SELLS_S3_BUCKET_NAME',
'algosells')
ALGO_RESULT_S3_BUCKET_NAME = ev(
'ALGO_RESULT_S3_BUCKET_NAME',
'algoresults')
ALGO_READY_DATASET_S3_BUCKET_NAME = ev(
'ALGO_READY_DATASET_S3_BUCKET_NAME',
'algoready')
ALGO_EXTRACT_DATASET_S3_BUCKET_NAME = ev(
'ALGO_EXTRACT_DATASET_S3_BUCKET_NAME',
'algoready')
ALGO_HISTORY_DATASET_S3_BUCKET_NAME = ev(
'ALGO_HISTORY_DATASET_S3_BUCKET_NAME',
'algohistory')
ALGO_REPORT_DATASET_S3_BUCKET_NAME = ev(
'ALGO_REPORT_DATASET_S3_BUCKET_NAME',
'algoreport')
ALGO_BACKUP_DATASET_S3_BUCKET_NAME = ev(
'ALGO_BACKUP_DATASET_S3_BUCKET_NAME',
'algobackup')
ALGO_READY_DIR = ev(
'ALGO_READY_DIR',
'/tmp')
ALGO_EXTRACT_DIR = ev(
'ALGO_EXTRACT_DIR',
'/tmp')
ALGO_HISTORY_DIR = ev(
'ALGO_HISTORY_HISTORY_DIR',
'/tmp')
ALGO_REPORT_DIR = ev(
'ALGO_REPORT_DIR',
'/tmp')
ALGO_LOAD_DIR = ev(
'ALGO_LOAD_DIR',
'/tmp')
ALGO_BACKUP_DIR = ev(
'ALGO_BACKUP_DIR',
'/tmp')
ALGO_READY_REDIS_ADDRESS = ev(
'ALGO_READY_REDIS_ADDRESS',
'localhost:6379')
ALGO_EXTRACT_REDIS_ADDRESS = ev(
'ALGO_EXTRACT_REDIS_ADDRESS',
'localhost:6379')
ALGO_HISTORY_REDIS_ADDRESS = ev(
'ALGO_HISTORY_REDIS_ADDRESS',
'localhost:6379')
ALGO_REPORT_REDIS_ADDRESS = ev(
'ALGO_REPORT_REDIS_ADDRESS',
'localhost:6379')
ALGO_BACKUP_REDIS_ADDRESS = ev(
'ALGO_BACKUP_REDIS_ADDRESS',
'localhost:6379')
ALGO_HISTORY_VERSION = ev(
'ALGO_HISTORY_VERSION',
'1')
ALGO_REPORT_VERSION = ev(
'ALGO_REPORT_VERSION',
'1')
Stock and Analysis Environment Variables
TICKER = ev(
'TICKER',
'SPY')
TICKER_ID = int(ev(
'TICKER_ID',
'1'))
DEFAULT_TICKERS = ev(
'DEFAULT_TICKERS',
'SPY,AMZN,TSLA,NFLX').split(',')
NEXT_EXP = opt_dates.option_expiration()
NEXT_EXP_STR = NEXT_EXP.strftime('%Y-%m-%d')
Logging Environment Variables
LOG_CONFIG_PATH = ev(
'LOG_CONFIG_PATH',
'./analysis_engine/log/logging.json')
Slack Environment Variables
SLACK_WEBHOOK = ev(
'SLACK_WEBHOOK',
None)
SLACK_ACCESS_TOKEN = ev(
'SLACK_ACCESS_TOKEN',
None
)
SLACK_PUBLISH_PLOT_CHANNELS = ev(
'SLACK_PUBLISH_PLOT_CHANNELS',
None
)
PROD_SLACK_ALERTS = ev(
'PROD_SLACK_ALERTS',
'0')
Celery Environment Variables
SSL_OPTIONS = {}
TRANSPORT_OPTIONS = {}
WORKER_BROKER_URL = ev(
'WORKER_BROKER_URL',
'redis://localhost:6379/11')
WORKER_BACKEND_URL = ev(
'WORKER_BACKEND_URL',
'redis://localhost:6379/12')
WORKER_CELERY_CONFIG_MODULE = ev(
'WORKER_CELERY_CONFIG_MODULE',
'analysis_engine.work_tasks.celery_config')
WORKER_TASKS = ev(
'WORKER_TASKS',
('analysis_engine.work_tasks.task_run_algo'))
INCLUDE_TASKS = WORKER_TASKS.split(',')
Supported S3 Environment Variables
ENABLED_S3_UPLOAD = ev(
'ENABLED_S3_UPLOAD',
'0') == '1'
S3_ACCESS_KEY = ev(
'AWS_ACCESS_KEY_ID',
'trexaccesskey')
S3_SECRET_KEY = ev(
'AWS_SECRET_ACCESS_KEY',
'trex123321')
S3_REGION_NAME = ev(
'AWS_DEFAULT_REGION',
'us-east-1')
S3_ADDRESS = ev(
'S3_ADDRESS',
'0.0.0.0:9000')
S3_SECURE = ev(
'S3_SECURE',
'0') == '1'
S3_BUCKET = ev(
'S3_BUCKET',
'pricing')
S3_COMPILED_BUCKET = ev(
'S3_COMPILED_BUCKET',
'compileddatasets')
S3_KEY = ev(
'S3_KEY',
'test_key')
DAILY_S3_BUCKET_NAME = ev(
'DAILY_S3_BUCKET_NAME',
'daily')
MINUTE_S3_BUCKET_NAME = ev(
'MINUTE_S3_BUCKET_NAME',
'minute')
QUOTE_S3_BUCKET_NAME = ev(
'QUOTE_S3_BUCKET_NAME',
'quote')
STATS_S3_BUCKET_NAME = ev(
'STATS_S3_BUCKET_NAME',
'stats')
PEERS_S3_BUCKET_NAME = ev(
'PEERS_S3_BUCKET_NAME',
'peers')
NEWS_S3_BUCKET_NAME = ev(
'NEWS_S3_BUCKET_NAME',
'news')
FINANCIALS_S3_BUCKET_NAME = ev(
'FINANCIALS_S3_BUCKET_NAME',
'financials')
EARNINGS_S3_BUCKET_NAME = ev(
'EARNINGS_S3_BUCKET_NAME',
'earnings')
DIVIDENDS_S3_BUCKET_NAME = ev(
'DIVIDENDS_S3_BUCKET_NAME',
'dividends')
COMPANY_S3_BUCKET_NAME = ev(
'COMPANY_S3_BUCKET_NAME',
'company')
PREPARE_S3_BUCKET_NAME = ev(
'PREPARE_S3_BUCKET_NAME',
'prepared')
ANALYZE_S3_BUCKET_NAME = ev(
'ANALYZE_S3_BUCKET_NAME',
'analyzed')
SCREENER_S3_BUCKET_NAME = ev(
'SCREENER_S3_BUCKET_NAME',
'screener-data')
PRICING_S3_BUCKET_NAME = ev(
'PRICING_S3_BUCKET_NAME',
'pricing')
OPTIONS_S3_BUCKET_NAME = ev(
'OPTIONS_S3_BUCKET_NAME',
'options')
Supported Redis Environment Variables
ENABLED_REDIS_PUBLISH = ev(
'ENABLED_REDIS_PUBLISH',
'0') == '1'
REDIS_ADDRESS = ev(
'REDIS_ADDRESS',
'localhost:6379')
REDIS_KEY = ev(
'REDIS_KEY',
'test_redis_key')
REDIS_PASSWORD = ev(
'REDIS_PASSWORD',
None)
REDIS_DB = int(ev(
'REDIS_DB',
'0'))
REDIS_EXPIRE = ev(
'REDIS_EXPIRE',
None)
-
analysis_engine.consts.
get_indicator_type_as_int
(val=None)[source]¶ convert the string to the
INDICATOR_TYPE_MAPPING
integer valueParameters: val – integer to lookup in the INDICATOR_TYPE_MAPPING
dictionary
-
analysis_engine.consts.
get_indicator_category_as_int
(val=None)[source]¶ convert the string to the
INDICATOR_CATEGORY_MAPPING
integer valueParameters: val – integer to lookup in the INDICATOR_CATEGORY_MAPPING
dictionary
-
analysis_engine.consts.
get_indicator_uses_data_as_int
(val=None)[source]¶ convert the string to the
INDICATOR_USES_DATA_MAPPING
integer valueParameters: val – integer to lookup in the INDICATOR_USES_DATA_MAPPING
dictionary
-
analysis_engine.consts.
get_algo_timeseries_from_int
(val)[source]¶ convert the integer value to the timeseries string found in the
analysis_engine.consts.ALGO_TIMESERIES
dictionaryParameters: val – integer value for finding the string timeseries label
-
analysis_engine.consts.
is_celery_disabled
(work_dict=None)[source]¶ Parameters: work_dict – request to check
-
analysis_engine.consts.
get_status
(status)[source]¶ Return the string label for an integer status code which should be one of the ones above.
Parameters: status – integer status code
-
analysis_engine.consts.
ppj
(json_data)[source]¶ Parameters: json_data – dictionary to convert to a pretty-printed, multi-line string
-
analysis_engine.consts.
to_float_str
(val)[source]¶ convert the float to a string with 2 decimal points of precision
Parameters: val – float to change to a 2-decimal string
-
analysis_engine.consts.
to_f
(val)[source]¶ truncate the float to 2 decimal points of precision
Parameters: val – float to change
-
analysis_engine.consts.
get_mb
(num)[source]¶ convert a the number of bytes (as an
integer
) to megabytes with 2 decimal points of precisionParameters: num – integer - number of bytes
-
analysis_engine.consts.
ev
(k, v)[source]¶ Parameters: - k – environment variable key
- v – environment variable value
-
analysis_engine.consts.
get_percent_done
(progress, total)[source]¶ calculate percentage done to 2 decimal points of precision
Parameters: - progress – progress counter
- total – total number of counts
-
analysis_engine.consts.
get_redis_host_and_port
(addr=None, req=None)[source]¶ parse the env
REDIS_ADDRESS
oraddr
string or a dictionaryreq
and return a tuple for (host (str), port (int))Parameters: - addr – optional - string redis address to parse
format is
host:port
- req – optional - dictionary where the host and port
are under the keys
redis_host
andredis_port
- addr – optional - string redis address to parse
format is