Source code for analysis_engine.work_tasks.get_celery_app

"""
Get a Celery Application Helper
"""

import os
import celery
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)


[docs]def get_celery_app( name=os.getenv( 'APP_NAME', 'worker'), auth_url=os.getenv( 'WORKER_BROKER_URL', 'redis://localhost:6379/11'), backend_url=os.getenv( 'WORKER_BACKEND_URL', 'redis://localhost:6379/12'), include_tasks=[], ssl_options=None, transport_options=None, path_to_config_module=os.getenv( 'WORKER_CELERY_CONFIG_MODULE', 'analysis_engine.work_tasks.celery_config'), worker_log_format=os.getenv( 'WORKER_LOG_FORMAT', '%(asctime)s: %(levelname)s %(message)s'), **kwargs): """get_celery_app Build a Celery app with support for environment variables to set endpoints locations. - export WORKER_BROKER_URL=redis://localhost:6379/11 - export WORKER_BACKEND_URL=redis://localhost:6379/12 - export WORKER_CELERY_CONFIG_MODULE=analysis_engine.work_tasks.cel ery_config .. note:: Jupyter notebooks need to use the ``WORKER_CELERY_CONFIG_MODULE=analysis_engine.work_tasks.celery service_config`` value which uses resolvable hostnames with docker compose: - export WORKER_BROKER_URL=redis://redis:6379/11 - export WORKER_BACKEND_URL=redis://redis:6379/12 :param name: name for this app :param auth_url: Celery broker address (default is ``redis://localhost:6379/11`` or ``analysis_engine.consts.WORKER_BROKER_URL`` environment variable) this is required for distributing algorithms :param backend_url: Celery backend address (default is ``redis://localhost:6379/12`` or ``analysis_engine.consts.WORKER_BACKEND_URL`` environment variable) this is required for distributing algorithms :param include_tasks: list of modules containing tasks to add :param ssl_options: security options dictionary (default is ``analysis_engine.consts.SSL_OPTIONS``) :param trasport_options: transport options dictionary (default is ``analysis_engine.consts.TRANSPORT_OPTIONS``) :param path_to_config_module: config module for advanced Celery worker connectivity requirements (default is ``analysis_engine.work_tasks.celery_config`` or ``analysis_engine.consts.WORKER_CELERY_CONFIG_MODULE``) :param worker_log_format: format for logs """ if len(include_tasks) == 0: log.error(f'creating celery app={name} MISSING tasks={include_tasks}') else: log.info(f'creating celery app={name} tasks={include_tasks}') # get the Celery application app = celery.Celery( name, broker_url=auth_url, result_backend=backend_url, include=include_tasks) app.config_from_object( path_to_config_module, namespace='CELERY') app.conf.update(kwargs) if transport_options: log.info(f'loading transport_options={transport_options}') app.conf.update(**transport_options) # custom tranport options if ssl_options: log.info(f'loading ssl_options={ssl_options}') app.conf.update(**ssl_options) # custom ssl options if len(include_tasks) > 0: app.autodiscover_tasks(include_tasks) return app
# end of get_celery_app