diff --git a/.docker-compose/Dockerfile b/.docker-compose/Dockerfile index 703424a..58839d7 100644 --- a/.docker-compose/Dockerfile +++ b/.docker-compose/Dockerfile @@ -4,8 +4,8 @@ RUN apt-get update RUN apt-get -y install coinor-cbc RUN python -m pip install pulp RUN python -m pip install pydantic -RUN python -m pip install pySqsListener RUN python -m pip install daemonize +RUN python -m pip install sqspy RUN mkdir /app WORKDIR /app diff --git a/Dockerfile b/Dockerfile index 703424a..58839d7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,8 +4,8 @@ RUN apt-get update RUN apt-get -y install coinor-cbc RUN python -m pip install pulp RUN python -m pip install pydantic -RUN python -m pip install pySqsListener RUN python -m pip install daemonize +RUN python -m pip install sqspy RUN mkdir /app WORKDIR /app diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py index 05c5b2c..55a4694 100644 --- a/app/helpers/aws_helper.py +++ b/app/helpers/aws_helper.py @@ -6,8 +6,12 @@ session = boto3.Session( aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY']) -s3 = session.resource('s3', region_name=os.environ['AWS_REGION']) -sqs = session.client('sqs', region_name=os.environ['AWS_REGION']) +s3 = session.resource('s3', + region_name=os.environ['AWS_REGION'], + endpoint_url=os.environ['ENDPOINT_URL']) +sqs = session.client('sqs', + region_name=os.environ['AWS_REGION'], + endpoint_url=os.environ['ENDPOINT_URL']) def get_key_from_message(body): diff --git a/app/main.py b/app/main.py index 13626b5..0c7d05e 100644 --- a/app/main.py +++ b/app/main.py @@ -4,14 +4,14 @@ from services.loft_service import LoftService from helpers import aws_helper from daemonize import Daemonize -from sqs_listener import SqsListener +from sqspy import Consumer logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(levelname)s %(asctime)s - %(message)s") -class ServiceListener(SqsListener): +class ServiceListener(Consumer): def handle_message(self, body, attributes, messages_attributes): # gather/manage/process data based on the particular needs @@ -24,13 +24,15 @@ class ServiceListener(SqsListener): def main(): - logging.info('Starting Solver Service (v1.1.3)...') + logging.info('Starting Solver Service (v1.1.4)...') listener = ServiceListener( + None, os.environ['SQS_QUEUE'], + create_queue=False, region_name=os.environ['AWS_REGION'], aws_access_key=os.environ['AWS_ACCESS_KEY_ID'], aws_secret_key=os.environ['AWS_SECRET_ACCESS_KEY'], - queue_url=os.environ['SQS_QUEUE']) + endpoint_url=os.environ['ENDPOINT_URL']) listener.listen() diff --git a/app/services/loft_service.py b/app/services/loft_service.py index 77b4f4a..bd16816 100644 --- a/app/services/loft_service.py +++ b/app/services/loft_service.py @@ -1,6 +1,6 @@ import os, json, random, io, logging -from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, LpStatus, lpSum +from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, LpAffineExpression, LpConstraint, LpStatus, lpSum from helpers import aws_helper, tar_helper, csv_helper, service_helper, solver_helper from lib.errors.item_generation_error import ItemGenerationError @@ -20,7 +20,6 @@ class LoftService(Base): self.solver_run = self.create_solver_run_from_attributes() self.solver_run.generate_bundles() self.solution = self.generate_solution() - # self.solution = self.generate_test_solution() self.result = self.stream_to_s3_bucket() except ItemGenerationError as error: self.result = self.stream_to_s3_bucket(error) @@ -30,55 +29,6 @@ class LoftService(Base): ItemGenerationError( "Provided params causing error in calculation results")) - def generate_test_solution(self) -> Solution: - solution = Solution(response_id=random.randint(100, 5000), forms=[]) - - problem = LpProblem("ata-form-generate-with-bundles", LpMinimize) - - bundles = LpVariable.dicts( - "Bundle", [bundle.id for bundle in self.solver_run.bundles], - lowBound=1, - upBound=1, - cat='Binary') - items = LpVariable.dicts("Item", - [item.id for item in self.solver_run.items], - lowBound=1, - upBound=1, - cat='Binary') - - problem += lpSum( - [bundles[bundle.id] for bundle in self.solver_run.bundles]) - # problem += lpSum([items[item.id] for item in self.solver_run.items]) - - # problem += lpSum([bundles[bundle.id] for bundle in self.solver_run.bundles]) <= 3, 'max total bundles used' - # problem += lpSum([bundles[bundle.id] for bundle in self.solver_run.bundles]) >= 1, 'min total bundles used' - problem += lpSum( - [bundles[bundle.id] for bundle in self.solver_run.bundles]) == 3 - - problem += lpSum( - [ - bundle.count * bundles[bundle.id] - for bundle in self.solver_run.bundles - ] + - [1 * items[item.id] for item in self.solver_run.unbundled_items()] - ) == self.solver_run.total_form_items, 'Total bundle form items for form' - - problem.solve() - - # for v in problem.variables(): - # print(f'{v.name}: {v.varValue}') - - # add return items and create as a form - form_items = service_helper.solution_items(problem.variables(), - self.solver_run) - - # add form to solution - solution.forms.append( - Form.create(form_items, self.solver_run, LpStatus[problem.status])) - logging.info('Form generated and added to solution...') - - return solution - def create_solver_run_from_attributes(self) -> SolverRun: logging.info('Retrieving attributes from message...') # get s3 object @@ -122,132 +72,96 @@ class LoftService(Base): # iterate for number of forms that require creation # currently creates distinc forms with no item overlap while f < self.solver_run.total_forms: - # currently constant drift is supported - # plan to support variable drift - problem = None + # setup vars + items = LpVariable.dicts( + "Item", [item.id for item in self.solver_run.items], + cat='Binary') + bundles = LpVariable.dicts( + "Bundle", [bundle.id for bundle in self.solver_run.bundles], + cat='Binary') - # allow target drift to increment to keep trying - # once limit has been reached loop will stop - # at this point the latest solve attempt will be used - # though likely to be infeasible - while self.solver_run.objective_function.minimum_drift() <= 2.0: - # setup vars - items = LpVariable.dicts( - "Item", [item.id for item in self.solver_run.items], - lowBound=0, - upBound=1, - cat='Binary') - bundles = LpVariable.dicts( - "Bundle", - [bundle.id for bundle in self.solver_run.bundles], - lowBound=0, - upBound=1, - cat='Binary') + # create problem + problem = LpProblem("ata-form-generate", LpMinimize) + problem_objective_functions = [] - # create problem - problem = LpProblem("ata-form-generate", LpMinimize) - problem_objective_functions = [] + # dummy objective function, because it just makes things easierâ„¢ + problem += lpSum( + [items[item.id] for item in self.solver_run.items]) - # dummy objective function, because it just makes things easierâ„¢ - # problem += lpSum( - # [items[item.id] for item in self.solver_run.items]) + # constraints + # problem += lpSum([items[item.id] + # for item in self.solver_run.items]) == self.solver_run.total_form_items, 'Total form items' + problem += lpSum( + [ + bundle.count * bundles[bundle.id] + for bundle in self.solver_run.bundles + ] + [ + 1 * items[item.id] + for item in self.solver_run.unbundled_items() + ] + ) == self.solver_run.total_form_items, 'Total bundle form items for form' - # constraints - # problem += lpSum([items[item.id] - # for item in self.solver_run.items]) == self.solver_run.total_form_items, 'Total form items' - problem += lpSum( - [ - bundle.count * bundles[bundle.id] - for bundle in self.solver_run.bundles - ] + [ - 1 * items[item.id] - for item in self.solver_run.unbundled_items() - ] - ) == self.solver_run.total_form_items, 'Total bundle form items for form' + # dynamic constraints + problem = solver_helper.build_constraints(self.solver_run, problem, + items, bundles) - # dynamic constraints - problem = solver_helper.build_constraints( - self.solver_run, problem, items, bundles) + # multi-objective constraints + logging.info('Creating TIF and TCC constraints') + for target in self.solver_run.objective_function.tif_targets: + tif = lpSum([ + bundle.tif(self.solver_run.irt_model, target.theta) * + bundles[bundle.id] for bundle in self.solver_run.bundles + ] + [ + item.iif(self.solver_run, target.theta) * items[item.id] + for item in self.solver_run.items + ]) - # multi-objective constraints - logging.info('Creating TIF and TCC constraints') - for target in self.solver_run.objective_function.tif_targets: + problem_objective_functions.append(tif) + e = LpAffineExpression( + [(bundles[bundle.id], + bundle.tif(self.solver_run.irt_model, target.theta)) + for bundle in self.solver_run.bundles] + + [(items[item.id], item.iif(self.solver_run, target.theta)) + for item in self.solver_run.items]) + constraint = LpConstraint( + e=e, + sense=0, + name=f'tif theta ({target.theta}) @{target.value}', + rhs=target.value) + elastized_constraint = constraint.makeElasticSubProblem( + penalty=1, proportionFreeBound=0.25) + if int(target.value) == 20: print(elastized_constraint) + problem.extend(elastized_constraint) + for target in self.solver_run.objective_function.tcc_targets: + tcc = lpSum([ + bundle.trf(self.solver_run.irt_model, target.theta) * + bundles[bundle.id] for bundle in self.solver_run.bundles + ] + [ + item.irf(self.solver_run, target.theta) * items[item.id] + for item in self.solver_run.items + ]) + problem_objective_functions.append(tcc) + e = LpAffineExpression( + [(bundles[bundle.id], + bundle.trf(self.solver_run.irt_model, target.theta)) + for bundle in self.solver_run.bundles] + + [(items[item.id], item.irf(self.solver_run, target.theta)) + for item in self.solver_run.items]) + constraint = LpConstraint( + e=e, + sense=0, + name=f'tcc theta ({target.theta}) @{target.value}', + rhs=target.value) + elastized_constraint = constraint.makeElasticSubProblem( + penalty=1, proportionFreeBound=0.25) + problem.extend(elastized_constraint) - tif = lpSum([ - bundle.tif(self.solver_run.irt_model, target.theta) * - bundles[bundle.id] - for bundle in self.solver_run.bundles - ] + [ - item.iif(self.solver_run, target.theta) * - items[item.id] for item in self.solver_run.items - ]) - problem_objective_functions.append(tif) - problem += lpSum([ - bundle.tif(self.solver_run.irt_model, target.theta) * - bundles[bundle.id] - for bundle in self.solver_run.bundles - ] + [ - item.iif(self.solver_run, target.theta) * - items[item.id] for item in self.solver_run.items - ]) >= target.value - target.value * target.drift, f'max tif theta ({target.theta}) target value {target.value}' - problem += lpSum([ - bundle.tif(self.solver_run.irt_model, target.theta) * - bundles[bundle.id] - for bundle in self.solver_run.bundles - ] + [ - item.iif(self.solver_run, target.theta) * - items[item.id] for item in self.solver_run.items - ]) <= target.value + target.value * target.drift, f'min tif theta ({target.theta}) target value {target.value}' - - for target in self.solver_run.objective_function.tcc_targets: - tcc = lpSum([ - bundle.trf(self.solver_run.irt_model, target.theta) * - bundles[bundle.id] - for bundle in self.solver_run.bundles - ] + [ - item.irf(self.solver_run, target.theta) * - items[item.id] for item in self.solver_run.items - ]) - problem_objective_functions.append(tcc) - problem += lpSum([ - bundle.trf(self.solver_run.irt_model, target.theta) * - bundles[bundle.id] - for bundle in self.solver_run.bundles - ] + [ - item.irf(self.solver_run, target.theta) * - items[item.id] for item in self.solver_run.items - ]) >= target.value - target.value * target.drift, f'max tcc theta ({target.theta}) target value {target.value}' - problem += lpSum([ - bundle.trf(self.solver_run.irt_model, target.theta) * - bundles[bundle.id] - for bundle in self.solver_run.bundles - ] + [ - item.irf(self.solver_run, target.theta) * - items[item.id] for item in self.solver_run.items - ]) <= target.value + target.value * target.drift, f'min tcc theta ({target.theta}) target value {target.value}' - - # solve problem - logging.info('Solving...') - # problem.solve() - problem.sequentialSolve(problem_objective_functions) - - # optimal solution found! - if LpStatus[problem.status] == 'Optimal': - logging.info( - f'Problem solved...generating Form and adding to Solution with {self.solver_run.drift_style} drift \n\ - tif target drift: \n\ - {["theta @" + str(target.theta) + " - " + str(target.drift) for target in self.solver_run.objective_function.tif_targets]} \n\ - tcc target drift: \n\ - {["theta @" + str(target.theta) + " - " + str(target.drift) for target in self.solver_run.objective_function.tcc_targets]}' - ) - break - else: - # increment drift to attempt to find optimal solution - increment = self.solver_run.objective_function.increment_targets_drift( - 2.0, True) - logging.info( - f'Non Optimal Solution...widening Target ranges to {increment} using {self.solver_run.drift_style} drift' - ) + # solve problem + logging.info('Solving...') + # print(problem) + problem.solve() + # problem.sequentialSolve(problem_objective_functions) + logging.info('Solved...generating form and adding to solution') # add return items and create as a form form_items = service_helper.solution_items(problem.variables(), @@ -257,12 +171,13 @@ class LoftService(Base): solution.forms.append( Form.create(form_items, self.solver_run, LpStatus[problem.status])) - logging.info('Form generated and added to Solution...') + logging.info('Form generated and added to solution...') # successfull form, increment f += 1 logging.info('Solution Generated.') + # print(problem) return solution def stream_to_s3_bucket(self, error=None):