Source code for analysis_engine.scripts.publish_from_s3_to_redis

#!/usr/bin/env python

"""

Publish the contents of an S3 key to a
Redis key

Steps:
------

1) Parse arguments
2) Download pricing data as a Celery task
3) Publish pricing data as a Celery tasks
4) Coming Soon - Start buy/sell analysis as Celery task(s)

"""

import argparse
import analysis_engine.work_tasks.publish_from_s3_to_redis \
    as task_publisher
from analysis_engine.work_tasks.get_celery_app import get_celery_app
from celery import signals
from spylunking.log.setup_logging import build_colorized_logger
from analysis_engine.api_requests import build_publish_from_s3_to_redis_request
from analysis_engine.consts import LOG_CONFIG_PATH
from analysis_engine.consts import TICKER
from analysis_engine.consts import TICKER_ID
from analysis_engine.consts import WORKER_BROKER_URL
from analysis_engine.consts import WORKER_BACKEND_URL
from analysis_engine.consts import WORKER_CELERY_CONFIG_MODULE
from analysis_engine.consts import INCLUDE_TASKS
from analysis_engine.consts import SSL_OPTIONS
from analysis_engine.consts import TRANSPORT_OPTIONS
from analysis_engine.consts import S3_ACCESS_KEY
from analysis_engine.consts import S3_SECRET_KEY
from analysis_engine.consts import S3_REGION_NAME
from analysis_engine.consts import S3_ADDRESS
from analysis_engine.consts import S3_SECURE
from analysis_engine.consts import S3_BUCKET
from analysis_engine.consts import S3_KEY
from analysis_engine.consts import REDIS_ADDRESS
from analysis_engine.consts import REDIS_KEY
from analysis_engine.consts import REDIS_PASSWORD
from analysis_engine.consts import REDIS_DB
from analysis_engine.consts import REDIS_EXPIRE
from analysis_engine.consts import get_status
from analysis_engine.consts import ppj
from analysis_engine.consts import is_celery_disabled


# Disable celery log hijacking
# https://github.com/celery/celery/issues/2509
@signals.setup_logging.connect
def setup_celery_logging(**kwargs):
    pass


log = build_colorized_logger(
    name='pub-from-s3-to-redis',
    log_config_path=LOG_CONFIG_PATH)


[docs]def publish_from_s3_to_redis(): """publish_from_s3_to_redis Download an S3 key and publish it's contents to Redis """ log.info( 'start - publish_from_s3_to_redis') parser = argparse.ArgumentParser( description=( 'Download and store the latest stock pricing, ' 'news, and options chain data ' 'and store it in S3 and Redis. ' 'Once stored, this will also ' 'start the buy and sell trading analysis.')) parser.add_argument( '-t', help=( 'ticker'), required=True, dest='ticker') parser.add_argument( '-i', help=( 'optional - ticker id ' 'not used without a database'), required=False, dest='ticker_id') parser.add_argument( '-l', help=( 'optional - path to the log config file'), required=False, dest='log_config_path') parser.add_argument( '-b', help=( 'optional - broker url for Celery'), required=False, dest='broker_url') parser.add_argument( '-B', help=( 'optional - backend url for Celery'), required=False, dest='backend_url') parser.add_argument( '-k', help=( 'optional - s3 access key'), required=False, dest='s3_access_key') parser.add_argument( '-s', help=( 'optional - s3 secret key'), required=False, dest='s3_secret_key') parser.add_argument( '-a', help=( 'optional - s3 address format: <host:port>'), required=False, dest='s3_address') parser.add_argument( '-S', help=( 'optional - s3 ssl or not'), required=False, dest='s3_secure') parser.add_argument( '-u', help=( 'optional - s3 bucket name'), required=False, dest='s3_bucket_name') parser.add_argument( '-g', help=( 'optional - s3 region name'), required=False, dest='s3_region_name') parser.add_argument( '-p', help=( 'optional - redis_password'), required=False, dest='redis_password') parser.add_argument( '-r', help=( 'optional - redis_address format: <host:port>'), required=False, dest='redis_address') parser.add_argument( '-n', help=( 'optional - redis and s3 key name'), required=False, dest='keyname') parser.add_argument( '-m', help=( 'optional - redis database number (0 by default)'), required=False, dest='redis_db') parser.add_argument( '-x', help=( 'optional - redis expiration in seconds'), required=False, dest='redis_expire') parser.add_argument( '-d', help=( 'debug'), required=False, dest='debug', action='store_true') args = parser.parse_args() ticker = TICKER ticker_id = TICKER_ID ssl_options = SSL_OPTIONS transport_options = TRANSPORT_OPTIONS broker_url = WORKER_BROKER_URL backend_url = WORKER_BACKEND_URL celery_config_module = WORKER_CELERY_CONFIG_MODULE include_tasks = INCLUDE_TASKS s3_access_key = S3_ACCESS_KEY s3_secret_key = S3_SECRET_KEY s3_region_name = S3_REGION_NAME s3_address = S3_ADDRESS s3_secure = S3_SECURE s3_bucket_name = S3_BUCKET s3_key = S3_KEY redis_address = REDIS_ADDRESS redis_key = REDIS_KEY redis_password = REDIS_PASSWORD redis_db = REDIS_DB redis_expire = REDIS_EXPIRE debug = False if args.ticker: ticker = args.ticker.upper() if args.ticker_id: ticker = args.ticker_id if args.broker_url: broker_url = args.broker_url if args.backend_url: backend_url = args.backend_url if args.s3_access_key: s3_access_key = args.s3_access_key if args.s3_secret_key: s3_secret_key = args.s3_secret_key if args.s3_region_name: s3_region_name = args.s3_region_name if args.s3_address: s3_address = args.s3_address if args.s3_secure: s3_secure = args.s3_secure if args.s3_bucket_name: s3_bucket_name = args.s3_bucket_name if args.keyname: s3_key = args.keyname redis_key = args.keyname if args.redis_address: redis_address = args.redis_address if args.redis_password: redis_password = args.redis_password if args.redis_db: redis_db = args.redis_db if args.redis_expire: redis_expire = args.redis_expire if args.debug: debug = True work = build_publish_from_s3_to_redis_request() work['ticker'] = ticker work['ticker_id'] = ticker_id work['s3_bucket'] = s3_bucket_name work['s3_key'] = s3_key work['redis_key'] = redis_key work['s3_access_key'] = s3_access_key work['s3_secret_key'] = s3_secret_key work['s3_region_name'] = s3_region_name work['s3_address'] = s3_address work['s3_secure'] = s3_secure work['redis_address'] = redis_address work['redis_password'] = redis_password work['redis_db'] = redis_db work['redis_expire'] = redis_expire work['debug'] = debug work['label'] = f'ticker={ticker}' path_to_tasks = 'analysis_engine.work_tasks' task_name = ( f'{path_to_tasks}.publish_from_s3_to_redis.publish_from_s3_to_redis') task_res = None if is_celery_disabled(): work['celery_disabled'] = True log.debug(f'starting without celery work={ppj(work)}') task_res = task_publisher.publish_from_s3_to_redis( work_dict=work) if debug: log.info( f'done - result={ppj(task_res)} task={task_name} ' f'status={get_status(status=task_res["status"])} ' f'err={task_res["err"]} label={work["label"]}') else: log.info( f'done - result task={task_name} ' f'status={get_status(status=task_res["status"])} ' f'err={task_res["err"]} label={work["label"]}') # if/else debug else: log.info(f'connecting to broker={broker_url} backend={backend_url}') # Get the Celery app app = get_celery_app( name=__name__, auth_url=broker_url, backend_url=backend_url, path_to_config_module=celery_config_module, ssl_options=ssl_options, transport_options=transport_options, include_tasks=include_tasks) log.info(f'calling task={task_name} - work={ppj(work)}') job_id = app.send_task( task_name, (work,)) log.info(f'calling task={task_name} - success job_id={job_id}')
# end of if/else # end of publish_from_s3_to_redis if __name__ == '__main__': publish_from_s3_to_redis()