Source code for analysis_engine.prepare_dict_for_algo

"""
Helper for converting a dictionary to an algorithm-ready
dataset
"""

import json
import zlib
import pandas as pd
import analysis_engine.consts as ae_consts
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)


[docs]def prepare_dict_for_algo( data, compress=False, encoding='utf-8', convert_to_dict=False, dataset_names=None): """prepare_dict_for_algo :param data: string holding contents of an algorithm-ready file, s3 key or redis-key :param compress: optional - boolean flag for decompressing the contents of the ``data`` if necessary (default is ``False`` and algorithms use ``zlib`` for compression) :param convert_to_dict: optional - bool for s3 use ``False`` and for files use ``True`` :param encoding: optional - string for data encoding :param dataset_names: optional - list of string keys for each dataset node in: ``dataset[ticker][0]['data'][dataset_names[0]]`` """ log.debug('start') use_data = None parsed_data = None data_as_dict = None if compress: log.debug('decompressing') parsed_data = zlib.decompress( data).decode( encoding) else: parsed_data = data if not parsed_data: log.error('failed parsing') return None log.debug('loading as dict') use_data = {} if convert_to_dict: data_as_dict = json.loads(parsed_data) else: data_as_dict = parsed_data if len(data_as_dict) == 0: log.error( 'empty algorithm-ready dictionary') return use_data empty_pd = pd.DataFrame([{}]) use_serialized_datasets = dataset_names if not use_serialized_datasets: use_serialized_datasets = ae_consts.DEFAULT_SERIALIZED_DATASETS log.info(f'converting serialized_datasets={use_serialized_datasets}') num_datasets = 0 for ticker in data_as_dict: if ticker not in use_data: use_data[ticker] = [] for node in data_as_dict[ticker]: new_node = { 'id': node['id'], 'date': node['date'], 'data': {} } for ds_key in node['data']: if ds_key in use_serialized_datasets: new_node['data'][ds_key] = empty_pd if node['data'][ds_key]: new_node['data'][ds_key] = pd.read_json( node['data'][ds_key], orient='records') num_datasets += 1 # if supported dataset key # end for all datasets in this node use_data[ticker].append(new_node) # end for all datasets on this date to load # end for all tickers in the dataset if num_datasets: log.info(f'found datasets={num_datasets}') else: log.error(f'did not find any datasets={num_datasets}') return use_data
# end of prepare_dict_for_algo