2023-09-15 19:37:47 +00:00

27 lines
839 B
Python

import logging, json, re
from helpers import aws_helper, tar_helper, common_helper
class Base:
def __init__(self, source, ingest_type='message'):
self.ingest_type = ingest_type
self.source = source
def service_attributes(self):
logging.info('Retrieving attributes from message...')
# get s3 object
self.key = aws_helper.get_key_from_message(self.source)
s3_object = aws_helper.get_object(
self.key, aws_helper.get_bucket_from_message(self.source))
# convert to tar
self.tar = tar_helper.raw_to_tar(s3_object)
# get attributes file and convert to dict
attributes = json.loads(
tar_helper.extract_file_from_tar(
self.tar, f'{camel_to_snake(self.ACTION)}_attributes.json').read())
return attributes