diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py index 893298a..38eb4fe 100644 --- a/app/helpers/aws_helper.py +++ b/app/helpers/aws_helper.py @@ -1,6 +1,7 @@ import boto3 -import json +import io +from typing import Union from lib.application_configs import ApplicationConfigs session = boto3.Session( @@ -10,33 +11,32 @@ session = boto3.Session( # ToDo: Figure out a much better way of doing this. # LocalStack wants endpoint_url, while prod doesnt :( if ApplicationConfigs.local_dev_env: - s3 = session.resource('s3', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url) + s3 = session.client('s3', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url) sqs = session.client('sqs', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url) else: - s3 = session.resource('s3', region_name=ApplicationConfigs.region_name) + s3 = session.client('s3', region_name=ApplicationConfigs.region_name) sqs = session.client('sqs', region_name=ApplicationConfigs.region_name) -def get_key_from_message(body): +def get_key_from_message(body: dict) -> str: return body['Records'][0]['s3']['object']['key'] - -def get_bucket_from_message(body): +def get_bucket_from_message(body: dict) -> str: return body['Records'][0]['s3']['bucket']['name'] +def get_object(key: str, bucket: str) -> bytes: + return s3.get_object( + Bucket=bucket, + Key=key, + )['Body'].read() -def get_object(key, bucket): - return s3.Object(bucket_name=bucket, key=key).get()['Body'].read() +def get_object_tag(key: str, bucket: str, tag_key: str) -> Union[str, None]: + tags = get_object_tags(key, bucket) + return next((tag for tag in tags if tag['Key'] == tag_key), None) + +def get_object_tags(key: str, bucket: str) -> list: + tags = s3.get_object_tagging(Bucket=bucket, Key=key)['TagSet'] + return tags -def file_stream_upload(buffer, name, bucket): - return s3.Bucket(bucket).upload_fileobj(buffer, name) - - -def receive_message(queue, message_num=1, wait_time=1): - return sqs.receive_message(QueueUrl=queue, - MaxNumberOfMessages=message_num, - WaitTimeSeconds=wait_time) - - -def delete_message(queue, receipt): - return sqs.delete_message(QueueUrl=queue, ReceiptHandle=receipt) +def file_stream_upload(buffer: io.BytesIO, name: str, bucket: str, action: str = None): + return s3.upload_fileobj(buffer, bucket, name, ExtraArgs={'Tagging': f'action={action}'}) diff --git a/app/main.py b/app/main.py index b2dfc00..ff2cc6f 100644 --- a/app/main.py +++ b/app/main.py @@ -1,7 +1,8 @@ import os, sys, logging from lib.application_configs import ApplicationConfigs -from services.loft_service import LoftService +from services.form_generation_service import FormGenerationService +from services.ability_estimation_service import AbilityEstimationService from helpers import aws_helper from daemonize import Daemonize @@ -13,19 +14,36 @@ logging.basicConfig(stream=sys.stdout, class ServiceListener(Consumer): + # add new services here + SERVICES = { + FormGenerationService.ACTION: FormGenerationService, + AbilityEstimationService.ACTION: AbilityEstimationService + } def handle_message(self, body, attributes, messages_attributes): # gather/manage/process data based on the particular needs logging.info('Incoming message: %s', body) - service = LoftService(body) - service.process() + # have to switch to either this or the AbilityEstimationService + # but depending on if we can add the tagset to the sqs message body + # we either extract that and then delegate to the appropriate service + # or get the action tag from the object through the aws helper function + key = aws_helper.get_key_from_message(body) + bucket = aws_helper.get_bucket_from_message(body) + action = aws_helper.get_object_tag(key, bucket, 'action')['Value'] - logging.info('Process complete for %s', service.file_name) + logging.info(f'Process starting for {action}') + if action in self.SERVICES: + service = self.SERVICES[action](body) + service.process() + + logging.info('Process complete for %s', service.file_name) + else: + logging.error(f'action of type {action} does not exist.') def main(): - logging.info('Starting Solver Service: Tokyo Drift (v1.3.1)...') + logging.info('Starting IRT Service: That Was Rash (v1.4.0)...') # ToDo: Figure out a much better way of doing this. # LocalStack wants 'endpoint_url', while prod doesnt :( diff --git a/app/models/advanced_options.py b/app/models/advanced_options.py index 849818d..c9b7213 100644 --- a/app/models/advanced_options.py +++ b/app/models/advanced_options.py @@ -3,10 +3,10 @@ from typing import List, Optional, Dict class AdvancedOptions(BaseModel): - linearity_check: Optional[bool] - show_progress: Optional[bool] - max_solution_time: Optional[int] - brand_bound_tolerance: Optional[float] - max_forms: Optional[int] - precision: Optional[float] - extra_param_range: Optional[List[Dict]] + linearity_check: Optional[bool] = None + show_progress: Optional[bool] = None + max_solution_time: Optional[int] = None + brand_bound_tolerance: Optional[float] = None + max_forms: Optional[int] = None + precision: Optional[float] = None + extra_param_range: Optional[List[Dict]] = None diff --git a/app/models/item.py b/app/models/item.py index f9f0a65..96ac372 100644 --- a/app/models/item.py +++ b/app/models/item.py @@ -8,9 +8,9 @@ from lib.irt.item_information_function import ItemInformationFunction class Item(BaseModel): id: int - position: Optional[int] - passage_id: Optional[int] - workflow_state: Optional[str] + position: Optional[int] = None + passage_id: Optional[int] = None + workflow_state: Optional[str] = None attributes: List[Attribute] b_param: float = 0.00 diff --git a/app/models/target.py b/app/models/target.py index f58f49a..1ad9fa4 100644 --- a/app/models/target.py +++ b/app/models/target.py @@ -4,7 +4,7 @@ from typing import Optional class Target(BaseModel): theta: float value: float - result: Optional[float] + result: Optional[float] = None drift: float = 0.0 @classmethod diff --git a/app/services/ability_estimation_service.py b/app/services/ability_estimation_service.py new file mode 100644 index 0000000..763c91c --- /dev/null +++ b/app/services/ability_estimation_service.py @@ -0,0 +1,9 @@ +import logging + +from services.base import Base + +class AbilityEstimationService(Base): + ACTION = 'abilityEstimation' + + def process(self): + logging.info('Ability Estimation Service to be implemented...') diff --git a/app/services/loft_service.py b/app/services/form_generation_service.py similarity index 98% rename from app/services/loft_service.py rename to app/services/form_generation_service.py index 0174158..d913200 100644 --- a/app/services/loft_service.py +++ b/app/services/form_generation_service.py @@ -1,4 +1,4 @@ -import os, json, random, io, logging +import json, random, io, logging from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, LpAffineExpression, LpConstraint, LpStatus, lpSum @@ -9,13 +9,12 @@ from lib.errors.item_generation_error import ItemGenerationError from models.solver_run import SolverRun from models.solution import Solution from models.form import Form -from models.item import Item from models.target import Target from services.base import Base - -class LoftService(Base): +class FormGenerationService(Base): + ACTION = 'formGeneration' def process(self): try: @@ -256,4 +255,4 @@ class LoftService(Base): # upload generated file to s3 and return result return aws_helper.file_stream_upload( solution_file, self.file_name, - ApplicationConfigs.s3_processed_bucket) + ApplicationConfigs.s3_processed_bucket, self.ACTION)