get ability estimation from source

This commit is contained in:
brmnjsh
2023-09-15 14:29:55 +00:00
parent f22d1c4cdb
commit 7a14856775
6 changed files with 78 additions and 4 deletions

View File

@ -1,9 +1,12 @@
import logging
from services.base import Base
from models.ability_estimation import AbilityEstimation
class AbilityEstimationService(Base):
ACTION = 'abilityEstimation'
def process(self):
logging.info('Ability Estimation Service to be implemented...')
attributes = self.service_attributes()
ability_estimation = AbilityEstimation.parse_obj(attributes)
results = ability_estimation.calculate()
return None

View File

@ -1,5 +1,25 @@
import logging, json, re
from helpers import aws_helper, tar_helper
class Base:
def __init__(self, source, ingest_type='message'):
self.ingest_type = ingest_type
self.source = source
def service_attributes(self):
logging.info('Retrieving attributes from message...')
# get s3 object
self.key = aws_helper.get_key_from_message(self.source)
s3_object = aws_helper.get_object(
self.key, aws_helper.get_bucket_from_message(self.source))
# convert to tar
self.tar = tar_helper.raw_to_tar(s3_object)
# get attributes file and convert to dict
action_snake = re.sub(r'(?<!^)(?=[A-Z])', '_', self.ACTION).lower()
attributes = json.loads(
tar_helper.extract_file_from_tar(
self.tar, f'{action_snake}_attributes.json').read())