Merge pull request #35 from yardstick/feature/QUANT-2992

This commit is contained in:
brmnjsh 2023-09-15 14:37:23 -04:00 committed by GitHub
commit e218ce4053
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 67 additions and 41 deletions

View File

@ -1,6 +1,7 @@
import boto3 import boto3
import json import io
from typing import Union
from lib.application_configs import ApplicationConfigs from lib.application_configs import ApplicationConfigs
session = boto3.Session( session = boto3.Session(
@ -10,33 +11,32 @@ session = boto3.Session(
# ToDo: Figure out a much better way of doing this. # ToDo: Figure out a much better way of doing this.
# LocalStack wants endpoint_url, while prod doesnt :( # LocalStack wants endpoint_url, while prod doesnt :(
if ApplicationConfigs.local_dev_env: if ApplicationConfigs.local_dev_env:
s3 = session.resource('s3', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url) s3 = session.client('s3', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url)
sqs = session.client('sqs', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url) sqs = session.client('sqs', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url)
else: else:
s3 = session.resource('s3', region_name=ApplicationConfigs.region_name) s3 = session.client('s3', region_name=ApplicationConfigs.region_name)
sqs = session.client('sqs', region_name=ApplicationConfigs.region_name) sqs = session.client('sqs', region_name=ApplicationConfigs.region_name)
def get_key_from_message(body): def get_key_from_message(body: dict) -> str:
return body['Records'][0]['s3']['object']['key'] return body['Records'][0]['s3']['object']['key']
def get_bucket_from_message(body: dict) -> str:
def get_bucket_from_message(body):
return body['Records'][0]['s3']['bucket']['name'] return body['Records'][0]['s3']['bucket']['name']
def get_object(key: str, bucket: str) -> bytes:
return s3.get_object(
Bucket=bucket,
Key=key,
)['Body'].read()
def get_object(key, bucket): def get_object_tag(key: str, bucket: str, tag_key: str) -> Union[str, None]:
return s3.Object(bucket_name=bucket, key=key).get()['Body'].read() tags = get_object_tags(key, bucket)
return next((tag for tag in tags if tag['Key'] == tag_key), None)
def get_object_tags(key: str, bucket: str) -> list:
tags = s3.get_object_tagging(Bucket=bucket, Key=key)['TagSet']
return tags
def file_stream_upload(buffer, name, bucket): def file_stream_upload(buffer: io.BytesIO, name: str, bucket: str, action: str = None):
return s3.Bucket(bucket).upload_fileobj(buffer, name) return s3.upload_fileobj(buffer, bucket, name, ExtraArgs={'Tagging': f'action={action}'})
def receive_message(queue, message_num=1, wait_time=1):
return sqs.receive_message(QueueUrl=queue,
MaxNumberOfMessages=message_num,
WaitTimeSeconds=wait_time)
def delete_message(queue, receipt):
return sqs.delete_message(QueueUrl=queue, ReceiptHandle=receipt)

View File

@ -1,7 +1,8 @@
import os, sys, logging import os, sys, logging
from lib.application_configs import ApplicationConfigs from lib.application_configs import ApplicationConfigs
from services.loft_service import LoftService from services.form_generation_service import FormGenerationService
from services.ability_estimation_service import AbilityEstimationService
from helpers import aws_helper from helpers import aws_helper
from daemonize import Daemonize from daemonize import Daemonize
@ -13,19 +14,36 @@ logging.basicConfig(stream=sys.stdout,
class ServiceListener(Consumer): class ServiceListener(Consumer):
# add new services here
SERVICES = {
FormGenerationService.ACTION: FormGenerationService,
AbilityEstimationService.ACTION: AbilityEstimationService
}
def handle_message(self, body, attributes, messages_attributes): def handle_message(self, body, attributes, messages_attributes):
# gather/manage/process data based on the particular needs # gather/manage/process data based on the particular needs
logging.info('Incoming message: %s', body) logging.info('Incoming message: %s', body)
service = LoftService(body) # have to switch to either this or the AbilityEstimationService
# but depending on if we can add the tagset to the sqs message body
# we either extract that and then delegate to the appropriate service
# or get the action tag from the object through the aws helper function
key = aws_helper.get_key_from_message(body)
bucket = aws_helper.get_bucket_from_message(body)
action = aws_helper.get_object_tag(key, bucket, 'action')['Value']
logging.info(f'Process starting for {action}')
if action in self.SERVICES:
service = self.SERVICES[action](body)
service.process() service.process()
logging.info('Process complete for %s', service.file_name) logging.info('Process complete for %s', service.file_name)
else:
logging.error(f'action of type {action} does not exist.')
def main(): def main():
logging.info('Starting Solver Service: Tokyo Drift (v1.3.1)...') logging.info('Starting IRT Service: That Was Rash (v1.4.0)...')
# ToDo: Figure out a much better way of doing this. # ToDo: Figure out a much better way of doing this.
# LocalStack wants 'endpoint_url', while prod doesnt :( # LocalStack wants 'endpoint_url', while prod doesnt :(

View File

@ -3,10 +3,10 @@ from typing import List, Optional, Dict
class AdvancedOptions(BaseModel): class AdvancedOptions(BaseModel):
linearity_check: Optional[bool] linearity_check: Optional[bool] = None
show_progress: Optional[bool] show_progress: Optional[bool] = None
max_solution_time: Optional[int] max_solution_time: Optional[int] = None
brand_bound_tolerance: Optional[float] brand_bound_tolerance: Optional[float] = None
max_forms: Optional[int] max_forms: Optional[int] = None
precision: Optional[float] precision: Optional[float] = None
extra_param_range: Optional[List[Dict]] extra_param_range: Optional[List[Dict]] = None

View File

@ -8,9 +8,9 @@ from lib.irt.item_information_function import ItemInformationFunction
class Item(BaseModel): class Item(BaseModel):
id: int id: int
position: Optional[int] position: Optional[int] = None
passage_id: Optional[int] passage_id: Optional[int] = None
workflow_state: Optional[str] workflow_state: Optional[str] = None
attributes: List[Attribute] attributes: List[Attribute]
b_param: float = 0.00 b_param: float = 0.00

View File

@ -4,7 +4,7 @@ from typing import Optional
class Target(BaseModel): class Target(BaseModel):
theta: float theta: float
value: float value: float
result: Optional[float] result: Optional[float] = None
drift: float = 0.0 drift: float = 0.0
@classmethod @classmethod

View File

@ -0,0 +1,9 @@
import logging
from services.base import Base
class AbilityEstimationService(Base):
ACTION = 'abilityEstimation'
def process(self):
logging.info('Ability Estimation Service to be implemented...')

View File

@ -1,4 +1,4 @@
import os, json, random, io, logging import json, random, io, logging
from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, LpAffineExpression, LpConstraint, LpStatus, lpSum from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, LpAffineExpression, LpConstraint, LpStatus, lpSum
@ -9,13 +9,12 @@ from lib.errors.item_generation_error import ItemGenerationError
from models.solver_run import SolverRun from models.solver_run import SolverRun
from models.solution import Solution from models.solution import Solution
from models.form import Form from models.form import Form
from models.item import Item
from models.target import Target from models.target import Target
from services.base import Base from services.base import Base
class FormGenerationService(Base):
class LoftService(Base): ACTION = 'formGeneration'
def process(self): def process(self):
try: try:
@ -256,4 +255,4 @@ class LoftService(Base):
# upload generated file to s3 and return result # upload generated file to s3 and return result
return aws_helper.file_stream_upload( return aws_helper.file_stream_upload(
solution_file, self.file_name, solution_file, self.file_name,
ApplicationConfigs.s3_processed_bucket) ApplicationConfigs.s3_processed_bucket, self.ACTION)