Merge pull request #29 from yardstick/feature/QUANT-1415-Loft-Cases-Fix

QUANT-1415 Loft Cases Fix
This commit is contained in:
brmnjsh 2022-03-29 11:46:16 -04:00 committed by GitHub
commit c7da2862b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1156 additions and 425 deletions

View File

@ -4,12 +4,14 @@ RUN apt-get update
RUN apt-get -y install coinor-cbc RUN apt-get -y install coinor-cbc
RUN python -m pip install pulp RUN python -m pip install pulp
RUN python -m pip install pydantic RUN python -m pip install pydantic
RUN python -m pip install pySqsListener
RUN python -m pip install daemonize RUN python -m pip install daemonize
RUN python -m pip install sqspy
RUN mkdir /app RUN mkdir /app
WORKDIR /app WORKDIR /app
ENV LOCAL_DEV True
# Bundle app source # Bundle app source
COPY . /app COPY . /app

View File

@ -4,8 +4,8 @@ RUN apt-get update
RUN apt-get -y install coinor-cbc RUN apt-get -y install coinor-cbc
RUN python -m pip install pulp RUN python -m pip install pulp
RUN python -m pip install pydantic RUN python -m pip install pydantic
RUN python -m pip install pySqsListener
RUN python -m pip install daemonize RUN python -m pip install daemonize
RUN python -m pip install sqspy
RUN mkdir /app RUN mkdir /app
WORKDIR /app WORKDIR /app

View File

@ -1,39 +1,42 @@
import boto3 import boto3
import os
import json import json
session = boto3.Session( from lib.application_configs import ApplicationConfigs
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY']
)
s3 = session.resource('s3', region_name=os.environ['AWS_REGION']) session = boto3.Session(
sqs = session.client('sqs', region_name=os.environ['AWS_REGION']) aws_access_key_id=ApplicationConfigs.aws_access_key_id,
aws_secret_access_key=ApplicationConfigs.aws_secret_key)
# ToDo: Figure out a much better way of doing this.
# LocalStack wants endpoint_url, while prod doesnt :(
if ApplicationConfigs.local_dev_env:
s3 = session.resource('s3', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url)
sqs = session.client('sqs', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url)
else:
s3 = session.resource('s3', region_name=ApplicationConfigs.region_name)
sqs = session.client('sqs', region_name=ApplicationConfigs.region_name)
def get_key_from_message(body): def get_key_from_message(body):
return body['Records'][0]['s3']['object']['key'] return body['Records'][0]['s3']['object']['key']
def get_bucket_from_message(body): def get_bucket_from_message(body):
return body['Records'][0]['s3']['bucket']['name'] return body['Records'][0]['s3']['bucket']['name']
def get_object(key, bucket): def get_object(key, bucket):
return s3.Object( return s3.Object(bucket_name=bucket, key=key).get()['Body'].read()
bucket_name=bucket,
key=key
).get()['Body'].read()
def file_stream_upload(buffer, name, bucket): def file_stream_upload(buffer, name, bucket):
return s3.Bucket(bucket).upload_fileobj(buffer, name) return s3.Bucket(bucket).upload_fileobj(buffer, name)
def receive_message(queue, message_num=1, wait_time=1): def receive_message(queue, message_num=1, wait_time=1):
return sqs.receive_message( return sqs.receive_message(QueueUrl=queue,
QueueUrl=queue, MaxNumberOfMessages=message_num,
MaxNumberOfMessages=message_num, WaitTimeSeconds=wait_time)
WaitTimeSeconds=wait_time
)
def delete_message(queue, receipt): def delete_message(queue, receipt):
return sqs.delete_message( return sqs.delete_message(QueueUrl=queue, ReceiptHandle=receipt)
QueueUrl=queue,
ReceiptHandle=receipt
)

View File

@ -0,0 +1,14 @@
def boolean_to_int(value: bool) -> int:
if value:
return 1
else:
return 0
def is_float(element: str) -> bool:
try:
float(element)
return True
except ValueError:
return False

View File

@ -1,5 +1,6 @@
import csv import csv
import io import io
def file_stream_reader(f): def file_stream_reader(f):
return csv.reader(io.StringIO(f.read().decode('ascii'))) return csv.reader(io.StringIO(f.read().decode('ascii')))

View File

@ -1,90 +1,132 @@
import csv import csv
import io import io
import re import re
from tokenize import String
from typing import Tuple
def items_csv_to_dict(items_csv_reader): from helpers import common_helper
items = [] from models.item import Item
headers = [] from models.solver_run import SolverRun
# get headers and items def csv_to_item(items_csv_reader, solver_run):
for key, row in enumerate(items_csv_reader): items = []
if key == 0: headers = []
headers = row
else:
item = { 'attributes': [] }
# ensure that the b param is formatted correctly # get headers and items
if len(re.findall(".", row[len(headers) - 1])) >= 3: for key, row in enumerate(items_csv_reader):
for key, col in enumerate(headers): if key == 0:
if key == 0: headers = row
item[col] = row[key] else:
if key == 2: item = {'attributes': []}
# make sure passage id exists
if row[key]:
item['passage_id'] = row[key]
# b param - tmep fix! use irt model b param for proper reference
elif key == len(headers) - 1:
item['b_param'] = row[key]
elif key > 2 and key < len(headers) - 1:
item['attributes'].append({
'id': col,
'value': row[key],
'type': 'metadata'
})
items.append(item) # ensure that the b param is formatted correctly
if row[len(headers) - 1] != '' and common_helper.is_float(row[len(headers) - 1]):
for key, col in enumerate(headers):
if solver_run.irt_model.formatted_b_param() == col:
value = float(row[key])
item['b_param'] = value
elif solver_run.get_constraint(
col) and solver_run.get_constraint(
col).reference_attribute.type == 'bundle':
if row[key]:
item[solver_run.get_constraint(
col).reference_attribute.id] = row[key]
elif solver_run.get_constraint(col):
constraint = solver_run.get_constraint(col)
item['attributes'].append({
'id':
col,
'value':
row[key],
'type':
constraint.reference_attribute.type
})
else:
if row[key]:
item[col] = row[key]
# confirm item is only added if it meets the criteria of 100% constraints as a pre-filter
valid_item = True
item = Item.parse_obj(item)
for constraint in solver_run.constraints:
if item.attribute_exists(constraint.reference_attribute) == False and constraint.minimum == 100:
valid_item = False
if valid_item: items.append(item)
return items
return items
def solution_to_file(buffer, total_form_items, forms): def solution_to_file(buffer, total_form_items, forms):
wr = csv.writer(buffer, dialect='excel', delimiter=',') wr = csv.writer(buffer, dialect='excel', delimiter=',')
# write header row for first row utilizing the total items all forms will have # write header row for first row utilizing the total items all forms will have
# fill the rows with the targets and cut score then the items # fill the rows with the targets and cut score then the items
header = ['status'] header = ['status']
for result in forms[0].tif_results: for result in forms[0].tif_results:
header += [f'tif @ {round(result.theta, 2)}'] header += [f'tif @ {round(result.theta, 2)}']
for result in forms[0].tcc_results: for result in forms[0].tcc_results:
header += [f'tcc @ {round(result.theta, 2)}'] header += [f'tcc @ {round(result.theta, 2)}']
header += ['cut score'] + [x + 1 for x in range(total_form_items)] header += ['cut score'] + [x + 1 for x in range(total_form_items)]
wr.writerow(header) wr.writerow(header)
# add each form as row to processed csv # add each form as row to processed csv
for form in forms: for form in forms:
row = [form.status] row = [form.status]
for result in form.tif_results + form.tcc_results: for result in form.tif_results + form.tcc_results:
row += [f'target - {result.value}\nresult - {round(result.result, 2)}'] row += [
f'target - {result.value}\nresult - {round(result.result, 2)}'
]
# provide generated items and cut score # provide generated items and cut score
row += [round(form.cut_score, 2)] + [item.id for item in form.items] row += [round(form.cut_score, 2)] + [item.id for item in form.items]
wr.writerow(row) wr.writerow(row)
buff2 = io.BytesIO(buffer.getvalue().encode()) buff2 = io.BytesIO(buffer.getvalue().encode())
return buff2
return buff2
def error_to_file(buffer, error): def error_to_file(buffer, error):
wr = csv.writer(buffer, dialect='excel', delimiter=',') wr = csv.writer(buffer, dialect='excel', delimiter=',')
wr.writerow(['status']) wr.writerow(['status'])
wr.writerow([error.args[0]]) wr.writerow([error.args[0]])
return io.BytesIO(buffer.getvalue().encode())
return io.BytesIO(buffer.getvalue().encode())
def key_to_uuid(key): def key_to_uuid(key):
return re.split("_", key)[0] return re.split("_", key)[0]
def solution_items(variables, solver_run):
form_items = []
for v in variables: def solution_items(variables: list, solver_run: SolverRun) -> Tuple[list]:
if v.varValue > 0: form_items = []
item_id = v.name.replace('Item_', '') solver_variables = []
item = solver_run.get_item(item_id)
# add item to list and then remove from master item list
form_items.append(item)
return form_items for v in variables:
if v.varValue > 0:
solver_variables.append(v.name)
if 'Item_' in v.name:
item_id = v.name.replace('Item_', '')
item = solver_run.get_item(int(item_id))
# add item to list and then remove from master item list
if item: form_items.append(item)
elif 'Bundle_' in v.name:
bundle_id = v.name.replace('Bundle_', '')
bundle = solver_run.get_bundle(int(bundle_id))
if bundle:
for item in bundle.items:
if item: form_items.append(item)
return form_items, solver_variables
def print_problem_variables(problem):
# Uncomment this as needed in local dev
# print(problem);
for v in problem.variables(): print(v.name, "=", v.varValue)

View File

@ -1,56 +1,86 @@
from pulp import lpSum from pulp import lpSum, LpProblem
from random import randint, sample from random import randint, sample
import logging import logging
from helpers.common_helper import *
from models.bundle import Bundle
from models.solver_run import SolverRun
from models.item import Item
from lib.errors.item_generation_error import ItemGenerationError from lib.errors.item_generation_error import ItemGenerationError
def build_constraints(solver_run, problem, items): def build_constraints(solver_run: SolverRun, problem: LpProblem,
try: items: list[Item], bundles: list[Bundle], selected_items: list[Item], selected_bundles: list[Bundle]) -> LpProblem:
total_form_items = solver_run.total_form_items logging.info('Creating Constraints...')
constraints = solver_run.constraints
for constraint in constraints: try:
attribute = constraint.reference_attribute total_form_items = solver_run.total_form_items
min = constraint.minimum constraints = solver_run.constraints
max = constraint.maximum
if attribute.type == 'metadata': for constraint in constraints:
con = dict(zip([item.id for item in solver_run.items], attribute = constraint.reference_attribute
[item.attribute_exists(attribute) min = constraint.minimum
for item in solver_run.items])) max = constraint.maximum
problem += lpSum([con[item.id]
* items[item.id]
for item in solver_run.items]) >= round(total_form_items * (min / 100)), f'{attribute.id} - {attribute.value} - min'
problem += lpSum([con[item.id]
* items[item.id]
for item in solver_run.items]) <= round(total_form_items * (max / 100)), f'{attribute.id} - {attribute.value} - max'
elif attribute.type == 'bundle':
# TODO: account for many different bundle types, since the id condition in L33 could yield duplicates
total_bundles = randint(constraint.minimum, constraint.maximum)
selected_bundles = sample(solver_run.bundles, total_bundles)
total_bundle_items = 0
for bundle in selected_bundles: if attribute.type == 'metadata':
con = dict(zip([item.id for item in solver_run.items], logging.info('Metadata Constraint Generating...')
[(getattr(item, bundle.type, False) == bundle.id)
for item in solver_run.items]))
problem += lpSum([con[item.id]
* items[item.id]
for item in solver_run.items]) == bundle.count, f'Bundle constraint for {bundle.type} ({bundle.id})'
total_bundle_items += bundle.count
# make sure all other items added to the form problem += lpSum(
# are not a part of any bundle [
# currently only supports single bundle constraints, will need refactoring for multiple bundle constraints len(bundle.items_with_attribute(attribute)) * bundles[bundle.id] for bundle in selected_bundles
con = dict(zip([item.id for item in solver_run.items], ] +
[(getattr(item, attribute.id, None) == None) [
for item in solver_run.items])) item.attribute_exists(attribute).real * items[item.id] for item in selected_items
problem += lpSum([con[item.id] ]
* items[item.id] ) >= round(total_form_items * (min / 100)), f'{attribute.id} - {attribute.value} - min'
for item in solver_run.items]) == solver_run.total_form_items - total_bundle_items, f'Remaining items are not of a bundle type'
problem += lpSum(
[
len(bundle.items_with_attribute(attribute)) * bundles[bundle.id] for bundle in selected_bundles
] +
[
item.attribute_exists(attribute).real * items[item.id] for item in selected_items
]
) <= round(total_form_items * (max / 100)), f'{attribute.id} - {attribute.value} - max'
elif attribute.type == 'bundle':
logging.info('Bundles Constraint Generating...')
# TODO: account for many different bundle types, since the id condition in L33 could yield duplicates
if selected_bundles != None:
# make sure the total bundles used in generated form is limited between min-max set
problem += lpSum([
bundles[bundle.id] for bundle in selected_bundles
]) == randint(int(constraint.minimum),
int(constraint.maximum))
logging.info('Constraints Created...')
return problem
except ValueError as error:
logging.error(error)
raise ItemGenerationError(
"Bundle min and/or max larger than bundle amount provided",
error.args[0])
return problem def get_random_bundles(total_form_items: int,
except ValueError as error: bundles: list[Bundle],
logging.error(error) min: int,
raise ItemGenerationError("Bundle min and/or max larger than bundle amount provided", error.args[0]) max: int,
found_bundles=False) -> list[Bundle]:
selected_bundles = None
total_bundle_items = 0
total_bundles = randint(min, max)
logging.info(f'Selecting Bundles (total of {total_bundles})...')
while found_bundles == False:
selected_bundles = sample(bundles, total_bundles)
total_bundle_items = sum(bundle.count for bundle in selected_bundles)
if total_bundle_items <= total_form_items:
found_bundles = True
if found_bundles == True:
return selected_bundles
else:
return get_random_bundles(total_form_items, total_bundles - 1, bundles)

View File

@ -1,9 +1,11 @@
import io import io
import tarfile import tarfile
def raw_to_tar(raw_object): def raw_to_tar(raw_object):
tarball = io.BytesIO(raw_object) tarball = io.BytesIO(raw_object)
return tarfile.open(fileobj=tarball, mode='r:gz') return tarfile.open(fileobj=tarball, mode='r:gz')
def extract_file_from_tar(tar, file_name): def extract_file_from_tar(tar, file_name):
return tar.extractfile(tar.getmember(file_name)) return tar.extractfile(tar.getmember(file_name))

View File

@ -0,0 +1,12 @@
import os
from pydantic.dataclasses import dataclass
@dataclass
class ApplicationConfigs():
region_name = os.environ.get('AWS_REGION', 'ca-central-1')
aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID', '')
aws_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY', '')
sqs_queue = os.environ.get('SQS_QUEUE', '')
endpoint_url = os.environ.get('ENDPOINT_URL', '')
s3_processed_bucket = os.environ.get('S3_PROCESSED_BUCKET', 'measure-local-solver-processed')
local_dev_env = os.environ.get('LOCAL_DEV', False) == 'True'

View File

@ -1,2 +1,2 @@
class ItemGenerationError(Exception): class ItemGenerationError(Exception):
pass pass

View File

@ -3,22 +3,28 @@ import logging
from lib.irt.models.three_parameter_logistic import ThreeParameterLogistic from lib.irt.models.three_parameter_logistic import ThreeParameterLogistic
from lib.errors.item_generation_error import ItemGenerationError from lib.errors.item_generation_error import ItemGenerationError
class ItemInformationFunction():
def __init__(self, irt_model):
self.model_data = irt_model
# determines the amount of information for a given question at a given theta (ability level) class ItemInformationFunction():
# further detailed on page 161, equation 4 here:
# https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf def __init__(self, irt_model):
def calculate(self, **kwargs): self.model_data = irt_model
try:
if self.model_data.model == '3PL': # determines the amount of information for a given question at a given theta (ability level)
p = ThreeParameterLogistic(self.model_data, kwargs).result() # further detailed on page 161, equation 4 here:
q = 1 - p # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf
return (self.model_data.a_param * q * (p - self.model_data.c_param)**2) / (p * ((1 - self.model_data.c_param)**2)) def calculate(self, **kwargs):
else: try:
# potentially error out if self.model_data.model == '3PL':
raise ItemGenerationError("irt model not supported or provided") p = ThreeParameterLogistic(self.model_data, kwargs).result()
except ZeroDivisionError as error: q = 1 - p
logging.error(error) return (self.model_data.a_param * q *
raise ItemGenerationError("params not well formatted", error.args[0]) (p - self.model_data.c_param)**2) / (p * (
(1 - self.model_data.c_param)**2))
else:
# potentially error out
raise ItemGenerationError(
"irt model not supported or provided")
except ZeroDivisionError as error:
logging.error(error)
raise ItemGenerationError("params not well formatted",
error.args[0])

View File

@ -1,12 +1,14 @@
from lib.irt.models.three_parameter_logistic import ThreeParameterLogistic from lib.irt.models.three_parameter_logistic import ThreeParameterLogistic
from lib.errors.item_generation_error import ItemGenerationError from lib.errors.item_generation_error import ItemGenerationError
class ItemResponseFunction():
def __init__(self, irt_model):
self.model_data = irt_model
def calculate(self, **kwargs): class ItemResponseFunction():
if self.model_data.model == '3PL':
return ThreeParameterLogistic(self.model_data, kwargs).result() def __init__(self, irt_model):
else: self.model_data = irt_model
raise ItemGenerationError("irt model not supported or provided")
def calculate(self, **kwargs):
if self.model_data.model == '3PL':
return ThreeParameterLogistic(self.model_data, kwargs).result()
else:
raise ItemGenerationError("irt model not supported or provided")

View File

@ -1,16 +1,18 @@
class ThreeParameterLogistic: class ThreeParameterLogistic:
def __init__(self, model_params, kwargs):
self.model_params = model_params
# check if exists, if not error out
self.b_param = kwargs['b_param']
self.e = 2.71828
self.theta = kwargs['theta']
# contains the primary 3pl function, determining the probably of an inidividual def __init__(self, model_params, kwargs):
# that an individual at a certain theta would get a particular question correct self.model_params = model_params
# detailed further on page 161, equation 1 here: # check if exists, if not error out
# https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf self.b_param = kwargs['b_param']
def result(self): self.e = 2.71828
a = self.model_params.a_param self.theta = kwargs['theta']
c = self.model_params.c_param
return c + (1 - c) * (1 / (1 + self.e**(-a * (self.theta - self.b_param)))) # contains the primary 3pl function, determining the probably of an inidividual
# that an individual at a certain theta would get a particular question correct
# detailed further on page 161, equation 1 here:
# https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf
def result(self):
a = self.model_params.a_param
c = self.model_params.c_param
return c + (1 - c) * (1 / (1 + self.e**(-a *
(self.theta - self.b_param))))

View File

@ -1,19 +1,22 @@
from lib.irt.item_information_function import ItemInformationFunction from lib.irt.item_information_function import ItemInformationFunction
class TestInformationFunction(): class TestInformationFunction():
def __init__(self, irt_model):
self.irt_model = irt_model
self.iif = ItemInformationFunction(irt_model)
# determins the amount of information def __init__(self, irt_model):
# at a certain theta (ability level) of the sum of a question set correct self.irt_model = irt_model
# detailed further on page 166, equation 4 here: self.iif = ItemInformationFunction(irt_model)
# https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf
def calculate(self, items, **kwargs):
sum = 0
for item in items: # determins the amount of information
result = self.iif.calculate(b_param=item.b_param, theta=kwargs['theta']) # at a certain theta (ability level) of the sum of a question set correct
sum += result # detailed further on page 166, equation 4 here:
# https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf
def calculate(self, items, **kwargs):
sum = 0
return sum for item in items:
result = self.iif.calculate(b_param=item.b_param,
theta=kwargs['theta'])
sum += result
return sum

View File

@ -1,20 +1,23 @@
from lib.irt.item_response_function import ItemResponseFunction from lib.irt.item_response_function import ItemResponseFunction
# otherwise known as the Test Characteristic Curve (TCC) # otherwise known as the Test Characteristic Curve (TCC)
class TestResponseFunction(): class TestResponseFunction():
def __init__(self, irt_model):
self.irt_model = irt_model
self.irf = ItemResponseFunction(irt_model)
# determins the probably of an inidividual def __init__(self, irt_model):
# at a certain theta (ability level) would get a sum of questions correct self.irt_model = irt_model
# detailed further on page 166, equation 3 here: self.irf = ItemResponseFunction(irt_model)
# https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf
def calculate(self, items, **kwargs):
sum = 0
for item in items: # determine the probability of an individual
result = self.irf.calculate(b_param=item.b_param, theta=kwargs['theta']) # at a certain theta (ability level) would get a sum of questions correct
sum += result # detailed further on page 166, equation 3 here:
# https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5978482/pdf/10.1177_0146621615613308.pdf
def calculate(self, items, **kwargs):
sum = 0
return sum for item in items:
result = self.irf.calculate(b_param=item.b_param,
theta=kwargs['theta'])
sum += result
return sum

View File

@ -1,36 +1,57 @@
import os, sys, logging import os, sys, logging
from lib.application_configs import ApplicationConfigs
from services.loft_service import LoftService from services.loft_service import LoftService
from helpers import aws_helper from helpers import aws_helper
from daemonize import Daemonize from daemonize import Daemonize
from sqs_listener import SqsListener from sqspy import Consumer
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(levelname)s %(asctime)s - %(message)s") logging.basicConfig(stream=sys.stdout,
level=logging.INFO,
format="%(levelname)s %(asctime)s - %(message)s")
class ServiceListener(SqsListener):
def handle_message(self, body, attributes, messages_attributes):
# gather/manage/process data based on the particular needs
logging.info('Incoming message: %s', body)
service = LoftService(body) class ServiceListener(Consumer):
service.process()
def handle_message(self, body, attributes, messages_attributes):
# gather/manage/process data based on the particular needs
logging.info('Incoming message: %s', body)
service = LoftService(body)
service.process()
logging.info('Process complete for %s', service.file_name)
logging.info('Process complete for %s', service.file_name)
def main(): def main():
logging.info('Starting Solver Service (v1.1.0)...') logging.info('Starting Solver Service: Tokyo Drift (v1.2)...')
listener = ServiceListener(
os.environ['SQS_QUEUE'], # ToDo: Figure out a much better way of doing this.
region_name=os.environ['AWS_REGION'], # LocalStack wants 'endpoint_url', while prod doesnt :(
aws_access_key=os.environ['AWS_ACCESS_KEY_ID'], if ApplicationConfigs.local_dev_env:
aws_secret_key=os.environ['AWS_SECRET_ACCESS_KEY'], listener = ServiceListener(
queue_url=os.environ['SQS_QUEUE'] None,
) ApplicationConfigs.sqs_queue,
listener.listen() create_queue=False,
region_name=ApplicationConfigs.region_name,
aws_access_key=ApplicationConfigs.aws_access_key_id,
aws_secret_key=ApplicationConfigs.aws_secret_key,
endpoint_url=ApplicationConfigs.endpoint_url)
else:
listener = ServiceListener(
None,
ApplicationConfigs.sqs_queue,
create_queue=False,
region_name=ApplicationConfigs.region_name,
aws_access_key=ApplicationConfigs.aws_access_key_id,
aws_secret_key=ApplicationConfigs.aws_secret_key)
listener.listen()
if __name__ == '__main__': if __name__ == '__main__':
myname=os.path.basename(sys.argv[0]) myname = os.path.basename(sys.argv[0])
pidfile='/tmp/%s' % myname pidfile = '/tmp/%s' % myname
daemon = Daemonize(app=myname,pid=pidfile, action=main, foreground=True) daemon = Daemonize(app=myname, pid=pidfile, action=main, foreground=True)
daemon.start() daemon.start()

View File

@ -1,11 +1,12 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import List, Optional, Dict from typing import List, Optional, Dict
class AdvancedOptions(BaseModel): class AdvancedOptions(BaseModel):
linearity_check: Optional[bool] linearity_check: Optional[bool]
show_progress: Optional[bool] show_progress: Optional[bool]
max_solution_time: Optional[int] max_solution_time: Optional[int]
brand_bound_tolerance: Optional[float] brand_bound_tolerance: Optional[float]
max_forms: Optional[int] max_forms: Optional[int]
precision: Optional[float] precision: Optional[float]
extra_param_range: Optional[List[Dict]] extra_param_range: Optional[List[Dict]]

View File

@ -1,7 +1,8 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional from typing import Optional
class Attribute(BaseModel): class Attribute(BaseModel):
value: Optional[str] value: Optional[str]
type: Optional[str] type: Optional[str]
id: str id: str

View File

@ -1,6 +1,48 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import List
from lib.irt.test_information_function import TestInformationFunction
from lib.irt.test_response_function import TestResponseFunction
from models.attribute import Attribute
from models.item import Item
from models.irt_model import IRTModel
class Bundle(BaseModel): class Bundle(BaseModel):
id: int id: int
count: int count: int
type: str items: List[Item]
type: str
def tif(self, irt_model: IRTModel, theta: float) -> float:
return TestInformationFunction(irt_model).calculate(self.items, theta=theta)
def trf(self, irt_model: IRTModel, theta: float) -> float:
return TestResponseFunction(irt_model).calculate(self.items, theta=theta)
def tif_trf_sum(self, solver_run):
return self.__trf_sum(solver_run) + self.__tif_sum(solver_run)
def items_with_attribute(self, attribute: Attribute) -> List[Item]:
items = []
for item in self.items:
if item.attribute_exists(attribute): items.append(item)
return items
def __tif_sum(self, solver_run):
total = 0
for target in solver_run.objective_function.tcc_targets:
total += self.tif(solver_run.irt_model, target.theta)
return total
def __trf_sum(self, solver_run):
total = 0
for target in solver_run.objective_function.tcc_targets:
total += self.trf(solver_run.irt_model, target.theta)
return total

View File

@ -2,7 +2,8 @@ from pydantic import BaseModel
from models.attribute import Attribute from models.attribute import Attribute
class Constraint(BaseModel): class Constraint(BaseModel):
reference_attribute: Attribute reference_attribute: Attribute
minimum: float minimum: float
maximum: float maximum: float

View File

@ -1,26 +1,31 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import List from typing import List, TypeVar, Type
from helpers import irt_helper from helpers import irt_helper
from models.solver_run import SolverRun
from models.item import Item from models.item import Item
from models.target import Target from models.target import Target
from lib.irt.test_response_function import TestResponseFunction from lib.irt.test_response_function import TestResponseFunction
class Form(BaseModel): _T = TypeVar("_T")
items: List[Item]
cut_score: float
tif_results: List[Target]
tcc_results: List[Target]
status: str = 'Not Optimized'
@classmethod class Form(BaseModel):
def create(cls, items, solver_run, status): items: List[Item]
return cls( cut_score: float
items=items, tif_results: List[Target]
cut_score=TestResponseFunction(solver_run.irt_model).calculate(items, theta=solver_run.theta_cut_score), tcc_results: List[Target]
tif_results=irt_helper.generate_tif_results(items, solver_run), status: str = 'Not Optimized'
tcc_results=irt_helper.generate_tcc_results(items, solver_run), solver_variables: List[str]
status=status
) @classmethod
def create(cls: Type[_T], items: list, solver_run: SolverRun, status: str, solver_variables: list) -> _T:
return cls(
items=items,
cut_score=TestResponseFunction(solver_run.irt_model).calculate(
items, theta=solver_run.theta_cut_score),
tif_results=irt_helper.generate_tif_results(items, solver_run),
tcc_results=irt_helper.generate_tcc_results(items, solver_run),
status=status,
solver_variables=solver_variables)

View File

@ -1,8 +1,13 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import Dict from typing import Dict
class IRTModel(BaseModel): class IRTModel(BaseModel):
a_param: float a_param: float
b_param: Dict = {"schema_bson_id": str, "field_bson_id": str} b_param: Dict = {"schema_bson_id": str, "field_bson_id": str}
c_param: float c_param: float
model: str model: str
def formatted_b_param(self):
return self.b_param['schema_bson_id'] + '-' + self.b_param[
'field_bson_id']

View File

@ -7,25 +7,47 @@ from lib.irt.item_response_function import ItemResponseFunction
from lib.irt.item_information_function import ItemInformationFunction from lib.irt.item_information_function import ItemInformationFunction
class Item(BaseModel): class Item(BaseModel):
id: int id: int
passage_id: Optional[int] passage_id: Optional[int]
attributes: List[Attribute] workflow_state: Optional[str]
b_param: float = 0.00 attributes: List[Attribute]
b_param: float = 0.00
def iif(self, solver_run, theta): def iif(self, solver_run, theta):
return ItemInformationFunction(solver_run.irt_model).calculate(b_param=self.b_param,theta=theta) return ItemInformationFunction(solver_run.irt_model).calculate(b_param=self.b_param, theta=theta)
def irf(self, solver_run, theta): def irf(self, solver_run, theta):
return ItemResponseFunction(solver_run.irt_model).calculate(b_param=self.b_param,theta=theta) return ItemResponseFunction(solver_run.irt_model).calculate(b_param=self.b_param, theta=theta)
def get_attribute(self, ref_attribute): def get_attribute(self, ref_attribute: Attribute) -> Attribute or None:
for attribute in self.attributes: for attribute in self.attributes:
if attribute.id == ref_attribute.id and attribute.value == ref_attribute.value: if self.attribute_exists(ref_attribute):
return attribute.value return attribute
return False
def attribute_exists(self, ref_attribute): return None
for attribute in self.attributes:
if attribute.id == ref_attribute.id and attribute.value == ref_attribute.value: def attribute_exists(self, ref_attribute: Attribute) -> bool:
return True for attribute in self.attributes:
return False if attribute.id == ref_attribute.id and attribute.value.lower(
) == ref_attribute.value.lower():
return True
return False
def iif_irf_sum(self, solver_run):
return self.__iif_sum(solver_run) + self.__irf_sum(solver_run)
def __iif_sum(self, solver_run):
total = 0
for target in solver_run.objective_function.tif_targets:
total += self.iif(solver_run, target.theta)
return total
def __irf_sum(self, solver_run):
total = 0
for target in solver_run.objective_function.tif_targets:
total += self.irf(solver_run, target.theta)
return total

View File

@ -3,11 +3,48 @@ from typing import Dict, List, AnyStr
from models.target import Target from models.target import Target
class ObjectiveFunction(BaseModel): class ObjectiveFunction(BaseModel):
# minimizing tif/tcc target value is only option currently # minimizing tif/tcc target value is only option currently
# as we add more we can build this out to be more dynamic # as we add more we can build this out to be more dynamic
# likely with models representing each objective function type # likely with models representing each objective function type
tif_targets: List[Target] tif_targets: List[Target]
tcc_targets: List[Target] tcc_targets: List[Target]
objective: AnyStr = "minimize" target_variance_percentage: int = 10
weight: Dict = {'tif': 1, 'tcc': 1} objective: AnyStr = "minimize"
weight: Dict = {'tif': 1, 'tcc': 1}
def increment_targets_drift(self,
limit: float or bool,
all: bool = False,
amount: float = 0.1,
targets: list[Target] = []) -> bool:
if all:
for target in self.tif_targets:
target.drift = round(target.drift + amount, 2)
for target in self.tcc_targets:
target.drift = round(target.drift + amount, 2)
else:
for target in targets:
target.drift = round(target.drift + amount, 2)
print(self.tif_targets)
print(self.tcc_targets)
return amount
def update_targets_drift(self, amount: float = 0.0):
for target in self.tif_targets:
target.drift = round(amount, 2)
for target in self.tcc_targets:
target.drift = round(amount, 2)
def minimum_drift(self) -> float:
minimum_drift = 0.0
for target in self.all_targets():
if target.drift < minimum_drift:
minimum_drift = target.drift
return minimum_drift
def all_targets(self) -> list[Target]:
return self.tif_targets + self.tcc_targets

View File

@ -3,6 +3,7 @@ from typing import List
from models.form import Form from models.form import Form
class Solution(BaseModel): class Solution(BaseModel):
response_id: int response_id: int
forms: List[Form] forms: List[Form]

View File

@ -1,5 +1,8 @@
from pydantic import BaseModel from pydantic import BaseModel
from typing import List, Optional from typing import List, Literal, Optional
import logging
import random
from models.item import Item from models.item import Item
from models.constraint import Constraint from models.constraint import Constraint
@ -8,58 +11,119 @@ from models.bundle import Bundle
from models.objective_function import ObjectiveFunction from models.objective_function import ObjectiveFunction
from models.advanced_options import AdvancedOptions from models.advanced_options import AdvancedOptions
class SolverRun(BaseModel): class SolverRun(BaseModel):
items: List[Item] items: List[Item] = []
bundles: Optional[Bundle] bundles: list[Bundle] = []
constraints: List[Constraint] constraints: List[Constraint]
irt_model: IRTModel irt_model: IRTModel
objective_function: ObjectiveFunction objective_function: ObjectiveFunction
total_form_items: int total_form_items: int
total_forms: int = 1 total_forms: int = 1
theta_cut_score: float = 0.00 theta_cut_score: float = 0.00
advanced_options: Optional[AdvancedOptions] drift_style: Literal['constant', 'variable'] = 'constant'
engine: str advanced_options: Optional[AdvancedOptions]
engine: str
def get_item(self, item_id): def get_item(self, item_id: int) -> Item or None:
for item in self.items: for item in self.items:
if str(item.id) == item_id: if item.id == item_id:
return item return item
return False
def remove_items(self, items): def get_bundle(self, bundle_id: int) -> Bundle or None:
self.items = [item for item in self.items if item not in items] for bundle in self.bundles:
return True if bundle.id == bundle_id:
return bundle
def generate_bundles(self): def get_constraint_by_type(self, type: str) -> Constraint or None:
bundle_constraints = (constraint.reference_attribute for constraint in self.constraints if constraint.reference_attribute.type == 'bundle') for constraint in self.constraints:
if type == constraint.reference_attribute.type:
return constraint
for bundle_constraint in bundle_constraints: def remove_items(self, items: list[Item]) -> bool:
type_attribute = bundle_constraint.id self.items = [item for item in self.items if item not in items]
return True
for item in self.items: def generate_bundles(self):
attribute_id = getattr(item, type_attribute, None) logging.info('Generating Bundles...')
# confirms bundle constraints exists
bundle_constraints = (
constraint.reference_attribute for constraint in self.constraints
if constraint.reference_attribute.type == 'bundle')
# make sure the item has said attribute for bundle_constraint in bundle_constraints:
if attribute_id != None: type_attribute = bundle_constraint.id
# if there are pre-existing bundles, add new or increment existing
# else create array with new bundle
if self.bundles != None:
# get index of the bundle in the bundles list if exists or None if it doesn't
bundle_index = next((index for (index, bundle) in enumerate(self.bundles) if bundle.id == attribute_id and bundle.type == type_attribute), None)
# if the index doesn't exist add the new bundle of whatever type for item in self.items:
# else increment the count of the current bundle attribute_id = getattr(item, type_attribute, None)
if bundle_index == None:
self.bundles.append(Bundle( # make sure the item has said attribute
id=attribute_id, if attribute_id != None:
count=1, # if there are pre-existing bundles, add new or increment existing
type=type_attribute # else create array with new bundle
)) if self.bundles != None:
else: # get index of the bundle in the bundles list if exists or None if it doesn't
self.bundles[bundle_index].count += 1 bundle_index = next(
else: (index
self.bundles = [Bundle( for (index, bundle) in enumerate(self.bundles)
id=attribute_id, if bundle.id == attribute_id
count=1, and bundle.type == type_attribute), None)
type=type_attribute
)] # if the index doesn't exist add the new bundle of whatever type
# else increment the count of the current bundle
if bundle_index == None:
self.bundles.append(
Bundle(id=attribute_id,
count=1,
items=[item],
type=type_attribute))
else:
self.bundles[bundle_index].count += 1
self.bundles[bundle_index].items.append(item)
else:
self.bundles = [
Bundle(id=attribute_id,
count=1,
items=[item],
type=type_attribute)
]
# temporary compensator for bundle item limits, since we shouldn't be using cases with less than 3 items
# ideally this should be in the bundles model as a new attribute to handle "constraints of constraints"
logging.info('Removing bundles with items < 3')
for k, v in enumerate(self.bundles):
bundle = self.bundles[k]
if bundle.count < 3: del self.bundles[k]
logging.info('Bundles Generated...')
def get_constraint(self, name: str) -> Constraint:
return next((constraint for constraint in self.constraints
if constraint.reference_attribute.id == name), None)
def unbundled_items(self) -> list:
# since the only bundles are based on passage id currently
# in the future when we have more than just passage based bundles
# we'll need to develop a more sophisticated way of handling this concern
bundle_constraints = (
constraint.reference_attribute for constraint in self.constraints
if constraint.reference_attribute.type == 'bundle')
if len(list(bundle_constraints)) > 0:
return [item for item in self.items if item.passage_id == None]
else:
return self.items
def select_items_by_percent(self, percent: int) -> list[Item]:
items = self.unbundled_items()
total_items = len(items)
selected_items_amount = round(total_items - (total_items *
(percent / 100)))
return random.sample(items, selected_items_amount)
def select_bundles_by_percent(self, percent: int) -> list[Bundle]:
total_bundles = len(self.bundles)
selected_bundles_amount = round(total_bundles - (total_bundles *
(percent / 100)))
return random.sample(self.bundles, selected_bundles_amount)

View File

@ -2,6 +2,21 @@ from pydantic import BaseModel
from typing import Optional from typing import Optional
class Target(BaseModel): class Target(BaseModel):
theta: float theta: float
value: float value: float
result: Optional[float] result: Optional[float]
drift: float = 0.0
@classmethod
def max_drift(cls) -> int:
return 15
@classmethod
def max_drift_increment(cls) -> int:
return 1 # 10%
def minimum(self) -> float:
return self.value - (self.value * self.drift)
def maximum(self) -> float:
return self.value + (self.value * self.drift)

View File

@ -1,4 +1,5 @@
class Base: class Base:
def __init__(self, source, ingest_type='message'):
self.ingest_type = ingest_type def __init__(self, source, ingest_type='message'):
self.source = source self.ingest_type = ingest_type
self.source = source

View File

@ -1,118 +1,259 @@
import os, json, random, io, logging import os, json, random, io, logging
from pulp import LpProblem, LpVariable, LpMinimize, LpStatus, lpSum from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, LpAffineExpression, LpConstraint, LpStatus, lpSum
from lib.application_configs import ApplicationConfigs
from helpers import aws_helper, tar_helper, csv_helper, service_helper, solver_helper from helpers import aws_helper, tar_helper, csv_helper, service_helper, solver_helper
from lib.errors.item_generation_error import ItemGenerationError from lib.errors.item_generation_error import ItemGenerationError
from models.solver_run import SolverRun from models.solver_run import SolverRun
from models.solution import Solution from models.solution import Solution
from models.form import Form from models.form import Form
from models.item import Item
from models.target import Target
from services.base import Base from services.base import Base
class LoftService(Base): class LoftService(Base):
def process(self):
try:
self.solver_run = SolverRun.parse_obj(self.retreive_attributes_from_message())
self.solver_run.generate_bundles()
self.solution = self.generate_solution()
self.result = self.stream_to_s3_bucket()
except ItemGenerationError as error:
self.result = self.stream_to_s3_bucket(error)
except TypeError as error:
logging.error(error)
self.result = self.stream_to_s3_bucket(ItemGenerationError("Provided params causing error in calculation results"))
def retreive_attributes_from_message(self): def process(self):
logging.info('Retrieving attributes from message...') try:
# get s3 object self.solver_run = self.create_solver_run_from_attributes()
self.key = aws_helper.get_key_from_message(self.source) self.solver_run.generate_bundles()
s3_object = aws_helper.get_object(self.key, aws_helper.get_bucket_from_message(self.source)) self.solution = self.generate_solution()
self.result = self.stream_to_s3_bucket()
except ItemGenerationError as error:
self.result = self.stream_to_s3_bucket(error)
except TypeError as error:
logging.error(error)
self.result = self.stream_to_s3_bucket(
ItemGenerationError(
"Provided params causing error in calculation results"))
# convert to tar def create_solver_run_from_attributes(self) -> SolverRun:
self.tar = tar_helper.raw_to_tar(s3_object) logging.info('Retrieving attributes from message...')
# get s3 object
self.key = aws_helper.get_key_from_message(self.source)
s3_object = aws_helper.get_object(
self.key, aws_helper.get_bucket_from_message(self.source))
# get attributes file and convert to dict # convert to tar
attributes = json.loads(tar_helper.extract_file_from_tar(self.tar , 'solver_run_attributes.json').read()) self.tar = tar_helper.raw_to_tar(s3_object)
# get items file and convert to dict # get attributes file and convert to dict
items_csv = tar_helper.extract_file_from_tar(self.tar , 'items.csv') attributes = json.loads(
items_csv_reader = csv_helper.file_stream_reader(items_csv) tar_helper.extract_file_from_tar(
self.tar, 'solver_run_attributes.json').read())
# add items to attributes dict # create solver run
attributes['items'] = service_helper.items_csv_to_dict(items_csv_reader) solver_run = SolverRun.parse_obj(attributes)
logging.info('Processed Attributes...')
return attributes # get items file and convert to dict
items_csv = tar_helper.extract_file_from_tar(self.tar, 'items.csv')
items_csv_reader = csv_helper.file_stream_reader(items_csv)
def generate_solution(self): # add items to solver run
# unsolved solution solver_run.items = service_helper.csv_to_item(items_csv_reader,
solution = Solution( solver_run)
response_id=random.randint(100, 5000),
forms=[]
)
# counter for number of forms logging.info('Processed Attributes...')
f = 0
# iterate for number of forms that require creation return solver_run
# currently creates distinc forms with no item overlap
while f < self.solver_run.total_forms:
# setup vars
items = LpVariable.dicts(
"Item", [item.id for item in self.solver_run.items], lowBound=1, upBound=1, cat='Binary')
problem_objection_functions = []
# create problem def generate_solution(self) -> Solution:
problem = LpProblem("ata-form-generate", LpMinimize) logging.info('Generating Solution...')
# dummy objective function, because it just makes things easier™ solution = Solution(response_id=random.randint(100, 5000),
problem += lpSum([items[item.id] forms=[]) # unsolved solution
for item in self.solver_run.items])
# constraints # iterate for number of forms that require creation
problem += lpSum([items[item.id] for form_count in range(self.solver_run.total_forms):
for item in self.solver_run.items]) == self.solver_run.total_form_items, 'Total form items' form_number = form_count + 1
current_drift = 0 # FF Tokyo Drift
# dynamic constraints # adding an element of randomness to the items and bundles used
problem = solver_helper.build_constraints(self.solver_run, problem, items) selected_items = self.solver_run.select_items_by_percent(30)
selected_bundles = self.solver_run.select_bundles_by_percent(
30)
# multi-objective constraints # setup common Solver variables
for target in self.solver_run.objective_function.tif_targets: items = LpVariable.dicts("Item",
problem += lpSum([item.iif(self.solver_run, target.theta)*items[item.id] [item.id for item in selected_items],
for item in self.solver_run.items]) <= target.value, f'min tif theta ({target.theta}) target value {target.value}' lowBound=0,
upBound=1,
cat='Binary')
bundles = LpVariable.dicts(
"Bundle", [bundle.id for bundle in selected_bundles],
lowBound=0,
upBound=1,
cat='Binary')
for target in self.solver_run.objective_function.tcc_targets: logging.info(f'Generating Solution for Form {form_number}')
problem += lpSum([item.irf(self.solver_run, target.theta)*items[item.id]
for item in self.solver_run.items]) <= target.value, f'min tcc theta ({target.theta}) target value {target.value}'
# solve problem while current_drift <= Target.max_drift():
problem.solve() drift_percent = current_drift / 100
self.solver_run.objective_function.update_targets_drift(
drift_percent)
# add return items and create as a form # create problem
form_items = service_helper.solution_items(problem.variables(), self.solver_run) problem = LpProblem('ata-form-generate', LpMinimize)
# add form to solution # objective function
solution.forms.append(Form.create(form_items, self.solver_run, LpStatus[problem.status])) problem += lpSum([
bundle.count * bundles[bundle.id]
for bundle in selected_bundles
] + [
items[item.id]
for item in selected_items
])
# successfull form, increment # Form Constraints
f += 1 problem += lpSum(
[
bundle.count * bundles[bundle.id]
for bundle in selected_bundles
] + [
1 * items[item.id]
for item in selected_items
]
) == self.solver_run.total_form_items, f'Total bundle form items for form {form_number}'
return solution # Dynamic constraints.. currently we only support Metadata and Bundles(Cases/Passages)
problem = solver_helper.build_constraints(
self.solver_run, problem, items, bundles, selected_items, selected_bundles)
def stream_to_s3_bucket(self, error = None): # form uniqueness constraints
self.file_name = f'{service_helper.key_to_uuid(self.key)}.csv' # for form in solution.forms:
solution_file = None # form_item_options = [
# setup writer buffer and write processed forms to file # bundles[bundle.id]
buffer = io.StringIO() # for bundle in selected_bundles
# ] + [
# items[item.id]
# for item in selected_items
# ]
# problem += len(
# set(form.solver_variables)
# & set(form_item_options)) / float(
# len(
# set(form.solver_variables)
# | set(form_item_options))) * 100 >= 10
if error: logging.info('Creating TIF and TCC Elastic constraints')
logging.info('Streaming %s error response to s3 bucket - %s', self.file_name, os.environ['S3_PROCESSED_BUCKET'])
solution_file = service_helper.error_to_file(buffer, error)
else:
logging.info('Streaming %s to s3 bucket - %s', self.file_name, os.environ['S3_PROCESSED_BUCKET'])
solution_file = service_helper.solution_to_file(buffer, self.solver_run.total_form_items, self.solution.forms)
# upload generated file to s3 and return result # Behold our very own Elastic constraints!
return aws_helper.file_stream_upload(solution_file, self.file_name, os.environ['S3_PROCESSED_BUCKET']) for tif_target in self.solver_run.objective_function.tif_targets:
problem += lpSum([
bundle.tif(self.solver_run.irt_model, tif_target.theta)
* bundles[bundle.id]
for bundle in selected_bundles
] + [
item.iif(self.solver_run, tif_target.theta) *
items[item.id]
for item in selected_items
]) >= tif_target.minimum(
), f'Min TIF theta({tif_target.theta}) at target {tif_target.value} drift at {current_drift}%'
problem += lpSum([
bundle.tif(self.solver_run.irt_model, tif_target.theta)
* bundles[bundle.id]
for bundle in selected_bundles
] + [
item.iif(self.solver_run, tif_target.theta) *
items[item.id]
for item in selected_items
]) <= tif_target.maximum(
), f'Max TIF theta({tif_target.theta}) at target {tif_target.value} drift at {current_drift}%'
for tcc_target in self.solver_run.objective_function.tcc_targets:
problem += lpSum([
bundle.trf(self.solver_run.irt_model, tcc_target.theta)
* bundles[bundle.id]
for bundle in selected_bundles
] + [
item.irf(self.solver_run, tcc_target.theta) *
items[item.id]
for item in selected_items
]) >= tcc_target.minimum(
), f'Min TCC theta({tcc_target.theta}) at target {tcc_target.value} drift at {current_drift}%'
problem += lpSum([
bundle.trf(self.solver_run.irt_model, tcc_target.theta)
* bundles[bundle.id]
for bundle in selected_bundles
] + [
item.irf(self.solver_run, tcc_target.theta) *
items[item.id]
for item in selected_items
]) <= tcc_target.maximum(
), f'Max TCC theta({tcc_target.theta}) at target {tcc_target.value} drift at {current_drift}%'
logging.info(
f'Solving for Form {form_number} with a drift of {current_drift}%'
)
problem.solve()
if LpStatus[problem.status] == 'Infeasible':
logging.info(
f'attempt infeasible for drift of {current_drift}%')
if current_drift >= Target.max_drift(
): # this is the last attempt, so lets finalize the solution
if ApplicationConfigs.local_dev_env:
service_helper.print_problem_variables(problem)
logging.info(
f'No feasible solution found for Form {form_number}!'
)
self.add_form_to_solution(problem, solution)
break
current_drift += Target.max_drift_increment()
else:
if ApplicationConfigs.local_dev_env:
service_helper.print_problem_variables(problem)
logging.info(
f'Optimal solution found with drift of {current_drift}%!'
)
self.add_form_to_solution(problem, solution)
break
logging.info('Solution Generated.')
return solution
def add_form_to_solution(self, problem: LpProblem, solution: Solution):
# add return items and create as a form
form_items, solver_variables = service_helper.solution_items(
problem.variables(), self.solver_run)
form = Form.create(form_items, self.solver_run,
LpStatus[problem.status], solver_variables)
solution.forms.append(form)
logging.info('Form generated and added to solution...')
def stream_to_s3_bucket(self, error=None):
self.file_name = f'{service_helper.key_to_uuid(self.key)}.csv'
solution_file = None
# setup writer buffer and write processed forms to file
buffer = io.StringIO()
if error:
logging.info('Streaming %s error response to s3 bucket - %s',
self.file_name,
ApplicationConfigs.s3_processed_bucket)
solution_file = service_helper.error_to_file(buffer, error)
else:
logging.info('Streaming %s to s3 bucket - %s', self.file_name,
ApplicationConfigs.s3_processed_bucket)
solution_file = service_helper.solution_to_file(
buffer, self.solver_run.total_form_items, self.solution.forms)
# upload generated file to s3 and return result
return aws_helper.file_stream_upload(
solution_file, self.file_name,
ApplicationConfigs.s3_processed_bucket)

View File

@ -0,0 +1,252 @@
# Local Dev Sandbox for the solver
# Useful for testing some concepts and functionality
# and offers a much faster feedback loop than the usual end-to-end process in Local Dev
#
# How to use:
# 1. run `compose exec meazure-solver bash`
# 2. run `python`
# 3. import this file in the python repl by `from services.solver_sandbox import SolverSandbox`
# 4. run any of the methds below e.g. `SolverSandbox.yas_elastic()`
import logging
from pulp import LpProblem, LpVariable, LpInteger, LpMinimize, LpMaximize, LpAffineExpression, LpConstraint, LpStatus, lpSum
from services.loft_service import LoftService
from lib.application_configs import ApplicationConfigs
class SolverSandbox:
def loft_service(body = {}):
if ApplicationConfigs.local_dev_env:
body = {'Records': [{'eventVersion': '2.1', 'eventSource': 'aws:s3', 'awsRegion': 'us-east-1', 'eventTime': '2022-03-17T13:51:22.708Z', 'eventName': 'ObjectCreated:Put', 'userIdentity': {'principalId': 'AIDAJDPLRKLG7UEXAMPLE'}, 'requestParameters': {'sourceIPAddress': '127.0.0.1'}, 'responseElements': {'x-amz-request-id': '25ecd478', 'x-amz-id-2': 'eftixk72aD6Ap51TnqcoF8eFidJG9Z/2'}, 's3': {'s3SchemaVersion': '1.0', 'configurationId': 'testConfigRule', 'bucket': {'name': 'measure-local-solver-ingest', 'ownerIdentity': {'principalId': 'A3NL1KOZZKExample'}, 'arn': 'arn:aws:s3:::measure-local-solver-ingest'}, 'object': {'key': '40f23de0-8827-013a-a353-0242ac120010_solver_run.tar.gz', 'size': 491, 'eTag': '"2b423d91e80d931302192e781b6bd47c"', 'versionId': None, 'sequencer': '0055AED6DCD90281E5'}}}]}
# CPNRE item bank with metadata and cases
# body = {'Records': [{'eventVersion': '2.1', 'eventSource': 'aws:s3', 'awsRegion': 'us-east-1', 'eventTime': '2022-03-23T18:49:42.979Z', 'eventName': 'ObjectCreated:Put', 'userIdentity': {'principalId': 'AIDAJDPLRKLG7UEXAMPLE'}, 'requestParameters': {'sourceIPAddress': '127.0.0.1'}, 'responseElements': {'x-amz-request-id': 'c4efd257', 'x-amz-id-2': 'eftixk72aD6Ap51TnqcoF8eFidJG9Z/2'}, 's3': {'s3SchemaVersion': '1.0', 'configurationId': 'testConfigRule', 'bucket': {'name': 'measure-local-solver-ingest', 'ownerIdentity': {'principalId': 'A3NL1KOZZKExample'}, 'arn': 'arn:aws:s3:::measure-local-solver-ingest'}, 'object': {'key': 'e8f38480-8d07-013a-5ee6-0242ac120010_solver_run.tar.gz', 'size': 12716, 'eTag': '"94189c36aef04dde3babb462442c3af3"', 'versionId': None, 'sequencer': '0055AED6DCD90281E5'}}}]}
# LOFT item bank with metadata and cases
body = {'Records': [{'eventVersion': '2.1', 'eventSource': 'aws:s3', 'awsRegion': 'us-east-1', 'eventTime': '2022-03-22T19:36:53.568Z', 'eventName': 'ObjectCreated:Put', 'userIdentity': {'principalId': 'AIDAJDPLRKLG7UEXAMPLE'}, 'requestParameters': {'sourceIPAddress': '127.0.0.1'}, 'responseElements': {'x-amz-request-id': '61f320d0', 'x-amz-id-2': 'eftixk72aD6Ap51TnqcoF8eFidJG9Z/2'}, 's3': {'s3SchemaVersion': '1.0', 'configurationId': 'testConfigRule', 'bucket': {'name': 'measure-local-solver-ingest', 'ownerIdentity': {'principalId': 'A3NL1KOZZKExample'}, 'arn': 'arn:aws:s3:::measure-local-solver-ingest'}, 'object': {'key': '5971f500-8c45-013a-5d13-0242ac120010_solver_run.tar.gz', 'size': 619, 'eTag': '"a3cbba098e9f6a445cba6014e47ccaf9"', 'versionId': None, 'sequencer': '0055AED6DCD90281E5'}}}]}
# Latest CPNRE Item Bank with metadata and cases
body = {'Records': [{'eventVersion': '2.1', 'eventSource': 'aws:s3', 'awsRegion': 'us-east-1', 'eventTime': '2022-03-24T15:47:54.652Z', 'eventName': 'ObjectCreated:Put', 'userIdentity': {'principalId': 'AIDAJDPLRKLG7UEXAMPLE'}, 'requestParameters': {'sourceIPAddress': '127.0.0.1'}, 'responseElements': {'x-amz-request-id': '1969b1ed', 'x-amz-id-2': 'eftixk72aD6Ap51TnqcoF8eFidJG9Z/2'}, 's3': {'s3SchemaVersion': '1.0', 'configurationId': 'testConfigRule', 'bucket': {'name': 'measure-local-solver-ingest', 'ownerIdentity': {'principalId': 'A3NL1KOZZKExample'}, 'arn': 'arn:aws:s3:::measure-local-solver-ingest'}, 'object': {'key': 'ab40ca20-8db7-013a-a88f-0242ac120013_solver_run.tar.gz', 'size': 24111, 'eTag': '"718a1a17b5dd5219b8e179bfd1ddf1ca"', 'versionId': None, 'sequencer': '0055AED6DCD90281E5'}}}]}
# Latest LOFT Item Bank with metadata and cases with target variance
body = {'Records': [{'eventVersion': '2.1', 'eventSource': 'aws:s3', 'awsRegion': 'us-east-1', 'eventTime': '2022-03-25T18:03:18.829Z', 'eventName': 'ObjectCreated:Put', 'userIdentity': {'principalId': 'AIDAJDPLRKLG7UEXAMPLE'}, 'requestParameters': {'sourceIPAddress': '127.0.0.1'}, 'responseElements': {'x-amz-request-id': '204c718f', 'x-amz-id-2': 'eftixk72aD6Ap51TnqcoF8eFidJG9Z/2'}, 's3': {'s3SchemaVersion': '1.0', 'configurationId': 'testConfigRule', 'bucket': {'name': 'measure-local-solver-ingest', 'ownerIdentity': {'principalId': 'A3NL1KOZZKExample'}, 'arn': 'arn:aws:s3:::measure-local-solver-ingest'}, 'object': {'key': 'beb35dc0-8e93-013a-5807-0242ac120013_solver_run.tar.gz', 'size': 24112, 'eTag': '"a5a4aad0eb8c9d9af2aad9684437022a"', 'versionId': None, 'sequencer': '0055AED6DCD90281E5'}}}]}
LoftService(body).process()
def yosh_loop():
Items = [1,2,3,4,5]
tif = {
1: 0.2,
2: 0.5,
3: 0.3,
4: 0.8,
5: 0.1
}
iif = {
1: 0.09,
2: 0.2,
3: 0.113,
4: 0.3,
5: 0.1
}
drift = 0.0
drift_limit = 0.2
iif_target = 0.5
tif_target = 0.9
item_vars = LpVariable.dicts("Item", Items, cat="Binary")
while drift <= drift_limit:
prob = LpProblem("tif_tcc_test", LpMinimize)
prob += lpSum([(tif[i] + iif[i]) * item_vars[i] for i in Items]), "TifTccSum"
prob += lpSum([item_vars[i] for i in Items]) == 3, "TotalItems"
prob += lpSum([tif[i] * item_vars[i] for i in Items]) >= tif_target - (tif_target * drift), 'TifMin'
prob += lpSum([tif[i] * item_vars[i] for i in Items]) <= tif_target + (tif_target * drift), 'TifMax'
prob += lpSum([iif[i] * item_vars[i] for i in Items]) >= iif_target - (iif_target * drift), 'TccMin'
prob += lpSum([iif[i] * item_vars[i] for i in Items]) <= iif_target + (iif_target * drift), 'TccMax'
prob.solve()
print(prob)
if LpStatus[prob.status] == "Infeasible":
print('attempt infeasible')
for v in prob.variables():
print(v.name, "=", v.varValue)
drift += 0.02
else:
print(f"solution found with drift of {drift}!")
for v in prob.variables():
print(v.name, "=", v.varValue)
break
def yas_elastic(tif_targets, tcc_targets): # [50, 55, 46], [60, 40, 50]
Items = [1,2,3,4,5,6,7,8,9,10]
iif = {
1: 5,
2: 5,
3: 5,
4: 10,
5: 10,
6: 10,
7: 15,
8: 20,
9: 20,
10: 20
}
# ---
irf = {
1: 5,
2: 5,
3: 5,
4: 10,
5: 10,
6: 10,
7: 15,
8: 20,
9: 20,
10: 20
}
items = LpVariable.dicts('Item', Items, cat='Binary')
drift = 0
max_drift = 25# 25% elasticity
while drift <= max_drift:
drift_percent = drift / 100
problem = LpProblem('TIF_TCC', LpMinimize)
# objective function
problem += lpSum([items[i] for i in Items])
# Constraint 1
problem += lpSum([items[i] for i in Items]) == 5, 'TotalItems'
# Our own "Elastic Constraints"
for tif_target in tif_targets:
print(f"Calculating TIF target of {tif_target} with drift of {drift}%")
problem += lpSum(
[iif[i] * items[i] for i in Items]
) >= tif_target - (tif_target * drift_percent)
problem += lpSum(
[iif[i] * items[i] for i in Items]
) <= tif_target + (tif_target * drift_percent)
for tcc_target in tcc_targets:
print(f"Calculating TIF target of {tcc_target} with drift of {drift}%")
problem += lpSum(
[irf[i] * items[i] for i in Items]
) >= tcc_target - (tcc_target * drift_percent)
problem += lpSum(
[irf[i] * items[i] for i in Items]
) <= tcc_target + (tcc_target * drift_percent)
problem.solve()
if LpStatus[problem.status] == 'Infeasible':
print(f"attempt infeasible for drift of {drift}")
for v in problem.variables(): print(v.name, "=", v.varValue)
# if drift == max_drift: breakpoint();
print(problem.objective.value())
print(problem.constraints)
print(problem.objective)
drift += 1
else:
print(f"solution found with drift of {drift}!")
for v in problem.variables(): print(v.name, "=", v.varValue);
print(problem.objective.value())
print(problem.constraints)
print(problem.objective)
break
# Implementation of the Whiskas Cat problem, with elastic constraints
# https://www.coin-or.org/PuLP/CaseStudies/a_blending_problem.html
# https://stackoverflow.com/questions/27278691/how-can-an-elastic-subproblem-in-pulp-be-used-as-a-constraint?noredirect=1&lq=1
def whiskas():
# Creates a list of the Ingredients
Ingredients = ['CHICKEN', 'BEEF', 'MUTTON', 'RICE', 'WHEAT', 'GEL']
# A dictionary of the costs of each of the Ingredients is created
costs = {'CHICKEN': 0.013,
'BEEF': 0.008,
'MUTTON': 0.010,
'RICE': 0.002,
'WHEAT': 0.005,
'GEL': 0.001}
# A dictionary of the protein percent in each of the Ingredients is created
proteinPercent = {'CHICKEN': 0.100,
'BEEF': 0.200,
'MUTTON': 0.150,
'RICE': 0.000,
'WHEAT': 0.040,
'GEL': 0.000}
# A dictionary of the fat percent in each of the Ingredients is created
fatPercent = {'CHICKEN': 0.080,
'BEEF': 0.100,
'MUTTON': 0.110,
'RICE': 0.010,
'WHEAT': 0.010,
'GEL': 0.000}
# A dictionary of the fibre percent in each of the Ingredients is created
fibrePercent = {'CHICKEN': 0.001,
'BEEF': 0.005,
'MUTTON': 0.003,
'RICE': 0.100,
'WHEAT': 0.150,
'GEL': 0.000}
# A dictionary of the salt percent in each of the Ingredients is created
saltPercent = {'CHICKEN': 0.002,
'BEEF': 0.005,
'MUTTON': 0.007,
'RICE': 0.002,
'WHEAT': 0.008,
'GEL': 0.000}
logging.info('Running Test...')
# create problem
problem = LpProblem("The Whiskas Problem", LpMinimize)
# A dictionary called 'ingredient_vars' is created to contain the referenced Variables
ingredient_vars = LpVariable.dicts("Ingr", Ingredients, 0)
# set objective
problem += lpSum([costs[i]*ingredient_vars[i] for i in Ingredients]), "Total Cost of Ingredients per can"
# The five constraints are added to 'prob'
problem += lpSum([ingredient_vars[i] for i in Ingredients]) == 100, "PercentagesSum"
problem += lpSum([proteinPercent[i] * ingredient_vars[i] for i in Ingredients]) >= 8.0, "ProteinRequirement"
problem += lpSum([fatPercent[i] * ingredient_vars[i] for i in Ingredients]) >= 6.0, "FatRequirement"
problem += lpSum([fibrePercent[i] * ingredient_vars[i] for i in Ingredients]) <= 2.0, "FibreRequirement"
problem += lpSum([saltPercent[i] * ingredient_vars[i] for i in Ingredients]) <= 0.4, "SaltRequirement"
# ELASTICIZE
# c6_LHS_A = LpAffineExpression([ingredient_vars])
c6_LHS = LpAffineExpression([(ingredient_vars['GEL'],1), (ingredient_vars['BEEF'],1)])
c6= LpConstraint(e=c6_LHS, sense=-1, name='GelBeefTotal', rhs=30)
c6_elastic = c6.makeElasticSubProblem(penalty = 100, proportionFreeBound = .10)
problem.extend(c6_elastic)
print(problem)
# solve problem
problem.solve()
# The status of the solution is printed to the screen
print("Status:", LpStatus[problem.status])
# Each of the variables is printed with it's resolved optimum value
for v in problem.variables():
print(v.name, "=", v.varValue)
# The optimised objective function value is printed to the screen
print("Total Cost of Ingredients per can = ", problem.objective.value())