"""
Restore an algorithm dataset from file, s3 or redis to redis
for ensuring all datasets are ready for Algorithmic backtesting
Supported Datasets:
- ``SA_DATASET_TYPE_ALGO_READY`` - Algorithm-ready datasets
"""
import analysis_engine.consts as ae_consts
import analysis_engine.load_dataset as load_dataset
import analysis_engine.show_dataset as show_dataset
import analysis_engine.get_data_from_redis_key as redis_utils
import analysis_engine.publish as publish
import spylunking.log.setup_logging as log_utils
log = log_utils.build_colorized_logger(name=__name__)
[docs]def restore_dataset(
show_summary=True,
force_restore=False,
algo_dataset=None,
dataset_type=ae_consts.SA_DATASET_TYPE_ALGO_READY,
serialize_datasets=ae_consts.DEFAULT_SERIALIZED_DATASETS,
path_to_file=None,
compress=False,
encoding='utf-8',
redis_enabled=True,
redis_key=None,
redis_address=None,
redis_db=None,
redis_password=None,
redis_expire=None,
redis_serializer='json',
redis_encoding='utf-8',
redis_output_db=None,
s3_enabled=True,
s3_key=None,
s3_address=None,
s3_bucket=None,
s3_access_key=None,
s3_secret_key=None,
s3_region_name=None,
s3_secure=False,
slack_enabled=False,
slack_code_block=False,
slack_full_width=False,
datasets_compressed=True,
verbose=False):
"""restore_dataset
Restore missing dataset nodes in redis from an algorithm-ready
dataset file on disk. Use this to restore redis from scratch.
:param show_summary: optional - show a summary of the algorithm-ready
dataset using ``analysis_engine.show_dataset.show_dataset``
(default is ``True``)
:param force_restore: optional - boolean - publish whatever is in
the algorithm-ready dataset into redis. If ``False`` this will
ensure that datasets are only set in redis if they are not already
set
:param algo_dataset: optional - already loaded algorithm-ready dataset
:param dataset_type: optional - dataset type
(default is ``SA_DATASET_TYPE_ALGO_READY``)
:param serialize_datasets: optional - list of dataset names to
deserialize in the dataset
:param path_to_file: optional - path to an algorithm-ready dataset
in a file
:param compress: optional - boolean flag for decompressing
the contents of the ``path_to_file`` if necessary
(default is ``False`` and algorithms
use ``zlib`` for compression)
:param encoding: optional - string for data encoding
**(Optional) Redis connectivity arguments**
:param redis_enabled: bool - toggle for auto-caching all
datasets in Redis
(default is ``True``)
:param redis_key: string - key to save the data in redis
(default is ``None``)
:param redis_address: Redis connection string format: ``host:port``
(default is ``localhost:6379``)
:param redis_db: Redis db to use
(default is ``0``)
:param redis_password: optional - Redis password
(default is ``None``)
:param redis_expire: optional - Redis expire value
(default is ``None``)
:param redis_serializer: not used yet - support for future
pickle objects in redis
:param redis_encoding: format of the encoded key in redis
:param redis_output_db: optional - integer publish to a separate
redis database
**(Optional) Minio (S3) connectivity arguments**
:param s3_enabled: bool - toggle for auto-archiving on Minio (S3)
(default is ``True``)
:param s3_key: string - key to save the data in redis
(default is ``None``)
:param s3_address: Minio S3 connection string format: ``host:port``
(default is ``localhost:9000``)
:param s3_bucket: S3 Bucket for storing the artifacts
(default is ``dev``) which should be viewable on a browser:
http://localhost:9000/minio/dev/
:param s3_access_key: S3 Access key
(default is ``trexaccesskey``)
:param s3_secret_key: S3 Secret key
(default is ``trex123321``)
:param s3_region_name: S3 region name
(default is ``us-east-1``)
:param s3_secure: Transmit using tls encryption
(default is ``False``)
**(Optional) Slack arguments**
:param slack_enabled: optional - boolean for
publishing to slack
:param slack_code_block: optional - boolean for
publishing as a code black in slack
:param slack_full_width: optional - boolean for
publishing as a to slack using the full
width allowed
Additonal arguments
:param datasets_compressed: optional - boolean for
publishing as compressed strings
default is ``True``
:param verbose: optional - bool for increasing
logging
"""
use_ds = algo_dataset
redis_host = ae_consts.REDIS_ADDRESS.split(':')[0]
redis_port = int(ae_consts.REDIS_ADDRESS.split(':')[1])
if redis_address:
redis_host = redis_address.split(':')[0]
redis_port = int(redis_address.split(':')[1])
if show_summary:
use_ds = show_dataset.show_dataset(
dataset_type=dataset_type,
compress=compress,
encoding=redis_encoding,
path_to_file=path_to_file,
s3_key=s3_key,
s3_address=s3_address,
s3_bucket=s3_bucket,
s3_access_key=s3_access_key,
s3_secret_key=s3_secret_key,
s3_region_name=s3_region_name,
s3_secure=s3_secure,
redis_key=redis_key,
redis_address=redis_address,
redis_db=redis_db,
redis_password=redis_password,
redis_expire=redis_expire,
redis_serializer=redis_serializer,
serialize_datasets=serialize_datasets)
# end of if show_summary
if not use_ds:
log.info(
f'loading from file={path_to_file} s3={s3_key} redis={redis_key}')
use_ds = load_dataset.load_dataset(
dataset_type=dataset_type,
compress=compress,
encoding=redis_encoding,
path_to_file=path_to_file,
s3_key=s3_key,
s3_address=s3_address,
s3_bucket=s3_bucket,
s3_access_key=s3_access_key,
s3_secret_key=s3_secret_key,
s3_region_name=s3_region_name,
s3_secure=s3_secure,
redis_key=redis_key,
redis_address=redis_address,
redis_db=redis_db,
redis_password=redis_password,
redis_expire=redis_expire,
redis_serializer=redis_serializer,
serialize_datasets=serialize_datasets)
# load if not loaded
if not use_ds:
log.error(
f'unable to load a dataset from file={path_to_file} '
f's3={s3_key} redis={redis_key}')
return None
log.info('restore - start')
total_to_restore = 0
for ticker in use_ds:
for ds_node in use_ds[ticker]:
for ds_key in ds_node['data']:
if ds_key in serialize_datasets:
total_to_restore += 1
# end of counting total_to_restore
log.info(f'restore - records={total_to_restore}')
num_done = 0
for ticker in use_ds:
for ds_node in use_ds[ticker]:
ds_parent_key = ds_node['id']
log.info(
f'restore - parent_key={ds_parent_key} - '
f'''{ae_consts.get_percent_done(
progress=num_done,
total=total_to_restore)} {num_done}/{total_to_restore}''')
if verbose:
print(ds_parent_key)
cache_res = redis_utils.get_data_from_redis_key(
host=redis_host,
port=redis_port,
password=redis_password,
db=redis_db,
key=ds_parent_key,
decompress_df=datasets_compressed,
serializer=redis_serializer,
encoding=redis_encoding,
expire=redis_expire,
label=f'restore-{ds_parent_key}')
should_restore = False
if (not force_restore and
cache_res['status'] == ae_consts.SUCCESS and
'data' in cache_res['rec'] and
cache_res['rec']['data'] and
len(cache_res['rec']['data']) > 10):
should_restore = False
else:
should_restore = True
if should_restore:
log.info(
f' - parent {ds_parent_key} restore')
new_parent_rec = {
'exp_date': None,
'publish_pricing_update': None,
'date': ds_node['date'],
'updated': None,
'version': ae_consts.DATASET_COLLECTION_VERSION
}
for sname in serialize_datasets:
if sname in ds_node['data']:
if hasattr(
ds_node['data'][sname],
'index'):
new_parent_rec[sname] = \
ds_node['data'][sname].to_json(
orient='records',
date_format='iso')
else:
new_parent_rec[sname] = \
ds_node['data'][sname]
publish.publish(
data=new_parent_rec,
redis_enabled=True,
redis_key=ds_parent_key,
redis_db=redis_output_db,
redis_address=redis_address,
redis_password=redis_password,
redis_expire=redis_expire,
redis_serializer=redis_serializer,
redis_encoding=redis_encoding,
s3_enabled=False,
output_file=None,
df_compress=datasets_compressed,
verbose=verbose)
for ds_key in ds_node['data']:
if ds_key in serialize_datasets:
new_key = f'{ds_parent_key}_{ds_key}'
if hasattr(
ds_node['data'][ds_key],
'index'):
loaded_df = ds_node['data'][ds_key]
if len(loaded_df.index) > 0:
if verbose:
print(f' - checking: {new_key}')
cache_res = redis_utils.get_data_from_redis_key(
host=redis_host,
port=redis_port,
password=redis_password,
db=redis_db,
key=new_key,
decompress_df=datasets_compressed,
serializer=redis_serializer,
encoding=redis_encoding,
expire=redis_expire,
label=f'restore-{new_key}')
should_restore = False
success_status = (
cache_res['status'] == ae_consts.SUCCESS)
if (not force_restore and
success_status and
'data' in cache_res['rec'] and
cache_res['rec']['data'] and
len(cache_res['rec']['data']) > 10):
should_restore = False
else:
if (str(cache_res['rec']['data']) !=
ae_consts.EMPTY_DF_STR):
should_restore = True
if should_restore:
log.info(
'restore nested dataset: '
f'{ds_parent_key} to: {new_key}')
publish.publish(
data=loaded_df,
redis_enabled=True,
redis_key=new_key,
redis_db=redis_output_db,
redis_address=redis_address,
redis_password=redis_password,
redis_expire=redis_expire,
redis_serializer=redis_serializer,
redis_encoding=redis_encoding,
s3_enabled=False,
output_file=None,
df_compress=datasets_compressed,
verbose=verbose)
else:
if verbose:
print(f' - checking: {new_key} - SKIP')
if verbose:
print(f' - {new_key} - no data to sync')
# end of is a dataframe
# else:
# end of handling dataframe vs dictionary
num_done += 1
# end of for all datasets
print('-----------------------------------')
# end for all dataset to restore
log.info(f'restore - done - num_done={num_done} total={total_to_restore}')
return use_ds
# end of restore_dataset