diff --git a/.docker-compose/Dockerfile b/.docker-compose/Dockerfile index 703424a..62f8a78 100644 --- a/.docker-compose/Dockerfile +++ b/.docker-compose/Dockerfile @@ -4,12 +4,14 @@ RUN apt-get update RUN apt-get -y install coinor-cbc RUN python -m pip install pulp RUN python -m pip install pydantic -RUN python -m pip install pySqsListener RUN python -m pip install daemonize +RUN python -m pip install sqspy RUN mkdir /app WORKDIR /app +ENV LOCAL_DEV True + # Bundle app source COPY . /app diff --git a/Dockerfile b/Dockerfile index 703424a..58839d7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,8 +4,8 @@ RUN apt-get update RUN apt-get -y install coinor-cbc RUN python -m pip install pulp RUN python -m pip install pydantic -RUN python -m pip install pySqsListener RUN python -m pip install daemonize +RUN python -m pip install sqspy RUN mkdir /app WORKDIR /app diff --git a/Jenkinsfile b/Jenkinsfile index 5eef75c..307591b 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,25 +1,36 @@ -def label = "docker-${UUID.randomUUID().toString()}" - -podTemplate(label: label, inheritFrom: 'base') { - node(label) { - stage('Checkout Repository') { - container('base') { - checkout scm - } +pipeline { + agent { + kubernetes { + label "kaniko-${UUID.randomUUID().toString()}" + inheritFrom 'kaniko' } - - stage('Login to Dockerhub') { - withCredentials([usernamePassword(credentialsId: 'DockerHubAccessYardstick', usernameVariable: 'USER', passwordVariable: 'PASS')]) { - container('base') { - sh "docker login --username ${USER} --password ${PASS}" + } + environment { + REPOSITORY = 'yardstick/measure-solver' + } + stages { + stage('Build Kaniko image') { + steps { + withCredentials([usernamePassword(credentialsId: 'DockerHubAccessYardstick', usernameVariable: 'USER', passwordVariable: 'PASS')]) { + container('kaniko') { + checkout scm + // Setup docker credentials + sh 'echo "{\\"auths\\":{\\"https://index.docker.io/v1/\\":{\\"auth\\":\\"$(printf "%s:%s" "$USER" "$PASS" | base64 | tr -d \'\n\')\\"}}}" > /kaniko/.docker/config.json' + // Execute kaniko build + sh """ + /kaniko/executor -f `pwd`/Dockerfile \ + -c `pwd` \ + --insecure=true \ + --insecure-registry=docker-registry.default:5000 \ + --cache=true \ + --cache-repo=docker-registry.default:5000/${REPOSITORY} \ + --destination ${env.REPOSITORY}:\$(echo ${BRANCH_NAME} | grep -Eo 'feature/([A-Za-z]+-[0-9]*)' | grep -Eo '[A-Za-z]+-[0-9]*' || \ + echo ${BRANCH_NAME} | grep -Eo '(release|hotfix)/[[:digit:]]+\\.[[:digit:]]+\\.[[:digit:]]+' | grep -Eo '[[:digit:]]+\\.[[:digit:]]+\\.[[:digit:]]+' || \ + echo ${BRANCH_NAME} | grep -Eo 'YASDEV-([[:digit:]]*)') + """ + } } } } - - stage('Build Docker image') { - container('base') { - sh "make display full branch=${BRANCH_NAME}" - } - } } -} \ No newline at end of file +} diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py index a7869af..893298a 100644 --- a/app/helpers/aws_helper.py +++ b/app/helpers/aws_helper.py @@ -1,39 +1,42 @@ import boto3 -import os import json -session = boto3.Session( - aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], - aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'] -) +from lib.application_configs import ApplicationConfigs -s3 = session.resource('s3', region_name=os.environ['AWS_REGION']) -sqs = session.client('sqs', region_name=os.environ['AWS_REGION']) +session = boto3.Session( + aws_access_key_id=ApplicationConfigs.aws_access_key_id, + aws_secret_access_key=ApplicationConfigs.aws_secret_key) + +# ToDo: Figure out a much better way of doing this. +# LocalStack wants endpoint_url, while prod doesnt :( +if ApplicationConfigs.local_dev_env: + s3 = session.resource('s3', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url) + sqs = session.client('sqs', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url) +else: + s3 = session.resource('s3', region_name=ApplicationConfigs.region_name) + sqs = session.client('sqs', region_name=ApplicationConfigs.region_name) def get_key_from_message(body): - return body['Records'][0]['s3']['object']['key'] + return body['Records'][0]['s3']['object']['key'] + def get_bucket_from_message(body): - return body['Records'][0]['s3']['bucket']['name'] + return body['Records'][0]['s3']['bucket']['name'] + def get_object(key, bucket): - return s3.Object( - bucket_name=bucket, - key=key - ).get()['Body'].read() + return s3.Object(bucket_name=bucket, key=key).get()['Body'].read() + def file_stream_upload(buffer, name, bucket): - return s3.Bucket(bucket).upload_fileobj(buffer, name) + return s3.Bucket(bucket).upload_fileobj(buffer, name) + def receive_message(queue, message_num=1, wait_time=1): - return sqs.receive_message( - QueueUrl=queue, - MaxNumberOfMessages=message_num, - WaitTimeSeconds=wait_time - ) + return sqs.receive_message(QueueUrl=queue, + MaxNumberOfMessages=message_num, + WaitTimeSeconds=wait_time) + def delete_message(queue, receipt): - return sqs.delete_message( - QueueUrl=queue, - ReceiptHandle=receipt - ) + return sqs.delete_message(QueueUrl=queue, ReceiptHandle=receipt) diff --git a/app/helpers/common_helper.py b/app/helpers/common_helper.py new file mode 100644 index 0000000..d74ec0e --- /dev/null +++ b/app/helpers/common_helper.py @@ -0,0 +1,14 @@ + +def boolean_to_int(value: bool) -> int: + if value: + return 1 + else: + return 0 + + +def is_float(element: str) -> bool: + try: + float(element) + return True + except ValueError: + return False diff --git a/app/helpers/csv_helper.py b/app/helpers/csv_helper.py index 1f6699c..c25d0a4 100644 --- a/app/helpers/csv_helper.py +++ b/app/helpers/csv_helper.py @@ -1,5 +1,6 @@ import csv import io + def file_stream_reader(f): - return csv.reader(io.StringIO(f.read().decode('ascii'))) + return csv.reader(io.StringIO(f.read().decode('ascii'))) diff --git a/app/helpers/service_helper.py b/app/helpers/service_helper.py index 2c9aa2e..cfae641 100644 --- a/app/helpers/service_helper.py +++ b/app/helpers/service_helper.py @@ -1,90 +1,132 @@ import csv import io import re +from tokenize import String +from typing import Tuple -def items_csv_to_dict(items_csv_reader): - items = [] - headers = [] +from helpers import common_helper +from models.item import Item +from models.solver_run import SolverRun - # get headers and items - for key, row in enumerate(items_csv_reader): - if key == 0: - headers = row - else: - item = { 'attributes': [] } +def csv_to_item(items_csv_reader, solver_run): + items = [] + headers = [] - # ensure that the b param is formatted correctly - if len(re.findall(".", row[len(headers) - 1])) >= 3: - for key, col in enumerate(headers): - if key == 0: - item[col] = row[key] - if key == 2: - # make sure passage id exists - if row[key]: - item['passage_id'] = row[key] - # b param - tmep fix! use irt model b param for proper reference - elif key == len(headers) - 1: - item['b_param'] = row[key] - elif key > 2 and key < len(headers) - 1: - item['attributes'].append({ - 'id': col, - 'value': row[key], - 'type': 'metadata' - }) + # get headers and items + for key, row in enumerate(items_csv_reader): + if key == 0: + headers = row + else: + item = {'attributes': []} - items.append(item) + # ensure that the b param is formatted correctly + if row[len(headers) - 1] != '' and common_helper.is_float(row[len(headers) - 1]): + for key, col in enumerate(headers): + if solver_run.irt_model.formatted_b_param() == col: + value = float(row[key]) + item['b_param'] = value + elif solver_run.get_constraint( + col) and solver_run.get_constraint( + col).reference_attribute.type == 'bundle': + if row[key]: + item[solver_run.get_constraint( + col).reference_attribute.id] = row[key] + elif solver_run.get_constraint(col): + constraint = solver_run.get_constraint(col) + item['attributes'].append({ + 'id': + col, + 'value': + row[key], + 'type': + constraint.reference_attribute.type + }) + else: + if row[key]: + item[col] = row[key] + + # confirm item is only added if it meets the criteria of 100% constraints as a pre-filter + valid_item = True + item = Item.parse_obj(item) + for constraint in solver_run.constraints: + if item.attribute_exists(constraint.reference_attribute) == False and constraint.minimum == 100: + valid_item = False + + if valid_item: items.append(item) + + return items - return items def solution_to_file(buffer, total_form_items, forms): - wr = csv.writer(buffer, dialect='excel', delimiter=',') + wr = csv.writer(buffer, dialect='excel', delimiter=',') - # write header row for first row utilizing the total items all forms will have - # fill the rows with the targets and cut score then the items - header = ['status'] + # write header row for first row utilizing the total items all forms will have + # fill the rows with the targets and cut score then the items + header = ['status'] - for result in forms[0].tif_results: - header += [f'tif @ {round(result.theta, 2)}'] + for result in forms[0].tif_results: + header += [f'tif @ {round(result.theta, 2)}'] - for result in forms[0].tcc_results: - header += [f'tcc @ {round(result.theta, 2)}'] + for result in forms[0].tcc_results: + header += [f'tcc @ {round(result.theta, 2)}'] - header += ['cut score'] + [x + 1 for x in range(total_form_items)] - wr.writerow(header) + header += ['cut score'] + [x + 1 for x in range(total_form_items)] + wr.writerow(header) - # add each form as row to processed csv - for form in forms: - row = [form.status] + # add each form as row to processed csv + for form in forms: + row = [form.status] - for result in form.tif_results + form.tcc_results: - row += [f'target - {result.value}\nresult - {round(result.result, 2)}'] + for result in form.tif_results + form.tcc_results: + row += [ + f'target - {result.value}\nresult - {round(result.result, 2)}' + ] - # provide generated items and cut score - row += [round(form.cut_score, 2)] + [item.id for item in form.items] - wr.writerow(row) + # provide generated items and cut score + row += [round(form.cut_score, 2)] + [item.id for item in form.items] + wr.writerow(row) - buff2 = io.BytesIO(buffer.getvalue().encode()) + buff2 = io.BytesIO(buffer.getvalue().encode()) + + return buff2 - return buff2 def error_to_file(buffer, error): - wr = csv.writer(buffer, dialect='excel', delimiter=',') - wr.writerow(['status']) - wr.writerow([error.args[0]]) + wr = csv.writer(buffer, dialect='excel', delimiter=',') + wr.writerow(['status']) + wr.writerow([error.args[0]]) + + return io.BytesIO(buffer.getvalue().encode()) - return io.BytesIO(buffer.getvalue().encode()) def key_to_uuid(key): - return re.split("_", key)[0] + return re.split("_", key)[0] -def solution_items(variables, solver_run): - form_items = [] - for v in variables: - if v.varValue > 0: - item_id = v.name.replace('Item_', '') - item = solver_run.get_item(item_id) - # add item to list and then remove from master item list - form_items.append(item) +def solution_items(variables: list, solver_run: SolverRun) -> Tuple[list]: + form_items = [] + solver_variables = [] - return form_items + for v in variables: + if v.varValue > 0: + solver_variables.append(v.name) + + if 'Item_' in v.name: + item_id = v.name.replace('Item_', '') + item = solver_run.get_item(int(item_id)) + # add item to list and then remove from master item list + if item: form_items.append(item) + elif 'Bundle_' in v.name: + bundle_id = v.name.replace('Bundle_', '') + bundle = solver_run.get_bundle(int(bundle_id)) + + if bundle: + for item in bundle.items: + if item: form_items.append(item) + + return form_items, solver_variables + +def print_problem_variables(problem): + # Uncomment this as needed in local dev + # print(problem); + for v in problem.variables(): print(v.name, "=", v.varValue) diff --git a/app/helpers/solver_helper.py b/app/helpers/solver_helper.py index 1ff0585..1b1c613 100644 --- a/app/helpers/solver_helper.py +++ b/app/helpers/solver_helper.py @@ -1,56 +1,86 @@ -from pulp import lpSum +from pulp import lpSum, LpProblem from random import randint, sample + import logging +from helpers.common_helper import * + +from models.bundle import Bundle +from models.solver_run import SolverRun +from models.item import Item + from lib.errors.item_generation_error import ItemGenerationError -def build_constraints(solver_run, problem, items): - try: - total_form_items = solver_run.total_form_items - constraints = solver_run.constraints +def build_constraints(solver_run: SolverRun, problem: LpProblem, + items: list[Item], bundles: list[Bundle], selected_items: list[Item], selected_bundles: list[Bundle]) -> LpProblem: + logging.info('Creating Constraints...') - for constraint in constraints: - attribute = constraint.reference_attribute - min = constraint.minimum - max = constraint.maximum + try: + total_form_items = solver_run.total_form_items + constraints = solver_run.constraints - if attribute.type == 'metadata': - con = dict(zip([item.id for item in solver_run.items], - [item.attribute_exists(attribute) - for item in solver_run.items])) - problem += lpSum([con[item.id] - * items[item.id] - for item in solver_run.items]) >= round(total_form_items * (min / 100)), f'{attribute.id} - {attribute.value} - min' - problem += lpSum([con[item.id] - * items[item.id] - for item in solver_run.items]) <= round(total_form_items * (max / 100)), f'{attribute.id} - {attribute.value} - max' - elif attribute.type == 'bundle': - # TODO: account for many different bundle types, since the id condition in L33 could yield duplicates - total_bundles = randint(constraint.minimum, constraint.maximum) - selected_bundles = sample(solver_run.bundles, total_bundles) - total_bundle_items = 0 + for constraint in constraints: + attribute = constraint.reference_attribute + min = constraint.minimum + max = constraint.maximum - for bundle in selected_bundles: - con = dict(zip([item.id for item in solver_run.items], - [(getattr(item, bundle.type, False) == bundle.id) - for item in solver_run.items])) - problem += lpSum([con[item.id] - * items[item.id] - for item in solver_run.items]) == bundle.count, f'Bundle constraint for {bundle.type} ({bundle.id})' - total_bundle_items += bundle.count + if attribute.type == 'metadata': + logging.info('Metadata Constraint Generating...') - # make sure all other items added to the form - # are not a part of any bundle - # currently only supports single bundle constraints, will need refactoring for multiple bundle constraints - con = dict(zip([item.id for item in solver_run.items], - [(getattr(item, attribute.id, None) == None) - for item in solver_run.items])) - problem += lpSum([con[item.id] - * items[item.id] - for item in solver_run.items]) == solver_run.total_form_items - total_bundle_items, f'Remaining items are not of a bundle type' + problem += lpSum( + [ + len(bundle.items_with_attribute(attribute)) * bundles[bundle.id] for bundle in selected_bundles + ] + + [ + item.attribute_exists(attribute).real * items[item.id] for item in selected_items + ] + ) >= round(total_form_items * (min / 100)), f'{attribute.id} - {attribute.value} - min' + + problem += lpSum( + [ + len(bundle.items_with_attribute(attribute)) * bundles[bundle.id] for bundle in selected_bundles + ] + + [ + item.attribute_exists(attribute).real * items[item.id] for item in selected_items + ] + ) <= round(total_form_items * (max / 100)), f'{attribute.id} - {attribute.value} - max' + elif attribute.type == 'bundle': + logging.info('Bundles Constraint Generating...') + # TODO: account for many different bundle types, since the id condition in L33 could yield duplicates + if selected_bundles != None: + # make sure the total bundles used in generated form is limited between min-max set + problem += lpSum([ + bundles[bundle.id] for bundle in selected_bundles + ]) == randint(int(constraint.minimum), + int(constraint.maximum)) + + logging.info('Constraints Created...') + return problem + except ValueError as error: + logging.error(error) + raise ItemGenerationError( + "Bundle min and/or max larger than bundle amount provided", + error.args[0]) - return problem - except ValueError as error: - logging.error(error) - raise ItemGenerationError("Bundle min and/or max larger than bundle amount provided", error.args[0]) +def get_random_bundles(total_form_items: int, + bundles: list[Bundle], + min: int, + max: int, + found_bundles=False) -> list[Bundle]: + selected_bundles = None + total_bundle_items = 0 + total_bundles = randint(min, max) + logging.info(f'Selecting Bundles (total of {total_bundles})...') + + while found_bundles == False: + selected_bundles = sample(bundles, total_bundles) + total_bundle_items = sum(bundle.count for bundle in selected_bundles) + + if total_bundle_items <= total_form_items: + found_bundles = True + + if found_bundles == True: + return selected_bundles + else: + return get_random_bundles(total_form_items, total_bundles - 1, bundles) diff --git a/app/helpers/tar_helper.py b/app/helpers/tar_helper.py index 1bfc0f5..a161f6e 100644 --- a/app/helpers/tar_helper.py +++ b/app/helpers/tar_helper.py @@ -1,9 +1,11 @@ import io import tarfile + def raw_to_tar(raw_object): - tarball = io.BytesIO(raw_object) - return tarfile.open(fileobj=tarball, mode='r:gz') + tarball = io.BytesIO(raw_object) + return tarfile.open(fileobj=tarball, mode='r:gz') + def extract_file_from_tar(tar, file_name): - return tar.extractfile(tar.getmember(file_name)) + return tar.extractfile(tar.getmember(file_name)) diff --git a/app/lib/application_configs.py b/app/lib/application_configs.py new file mode 100644 index 0000000..676e58b --- /dev/null +++ b/app/lib/application_configs.py @@ -0,0 +1,12 @@ +import os +from pydantic.dataclasses import dataclass + +@dataclass +class ApplicationConfigs(): + region_name = os.environ.get('AWS_REGION', 'ca-central-1') + aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID', '') + aws_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY', '') + sqs_queue = os.environ.get('SQS_QUEUE', '') + endpoint_url = os.environ.get('ENDPOINT_URL', '') + s3_processed_bucket = os.environ.get('S3_PROCESSED_BUCKET', 'measure-local-solver-processed') + local_dev_env = os.environ.get('LOCAL_DEV', False) == 'True' diff --git a/app/lib/errors/item_generation_error.py b/app/lib/errors/item_generation_error.py index f15fd6d..db159f2 100644 --- a/app/lib/errors/item_generation_error.py +++ b/app/lib/errors/item_generation_error.py @@ -1,2 +1,2 @@ class ItemGenerationError(Exception): - pass + pass diff --git a/app/lib/irt/item_information_function.py b/app/lib/irt/item_information_function.py index 9e75197..cd9953f 100644 --- a/app/lib/irt/item_information_function.py +++ b/app/lib/irt/item_information_function.py @@ -3,22 +3,28 @@ import logging from lib.irt.models.three_parameter_logistic import ThreeParameterLogistic from lib.errors.item_generation_error import ItemGenerationError -class ItemInformationFunction(): - def __init__(self, irt_model): - self.model_data = irt_model - # determines the amount of information for a given question at a given theta (ability level) - # further detailed on page 161, equation 4 here: - # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf - def calculate(self, **kwargs): - try: - if self.model_data.model == '3PL': - p = ThreeParameterLogistic(self.model_data, kwargs).result() - q = 1 - p - return (self.model_data.a_param * q * (p - self.model_data.c_param)**2) / (p * ((1 - self.model_data.c_param)**2)) - else: - # potentially error out - raise ItemGenerationError("irt model not supported or provided") - except ZeroDivisionError as error: - logging.error(error) - raise ItemGenerationError("params not well formatted", error.args[0]) +class ItemInformationFunction(): + + def __init__(self, irt_model): + self.model_data = irt_model + + # determines the amount of information for a given question at a given theta (ability level) + # further detailed on page 161, equation 4 here: + # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf + def calculate(self, **kwargs): + try: + if self.model_data.model == '3PL': + p = ThreeParameterLogistic(self.model_data, kwargs).result() + q = 1 - p + return (self.model_data.a_param * q * + (p - self.model_data.c_param)**2) / (p * ( + (1 - self.model_data.c_param)**2)) + else: + # potentially error out + raise ItemGenerationError( + "irt model not supported or provided") + except ZeroDivisionError as error: + logging.error(error) + raise ItemGenerationError("params not well formatted", + error.args[0]) diff --git a/app/lib/irt/item_response_function.py b/app/lib/irt/item_response_function.py index 3d64df9..6cd8ced 100644 --- a/app/lib/irt/item_response_function.py +++ b/app/lib/irt/item_response_function.py @@ -1,12 +1,14 @@ from lib.irt.models.three_parameter_logistic import ThreeParameterLogistic from lib.errors.item_generation_error import ItemGenerationError -class ItemResponseFunction(): - def __init__(self, irt_model): - self.model_data = irt_model - def calculate(self, **kwargs): - if self.model_data.model == '3PL': - return ThreeParameterLogistic(self.model_data, kwargs).result() - else: - raise ItemGenerationError("irt model not supported or provided") +class ItemResponseFunction(): + + def __init__(self, irt_model): + self.model_data = irt_model + + def calculate(self, **kwargs): + if self.model_data.model == '3PL': + return ThreeParameterLogistic(self.model_data, kwargs).result() + else: + raise ItemGenerationError("irt model not supported or provided") diff --git a/app/lib/irt/models/three_parameter_logistic.py b/app/lib/irt/models/three_parameter_logistic.py index 755331e..5fd4956 100644 --- a/app/lib/irt/models/three_parameter_logistic.py +++ b/app/lib/irt/models/three_parameter_logistic.py @@ -1,16 +1,18 @@ class ThreeParameterLogistic: - def __init__(self, model_params, kwargs): - self.model_params = model_params - # check if exists, if not error out - self.b_param = kwargs['b_param'] - self.e = 2.71828 - self.theta = kwargs['theta'] - # contains the primary 3pl function, determining the probably of an inidividual - # that an individual at a certain theta would get a particular question correct - # detailed further on page 161, equation 1 here: - # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf - def result(self): - a = self.model_params.a_param - c = self.model_params.c_param - return c + (1 - c) * (1 / (1 + self.e**(-a * (self.theta - self.b_param)))) + def __init__(self, model_params, kwargs): + self.model_params = model_params + # check if exists, if not error out + self.b_param = kwargs['b_param'] + self.e = 2.71828 + self.theta = kwargs['theta'] + + # contains the primary 3pl function, determining the probably of an inidividual + # that an individual at a certain theta would get a particular question correct + # detailed further on page 161, equation 1 here: + # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf + def result(self): + a = self.model_params.a_param + c = self.model_params.c_param + return c + (1 - c) * (1 / (1 + self.e**(-a * + (self.theta - self.b_param)))) diff --git a/app/lib/irt/test_information_function.py b/app/lib/irt/test_information_function.py index b91f9b5..043d536 100644 --- a/app/lib/irt/test_information_function.py +++ b/app/lib/irt/test_information_function.py @@ -1,19 +1,22 @@ from lib.irt.item_information_function import ItemInformationFunction + class TestInformationFunction(): - def __init__(self, irt_model): - self.irt_model = irt_model - self.iif = ItemInformationFunction(irt_model) - # determins the amount of information - # at a certain theta (ability level) of the sum of a question set correct - # detailed further on page 166, equation 4 here: - # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf - def calculate(self, items, **kwargs): - sum = 0 + def __init__(self, irt_model): + self.irt_model = irt_model + self.iif = ItemInformationFunction(irt_model) - for item in items: - result = self.iif.calculate(b_param=item.b_param, theta=kwargs['theta']) - sum += result + # determins the amount of information + # at a certain theta (ability level) of the sum of a question set correct + # detailed further on page 166, equation 4 here: + # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf + def calculate(self, items, **kwargs): + sum = 0 - return sum + for item in items: + result = self.iif.calculate(b_param=item.b_param, + theta=kwargs['theta']) + sum += result + + return sum diff --git a/app/lib/irt/test_response_function.py b/app/lib/irt/test_response_function.py index d06aa83..3aaa958 100644 --- a/app/lib/irt/test_response_function.py +++ b/app/lib/irt/test_response_function.py @@ -1,20 +1,23 @@ from lib.irt.item_response_function import ItemResponseFunction + # otherwise known as the Test Characteristic Curve (TCC) class TestResponseFunction(): - def __init__(self, irt_model): - self.irt_model = irt_model - self.irf = ItemResponseFunction(irt_model) - # determins the probably of an inidividual - # at a certain theta (ability level) would get a sum of questions correct - # detailed further on page 166, equation 3 here: - # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf - def calculate(self, items, **kwargs): - sum = 0 + def __init__(self, irt_model): + self.irt_model = irt_model + self.irf = ItemResponseFunction(irt_model) - for item in items: - result = self.irf.calculate(b_param=item.b_param, theta=kwargs['theta']) - sum += result + # determine the probability of an individual + # at a certain theta (ability level) would get a sum of questions correct + # detailed further on page 166, equation 3 here: + # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf + def calculate(self, items, **kwargs): + sum = 0 - return sum + for item in items: + result = self.irf.calculate(b_param=item.b_param, + theta=kwargs['theta']) + sum += result + + return sum diff --git a/app/main.py b/app/main.py index ae05cb2..1459f59 100644 --- a/app/main.py +++ b/app/main.py @@ -1,36 +1,57 @@ import os, sys, logging +from lib.application_configs import ApplicationConfigs from services.loft_service import LoftService from helpers import aws_helper from daemonize import Daemonize -from sqs_listener import SqsListener +from sqspy import Consumer -logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(levelname)s %(asctime)s - %(message)s") +logging.basicConfig(stream=sys.stdout, + level=logging.INFO, + format="%(levelname)s %(asctime)s - %(message)s") -class ServiceListener(SqsListener): - def handle_message(self, body, attributes, messages_attributes): - # gather/manage/process data based on the particular needs - logging.info('Incoming message: %s', body) - service = LoftService(body) - service.process() +class ServiceListener(Consumer): + + def handle_message(self, body, attributes, messages_attributes): + # gather/manage/process data based on the particular needs + logging.info('Incoming message: %s', body) + + service = LoftService(body) + service.process() + + logging.info('Process complete for %s', service.file_name) - logging.info('Process complete for %s', service.file_name) def main(): - logging.info('Starting Solver Service (v1.1.0)...') - listener = ServiceListener( - os.environ['SQS_QUEUE'], - region_name=os.environ['AWS_REGION'], - aws_access_key=os.environ['AWS_ACCESS_KEY_ID'], - aws_secret_key=os.environ['AWS_SECRET_ACCESS_KEY'], - queue_url=os.environ['SQS_QUEUE'] - ) - listener.listen() + logging.info('Starting Solver Service: Tokyo Drift (v1.2)...') + + # ToDo: Figure out a much better way of doing this. + # LocalStack wants 'endpoint_url', while prod doesnt :( + if ApplicationConfigs.local_dev_env: + listener = ServiceListener( + None, + ApplicationConfigs.sqs_queue, + create_queue=False, + region_name=ApplicationConfigs.region_name, + aws_access_key=ApplicationConfigs.aws_access_key_id, + aws_secret_key=ApplicationConfigs.aws_secret_key, + endpoint_url=ApplicationConfigs.endpoint_url) + else: + listener = ServiceListener( + None, + ApplicationConfigs.sqs_queue, + create_queue=False, + region_name=ApplicationConfigs.region_name, + aws_access_key=ApplicationConfigs.aws_access_key_id, + aws_secret_key=ApplicationConfigs.aws_secret_key) + + listener.listen() + if __name__ == '__main__': - myname=os.path.basename(sys.argv[0]) - pidfile='/tmp/%s' % myname - daemon = Daemonize(app=myname,pid=pidfile, action=main, foreground=True) - daemon.start() + myname = os.path.basename(sys.argv[0]) + pidfile = '/tmp/%s' % myname + daemon = Daemonize(app=myname, pid=pidfile, action=main, foreground=True) + daemon.start() diff --git a/app/models/advanced_options.py b/app/models/advanced_options.py index 1d7d420..849818d 100644 --- a/app/models/advanced_options.py +++ b/app/models/advanced_options.py @@ -1,11 +1,12 @@ from pydantic import BaseModel from typing import List, Optional, Dict + class AdvancedOptions(BaseModel): - linearity_check: Optional[bool] - show_progress: Optional[bool] - max_solution_time: Optional[int] - brand_bound_tolerance: Optional[float] - max_forms: Optional[int] - precision: Optional[float] - extra_param_range: Optional[List[Dict]] + linearity_check: Optional[bool] + show_progress: Optional[bool] + max_solution_time: Optional[int] + brand_bound_tolerance: Optional[float] + max_forms: Optional[int] + precision: Optional[float] + extra_param_range: Optional[List[Dict]] diff --git a/app/models/attribute.py b/app/models/attribute.py index 1bdc837..73be39a 100644 --- a/app/models/attribute.py +++ b/app/models/attribute.py @@ -1,7 +1,8 @@ from pydantic import BaseModel from typing import Optional + class Attribute(BaseModel): - value: Optional[str] - type: Optional[str] - id: str + value: Optional[str] + type: Optional[str] + id: str diff --git a/app/models/bundle.py b/app/models/bundle.py index 48eea7e..82fb127 100644 --- a/app/models/bundle.py +++ b/app/models/bundle.py @@ -1,6 +1,48 @@ from pydantic import BaseModel +from typing import List + +from lib.irt.test_information_function import TestInformationFunction +from lib.irt.test_response_function import TestResponseFunction + +from models.attribute import Attribute +from models.item import Item +from models.irt_model import IRTModel class Bundle(BaseModel): - id: int - count: int - type: str + id: int + count: int + items: List[Item] + type: str + + def tif(self, irt_model: IRTModel, theta: float) -> float: + return TestInformationFunction(irt_model).calculate(self.items, theta=theta) + + def trf(self, irt_model: IRTModel, theta: float) -> float: + return TestResponseFunction(irt_model).calculate(self.items, theta=theta) + + def tif_trf_sum(self, solver_run): + return self.__trf_sum(solver_run) + self.__tif_sum(solver_run) + + def items_with_attribute(self, attribute: Attribute) -> List[Item]: + items = [] + + for item in self.items: + if item.attribute_exists(attribute): items.append(item) + + return items + + def __tif_sum(self, solver_run): + total = 0 + + for target in solver_run.objective_function.tcc_targets: + total += self.tif(solver_run.irt_model, target.theta) + + return total + + def __trf_sum(self, solver_run): + total = 0 + + for target in solver_run.objective_function.tcc_targets: + total += self.trf(solver_run.irt_model, target.theta) + + return total diff --git a/app/models/constraint.py b/app/models/constraint.py index 28a3939..ba9286c 100644 --- a/app/models/constraint.py +++ b/app/models/constraint.py @@ -2,7 +2,8 @@ from pydantic import BaseModel from models.attribute import Attribute + class Constraint(BaseModel): - reference_attribute: Attribute - minimum: float - maximum: float + reference_attribute: Attribute + minimum: float + maximum: float diff --git a/app/models/form.py b/app/models/form.py index 72863ae..e2842f2 100644 --- a/app/models/form.py +++ b/app/models/form.py @@ -1,26 +1,31 @@ from pydantic import BaseModel -from typing import List +from typing import List, TypeVar, Type from helpers import irt_helper +from models.solver_run import SolverRun from models.item import Item from models.target import Target from lib.irt.test_response_function import TestResponseFunction -class Form(BaseModel): - items: List[Item] - cut_score: float - tif_results: List[Target] - tcc_results: List[Target] - status: str = 'Not Optimized' +_T = TypeVar("_T") - @classmethod - def create(cls, items, solver_run, status): - return cls( - items=items, - cut_score=TestResponseFunction(solver_run.irt_model).calculate(items, theta=solver_run.theta_cut_score), - tif_results=irt_helper.generate_tif_results(items, solver_run), - tcc_results=irt_helper.generate_tcc_results(items, solver_run), - status=status - ) +class Form(BaseModel): + items: List[Item] + cut_score: float + tif_results: List[Target] + tcc_results: List[Target] + status: str = 'Not Optimized' + solver_variables: List[str] + + @classmethod + def create(cls: Type[_T], items: list, solver_run: SolverRun, status: str, solver_variables: list) -> _T: + return cls( + items=items, + cut_score=TestResponseFunction(solver_run.irt_model).calculate( + items, theta=solver_run.theta_cut_score), + tif_results=irt_helper.generate_tif_results(items, solver_run), + tcc_results=irt_helper.generate_tcc_results(items, solver_run), + status=status, + solver_variables=solver_variables) diff --git a/app/models/irt_model.py b/app/models/irt_model.py index 2596686..2f79776 100644 --- a/app/models/irt_model.py +++ b/app/models/irt_model.py @@ -1,8 +1,13 @@ from pydantic import BaseModel from typing import Dict + class IRTModel(BaseModel): - a_param: float - b_param: Dict = {"schema_bson_id": str, "field_bson_id": str} - c_param: float - model: str + a_param: float + b_param: Dict = {"schema_bson_id": str, "field_bson_id": str} + c_param: float + model: str + + def formatted_b_param(self): + return self.b_param['schema_bson_id'] + '-' + self.b_param[ + 'field_bson_id'] diff --git a/app/models/item.py b/app/models/item.py index 48b5000..260d612 100644 --- a/app/models/item.py +++ b/app/models/item.py @@ -7,25 +7,47 @@ from lib.irt.item_response_function import ItemResponseFunction from lib.irt.item_information_function import ItemInformationFunction class Item(BaseModel): - id: int - passage_id: Optional[int] - attributes: List[Attribute] - b_param: float = 0.00 + id: int + passage_id: Optional[int] + workflow_state: Optional[str] + attributes: List[Attribute] + b_param: float = 0.00 - def iif(self, solver_run, theta): - return ItemInformationFunction(solver_run.irt_model).calculate(b_param=self.b_param,theta=theta) + def iif(self, solver_run, theta): + return ItemInformationFunction(solver_run.irt_model).calculate(b_param=self.b_param, theta=theta) - def irf(self, solver_run, theta): - return ItemResponseFunction(solver_run.irt_model).calculate(b_param=self.b_param,theta=theta) + def irf(self, solver_run, theta): + return ItemResponseFunction(solver_run.irt_model).calculate(b_param=self.b_param, theta=theta) - def get_attribute(self, ref_attribute): - for attribute in self.attributes: - if attribute.id == ref_attribute.id and attribute.value == ref_attribute.value: - return attribute.value - return False + def get_attribute(self, ref_attribute: Attribute) -> Attribute or None: + for attribute in self.attributes: + if self.attribute_exists(ref_attribute): + return attribute - def attribute_exists(self, ref_attribute): - for attribute in self.attributes: - if attribute.id == ref_attribute.id and attribute.value == ref_attribute.value: - return True - return False + return None + + def attribute_exists(self, ref_attribute: Attribute) -> bool: + for attribute in self.attributes: + if attribute.id == ref_attribute.id and attribute.value.lower( + ) == ref_attribute.value.lower(): + return True + return False + + def iif_irf_sum(self, solver_run): + return self.__iif_sum(solver_run) + self.__irf_sum(solver_run) + + def __iif_sum(self, solver_run): + total = 0 + + for target in solver_run.objective_function.tif_targets: + total += self.iif(solver_run, target.theta) + + return total + + def __irf_sum(self, solver_run): + total = 0 + + for target in solver_run.objective_function.tif_targets: + total += self.irf(solver_run, target.theta) + + return total diff --git a/app/models/objective_function.py b/app/models/objective_function.py index 62f96cb..5081f1d 100644 --- a/app/models/objective_function.py +++ b/app/models/objective_function.py @@ -3,11 +3,48 @@ from typing import Dict, List, AnyStr from models.target import Target + class ObjectiveFunction(BaseModel): - # minimizing tif/tcc target value is only option currently - # as we add more we can build this out to be more dynamic - # likely with models representing each objective function type - tif_targets: List[Target] - tcc_targets: List[Target] - objective: AnyStr = "minimize" - weight: Dict = {'tif': 1, 'tcc': 1} + # minimizing tif/tcc target value is only option currently + # as we add more we can build this out to be more dynamic + # likely with models representing each objective function type + tif_targets: List[Target] + tcc_targets: List[Target] + target_variance_percentage: int = 10 + objective: AnyStr = "minimize" + weight: Dict = {'tif': 1, 'tcc': 1} + + def increment_targets_drift(self, + limit: float or bool, + all: bool = False, + amount: float = 0.1, + targets: list[Target] = []) -> bool: + if all: + for target in self.tif_targets: + target.drift = round(target.drift + amount, 2) + for target in self.tcc_targets: + target.drift = round(target.drift + amount, 2) + else: + for target in targets: + target.drift = round(target.drift + amount, 2) + print(self.tif_targets) + print(self.tcc_targets) + return amount + + def update_targets_drift(self, amount: float = 0.0): + for target in self.tif_targets: + target.drift = round(amount, 2) + for target in self.tcc_targets: + target.drift = round(amount, 2) + + def minimum_drift(self) -> float: + minimum_drift = 0.0 + + for target in self.all_targets(): + if target.drift < minimum_drift: + minimum_drift = target.drift + + return minimum_drift + + def all_targets(self) -> list[Target]: + return self.tif_targets + self.tcc_targets diff --git a/app/models/solution.py b/app/models/solution.py index d895574..0f7c2ea 100644 --- a/app/models/solution.py +++ b/app/models/solution.py @@ -3,6 +3,7 @@ from typing import List from models.form import Form + class Solution(BaseModel): - response_id: int - forms: List[Form] + response_id: int + forms: List[Form] diff --git a/app/models/solver_run.py b/app/models/solver_run.py index 3926b58..ad54659 100644 --- a/app/models/solver_run.py +++ b/app/models/solver_run.py @@ -1,5 +1,8 @@ from pydantic import BaseModel -from typing import List, Optional +from typing import List, Literal, Optional + +import logging +import random from models.item import Item from models.constraint import Constraint @@ -8,58 +11,119 @@ from models.bundle import Bundle from models.objective_function import ObjectiveFunction from models.advanced_options import AdvancedOptions + class SolverRun(BaseModel): - items: List[Item] - bundles: Optional[Bundle] - constraints: List[Constraint] - irt_model: IRTModel - objective_function: ObjectiveFunction - total_form_items: int - total_forms: int = 1 - theta_cut_score: float = 0.00 - advanced_options: Optional[AdvancedOptions] - engine: str + items: List[Item] = [] + bundles: list[Bundle] = [] + constraints: List[Constraint] + irt_model: IRTModel + objective_function: ObjectiveFunction + total_form_items: int + total_forms: int = 1 + theta_cut_score: float = 0.00 + drift_style: Literal['constant', 'variable'] = 'constant' + advanced_options: Optional[AdvancedOptions] + engine: str - def get_item(self, item_id): - for item in self.items: - if str(item.id) == item_id: - return item - return False + def get_item(self, item_id: int) -> Item or None: + for item in self.items: + if item.id == item_id: + return item - def remove_items(self, items): - self.items = [item for item in self.items if item not in items] - return True + def get_bundle(self, bundle_id: int) -> Bundle or None: + for bundle in self.bundles: + if bundle.id == bundle_id: + return bundle - def generate_bundles(self): - bundle_constraints = (constraint.reference_attribute for constraint in self.constraints if constraint.reference_attribute.type == 'bundle') + def get_constraint_by_type(self, type: str) -> Constraint or None: + for constraint in self.constraints: + if type == constraint.reference_attribute.type: + return constraint - for bundle_constraint in bundle_constraints: - type_attribute = bundle_constraint.id + def remove_items(self, items: list[Item]) -> bool: + self.items = [item for item in self.items if item not in items] + return True - for item in self.items: - attribute_id = getattr(item, type_attribute, None) + def generate_bundles(self): + logging.info('Generating Bundles...') + # confirms bundle constraints exists + bundle_constraints = ( + constraint.reference_attribute for constraint in self.constraints + if constraint.reference_attribute.type == 'bundle') - # make sure the item has said attribute - if attribute_id != None: - # if there are pre-existing bundles, add new or increment existing - # else create array with new bundle - if self.bundles != None: - # get index of the bundle in the bundles list if exists or None if it doesn't - bundle_index = next((index for (index, bundle) in enumerate(self.bundles) if bundle.id == attribute_id and bundle.type == type_attribute), None) + for bundle_constraint in bundle_constraints: + type_attribute = bundle_constraint.id - # if the index doesn't exist add the new bundle of whatever type - # else increment the count of the current bundle - if bundle_index == None: - self.bundles.append(Bundle( - id=attribute_id, - count=1, - type=type_attribute - )) - else: - self.bundles[bundle_index].count += 1 - else: - self.bundles = [Bundle( - id=attribute_id, - count=1, - type=type_attribute - )] + for item in self.items: + attribute_id = getattr(item, type_attribute, None) + + # make sure the item has said attribute + if attribute_id != None: + # if there are pre-existing bundles, add new or increment existing + # else create array with new bundle + if self.bundles != None: + # get index of the bundle in the bundles list if exists or None if it doesn't + bundle_index = next( + (index + for (index, bundle) in enumerate(self.bundles) + if bundle.id == attribute_id + and bundle.type == type_attribute), None) + + # if the index doesn't exist add the new bundle of whatever type + # else increment the count of the current bundle + if bundle_index == None: + self.bundles.append( + Bundle(id=attribute_id, + count=1, + items=[item], + type=type_attribute)) + else: + self.bundles[bundle_index].count += 1 + self.bundles[bundle_index].items.append(item) + else: + self.bundles = [ + Bundle(id=attribute_id, + count=1, + items=[item], + type=type_attribute) + ] + # temporary compensator for bundle item limits, since we shouldn't be using cases with less than 3 items + # ideally this should be in the bundles model as a new attribute to handle "constraints of constraints" + logging.info('Removing bundles with items < 3') + for k, v in enumerate(self.bundles): + bundle = self.bundles[k] + if bundle.count < 3: del self.bundles[k] + + logging.info('Bundles Generated...') + + def get_constraint(self, name: str) -> Constraint: + return next((constraint for constraint in self.constraints + if constraint.reference_attribute.id == name), None) + + def unbundled_items(self) -> list: + # since the only bundles are based on passage id currently + # in the future when we have more than just passage based bundles + # we'll need to develop a more sophisticated way of handling this concern + bundle_constraints = ( + constraint.reference_attribute for constraint in self.constraints + if constraint.reference_attribute.type == 'bundle') + + if len(list(bundle_constraints)) > 0: + return [item for item in self.items if item.passage_id == None] + else: + return self.items + + def select_items_by_percent(self, percent: int) -> list[Item]: + items = self.unbundled_items() + total_items = len(items) + selected_items_amount = round(total_items - (total_items * + (percent / 100))) + + return random.sample(items, selected_items_amount) + + def select_bundles_by_percent(self, percent: int) -> list[Bundle]: + total_bundles = len(self.bundles) + selected_bundles_amount = round(total_bundles - (total_bundles * + (percent / 100))) + + return random.sample(self.bundles, selected_bundles_amount) diff --git a/app/models/target.py b/app/models/target.py index 3999d83..f58f49a 100644 --- a/app/models/target.py +++ b/app/models/target.py @@ -2,6 +2,21 @@ from pydantic import BaseModel from typing import Optional class Target(BaseModel): - theta: float - value: float - result: Optional[float] + theta: float + value: float + result: Optional[float] + drift: float = 0.0 + + @classmethod + def max_drift(cls) -> int: + return 15 + + @classmethod + def max_drift_increment(cls) -> int: + return 1 # 10% + + def minimum(self) -> float: + return self.value - (self.value * self.drift) + + def maximum(self) -> float: + return self.value + (self.value * self.drift) diff --git a/app/services/base.py b/app/services/base.py index 77f72f7..eb7a248 100644 --- a/app/services/base.py +++ b/app/services/base.py @@ -1,4 +1,5 @@ class Base: - def __init__(self, source, ingest_type='message'): - self.ingest_type = ingest_type - self.source = source + + def __init__(self, source, ingest_type='message'): + self.ingest_type = ingest_type + self.source = source diff --git a/app/services/loft_service.py b/app/services/loft_service.py index 3afee56..d854291 100644 --- a/app/services/loft_service.py +++ b/app/services/loft_service.py @@ -1,118 +1,259 @@ import os, json, random, io, logging -from pulp import LpProblem, LpVariable, LpMinimize, LpStatus, lpSum +from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, LpAffineExpression, LpConstraint, LpStatus, lpSum +from lib.application_configs import ApplicationConfigs from helpers import aws_helper, tar_helper, csv_helper, service_helper, solver_helper from lib.errors.item_generation_error import ItemGenerationError from models.solver_run import SolverRun from models.solution import Solution from models.form import Form +from models.item import Item +from models.target import Target from services.base import Base + class LoftService(Base): - def process(self): - try: - self.solver_run = SolverRun.parse_obj(self.retreive_attributes_from_message()) - self.solver_run.generate_bundles() - self.solution = self.generate_solution() - self.result = self.stream_to_s3_bucket() - except ItemGenerationError as error: - self.result = self.stream_to_s3_bucket(error) - except TypeError as error: - logging.error(error) - self.result = self.stream_to_s3_bucket(ItemGenerationError("Provided params causing error in calculation results")) - def retreive_attributes_from_message(self): - logging.info('Retrieving attributes from message...') - # get s3 object - self.key = aws_helper.get_key_from_message(self.source) - s3_object = aws_helper.get_object(self.key, aws_helper.get_bucket_from_message(self.source)) + def process(self): + try: + self.solver_run = self.create_solver_run_from_attributes() + self.solver_run.generate_bundles() + self.solution = self.generate_solution() + self.result = self.stream_to_s3_bucket() + except ItemGenerationError as error: + self.result = self.stream_to_s3_bucket(error) + except TypeError as error: + logging.error(error) + self.result = self.stream_to_s3_bucket( + ItemGenerationError( + "Provided params causing error in calculation results")) - # convert to tar - self.tar = tar_helper.raw_to_tar(s3_object) + def create_solver_run_from_attributes(self) -> SolverRun: + logging.info('Retrieving attributes from message...') + # get s3 object + self.key = aws_helper.get_key_from_message(self.source) + s3_object = aws_helper.get_object( + self.key, aws_helper.get_bucket_from_message(self.source)) - # get attributes file and convert to dict - attributes = json.loads(tar_helper.extract_file_from_tar(self.tar , 'solver_run_attributes.json').read()) + # convert to tar + self.tar = tar_helper.raw_to_tar(s3_object) - # get items file and convert to dict - items_csv = tar_helper.extract_file_from_tar(self.tar , 'items.csv') - items_csv_reader = csv_helper.file_stream_reader(items_csv) + # get attributes file and convert to dict + attributes = json.loads( + tar_helper.extract_file_from_tar( + self.tar, 'solver_run_attributes.json').read()) - # add items to attributes dict - attributes['items'] = service_helper.items_csv_to_dict(items_csv_reader) - logging.info('Processed Attributes...') + # create solver run + solver_run = SolverRun.parse_obj(attributes) - return attributes + # get items file and convert to dict + items_csv = tar_helper.extract_file_from_tar(self.tar, 'items.csv') + items_csv_reader = csv_helper.file_stream_reader(items_csv) - def generate_solution(self): - # unsolved solution - solution = Solution( - response_id=random.randint(100, 5000), - forms=[] - ) + # add items to solver run + solver_run.items = service_helper.csv_to_item(items_csv_reader, + solver_run) - # counter for number of forms - f = 0 + logging.info('Processed Attributes...') - # iterate for number of forms that require creation - # currently creates distinc forms with no item overlap - while f < self.solver_run.total_forms: - # setup vars - items = LpVariable.dicts( - "Item", [item.id for item in self.solver_run.items], lowBound=1, upBound=1, cat='Binary') - problem_objection_functions = [] + return solver_run - # create problem - problem = LpProblem("ata-form-generate", LpMinimize) + def generate_solution(self) -> Solution: + logging.info('Generating Solution...') - # dummy objective function, because it just makes things easierâ„¢ - problem += lpSum([items[item.id] - for item in self.solver_run.items]) + solution = Solution(response_id=random.randint(100, 5000), + forms=[]) # unsolved solution - # constraints - problem += lpSum([items[item.id] - for item in self.solver_run.items]) == self.solver_run.total_form_items, 'Total form items' + # iterate for number of forms that require creation + for form_count in range(self.solver_run.total_forms): + form_number = form_count + 1 + current_drift = 0 # FF Tokyo Drift - # dynamic constraints - problem = solver_helper.build_constraints(self.solver_run, problem, items) + # adding an element of randomness to the items and bundles used + selected_items = self.solver_run.select_items_by_percent(30) + selected_bundles = self.solver_run.select_bundles_by_percent( + 30) - # multi-objective constraints - for target in self.solver_run.objective_function.tif_targets: - problem += lpSum([item.iif(self.solver_run, target.theta)*items[item.id] - for item in self.solver_run.items]) <= target.value, f'min tif theta ({target.theta}) target value {target.value}' + # setup common Solver variables + items = LpVariable.dicts("Item", + [item.id for item in selected_items], + lowBound=0, + upBound=1, + cat='Binary') + bundles = LpVariable.dicts( + "Bundle", [bundle.id for bundle in selected_bundles], + lowBound=0, + upBound=1, + cat='Binary') - for target in self.solver_run.objective_function.tcc_targets: - problem += lpSum([item.irf(self.solver_run, target.theta)*items[item.id] - for item in self.solver_run.items]) <= target.value, f'min tcc theta ({target.theta}) target value {target.value}' + logging.info(f'Generating Solution for Form {form_number}') - # solve problem - problem.solve() + while current_drift <= Target.max_drift(): + drift_percent = current_drift / 100 + self.solver_run.objective_function.update_targets_drift( + drift_percent) - # add return items and create as a form - form_items = service_helper.solution_items(problem.variables(), self.solver_run) + # create problem + problem = LpProblem('ata-form-generate', LpMinimize) - # add form to solution - solution.forms.append(Form.create(form_items, self.solver_run, LpStatus[problem.status])) + # objective function + problem += lpSum([ + bundle.count * bundles[bundle.id] + for bundle in selected_bundles + ] + [ + items[item.id] + for item in selected_items + ]) - # successfull form, increment - f += 1 + # Form Constraints + problem += lpSum( + [ + bundle.count * bundles[bundle.id] + for bundle in selected_bundles + ] + [ + 1 * items[item.id] + for item in selected_items + ] + ) == self.solver_run.total_form_items, f'Total bundle form items for form {form_number}' - return solution + # Dynamic constraints.. currently we only support Metadata and Bundles(Cases/Passages) + problem = solver_helper.build_constraints( + self.solver_run, problem, items, bundles, selected_items, selected_bundles) - def stream_to_s3_bucket(self, error = None): - self.file_name = f'{service_helper.key_to_uuid(self.key)}.csv' - solution_file = None - # setup writer buffer and write processed forms to file - buffer = io.StringIO() + # form uniqueness constraints + # for form in solution.forms: + # form_item_options = [ + # bundles[bundle.id] + # for bundle in selected_bundles + # ] + [ + # items[item.id] + # for item in selected_items + # ] + # problem += len( + # set(form.solver_variables) + # & set(form_item_options)) / float( + # len( + # set(form.solver_variables) + # | set(form_item_options))) * 100 >= 10 - if error: - logging.info('Streaming %s error response to s3 bucket - %s', self.file_name, os.environ['S3_PROCESSED_BUCKET']) - solution_file = service_helper.error_to_file(buffer, error) - else: - logging.info('Streaming %s to s3 bucket - %s', self.file_name, os.environ['S3_PROCESSED_BUCKET']) - solution_file = service_helper.solution_to_file(buffer, self.solver_run.total_form_items, self.solution.forms) + logging.info('Creating TIF and TCC Elastic constraints') - # upload generated file to s3 and return result - return aws_helper.file_stream_upload(solution_file, self.file_name, os.environ['S3_PROCESSED_BUCKET']) + # Behold our very own Elastic constraints! + for tif_target in self.solver_run.objective_function.tif_targets: + problem += lpSum([ + bundle.tif(self.solver_run.irt_model, tif_target.theta) + * bundles[bundle.id] + for bundle in selected_bundles + ] + [ + item.iif(self.solver_run, tif_target.theta) * + items[item.id] + for item in selected_items + ]) >= tif_target.minimum( + ), f'Min TIF theta({tif_target.theta}) at target {tif_target.value} drift at {current_drift}%' + problem += lpSum([ + bundle.tif(self.solver_run.irt_model, tif_target.theta) + * bundles[bundle.id] + for bundle in selected_bundles + ] + [ + item.iif(self.solver_run, tif_target.theta) * + items[item.id] + for item in selected_items + ]) <= tif_target.maximum( + ), f'Max TIF theta({tif_target.theta}) at target {tif_target.value} drift at {current_drift}%' + + for tcc_target in self.solver_run.objective_function.tcc_targets: + problem += lpSum([ + bundle.trf(self.solver_run.irt_model, tcc_target.theta) + * bundles[bundle.id] + for bundle in selected_bundles + ] + [ + item.irf(self.solver_run, tcc_target.theta) * + items[item.id] + for item in selected_items + ]) >= tcc_target.minimum( + ), f'Min TCC theta({tcc_target.theta}) at target {tcc_target.value} drift at {current_drift}%' + problem += lpSum([ + bundle.trf(self.solver_run.irt_model, tcc_target.theta) + * bundles[bundle.id] + for bundle in selected_bundles + ] + [ + item.irf(self.solver_run, tcc_target.theta) * + items[item.id] + for item in selected_items + ]) <= tcc_target.maximum( + ), f'Max TCC theta({tcc_target.theta}) at target {tcc_target.value} drift at {current_drift}%' + + logging.info( + f'Solving for Form {form_number} with a drift of {current_drift}%' + ) + problem.solve() + + if LpStatus[problem.status] == 'Infeasible': + logging.info( + f'attempt infeasible for drift of {current_drift}%') + + if current_drift >= Target.max_drift( + ): # this is the last attempt, so lets finalize the solution + if ApplicationConfigs.local_dev_env: + service_helper.print_problem_variables(problem) + + logging.info( + f'No feasible solution found for Form {form_number}!' + ) + + self.add_form_to_solution(problem, solution) + + break + + current_drift += Target.max_drift_increment() + else: + if ApplicationConfigs.local_dev_env: + service_helper.print_problem_variables(problem) + + logging.info( + f'Optimal solution found with drift of {current_drift}%!' + ) + + self.add_form_to_solution(problem, solution) + + break + + logging.info('Solution Generated.') + + return solution + + def add_form_to_solution(self, problem: LpProblem, solution: Solution): + # add return items and create as a form + form_items, solver_variables = service_helper.solution_items( + problem.variables(), self.solver_run) + form = Form.create(form_items, self.solver_run, + LpStatus[problem.status], solver_variables) + + solution.forms.append(form) + + logging.info('Form generated and added to solution...') + + def stream_to_s3_bucket(self, error=None): + self.file_name = f'{service_helper.key_to_uuid(self.key)}.csv' + solution_file = None + # setup writer buffer and write processed forms to file + buffer = io.StringIO() + + if error: + logging.info('Streaming %s error response to s3 bucket - %s', + self.file_name, + ApplicationConfigs.s3_processed_bucket) + solution_file = service_helper.error_to_file(buffer, error) + else: + logging.info('Streaming %s to s3 bucket - %s', self.file_name, + ApplicationConfigs.s3_processed_bucket) + solution_file = service_helper.solution_to_file( + buffer, self.solver_run.total_form_items, self.solution.forms) + + # upload generated file to s3 and return result + return aws_helper.file_stream_upload( + solution_file, self.file_name, + ApplicationConfigs.s3_processed_bucket) diff --git a/app/services/solver_sandbox.py b/app/services/solver_sandbox.py new file mode 100644 index 0000000..97ac5fd --- /dev/null +++ b/app/services/solver_sandbox.py @@ -0,0 +1,252 @@ +# Local Dev Sandbox for the solver +# Useful for testing some concepts and functionality +# and offers a much faster feedback loop than the usual end-to-end process in Local Dev +# +# How to use: +# 1. run `compose exec meazure-solver bash` +# 2. run `python` +# 3. import this file in the python repl by `from services.solver_sandbox import SolverSandbox` +# 4. run any of the methds below e.g. `SolverSandbox.yas_elastic()` + +import logging + +from pulp import LpProblem, LpVariable, LpInteger, LpMinimize, LpMaximize, LpAffineExpression, LpConstraint, LpStatus, lpSum +from services.loft_service import LoftService +from lib.application_configs import ApplicationConfigs + +class SolverSandbox: + def loft_service(body = {}): + if ApplicationConfigs.local_dev_env: + body = {'Records': [{'eventVersion': '2.1', 'eventSource': 'aws:s3', 'awsRegion': 'us-east-1', 'eventTime': '2022-03-17T13:51:22.708Z', 'eventName': 'ObjectCreated:Put', 'userIdentity': {'principalId': 'AIDAJDPLRKLG7UEXAMPLE'}, 'requestParameters': {'sourceIPAddress': '127.0.0.1'}, 'responseElements': {'x-amz-request-id': '25ecd478', 'x-amz-id-2': 'eftixk72aD6Ap51TnqcoF8eFidJG9Z/2'}, 's3': {'s3SchemaVersion': '1.0', 'configurationId': 'testConfigRule', 'bucket': {'name': 'measure-local-solver-ingest', 'ownerIdentity': {'principalId': 'A3NL1KOZZKExample'}, 'arn': 'arn:aws:s3:::measure-local-solver-ingest'}, 'object': {'key': '40f23de0-8827-013a-a353-0242ac120010_solver_run.tar.gz', 'size': 491, 'eTag': '"2b423d91e80d931302192e781b6bd47c"', 'versionId': None, 'sequencer': '0055AED6DCD90281E5'}}}]} + + # CPNRE item bank with metadata and cases + # body = {'Records': [{'eventVersion': '2.1', 'eventSource': 'aws:s3', 'awsRegion': 'us-east-1', 'eventTime': '2022-03-23T18:49:42.979Z', 'eventName': 'ObjectCreated:Put', 'userIdentity': {'principalId': 'AIDAJDPLRKLG7UEXAMPLE'}, 'requestParameters': {'sourceIPAddress': '127.0.0.1'}, 'responseElements': {'x-amz-request-id': 'c4efd257', 'x-amz-id-2': 'eftixk72aD6Ap51TnqcoF8eFidJG9Z/2'}, 's3': {'s3SchemaVersion': '1.0', 'configurationId': 'testConfigRule', 'bucket': {'name': 'measure-local-solver-ingest', 'ownerIdentity': {'principalId': 'A3NL1KOZZKExample'}, 'arn': 'arn:aws:s3:::measure-local-solver-ingest'}, 'object': {'key': 'e8f38480-8d07-013a-5ee6-0242ac120010_solver_run.tar.gz', 'size': 12716, 'eTag': '"94189c36aef04dde3babb462442c3af3"', 'versionId': None, 'sequencer': '0055AED6DCD90281E5'}}}]} + + # LOFT item bank with metadata and cases + body = {'Records': [{'eventVersion': '2.1', 'eventSource': 'aws:s3', 'awsRegion': 'us-east-1', 'eventTime': '2022-03-22T19:36:53.568Z', 'eventName': 'ObjectCreated:Put', 'userIdentity': {'principalId': 'AIDAJDPLRKLG7UEXAMPLE'}, 'requestParameters': {'sourceIPAddress': '127.0.0.1'}, 'responseElements': {'x-amz-request-id': '61f320d0', 'x-amz-id-2': 'eftixk72aD6Ap51TnqcoF8eFidJG9Z/2'}, 's3': {'s3SchemaVersion': '1.0', 'configurationId': 'testConfigRule', 'bucket': {'name': 'measure-local-solver-ingest', 'ownerIdentity': {'principalId': 'A3NL1KOZZKExample'}, 'arn': 'arn:aws:s3:::measure-local-solver-ingest'}, 'object': {'key': '5971f500-8c45-013a-5d13-0242ac120010_solver_run.tar.gz', 'size': 619, 'eTag': '"a3cbba098e9f6a445cba6014e47ccaf9"', 'versionId': None, 'sequencer': '0055AED6DCD90281E5'}}}]} + + # Latest CPNRE Item Bank with metadata and cases + body = {'Records': [{'eventVersion': '2.1', 'eventSource': 'aws:s3', 'awsRegion': 'us-east-1', 'eventTime': '2022-03-24T15:47:54.652Z', 'eventName': 'ObjectCreated:Put', 'userIdentity': {'principalId': 'AIDAJDPLRKLG7UEXAMPLE'}, 'requestParameters': {'sourceIPAddress': '127.0.0.1'}, 'responseElements': {'x-amz-request-id': '1969b1ed', 'x-amz-id-2': 'eftixk72aD6Ap51TnqcoF8eFidJG9Z/2'}, 's3': {'s3SchemaVersion': '1.0', 'configurationId': 'testConfigRule', 'bucket': {'name': 'measure-local-solver-ingest', 'ownerIdentity': {'principalId': 'A3NL1KOZZKExample'}, 'arn': 'arn:aws:s3:::measure-local-solver-ingest'}, 'object': {'key': 'ab40ca20-8db7-013a-a88f-0242ac120013_solver_run.tar.gz', 'size': 24111, 'eTag': '"718a1a17b5dd5219b8e179bfd1ddf1ca"', 'versionId': None, 'sequencer': '0055AED6DCD90281E5'}}}]} + + # Latest LOFT Item Bank with metadata and cases with target variance + body = {'Records': [{'eventVersion': '2.1', 'eventSource': 'aws:s3', 'awsRegion': 'us-east-1', 'eventTime': '2022-03-25T18:03:18.829Z', 'eventName': 'ObjectCreated:Put', 'userIdentity': {'principalId': 'AIDAJDPLRKLG7UEXAMPLE'}, 'requestParameters': {'sourceIPAddress': '127.0.0.1'}, 'responseElements': {'x-amz-request-id': '204c718f', 'x-amz-id-2': 'eftixk72aD6Ap51TnqcoF8eFidJG9Z/2'}, 's3': {'s3SchemaVersion': '1.0', 'configurationId': 'testConfigRule', 'bucket': {'name': 'measure-local-solver-ingest', 'ownerIdentity': {'principalId': 'A3NL1KOZZKExample'}, 'arn': 'arn:aws:s3:::measure-local-solver-ingest'}, 'object': {'key': 'beb35dc0-8e93-013a-5807-0242ac120013_solver_run.tar.gz', 'size': 24112, 'eTag': '"a5a4aad0eb8c9d9af2aad9684437022a"', 'versionId': None, 'sequencer': '0055AED6DCD90281E5'}}}]} + + LoftService(body).process() + + def yosh_loop(): + Items = [1,2,3,4,5] + tif = { + 1: 0.2, + 2: 0.5, + 3: 0.3, + 4: 0.8, + 5: 0.1 + } + iif = { + 1: 0.09, + 2: 0.2, + 3: 0.113, + 4: 0.3, + 5: 0.1 + } + drift = 0.0 + drift_limit = 0.2 + iif_target = 0.5 + tif_target = 0.9 + item_vars = LpVariable.dicts("Item", Items, cat="Binary") + while drift <= drift_limit: + prob = LpProblem("tif_tcc_test", LpMinimize) + prob += lpSum([(tif[i] + iif[i]) * item_vars[i] for i in Items]), "TifTccSum" + prob += lpSum([item_vars[i] for i in Items]) == 3, "TotalItems" + prob += lpSum([tif[i] * item_vars[i] for i in Items]) >= tif_target - (tif_target * drift), 'TifMin' + prob += lpSum([tif[i] * item_vars[i] for i in Items]) <= tif_target + (tif_target * drift), 'TifMax' + prob += lpSum([iif[i] * item_vars[i] for i in Items]) >= iif_target - (iif_target * drift), 'TccMin' + prob += lpSum([iif[i] * item_vars[i] for i in Items]) <= iif_target + (iif_target * drift), 'TccMax' + prob.solve() + print(prob) + if LpStatus[prob.status] == "Infeasible": + print('attempt infeasible') + for v in prob.variables(): + print(v.name, "=", v.varValue) + + drift += 0.02 + else: + print(f"solution found with drift of {drift}!") + for v in prob.variables(): + print(v.name, "=", v.varValue) + break + + def yas_elastic(tif_targets, tcc_targets): # [50, 55, 46], [60, 40, 50] + Items = [1,2,3,4,5,6,7,8,9,10] + + iif = { + 1: 5, + 2: 5, + 3: 5, + 4: 10, + 5: 10, + 6: 10, + 7: 15, + 8: 20, + 9: 20, + 10: 20 + } + # --- + irf = { + 1: 5, + 2: 5, + 3: 5, + 4: 10, + 5: 10, + 6: 10, + 7: 15, + 8: 20, + 9: 20, + 10: 20 + } + + items = LpVariable.dicts('Item', Items, cat='Binary') + drift = 0 + max_drift = 25# 25% elasticity + + while drift <= max_drift: + drift_percent = drift / 100 + problem = LpProblem('TIF_TCC', LpMinimize) + + # objective function + problem += lpSum([items[i] for i in Items]) + + # Constraint 1 + problem += lpSum([items[i] for i in Items]) == 5, 'TotalItems' + + # Our own "Elastic Constraints" + for tif_target in tif_targets: + print(f"Calculating TIF target of {tif_target} with drift of {drift}%") + problem += lpSum( + [iif[i] * items[i] for i in Items] + ) >= tif_target - (tif_target * drift_percent) + problem += lpSum( + [iif[i] * items[i] for i in Items] + ) <= tif_target + (tif_target * drift_percent) + + for tcc_target in tcc_targets: + print(f"Calculating TIF target of {tcc_target} with drift of {drift}%") + problem += lpSum( + [irf[i] * items[i] for i in Items] + ) >= tcc_target - (tcc_target * drift_percent) + problem += lpSum( + [irf[i] * items[i] for i in Items] + ) <= tcc_target + (tcc_target * drift_percent) + + problem.solve() + + if LpStatus[problem.status] == 'Infeasible': + print(f"attempt infeasible for drift of {drift}") + + for v in problem.variables(): print(v.name, "=", v.varValue) + + # if drift == max_drift: breakpoint(); + + print(problem.objective.value()) + print(problem.constraints) + print(problem.objective) + + drift += 1 + else: + print(f"solution found with drift of {drift}!") + + for v in problem.variables(): print(v.name, "=", v.varValue); + + print(problem.objective.value()) + print(problem.constraints) + print(problem.objective) + + break + + # Implementation of the Whiskas Cat problem, with elastic constraints + # https://www.coin-or.org/PuLP/CaseStudies/a_blending_problem.html + # https://stackoverflow.com/questions/27278691/how-can-an-elastic-subproblem-in-pulp-be-used-as-a-constraint?noredirect=1&lq=1 + def whiskas(): + # Creates a list of the Ingredients + Ingredients = ['CHICKEN', 'BEEF', 'MUTTON', 'RICE', 'WHEAT', 'GEL'] + + # A dictionary of the costs of each of the Ingredients is created + costs = {'CHICKEN': 0.013, + 'BEEF': 0.008, + 'MUTTON': 0.010, + 'RICE': 0.002, + 'WHEAT': 0.005, + 'GEL': 0.001} + + # A dictionary of the protein percent in each of the Ingredients is created + proteinPercent = {'CHICKEN': 0.100, + 'BEEF': 0.200, + 'MUTTON': 0.150, + 'RICE': 0.000, + 'WHEAT': 0.040, + 'GEL': 0.000} + + # A dictionary of the fat percent in each of the Ingredients is created + fatPercent = {'CHICKEN': 0.080, + 'BEEF': 0.100, + 'MUTTON': 0.110, + 'RICE': 0.010, + 'WHEAT': 0.010, + 'GEL': 0.000} + + # A dictionary of the fibre percent in each of the Ingredients is created + fibrePercent = {'CHICKEN': 0.001, + 'BEEF': 0.005, + 'MUTTON': 0.003, + 'RICE': 0.100, + 'WHEAT': 0.150, + 'GEL': 0.000} + + # A dictionary of the salt percent in each of the Ingredients is created + saltPercent = {'CHICKEN': 0.002, + 'BEEF': 0.005, + 'MUTTON': 0.007, + 'RICE': 0.002, + 'WHEAT': 0.008, + 'GEL': 0.000} + + logging.info('Running Test...') + + # create problem + problem = LpProblem("The Whiskas Problem", LpMinimize) + + # A dictionary called 'ingredient_vars' is created to contain the referenced Variables + ingredient_vars = LpVariable.dicts("Ingr", Ingredients, 0) + + # set objective + problem += lpSum([costs[i]*ingredient_vars[i] for i in Ingredients]), "Total Cost of Ingredients per can" + + # The five constraints are added to 'prob' + problem += lpSum([ingredient_vars[i] for i in Ingredients]) == 100, "PercentagesSum" + problem += lpSum([proteinPercent[i] * ingredient_vars[i] for i in Ingredients]) >= 8.0, "ProteinRequirement" + problem += lpSum([fatPercent[i] * ingredient_vars[i] for i in Ingredients]) >= 6.0, "FatRequirement" + problem += lpSum([fibrePercent[i] * ingredient_vars[i] for i in Ingredients]) <= 2.0, "FibreRequirement" + problem += lpSum([saltPercent[i] * ingredient_vars[i] for i in Ingredients]) <= 0.4, "SaltRequirement" + + # ELASTICIZE + # c6_LHS_A = LpAffineExpression([ingredient_vars]) + c6_LHS = LpAffineExpression([(ingredient_vars['GEL'],1), (ingredient_vars['BEEF'],1)]) + c6= LpConstraint(e=c6_LHS, sense=-1, name='GelBeefTotal', rhs=30) + c6_elastic = c6.makeElasticSubProblem(penalty = 100, proportionFreeBound = .10) + + problem.extend(c6_elastic) + + print(problem) + + # solve problem + problem.solve() + + # The status of the solution is printed to the screen + print("Status:", LpStatus[problem.status]) + + # Each of the variables is printed with it's resolved optimum value + for v in problem.variables(): + print(v.name, "=", v.varValue) + + # The optimised objective function value is printed to the screen + print("Total Cost of Ingredients per can = ", problem.objective.value())