Celery Worker Tasks¶
Celery tasks are automatically processed by the workers. You can turn off celery task publishing by setting the environment variable CELERY_DISABLED
is set to 1
(by default celery is enabled for task publishing).
Tip
all tasks share the analysis_engine.work_tasks.custom_task.CustomTask class for customizing event handling.
Handle Pricing Update Task
Get the latest stock news, quotes and options chains for a ticker and publish the values to redis and S3 for downstream analysis.
Writes pricing updates to S3 and Redis by building a list of publishing sub-task:
Sample work_dict request for this method
analysis_engine.api_requests.publish_pricing_update
work = {
'ticker': ticker,
'ticker_id': ticker_id,
's3_bucket': s3_bucket_name,
's3_key': s3_key,
'redis_key': redis_key,
'prepared_s3_key': s3_prepared_key,
'prepared_s3_bucket': s3_prepared_bucket_name,
'prepared_redis_key': redis_prepared_key,
'ignore_columns': ignore_columns,
's3_enabled': s3_enabled,
'redis_enabled': redis_enabled
}
Tip
This task uses the analysis_engine.work_tasks. custom_task.CustomTask class for task event handling.
Supported Environment Variables
export DEBUG_RESULTS=1
-
analysis_engine.work_tasks.handle_pricing_update_task.
run_handle_pricing_update_task
(work_dict)[source]¶ Celery wrapper for running without celery
Parameters: work_dict – task data
-
(task)
analysis_engine.work_tasks.handle_pricing_update_task.
handle_pricing_update_task
(work_dict)¶ Writes pricing updates to S3 and Redis
Parameters: work_dict – dictionary for key/values
Get New Pricing Data Task
This will fetch data (pricing, financials, earnings, dividends, options, and more) from these sources:
- IEX
- Tradier
- Yahoo - disabled as of 2019/01/03
Detailed example for getting new pricing data
import datetime
import build_get_new_pricing_request
from analysis_engine.api_requests
from analysis_engine.work_tasks.get_new_pricing_data
import get_new_pricing_data
# store data
cur_date = datetime.datetime.now().strftime('%Y-%m-%d')
work = build_get_new_pricing_request(
label=f'get-pricing-{cur_date}')
work['ticker'] = 'TSLA'
work['s3_bucket'] = 'pricing'
work['s3_key'] = f'{work["ticker"]}-{cur_date}'
work['redis_key'] = f'{work["ticker"]}-{cur_date}'
work['celery_disabled'] = True
res = get_new_pricing_data(
work)
print('full result dictionary:')
print(res)
if res['data']:
print(
'named datasets returned as '
'json-serialized pandas DataFrames:')
for k in res['data']:
print(f' - {k}')
Warning
When fetching pricing data from sources like IEX,
Please ensure the returned values are
not serialized pandas Dataframes to prevent
issues with celery task results. Instead
it is preferred to returned a df.to_json()
before sending the results into the
results backend.
Tip
This task uses the analysis_engine.work_tasks. custom_task.CustomTask class for task event handling.
Sample work_dict request for this method
analysis_engine.api_requests.build_get_new_pricing_request
Supported Environment Variables
export DEBUG_RESULTS=1
-
analysis_engine.work_tasks.get_new_pricing_data.
run_get_new_pricing_data
(work_dict)[source]¶ Celery wrapper for running without celery
Parameters: work_dict – task data
-
(task)
analysis_engine.work_tasks.get_new_pricing_data.
get_new_pricing_data
(work_dict)¶ Get Ticker information on:
- prices - turn off with
work_dict.get_pricing = False
- news - turn off with
work_dict.get_news = False
- options - turn off with
work_dict.get_options = False
Parameters: work_dict – dictionary for key/values - prices - turn off with
Publish Pricing Data Task
Publish new stock data to external services and systems (redis and s3) provided the system(s) are running and enabled.
- redis - using redis-py
- s3 - using boto3
Sample work_dict request for this method
analysis_engine.api_requests.publish_pricing_update
work_request = {
'ticker': ticker,
'ticker_id': ticker_id,
'strike': use_strike,
'contract': contract_type,
's3_bucket': s3_bucket_name,
's3_key': s3_key,
'redis_key': redis_key,
'data': use_data
}
Tip
This task uses the analysis_engine.work_tasks. custom_task.CustomTask class for task event handling.
Supported Environment Variables
export DEBUG_RESULTS=1
-
analysis_engine.work_tasks.publish_pricing_update.
run_publish_pricing_update
(work_dict)[source]¶ Celery wrapper for running without celery
Parameters: work_dict – task data
-
(task)
analysis_engine.work_tasks.publish_pricing_update.
publish_pricing_update
(work_dict)¶ Publish Ticker Data to S3 and Redis
- prices - turn off with
work_dict.get_pricing = False
- news - turn off with
work_dict.get_news = False
- options - turn off with
work_dict.get_options = False
Parameters: work_dict – dictionary for key/values - prices - turn off with
Publish Data from S3 to Redis Task
Publish S3 key with stock data to redis and s3 (if either of them are running and enabled)
- redis - using redis-py
- s3 - using boto3
Sample work_dict request for this method
analysis_engine.api_requests.build_publish_from_s3_to_redis_request
work_request = {
'ticker': ticker,
'ticker_id': ticker_id,
's3_bucket': s3_bucket_name,
's3_key': s3_key,
'redis_key': redis_key,
's3_enabled': s3_enabled,
'redis_enabled': redis_enabled
}
Tip
This task uses the analysis_engine.work_tasks. custom_task.CustomTask class for task event handling.
Supported Environment Variables
export DEBUG_RESULTS=1
-
analysis_engine.work_tasks.publish_from_s3_to_redis.
run_publish_from_s3_to_redis
(work_dict)[source]¶ Celery wrapper for running without celery
Parameters: work_dict – task data
-
(task)
analysis_engine.work_tasks.publish_from_s3_to_redis.
publish_from_s3_to_redis
(work_dict)¶ Publish Ticker Data from S3 to Redis
Parameters: work_dict – dictionary for key/values
Publish Aggregate Ticker Data from S3 Task
Publish S3 key with aggregated stock data to redis and s3 (if either of them are running and enabled)
- redis - using redis-py
- s3 - using boto3
Sample work_dict request for this method
analysis_engine.api_requests.build_publish_ticker_aggregate_from_s3 _request
work_request = {
'ticker': ticker,
'ticker_id': ticker_id,
's3_bucket': s3_bucket_name,
's3_compiled_bucket': s3_compiled_bucket_name,
's3_key': s3_key,
'redis_key': redis_key,
's3_enabled': s3_enabled,
'redis_enabled': redis_enabled
}
Tip
This task uses the analysis_engine.work_tasks. custom_task.CustomTask class for task event handling.
Supported Environment Variables
export DEBUG_RESULTS=1
-
analysis_engine.work_tasks.publish_ticker_aggregate_from_s3.
run_publish_ticker_aggregate_from_s3
(work_dict)[source]¶ Celery wrapper for running without celery
Parameters: work_dict – task data
-
(task)
analysis_engine.work_tasks.publish_ticker_aggregate_from_s3.
publish_ticker_aggregate_from_s3
(work_dict)¶ Publish Aggregated Ticker Data from S3 to Redis
Parameters: work_dict – dictionary for key/values
Work in progress - screener-driven analysis task
Supported environment variables
export DEBUG_RESULTS=1
-
analysis_engine.work_tasks.task_screener_analysis.
run_screener_analysis
(work_dict)[source]¶ Celery wrapper for running without celery
Parameters: work_dict – task data
-
(task)
analysis_engine.work_tasks.task_screener_analysis.
task_screener_analysis
(work_dict)¶ Parameters: work_dict – task dictionary
Prepare Pricing Dataset
Prepare dataset for analysis. This task collapses nested json dictionaries into a csv file with a header row and stores the output file in s3 and redis automatically.
- if key not in redis, load the key by the same name from s3
- prepare dataset from redis key
- the dataset will be stored as a dictionary with a pandas dataframe
Sample work_dict request for this method
analysis_engine.api_requests.build_prepare_dataset_request
work_request = {
'ticker': ticker,
'ticker_id': ticker_id,
's3_bucket': s3_bucket_name,
's3_key': s3_key,
'redis_key': redis_key,
'prepared_s3_key': s3_prepared_key,
'prepared_s3_bucket': s3_prepared_bucket_name,
'prepared_redis_key': redis_prepared_key,
'ignore_columns': ignore_columns,
's3_enabled': s3_enabled,
'redis_enabled': redis_enabled
}
Tip
This task uses the analysis_engine.work_tasks. custom_task.CustomTask class for task event handling.
Supported Environment Variables
export DEBUG_PREPARE=1
export DEBUG_RESULTS=1
-
analysis_engine.work_tasks.prepare_pricing_dataset.
run_prepare_pricing_dataset
(work_dict)[source]¶ Celery wrapper for running without celery
Parameters: work_dict – task data
-
(task)
analysis_engine.work_tasks.prepare_pricing_dataset.
prepare_pricing_dataset
(work_dict)¶ Prepare dataset for analysis. Supports loading dataset from s3 if not found in redis. Outputs prepared artifact as a csv to s3 and redis.
Parameters: work_dict – dictionary for key/values
Custom Celery Task Handling¶
Define your own on_failure
and on_success
with the analysis_engine.work_tasks.custom_task.CustomTask
custom
class object.
Debug values with the environment variable:
export DEBUG_TASK=1
-
class
analysis_engine.work_tasks.custom_task.
CustomTask
[source]¶ -
-
on_failure
(exc, task_id, args, kwargs, einfo)[source]¶ Handle custom actions when a task completes not successfully. As an example, if the task throws an exception, then this
on_failure
method can customize how to handle exceptional cases.http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-inheritance
Parameters: - exc – exception
- task_id – task id
- args – arguments passed into task
- kwargs – keyword arguments passed into task
- einfo – exception info
-
on_success
(retval, task_id, args, kwargs)[source]¶ Handle custom actions when a task completes successfully.
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html
Parameters: - retval – return value
- task_id – celery task id
- args – arguments passed into task
- kwargs – keyword arguments passed into task
-
Get a Celery Application Helper
-
analysis_engine.work_tasks.get_celery_app.
get_celery_app
(name='worker', auth_url='redis://localhost:6379/11', backend_url='redis://localhost:6379/12', include_tasks=[], ssl_options=None, transport_options=None, path_to_config_module='analysis_engine.work_tasks.celery_config', worker_log_format='%(asctime)s: %(levelname)s %(message)s', **kwargs)[source]¶ Build a Celery app with support for environment variables to set endpoints locations.
- export WORKER_BROKER_URL=redis://localhost:6379/11
- export WORKER_BACKEND_URL=redis://localhost:6379/12
- export WORKER_CELERY_CONFIG_MODULE=analysis_engine.work_tasks.cel ery_config
Note
Jupyter notebooks need to use the
WORKER_CELERY_CONFIG_MODULE=analysis_engine.work_tasks.celery service_config
value which uses resolvable hostnames with docker compose:- export WORKER_BROKER_URL=redis://redis:6379/11
- export WORKER_BACKEND_URL=redis://redis:6379/12
Parameters: - name – name for this app
- auth_url – Celery broker address
(default is
redis://localhost:6379/11
oranalysis_engine.consts.WORKER_BROKER_URL
environment variable) this is required for distributing algorithms - backend_url – Celery backend address
(default is
redis://localhost:6379/12
oranalysis_engine.consts.WORKER_BACKEND_URL
environment variable) this is required for distributing algorithms - include_tasks – list of modules containing tasks to add
- ssl_options – security options dictionary
(default is
analysis_engine.consts.SSL_OPTIONS
) - trasport_options – transport options dictionary
(default is
analysis_engine.consts.TRANSPORT_OPTIONS
) - path_to_config_module – config module for advanced
Celery worker connectivity requirements
(default is
analysis_engine.work_tasks.celery_config
oranalysis_engine.consts.WORKER_CELERY_CONFIG_MODULE
) - worker_log_format – format for logs