new listener

This commit is contained in:
Joshua Burman 2022-02-17 11:25:29 -05:00
parent f18e048081
commit 50bb02ee0f
5 changed files with 100 additions and 179 deletions

View File

@ -4,8 +4,8 @@ RUN apt-get update
RUN apt-get -y install coinor-cbc RUN apt-get -y install coinor-cbc
RUN python -m pip install pulp RUN python -m pip install pulp
RUN python -m pip install pydantic RUN python -m pip install pydantic
RUN python -m pip install pySqsListener
RUN python -m pip install daemonize RUN python -m pip install daemonize
RUN python -m pip install sqspy
RUN mkdir /app RUN mkdir /app
WORKDIR /app WORKDIR /app

View File

@ -4,8 +4,8 @@ RUN apt-get update
RUN apt-get -y install coinor-cbc RUN apt-get -y install coinor-cbc
RUN python -m pip install pulp RUN python -m pip install pulp
RUN python -m pip install pydantic RUN python -m pip install pydantic
RUN python -m pip install pySqsListener
RUN python -m pip install daemonize RUN python -m pip install daemonize
RUN python -m pip install sqspy
RUN mkdir /app RUN mkdir /app
WORKDIR /app WORKDIR /app

View File

@ -6,8 +6,12 @@ session = boto3.Session(
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY']) aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'])
s3 = session.resource('s3', region_name=os.environ['AWS_REGION']) s3 = session.resource('s3',
sqs = session.client('sqs', region_name=os.environ['AWS_REGION']) region_name=os.environ['AWS_REGION'],
endpoint_url=os.environ['ENDPOINT_URL'])
sqs = session.client('sqs',
region_name=os.environ['AWS_REGION'],
endpoint_url=os.environ['ENDPOINT_URL'])
def get_key_from_message(body): def get_key_from_message(body):

View File

@ -4,14 +4,14 @@ from services.loft_service import LoftService
from helpers import aws_helper from helpers import aws_helper
from daemonize import Daemonize from daemonize import Daemonize
from sqs_listener import SqsListener from sqspy import Consumer
logging.basicConfig(stream=sys.stdout, logging.basicConfig(stream=sys.stdout,
level=logging.INFO, level=logging.INFO,
format="%(levelname)s %(asctime)s - %(message)s") format="%(levelname)s %(asctime)s - %(message)s")
class ServiceListener(SqsListener): class ServiceListener(Consumer):
def handle_message(self, body, attributes, messages_attributes): def handle_message(self, body, attributes, messages_attributes):
# gather/manage/process data based on the particular needs # gather/manage/process data based on the particular needs
@ -24,13 +24,15 @@ class ServiceListener(SqsListener):
def main(): def main():
logging.info('Starting Solver Service (v1.1.3)...') logging.info('Starting Solver Service (v1.1.4)...')
listener = ServiceListener( listener = ServiceListener(
None,
os.environ['SQS_QUEUE'], os.environ['SQS_QUEUE'],
create_queue=False,
region_name=os.environ['AWS_REGION'], region_name=os.environ['AWS_REGION'],
aws_access_key=os.environ['AWS_ACCESS_KEY_ID'], aws_access_key=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_key=os.environ['AWS_SECRET_ACCESS_KEY'], aws_secret_key=os.environ['AWS_SECRET_ACCESS_KEY'],
queue_url=os.environ['SQS_QUEUE']) endpoint_url=os.environ['ENDPOINT_URL'])
listener.listen() listener.listen()

View File

@ -1,6 +1,6 @@
import os, json, random, io, logging import os, json, random, io, logging
from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, LpStatus, lpSum from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, LpAffineExpression, LpConstraint, LpStatus, lpSum
from helpers import aws_helper, tar_helper, csv_helper, service_helper, solver_helper from helpers import aws_helper, tar_helper, csv_helper, service_helper, solver_helper
from lib.errors.item_generation_error import ItemGenerationError from lib.errors.item_generation_error import ItemGenerationError
@ -20,7 +20,6 @@ class LoftService(Base):
self.solver_run = self.create_solver_run_from_attributes() self.solver_run = self.create_solver_run_from_attributes()
self.solver_run.generate_bundles() self.solver_run.generate_bundles()
self.solution = self.generate_solution() self.solution = self.generate_solution()
# self.solution = self.generate_test_solution()
self.result = self.stream_to_s3_bucket() self.result = self.stream_to_s3_bucket()
except ItemGenerationError as error: except ItemGenerationError as error:
self.result = self.stream_to_s3_bucket(error) self.result = self.stream_to_s3_bucket(error)
@ -30,55 +29,6 @@ class LoftService(Base):
ItemGenerationError( ItemGenerationError(
"Provided params causing error in calculation results")) "Provided params causing error in calculation results"))
def generate_test_solution(self) -> Solution:
solution = Solution(response_id=random.randint(100, 5000), forms=[])
problem = LpProblem("ata-form-generate-with-bundles", LpMinimize)
bundles = LpVariable.dicts(
"Bundle", [bundle.id for bundle in self.solver_run.bundles],
lowBound=1,
upBound=1,
cat='Binary')
items = LpVariable.dicts("Item",
[item.id for item in self.solver_run.items],
lowBound=1,
upBound=1,
cat='Binary')
problem += lpSum(
[bundles[bundle.id] for bundle in self.solver_run.bundles])
# problem += lpSum([items[item.id] for item in self.solver_run.items])
# problem += lpSum([bundles[bundle.id] for bundle in self.solver_run.bundles]) <= 3, 'max total bundles used'
# problem += lpSum([bundles[bundle.id] for bundle in self.solver_run.bundles]) >= 1, 'min total bundles used'
problem += lpSum(
[bundles[bundle.id] for bundle in self.solver_run.bundles]) == 3
problem += lpSum(
[
bundle.count * bundles[bundle.id]
for bundle in self.solver_run.bundles
] +
[1 * items[item.id] for item in self.solver_run.unbundled_items()]
) == self.solver_run.total_form_items, 'Total bundle form items for form'
problem.solve()
# for v in problem.variables():
# print(f'{v.name}: {v.varValue}')
# add return items and create as a form
form_items = service_helper.solution_items(problem.variables(),
self.solver_run)
# add form to solution
solution.forms.append(
Form.create(form_items, self.solver_run, LpStatus[problem.status]))
logging.info('Form generated and added to solution...')
return solution
def create_solver_run_from_attributes(self) -> SolverRun: def create_solver_run_from_attributes(self) -> SolverRun:
logging.info('Retrieving attributes from message...') logging.info('Retrieving attributes from message...')
# get s3 object # get s3 object
@ -122,132 +72,96 @@ class LoftService(Base):
# iterate for number of forms that require creation # iterate for number of forms that require creation
# currently creates distinc forms with no item overlap # currently creates distinc forms with no item overlap
while f < self.solver_run.total_forms: while f < self.solver_run.total_forms:
# currently constant drift is supported # setup vars
# plan to support variable drift items = LpVariable.dicts(
problem = None "Item", [item.id for item in self.solver_run.items],
cat='Binary')
bundles = LpVariable.dicts(
"Bundle", [bundle.id for bundle in self.solver_run.bundles],
cat='Binary')
# allow target drift to increment to keep trying # create problem
# once limit has been reached loop will stop problem = LpProblem("ata-form-generate", LpMinimize)
# at this point the latest solve attempt will be used problem_objective_functions = []
# though likely to be infeasible
while self.solver_run.objective_function.minimum_drift() <= 2.0:
# setup vars
items = LpVariable.dicts(
"Item", [item.id for item in self.solver_run.items],
lowBound=0,
upBound=1,
cat='Binary')
bundles = LpVariable.dicts(
"Bundle",
[bundle.id for bundle in self.solver_run.bundles],
lowBound=0,
upBound=1,
cat='Binary')
# create problem # dummy objective function, because it just makes things easier™
problem = LpProblem("ata-form-generate", LpMinimize) problem += lpSum(
problem_objective_functions = [] [items[item.id] for item in self.solver_run.items])
# dummy objective function, because it just makes things easier™ # constraints
# problem += lpSum( # problem += lpSum([items[item.id]
# [items[item.id] for item in self.solver_run.items]) # for item in self.solver_run.items]) == self.solver_run.total_form_items, 'Total form items'
problem += lpSum(
[
bundle.count * bundles[bundle.id]
for bundle in self.solver_run.bundles
] + [
1 * items[item.id]
for item in self.solver_run.unbundled_items()
]
) == self.solver_run.total_form_items, 'Total bundle form items for form'
# constraints # dynamic constraints
# problem += lpSum([items[item.id] problem = solver_helper.build_constraints(self.solver_run, problem,
# for item in self.solver_run.items]) == self.solver_run.total_form_items, 'Total form items' items, bundles)
problem += lpSum(
[
bundle.count * bundles[bundle.id]
for bundle in self.solver_run.bundles
] + [
1 * items[item.id]
for item in self.solver_run.unbundled_items()
]
) == self.solver_run.total_form_items, 'Total bundle form items for form'
# dynamic constraints # multi-objective constraints
problem = solver_helper.build_constraints( logging.info('Creating TIF and TCC constraints')
self.solver_run, problem, items, bundles) for target in self.solver_run.objective_function.tif_targets:
tif = lpSum([
bundle.tif(self.solver_run.irt_model, target.theta) *
bundles[bundle.id] for bundle in self.solver_run.bundles
] + [
item.iif(self.solver_run, target.theta) * items[item.id]
for item in self.solver_run.items
])
# multi-objective constraints problem_objective_functions.append(tif)
logging.info('Creating TIF and TCC constraints') e = LpAffineExpression(
for target in self.solver_run.objective_function.tif_targets: [(bundles[bundle.id],
bundle.tif(self.solver_run.irt_model, target.theta))
for bundle in self.solver_run.bundles] +
[(items[item.id], item.iif(self.solver_run, target.theta))
for item in self.solver_run.items])
constraint = LpConstraint(
e=e,
sense=0,
name=f'tif theta ({target.theta}) @{target.value}',
rhs=target.value)
elastized_constraint = constraint.makeElasticSubProblem(
penalty=1, proportionFreeBound=0.25)
if int(target.value) == 20: print(elastized_constraint)
problem.extend(elastized_constraint)
for target in self.solver_run.objective_function.tcc_targets:
tcc = lpSum([
bundle.trf(self.solver_run.irt_model, target.theta) *
bundles[bundle.id] for bundle in self.solver_run.bundles
] + [
item.irf(self.solver_run, target.theta) * items[item.id]
for item in self.solver_run.items
])
problem_objective_functions.append(tcc)
e = LpAffineExpression(
[(bundles[bundle.id],
bundle.trf(self.solver_run.irt_model, target.theta))
for bundle in self.solver_run.bundles] +
[(items[item.id], item.irf(self.solver_run, target.theta))
for item in self.solver_run.items])
constraint = LpConstraint(
e=e,
sense=0,
name=f'tcc theta ({target.theta}) @{target.value}',
rhs=target.value)
elastized_constraint = constraint.makeElasticSubProblem(
penalty=1, proportionFreeBound=0.25)
problem.extend(elastized_constraint)
tif = lpSum([ # solve problem
bundle.tif(self.solver_run.irt_model, target.theta) * logging.info('Solving...')
bundles[bundle.id] # print(problem)
for bundle in self.solver_run.bundles problem.solve()
] + [ # problem.sequentialSolve(problem_objective_functions)
item.iif(self.solver_run, target.theta) * logging.info('Solved...generating form and adding to solution')
items[item.id] for item in self.solver_run.items
])
problem_objective_functions.append(tif)
problem += lpSum([
bundle.tif(self.solver_run.irt_model, target.theta) *
bundles[bundle.id]
for bundle in self.solver_run.bundles
] + [
item.iif(self.solver_run, target.theta) *
items[item.id] for item in self.solver_run.items
]) >= target.value - target.value * target.drift, f'max tif theta ({target.theta}) target value {target.value}'
problem += lpSum([
bundle.tif(self.solver_run.irt_model, target.theta) *
bundles[bundle.id]
for bundle in self.solver_run.bundles
] + [
item.iif(self.solver_run, target.theta) *
items[item.id] for item in self.solver_run.items
]) <= target.value + target.value * target.drift, f'min tif theta ({target.theta}) target value {target.value}'
for target in self.solver_run.objective_function.tcc_targets:
tcc = lpSum([
bundle.trf(self.solver_run.irt_model, target.theta) *
bundles[bundle.id]
for bundle in self.solver_run.bundles
] + [
item.irf(self.solver_run, target.theta) *
items[item.id] for item in self.solver_run.items
])
problem_objective_functions.append(tcc)
problem += lpSum([
bundle.trf(self.solver_run.irt_model, target.theta) *
bundles[bundle.id]
for bundle in self.solver_run.bundles
] + [
item.irf(self.solver_run, target.theta) *
items[item.id] for item in self.solver_run.items
]) >= target.value - target.value * target.drift, f'max tcc theta ({target.theta}) target value {target.value}'
problem += lpSum([
bundle.trf(self.solver_run.irt_model, target.theta) *
bundles[bundle.id]
for bundle in self.solver_run.bundles
] + [
item.irf(self.solver_run, target.theta) *
items[item.id] for item in self.solver_run.items
]) <= target.value + target.value * target.drift, f'min tcc theta ({target.theta}) target value {target.value}'
# solve problem
logging.info('Solving...')
# problem.solve()
problem.sequentialSolve(problem_objective_functions)
# optimal solution found!
if LpStatus[problem.status] == 'Optimal':
logging.info(
f'Problem solved...generating Form and adding to Solution with {self.solver_run.drift_style} drift \n\
tif target drift: \n\
{["theta @" + str(target.theta) + " - " + str(target.drift) for target in self.solver_run.objective_function.tif_targets]} \n\
tcc target drift: \n\
{["theta @" + str(target.theta) + " - " + str(target.drift) for target in self.solver_run.objective_function.tcc_targets]}'
)
break
else:
# increment drift to attempt to find optimal solution
increment = self.solver_run.objective_function.increment_targets_drift(
2.0, True)
logging.info(
f'Non Optimal Solution...widening Target ranges to {increment} using {self.solver_run.drift_style} drift'
)
# add return items and create as a form # add return items and create as a form
form_items = service_helper.solution_items(problem.variables(), form_items = service_helper.solution_items(problem.variables(),
@ -257,12 +171,13 @@ class LoftService(Base):
solution.forms.append( solution.forms.append(
Form.create(form_items, self.solver_run, Form.create(form_items, self.solver_run,
LpStatus[problem.status])) LpStatus[problem.status]))
logging.info('Form generated and added to Solution...') logging.info('Form generated and added to solution...')
# successfull form, increment # successfull form, increment
f += 1 f += 1
logging.info('Solution Generated.') logging.info('Solution Generated.')
# print(problem)
return solution return solution
def stream_to_s3_bucket(self, error=None): def stream_to_s3_bucket(self, error=None):