Source code for analysis_engine.mocks.mock_boto3_s3

"""
Mock boto3 s3 objects
"""

import os
import json
import analysis_engine.consts as ae_consts
import spylunking.log.setup_logging as log_utils

log = log_utils.build_colorized_logger(name=__name__)


[docs]def mock_s3_read_contents_from_key_ev( s3, s3_bucket_name, s3_key, encoding, convert_as_json): """mock_s3_read_contents_from_key :param s3: s3 client :param s3_bucket_name: bucket name :param s3_key: key :param encoding: utf-8 :param convert_as_json: convert to json """ env_key = 'TEST_S3_CONTENTS' str_contents = ae_consts.ev( env_key, None) log.info( f'returning mock s3={s3_bucket_name}:{s3_key} contents={str_contents} ' f'encoding={encoding} json={convert_as_json} env_key={env_key}') if not str_contents: return str_contents if convert_as_json: return json.loads(str_contents)
# end of mock_s3_read_contents_from_key_ev
[docs]def mock_publish_from_s3_to_redis( work_dict): """mock_publish_from_s3_to_redis :param work_dict: dictionary for driving the task """ env_key = 'TEST_S3_CONTENTS' redis_key = work_dict.get( 'redis_key', env_key) str_dict = ae_consts.ev( env_key, None) log.info( 'mock_publish_from_s3_to_redis - ' f'setting key={redis_key} value={str_dict}') data = None if str_dict: os.environ[redis_key] = str_dict data = str_dict.encode('utf-8') else: os.environ[redis_key] = '' data = None status = ae_consts.SUCCESS err = None return { 'status': status, 'err': err, 'rec': { 'data': data } }
# end of mock_publish_from_s3_to_redis
[docs]def mock_publish_from_s3_to_redis_err( work_dict): """mock_publish_from_s3_to_redis_err :param work_dict: dictionary for driving the task """ env_key = 'TEST_S3_CONTENTS' redis_key = work_dict.get( 'redis_key', env_key) str_dict = ae_consts.ev( env_key, None) log.info( 'mock_publish_from_s3_to_redis_err - ' f'setting key={redis_key} value={str_dict}') data = None if str_dict: os.environ[redis_key] = str_dict data = str_dict.encode('utf-8') else: os.environ[redis_key] = '' data = None status = ae_consts.ERR err = None return { 'status': status, 'err': err, 'rec': { 'data': data } }
# end of mock_publish_from_s3_to_redis_err
[docs]def mock_publish_from_s3_exception( work_dict): """mock_publish_from_s3_exception :param work_dict: dictionary for driving the task """ raise Exception( 'test mock_publish_from_s3_exception')
# end of mock_publish_from_s3_exception
[docs]class MockBotoS3Bucket: """MockBotoS3Bucket""" def __init__( self, name): """__init__ build mock bucket :param name: name of the bucket """ self.name = name self.datas = [] # payloads uploaded to s3 self.keys = [] # keys uploaded to s3 # end of __init__
[docs] def put_object( self, Key=None, Body=None): """put_object :param Key: new Key name :param Body: new Payload in Key """ log.debug( f'mock - MockBotoS3Bucket.put_object(Key={Key}, ' f'Body={Body})') self.keys.append(Key) self.datas.append(Body)
# end of put_object # end of MockBotoS3Bucket
[docs]class MockBotoS3AllBuckets: """MockBotoS3AllBuckets""" def __init__( self): """__init__""" self.buckets = {} # end of __init__
[docs] def add( self, bucket_name): """add :param bucket_name: bucket name to add """ if bucket_name not in self.buckets: log.info( f'adding bucket={bucket_name} total={len(self.buckets) + 1}') self.buckets[bucket_name] = MockBotoS3Bucket( name=bucket_name) return self.buckets[bucket_name]
# end of add
[docs] def all( self): """all""" return self.buckets
# end of all # end of MockBotoS3AllBuckets
[docs]class MockBotoS3: """MockBotoS3""" def __init__( self, name='mock_s3', endpoint_url=None, aws_access_key_id=None, aws_secret_access_key=None, region_name=None, config=None): """__init__ build mock object :param name: name of client :param endpoint_url: endpoint url :param aws_access_key_id: aws access key :param aws_secret_access_key: aws secret key :param region_name: region name :param config: config object """ self.name = name self.endpoint_url = endpoint_url self.aws_access_key_id = aws_access_key_id self.aws_secret_access_key = aws_secret_access_key self.region_name = region_name self.config = config self.buckets = MockBotoS3AllBuckets() self.keys = [] # end of __init__
[docs] def Bucket( self, name): """Bucket :param name: name of new bucket """ log.info( f'MockBotoS3.Bucket({name})') return self.buckets.add( bucket_name=name)
# end of Bucket
[docs] def create_bucket( self, Bucket=None): """create_bucket :param bucket_name: name of the new bucket """ log.info( f'mock - MockBotoS3.create_bucket(Bucket={Bucket})') return self.buckets.add( bucket_name=Bucket)
# end of create_bucket # end of MockBotoS3
[docs]def build_boto3_resource( name='mock_s3', endpoint_url=None, aws_access_key_id=None, aws_secret_access_key=None, region_name=None, config=None): """build_boto3_resource :param name: name of client :param endpoint_url: endpoint url :param aws_access_key_id: aws access key :param aws_secret_access_key: aws secret key :param region_name: region name :param config: config object """ if 's3' in name.lower(): return MockBotoS3( name='mock_s3', endpoint_url=endpoint_url, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name, config=config) else: return MockBotoS3( name='mock_s3', endpoint_url=endpoint_url, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name, config=config)
# end of build_boto3_resource