2023-09-15 16:51:54 +00:00

28 lines
885 B
Python

import logging, json, re
from helpers import aws_helper, tar_helper
class Base:
def __init__(self, source, ingest_type='message'):
self.ingest_type = ingest_type
self.source = source
def service_attributes(self):
logging.info('Retrieving attributes from message...')
# get s3 object
self.key = aws_helper.get_key_from_message(self.source)
s3_object = aws_helper.get_object(
self.key, aws_helper.get_bucket_from_message(self.source))
# convert to tar
self.tar = tar_helper.raw_to_tar(s3_object)
# get attributes file and convert to dict
action_snake = re.sub(r'(?<!^)(?=[A-Z])', '_', self.ACTION).lower()
attributes = json.loads(
tar_helper.extract_file_from_tar(
self.tar, f'{action_snake}_attributes.json').read())
return attributes