Celery Worker Tasks

Celery tasks are automatically processed by the workers. You can turn off celery task publishing by setting the environment variable CELERY_DISABLED is set to 1 (by default celery is enabled for task publishing).

Tip

all tasks share the analysis_engine.work_tasks.custom_task.CustomTask class for customizing event handling.

Handle Pricing Update Task

Get the latest stock news, quotes and options chains for a ticker and publish the values to redis and S3 for downstream analysis.

Writes pricing updates to S3 and Redis by building a list of publishing sub-task:

Sample work_dict request for this method

analysis_engine.api_requests.publish_pricing_update

work = {
    'ticker': ticker,
    'ticker_id': ticker_id,
    's3_bucket': s3_bucket_name,
    's3_key': s3_key,
    'redis_key': redis_key,
    'prepared_s3_key': s3_prepared_key,
    'prepared_s3_bucket': s3_prepared_bucket_name,
    'prepared_redis_key': redis_prepared_key,
    'ignore_columns': ignore_columns,
    's3_enabled': s3_enabled,
    'redis_enabled': redis_enabled
}

Tip

This task uses the analysis_engine.work_tasks. custom_task.CustomTask class for task event handling.

Supported Environment Variables

export DEBUG_RESULTS=1
analysis_engine.work_tasks.handle_pricing_update_task.run_handle_pricing_update_task(work_dict)[source]

Celery wrapper for running without celery

Parameters:work_dict – task data
(task)analysis_engine.work_tasks.handle_pricing_update_task.handle_pricing_update_task(work_dict)

Writes pricing updates to S3 and Redis

Parameters:work_dict – dictionary for key/values

Get New Pricing Data Task

This will fetch data (pricing, financials, earnings, dividends, options, and more) from these sources:

  1. IEX
  2. Tradier
  3. Yahoo - disabled as of 2019/01/03

Detailed example for getting new pricing data

import datetime
import build_get_new_pricing_request
from analysis_engine.api_requests
from analysis_engine.work_tasks.get_new_pricing_data
    import get_new_pricing_data

# store data
cur_date = datetime.datetime.now().strftime('%Y-%m-%d')
work = build_get_new_pricing_request(
    label=f'get-pricing-{cur_date}')
work['ticker'] = 'TSLA'
work['s3_bucket'] = 'pricing'
work['s3_key'] = f'{work["ticker"]}-{cur_date}'
work['redis_key'] = f'{work["ticker"]}-{cur_date}'
work['celery_disabled'] = True
res = get_new_pricing_data(
    work)
print('full result dictionary:')
print(res)
if res['data']:
    print(
        'named datasets returned as '
        'json-serialized pandas DataFrames:')
    for k in res['data']:
        print(f' - {k}')

Warning

When fetching pricing data from sources like IEX, Please ensure the returned values are not serialized pandas Dataframes to prevent issues with celery task results. Instead it is preferred to returned a df.to_json() before sending the results into the results backend.

Tip

This task uses the analysis_engine.work_tasks. custom_task.CustomTask class for task event handling.

Sample work_dict request for this method

analysis_engine.api_requests.build_get_new_pricing_request

Supported Environment Variables

export DEBUG_RESULTS=1
analysis_engine.work_tasks.get_new_pricing_data.run_get_new_pricing_data(work_dict)[source]

Celery wrapper for running without celery

Parameters:work_dict – task data
(task)analysis_engine.work_tasks.get_new_pricing_data.get_new_pricing_data(work_dict)

Get Ticker information on:

  • prices - turn off with work_dict.get_pricing = False
  • news - turn off with work_dict.get_news = False
  • options - turn off with work_dict.get_options = False
Parameters:work_dict – dictionary for key/values

Publish Pricing Data Task

Publish new stock data to external services and systems (redis and s3) provided the system(s) are running and enabled.

  • redis - using redis-py
  • s3 - using boto3

Sample work_dict request for this method

analysis_engine.api_requests.publish_pricing_update

work_request = {
    'ticker': ticker,
    'ticker_id': ticker_id,
    'strike': use_strike,
    'contract': contract_type,
    's3_bucket': s3_bucket_name,
    's3_key': s3_key,
    'redis_key': redis_key,
    'data': use_data
}

Tip

This task uses the analysis_engine.work_tasks. custom_task.CustomTask class for task event handling.

Supported Environment Variables

export DEBUG_RESULTS=1
analysis_engine.work_tasks.publish_pricing_update.run_publish_pricing_update(work_dict)[source]

Celery wrapper for running without celery

Parameters:work_dict – task data
(task)analysis_engine.work_tasks.publish_pricing_update.publish_pricing_update(work_dict)

Publish Ticker Data to S3 and Redis

  • prices - turn off with work_dict.get_pricing = False
  • news - turn off with work_dict.get_news = False
  • options - turn off with work_dict.get_options = False
Parameters:work_dict – dictionary for key/values

Publish Data from S3 to Redis Task

Publish S3 key with stock data to redis and s3 (if either of them are running and enabled)

  • redis - using redis-py
  • s3 - using boto3

Sample work_dict request for this method

analysis_engine.api_requests.build_publish_from_s3_to_redis_request

work_request = {
    'ticker': ticker,
    'ticker_id': ticker_id,
    's3_bucket': s3_bucket_name,
    's3_key': s3_key,
    'redis_key': redis_key,
    's3_enabled': s3_enabled,
    'redis_enabled': redis_enabled
}

Tip

This task uses the analysis_engine.work_tasks. custom_task.CustomTask class for task event handling.

Supported Environment Variables

export DEBUG_RESULTS=1
analysis_engine.work_tasks.publish_from_s3_to_redis.run_publish_from_s3_to_redis(work_dict)[source]

Celery wrapper for running without celery

Parameters:work_dict – task data
(task)analysis_engine.work_tasks.publish_from_s3_to_redis.publish_from_s3_to_redis(work_dict)

Publish Ticker Data from S3 to Redis

Parameters:work_dict – dictionary for key/values

Publish Aggregate Ticker Data from S3 Task

Publish S3 key with aggregated stock data to redis and s3 (if either of them are running and enabled)

  • redis - using redis-py
  • s3 - using boto3

Sample work_dict request for this method

analysis_engine.api_requests.build_publish_ticker_aggregate_from_s3 _request

work_request = {
    'ticker': ticker,
    'ticker_id': ticker_id,
    's3_bucket': s3_bucket_name,
    's3_compiled_bucket': s3_compiled_bucket_name,
    's3_key': s3_key,
    'redis_key': redis_key,
    's3_enabled': s3_enabled,
    'redis_enabled': redis_enabled
}

Tip

This task uses the analysis_engine.work_tasks. custom_task.CustomTask class for task event handling.

Supported Environment Variables

export DEBUG_RESULTS=1
analysis_engine.work_tasks.publish_ticker_aggregate_from_s3.run_publish_ticker_aggregate_from_s3(work_dict)[source]

Celery wrapper for running without celery

Parameters:work_dict – task data
(task)analysis_engine.work_tasks.publish_ticker_aggregate_from_s3.publish_ticker_aggregate_from_s3(work_dict)

Publish Aggregated Ticker Data from S3 to Redis

Parameters:work_dict – dictionary for key/values

Work in progress - screener-driven analysis task

Supported environment variables

export DEBUG_RESULTS=1
analysis_engine.work_tasks.task_screener_analysis.run_screener_analysis(work_dict)[source]

Celery wrapper for running without celery

Parameters:work_dict – task data
(task)analysis_engine.work_tasks.task_screener_analysis.task_screener_analysis(work_dict)
Parameters:work_dict – task dictionary

Prepare Pricing Dataset

Prepare dataset for analysis. This task collapses nested json dictionaries into a csv file with a header row and stores the output file in s3 and redis automatically.

  • if key not in redis, load the key by the same name from s3
  • prepare dataset from redis key
  • the dataset will be stored as a dictionary with a pandas dataframe

Sample work_dict request for this method

analysis_engine.api_requests.build_prepare_dataset_request

work_request = {
    'ticker': ticker,
    'ticker_id': ticker_id,
    's3_bucket': s3_bucket_name,
    's3_key': s3_key,
    'redis_key': redis_key,
    'prepared_s3_key': s3_prepared_key,
    'prepared_s3_bucket': s3_prepared_bucket_name,
    'prepared_redis_key': redis_prepared_key,
    'ignore_columns': ignore_columns,
    's3_enabled': s3_enabled,
    'redis_enabled': redis_enabled
}

Tip

This task uses the analysis_engine.work_tasks. custom_task.CustomTask class for task event handling.

Supported Environment Variables

export DEBUG_PREPARE=1
export DEBUG_RESULTS=1
analysis_engine.work_tasks.prepare_pricing_dataset.run_prepare_pricing_dataset(work_dict)[source]

Celery wrapper for running without celery

Parameters:work_dict – task data
(task)analysis_engine.work_tasks.prepare_pricing_dataset.prepare_pricing_dataset(work_dict)

Prepare dataset for analysis. Supports loading dataset from s3 if not found in redis. Outputs prepared artifact as a csv to s3 and redis.

Parameters:work_dict – dictionary for key/values

Custom Celery Task Handling

Define your own on_failure and on_success with the analysis_engine.work_tasks.custom_task.CustomTask custom class object.

Debug values with the environment variable:

export DEBUG_TASK=1
class analysis_engine.work_tasks.custom_task.CustomTask[source]
build_log_label_from_args(args)[source]
Parameters:args – list of celery args
on_failure(exc, task_id, args, kwargs, einfo)[source]

Handle custom actions when a task completes not successfully. As an example, if the task throws an exception, then this on_failure method can customize how to handle exceptional cases.

http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-inheritance

Parameters:
  • exc – exception
  • task_id – task id
  • args – arguments passed into task
  • kwargs – keyword arguments passed into task
  • einfo – exception info
on_success(retval, task_id, args, kwargs)[source]

Handle custom actions when a task completes successfully.

http://docs.celeryproject.org/en/latest/reference/celery.app.task.html

Parameters:
  • retval – return value
  • task_id – celery task id
  • args – arguments passed into task
  • kwargs – keyword arguments passed into task

Get a Celery Application Helper

analysis_engine.work_tasks.get_celery_app.get_celery_app(name='worker', auth_url='redis://localhost:6379/11', backend_url='redis://localhost:6379/12', include_tasks=[], ssl_options=None, transport_options=None, path_to_config_module='analysis_engine.work_tasks.celery_config', worker_log_format='%(asctime)s: %(levelname)s %(message)s', **kwargs)[source]

Build a Celery app with support for environment variables to set endpoints locations.

  • export WORKER_BROKER_URL=redis://localhost:6379/11
  • export WORKER_BACKEND_URL=redis://localhost:6379/12
  • export WORKER_CELERY_CONFIG_MODULE=analysis_engine.work_tasks.cel ery_config

Note

Jupyter notebooks need to use the WORKER_CELERY_CONFIG_MODULE=analysis_engine.work_tasks.celery service_config value which uses resolvable hostnames with docker compose:

  • export WORKER_BROKER_URL=redis://redis:6379/11
  • export WORKER_BACKEND_URL=redis://redis:6379/12
Parameters:
  • name – name for this app
  • auth_url – Celery broker address (default is redis://localhost:6379/11 or analysis_engine.consts.WORKER_BROKER_URL environment variable) this is required for distributing algorithms
  • backend_url – Celery backend address (default is redis://localhost:6379/12 or analysis_engine.consts.WORKER_BACKEND_URL environment variable) this is required for distributing algorithms
  • include_tasks – list of modules containing tasks to add
  • ssl_options – security options dictionary (default is analysis_engine.consts.SSL_OPTIONS)
  • trasport_options – transport options dictionary (default is analysis_engine.consts.TRANSPORT_OPTIONS)
  • path_to_config_module – config module for advanced Celery worker connectivity requirements (default is analysis_engine.work_tasks.celery_config or analysis_engine.consts.WORKER_CELERY_CONFIG_MODULE)
  • worker_log_format – format for logs