From 7e572115647b586c303a4a0ef9f17337194db22e Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Sun, 3 Sep 2023 14:59:53 +0000 Subject: [PATCH 01/13] updating to new pydantic requirements for values --- app/main.py | 2 +- app/models/advanced_options.py | 14 +++++++------- app/models/item.py | 6 +++--- app/models/target.py | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/app/main.py b/app/main.py index b2dfc00..beb2f17 100644 --- a/app/main.py +++ b/app/main.py @@ -25,7 +25,7 @@ class ServiceListener(Consumer): def main(): - logging.info('Starting Solver Service: Tokyo Drift (v1.3.1)...') + logging.info('Starting Solver Service: That Was Rash (v1.4.0)...') # ToDo: Figure out a much better way of doing this. # LocalStack wants 'endpoint_url', while prod doesnt :( diff --git a/app/models/advanced_options.py b/app/models/advanced_options.py index 849818d..c9b7213 100644 --- a/app/models/advanced_options.py +++ b/app/models/advanced_options.py @@ -3,10 +3,10 @@ from typing import List, Optional, Dict class AdvancedOptions(BaseModel): - linearity_check: Optional[bool] - show_progress: Optional[bool] - max_solution_time: Optional[int] - brand_bound_tolerance: Optional[float] - max_forms: Optional[int] - precision: Optional[float] - extra_param_range: Optional[List[Dict]] + linearity_check: Optional[bool] = None + show_progress: Optional[bool] = None + max_solution_time: Optional[int] = None + brand_bound_tolerance: Optional[float] = None + max_forms: Optional[int] = None + precision: Optional[float] = None + extra_param_range: Optional[List[Dict]] = None diff --git a/app/models/item.py b/app/models/item.py index f9f0a65..96ac372 100644 --- a/app/models/item.py +++ b/app/models/item.py @@ -8,9 +8,9 @@ from lib.irt.item_information_function import ItemInformationFunction class Item(BaseModel): id: int - position: Optional[int] - passage_id: Optional[int] - workflow_state: Optional[str] + position: Optional[int] = None + passage_id: Optional[int] = None + workflow_state: Optional[str] = None attributes: List[Attribute] b_param: float = 0.00 diff --git a/app/models/target.py b/app/models/target.py index f58f49a..1ad9fa4 100644 --- a/app/models/target.py +++ b/app/models/target.py @@ -4,7 +4,7 @@ from typing import Optional class Target(BaseModel): theta: float value: float - result: Optional[float] + result: Optional[float] = None drift: float = 0.0 @classmethod From 386ab255158e2a0384bd682465eaf163e9725a23 Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Sun, 3 Sep 2023 20:28:54 +0000 Subject: [PATCH 02/13] refactor to use s3 client instead of s3 resource, add tag gathering --- app/helpers/aws_helper.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py index 893298a..e86d30e 100644 --- a/app/helpers/aws_helper.py +++ b/app/helpers/aws_helper.py @@ -10,10 +10,10 @@ session = boto3.Session( # ToDo: Figure out a much better way of doing this. # LocalStack wants endpoint_url, while prod doesnt :( if ApplicationConfigs.local_dev_env: - s3 = session.resource('s3', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url) + s3 = session.client('s3', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url) sqs = session.client('sqs', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url) else: - s3 = session.resource('s3', region_name=ApplicationConfigs.region_name) + s3 = session.client('s3', region_name=ApplicationConfigs.region_name) sqs = session.client('sqs', region_name=ApplicationConfigs.region_name) def get_key_from_message(body): @@ -25,11 +25,29 @@ def get_bucket_from_message(body): def get_object(key, bucket): - return s3.Object(bucket_name=bucket, key=key).get()['Body'].read() + return s3.get_object( + Bucket=bucket, + Key=key, + )['Body'].read() + +def get_object_tag(key, bucket, tag_key): + tags = get_object_tags(key, bucket) + tag_index = 0 + + while tag_index < len(tags): + tag = tags[tag_index] + if tag['Key'] == tag_key: + return tag + else: + return None + +def get_object_tags(key, bucket): + tags = s3.get_object_tagging(Bucket=bucket, Key=key)['TagSet'] + return tags def file_stream_upload(buffer, name, bucket): - return s3.Bucket(bucket).upload_fileobj(buffer, name) + return s3.upload_fileobj(buffer, bucket, name) def receive_message(queue, message_num=1, wait_time=1): From d90f51faca2bfbe9cafe8da990a66fb0a86e676f Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Sun, 3 Sep 2023 21:38:46 +0000 Subject: [PATCH 03/13] add more typing, add tag to file upload, and rename form generation service --- app/helpers/aws_helper.py | 26 ++++++------------- app/main.py | 6 ++--- ..._service.py => form_generation_service.py} | 4 +-- 3 files changed, 13 insertions(+), 23 deletions(-) rename app/services/{loft_service.py => form_generation_service.py} (99%) diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py index e86d30e..ff7d757 100644 --- a/app/helpers/aws_helper.py +++ b/app/helpers/aws_helper.py @@ -1,5 +1,5 @@ import boto3 -import json +import io from lib.application_configs import ApplicationConfigs @@ -16,21 +16,21 @@ else: s3 = session.client('s3', region_name=ApplicationConfigs.region_name) sqs = session.client('sqs', region_name=ApplicationConfigs.region_name) -def get_key_from_message(body): +def get_key_from_message(body: dict): return body['Records'][0]['s3']['object']['key'] -def get_bucket_from_message(body): +def get_bucket_from_message(body: dict): return body['Records'][0]['s3']['bucket']['name'] -def get_object(key, bucket): +def get_object(key: str, bucket: str): return s3.get_object( Bucket=bucket, Key=key, )['Body'].read() -def get_object_tag(key, bucket, tag_key): +def get_object_tag(key: str, bucket: str, tag_key: str): tags = get_object_tags(key, bucket) tag_index = 0 @@ -41,20 +41,10 @@ def get_object_tag(key, bucket, tag_key): else: return None -def get_object_tags(key, bucket): +def get_object_tags(key: str, bucket: str): tags = s3.get_object_tagging(Bucket=bucket, Key=key)['TagSet'] return tags -def file_stream_upload(buffer, name, bucket): - return s3.upload_fileobj(buffer, bucket, name) - - -def receive_message(queue, message_num=1, wait_time=1): - return sqs.receive_message(QueueUrl=queue, - MaxNumberOfMessages=message_num, - WaitTimeSeconds=wait_time) - - -def delete_message(queue, receipt): - return sqs.delete_message(QueueUrl=queue, ReceiptHandle=receipt) +def file_stream_upload(buffer: io.BytesIO, name: str, bucket: str, action: str = None): + return s3.upload_fileobj(buffer, bucket, name, ExtraArgs={'Tagging': f'action={action}'}) diff --git a/app/main.py b/app/main.py index beb2f17..a1280c6 100644 --- a/app/main.py +++ b/app/main.py @@ -1,7 +1,7 @@ import os, sys, logging from lib.application_configs import ApplicationConfigs -from services.loft_service import LoftService +from services.form_generation_service import FormGenerationService from helpers import aws_helper from daemonize import Daemonize @@ -18,14 +18,14 @@ class ServiceListener(Consumer): # gather/manage/process data based on the particular needs logging.info('Incoming message: %s', body) - service = LoftService(body) + service = FormGenerationService(body) service.process() logging.info('Process complete for %s', service.file_name) def main(): - logging.info('Starting Solver Service: That Was Rash (v1.4.0)...') + logging.info('Starting IRT Service: That Was Rash (v1.4.0)...') # ToDo: Figure out a much better way of doing this. # LocalStack wants 'endpoint_url', while prod doesnt :( diff --git a/app/services/loft_service.py b/app/services/form_generation_service.py similarity index 99% rename from app/services/loft_service.py rename to app/services/form_generation_service.py index 0174158..92fd8ef 100644 --- a/app/services/loft_service.py +++ b/app/services/form_generation_service.py @@ -15,7 +15,7 @@ from models.target import Target from services.base import Base -class LoftService(Base): +class FormGenerationService(Base): def process(self): try: @@ -256,4 +256,4 @@ class LoftService(Base): # upload generated file to s3 and return result return aws_helper.file_stream_upload( solution_file, self.file_name, - ApplicationConfigs.s3_processed_bucket) + ApplicationConfigs.s3_processed_bucket, 'formGeneration') From 0e0504581775f9660c23f8056f9c12fde93118a0 Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Sun, 3 Sep 2023 21:40:35 +0000 Subject: [PATCH 04/13] add return types --- app/helpers/aws_helper.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py index ff7d757..3c5e029 100644 --- a/app/helpers/aws_helper.py +++ b/app/helpers/aws_helper.py @@ -16,21 +16,21 @@ else: s3 = session.client('s3', region_name=ApplicationConfigs.region_name) sqs = session.client('sqs', region_name=ApplicationConfigs.region_name) -def get_key_from_message(body: dict): +def get_key_from_message(body: dict) -> str: return body['Records'][0]['s3']['object']['key'] -def get_bucket_from_message(body: dict): +def get_bucket_from_message(body: dict) -> str: return body['Records'][0]['s3']['bucket']['name'] -def get_object(key: str, bucket: str): +def get_object(key: str, bucket: str) -> bytes: return s3.get_object( Bucket=bucket, Key=key, )['Body'].read() -def get_object_tag(key: str, bucket: str, tag_key: str): +def get_object_tag(key: str, bucket: str, tag_key: str) -> str | None: tags = get_object_tags(key, bucket) tag_index = 0 @@ -41,7 +41,7 @@ def get_object_tag(key: str, bucket: str, tag_key: str): else: return None -def get_object_tags(key: str, bucket: str): +def get_object_tags(key: str, bucket: str) -> list: tags = s3.get_object_tagging(Bucket=bucket, Key=key)['TagSet'] return tags From fc42f0c5c17031fadd4a447404c91a291cd10563 Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Sun, 3 Sep 2023 21:47:13 +0000 Subject: [PATCH 05/13] python < 3.10 need to use Union for multiple return types --- app/helpers/aws_helper.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py index 3c5e029..c8c31b4 100644 --- a/app/helpers/aws_helper.py +++ b/app/helpers/aws_helper.py @@ -1,6 +1,7 @@ import boto3 import io +from typing import Union from lib.application_configs import ApplicationConfigs session = boto3.Session( @@ -30,7 +31,7 @@ def get_object(key: str, bucket: str) -> bytes: Key=key, )['Body'].read() -def get_object_tag(key: str, bucket: str, tag_key: str) -> str | None: +def get_object_tag(key: str, bucket: str, tag_key: str) -> Union[str, None]: tags = get_object_tags(key, bucket) tag_index = 0 From 4ceeadbbb836d8e2257c5995192ed007181425c9 Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Sun, 3 Sep 2023 21:55:48 +0000 Subject: [PATCH 06/13] minor formatting fixes, add ability estimation service class --- app/main.py | 5 +++++ app/services/ability_estimation_service.py | 4 ++++ app/services/form_generation_service.py | 1 - 3 files changed, 9 insertions(+), 1 deletion(-) create mode 100644 app/services/ability_estimation_service.py diff --git a/app/main.py b/app/main.py index a1280c6..8ba0aaa 100644 --- a/app/main.py +++ b/app/main.py @@ -2,6 +2,7 @@ import os, sys, logging from lib.application_configs import ApplicationConfigs from services.form_generation_service import FormGenerationService +from services.ability_estimation_service import AbilityEstimationService from helpers import aws_helper from daemonize import Daemonize @@ -18,6 +19,10 @@ class ServiceListener(Consumer): # gather/manage/process data based on the particular needs logging.info('Incoming message: %s', body) + # have to swith to either this or the AbilityEstimationService + # but depending on if we can add the tagset to the sqs message body + # we either extract that and then delegate to the appropriate service + # or get the action tag from the object through the aws helper function service = FormGenerationService(body) service.process() diff --git a/app/services/ability_estimation_service.py b/app/services/ability_estimation_service.py new file mode 100644 index 0000000..5dafdac --- /dev/null +++ b/app/services/ability_estimation_service.py @@ -0,0 +1,4 @@ +from services.base import Base + +class AbilityEstimationService(Base): + pass diff --git a/app/services/form_generation_service.py b/app/services/form_generation_service.py index 92fd8ef..f62d6e5 100644 --- a/app/services/form_generation_service.py +++ b/app/services/form_generation_service.py @@ -14,7 +14,6 @@ from models.target import Target from services.base import Base - class FormGenerationService(Base): def process(self): From 8f182ef170d0164e3a82dd72e5eb3858ef47d427 Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Sun, 3 Sep 2023 21:56:32 +0000 Subject: [PATCH 07/13] type --- app/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/main.py b/app/main.py index 8ba0aaa..617d886 100644 --- a/app/main.py +++ b/app/main.py @@ -19,7 +19,7 @@ class ServiceListener(Consumer): # gather/manage/process data based on the particular needs logging.info('Incoming message: %s', body) - # have to swith to either this or the AbilityEstimationService + # have to switch to either this or the AbilityEstimationService # but depending on if we can add the tagset to the sqs message body # we either extract that and then delegate to the appropriate service # or get the action tag from the object through the aws helper function From 7e68606cc179862a200e9b2ce3c3db82e81b614e Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Mon, 4 Sep 2023 19:50:29 +0000 Subject: [PATCH 08/13] added methodology for delegating to service, fixed some formatting, cleaned up dangling imports --- app/helpers/aws_helper.py | 2 -- app/main.py | 16 +++++++++++++++- app/services/ability_estimation_service.py | 6 +++++- app/services/form_generation_service.py | 3 +-- 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py index c8c31b4..cfcf7f6 100644 --- a/app/helpers/aws_helper.py +++ b/app/helpers/aws_helper.py @@ -20,11 +20,9 @@ else: def get_key_from_message(body: dict) -> str: return body['Records'][0]['s3']['object']['key'] - def get_bucket_from_message(body: dict) -> str: return body['Records'][0]['s3']['bucket']['name'] - def get_object(key: str, bucket: str) -> bytes: return s3.get_object( Bucket=bucket, diff --git a/app/main.py b/app/main.py index 617d886..3374c2f 100644 --- a/app/main.py +++ b/app/main.py @@ -23,7 +23,21 @@ class ServiceListener(Consumer): # but depending on if we can add the tagset to the sqs message body # we either extract that and then delegate to the appropriate service # or get the action tag from the object through the aws helper function - service = FormGenerationService(body) + service = None + key = aws_helper.get_key_from_message(body) + bucket = aws_helper.get_bucket_from_message(body) + action = aws_helper.get_object_tag(key, bucket, 'action')['Value'] + + logging.info(f'Process starting for {action}') + + if action == 'formGeneration': + service = FormGenerationService(body) + elif action == 'abilityEstimation': + service = AbilityEstimationService(body) + else: + logging.error(f'action of type {action} does not exist.') + return + service.process() logging.info('Process complete for %s', service.file_name) diff --git a/app/services/ability_estimation_service.py b/app/services/ability_estimation_service.py index 5dafdac..daaa308 100644 --- a/app/services/ability_estimation_service.py +++ b/app/services/ability_estimation_service.py @@ -1,4 +1,8 @@ +import logging + from services.base import Base class AbilityEstimationService(Base): - pass + + def process(self): + logging.info('Ability Estimation Service to be implemented...') diff --git a/app/services/form_generation_service.py b/app/services/form_generation_service.py index f62d6e5..95ce8e8 100644 --- a/app/services/form_generation_service.py +++ b/app/services/form_generation_service.py @@ -1,4 +1,4 @@ -import os, json, random, io, logging +import json, random, io, logging from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, LpAffineExpression, LpConstraint, LpStatus, lpSum @@ -9,7 +9,6 @@ from lib.errors.item_generation_error import ItemGenerationError from models.solver_run import SolverRun from models.solution import Solution from models.form import Form -from models.item import Item from models.target import Target from services.base import Base From 55a0f7c254afeb48f64d5434aaf560368617ea71 Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Tue, 5 Sep 2023 18:25:51 +0000 Subject: [PATCH 09/13] make tagging more general --- app/services/ability_estimation_service.py | 1 + app/services/form_generation_service.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/app/services/ability_estimation_service.py b/app/services/ability_estimation_service.py index daaa308..763c91c 100644 --- a/app/services/ability_estimation_service.py +++ b/app/services/ability_estimation_service.py @@ -3,6 +3,7 @@ import logging from services.base import Base class AbilityEstimationService(Base): + ACTION = 'abilityEstimation' def process(self): logging.info('Ability Estimation Service to be implemented...') diff --git a/app/services/form_generation_service.py b/app/services/form_generation_service.py index 95ce8e8..d913200 100644 --- a/app/services/form_generation_service.py +++ b/app/services/form_generation_service.py @@ -14,6 +14,7 @@ from models.target import Target from services.base import Base class FormGenerationService(Base): + ACTION = 'formGeneration' def process(self): try: @@ -254,4 +255,4 @@ class FormGenerationService(Base): # upload generated file to s3 and return result return aws_helper.file_stream_upload( solution_file, self.file_name, - ApplicationConfigs.s3_processed_bucket, 'formGeneration') + ApplicationConfigs.s3_processed_bucket, self.ACTION) From dfeb65c900ab82580291822c443375b34294174a Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Wed, 6 Sep 2023 15:37:42 +0000 Subject: [PATCH 10/13] make more functional v2 --- app/main.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/app/main.py b/app/main.py index 3374c2f..f0f3580 100644 --- a/app/main.py +++ b/app/main.py @@ -14,6 +14,11 @@ logging.basicConfig(stream=sys.stdout, class ServiceListener(Consumer): + # add new services here + SERVICES = { + FormGenerationService.ACTION: FormGenerationService, + AbilityEstimationService.ACTION: AbilityEstimationService + } def handle_message(self, body, attributes, messages_attributes): # gather/manage/process data based on the particular needs @@ -23,25 +28,20 @@ class ServiceListener(Consumer): # but depending on if we can add the tagset to the sqs message body # we either extract that and then delegate to the appropriate service # or get the action tag from the object through the aws helper function - service = None + # service = None key = aws_helper.get_key_from_message(body) bucket = aws_helper.get_bucket_from_message(body) action = aws_helper.get_object_tag(key, bucket, 'action')['Value'] logging.info(f'Process starting for {action}') - if action == 'formGeneration': - service = FormGenerationService(body) - elif action == 'abilityEstimation': - service = AbilityEstimationService(body) + if action in self.SERVICES: + service = self.SERVICES[action](body) + service.process() + + logging.info('Process complete for %s', service.file_name) else: logging.error(f'action of type {action} does not exist.') - return - - service.process() - - logging.info('Process complete for %s', service.file_name) - def main(): logging.info('Starting IRT Service: That Was Rash (v1.4.0)...') From 91e1640697c8e45ecb43ab0ff105eab3a2619046 Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Fri, 8 Sep 2023 12:51:37 +0200 Subject: [PATCH 11/13] Update app/main.py Co-authored-by: Owen <84334068+owenhtr@users.noreply.github.com> --- app/main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/app/main.py b/app/main.py index f0f3580..ff2cc6f 100644 --- a/app/main.py +++ b/app/main.py @@ -28,7 +28,6 @@ class ServiceListener(Consumer): # but depending on if we can add the tagset to the sqs message body # we either extract that and then delegate to the appropriate service # or get the action tag from the object through the aws helper function - # service = None key = aws_helper.get_key_from_message(body) bucket = aws_helper.get_bucket_from_message(body) action = aws_helper.get_object_tag(key, bucket, 'action')['Value'] From 2ae15f923756575ab93bd3c11bb8a52dedf7d92a Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Fri, 8 Sep 2023 15:51:42 +0000 Subject: [PATCH 12/13] enumerate --- app/helpers/aws_helper.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py index cfcf7f6..cccda41 100644 --- a/app/helpers/aws_helper.py +++ b/app/helpers/aws_helper.py @@ -31,10 +31,9 @@ def get_object(key: str, bucket: str) -> bytes: def get_object_tag(key: str, bucket: str, tag_key: str) -> Union[str, None]: tags = get_object_tags(key, bucket) - tag_index = 0 - while tag_index < len(tags): - tag = tags[tag_index] + for i, tag in enumerate(tags): + tag = tags[i] if tag['Key'] == tag_key: return tag else: From 4788012ce6a5d411c04e33df0f8c6eefd3417cc7 Mon Sep 17 00:00:00 2001 From: brmnjsh Date: Mon, 11 Sep 2023 13:23:04 +0000 Subject: [PATCH 13/13] make more pythonic --- app/helpers/aws_helper.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py index cccda41..38eb4fe 100644 --- a/app/helpers/aws_helper.py +++ b/app/helpers/aws_helper.py @@ -31,13 +31,7 @@ def get_object(key: str, bucket: str) -> bytes: def get_object_tag(key: str, bucket: str, tag_key: str) -> Union[str, None]: tags = get_object_tags(key, bucket) - - for i, tag in enumerate(tags): - tag = tags[i] - if tag['Key'] == tag_key: - return tag - else: - return None + return next((tag for tag in tags if tag['Key'] == tag_key), None) def get_object_tags(key: str, bucket: str) -> list: tags = s3.get_object_tagging(Bucket=bucket, Key=key)['TagSet']