irt-service/app/models/problem.py
2023-11-10 15:21:16 -05:00

139 lines
5.4 KiB
Python

from __future__ import annotations
from typing import TYPE_CHECKING
from pydantic import BaseModel
from typing import Any, List
from pulp import LpProblem, LpVariable, lpSum
import logging
from models.solution import Solution
from models.item import Item
from models.bundle import Bundle
from lib.errors.item_generation_error import ItemGenerationError
if TYPE_CHECKING:
from models.solver_run import SolverRun
class Problem(BaseModel):
items: List[Item]
bundles: List[Bundle]
problem: Any
solver_items_var: Any = None
solver_bundles_var: Any = None
def __init__(self, **data) -> None:
super().__init__(**data)
# setup common Solver variables
self.solver_items_var = LpVariable.dicts("Item",
[item.id for item in self.items],
lowBound=0,
upBound=1,
cat='Binary')
self.solver_bundles_var = LpVariable.dicts("Bundle",
[bundle.id for bundle in self.bundles],
lowBound=0,
upBound=1,
cat='Binary')
# objective function
self.problem += lpSum([
bundle.count * self.solver_bundles_var[bundle.id]
for bundle in self.bundles
] + [
self.solver_items_var[item.id]
for item in self.items
])
def solve(self) -> LpProblem:
self.problem.solve()
return self.problem
def generate(self, solution: Solution, solver_run: SolverRun):
# Form Constraints
self.problem += lpSum(
[
bundle.count * self.solver_bundles_var[bundle.id]
for bundle in self.bundles
] + [
1 * self.solver_items_var[item.id]
for item in self.items
]
) == solver_run.total_form_items, f'Total bundle form items for form'
# each time a form is generated, we want to ensure
# that it is unique to all other forms generated before it
self.problem += lpSum(
[
solution.items_exist_in_forms(bundle.items) * self.solver_bundles_var[bundle.id]
for bundle in self.bundles
] + [
solution.items_exist_in_forms([item]) * self.solver_items_var[item.id]
for item in self.items
]
) <= solver_run.total_form_items - 1, f'Ensuring uniqueness for form'
def generate_constraints(self, solver_run: SolverRun, current_drift: int):
logging.info('Creating Constraints...')
try:
for constraint in solver_run.constraints:
constraint.build(self, solver_run)
for tif_target in solver_run.objective_function.tif_targets:
self.problem += lpSum([
bundle.tif(solver_run.irt_model, tif_target.theta)
* self.solver_bundles_var[bundle.id]
for bundle in self.bundles
] + [
item.iif(solver_run.irt_model, tif_target.theta) *
self.solver_items_var[item.id]
for item in self.items
]) >= tif_target.minimum(
), f'Min TIF theta({tif_target.theta}) at target {tif_target.value} drift at {current_drift}%'
self.problem += lpSum([
bundle.tif(solver_run.irt_model, tif_target.theta)
* self.solver_bundles_var[bundle.id]
for bundle in self.bundles
] + [
item.iif(solver_run.irt_model, tif_target.theta) *
self.solver_items_var[item.id]
for item in self.items
]) <= tif_target.maximum(
), f'Max TIF theta({tif_target.theta}) at target {tif_target.value} drift at {current_drift}%'
for tcc_target in solver_run.objective_function.tcc_targets:
self.problem += lpSum([
bundle.trf(solver_run.irt_model, tcc_target.theta)
* self.solver_bundles_var[bundle.id]
for bundle in self.bundles
] + [
item.irf(solver_run.irt_model, tcc_target.theta) *
self.solver_items_var[item.id]
for item in self.items
]) >= tcc_target.minimum(
), f'Min TCC theta({tcc_target.theta}) at target {tcc_target.value} drift at {current_drift}%'
self.problem += lpSum([
bundle.trf(solver_run.irt_model, tcc_target.theta)
* self.solver_bundles_var[bundle.id]
for bundle in self.bundles
] + [
item.irf(solver_run.irt_model, tcc_target.theta) *
self.solver_items_var[item.id]
for item in self.items
]) <= tcc_target.maximum(
), f'Max TCC theta({tcc_target.theta}) at target {tcc_target.value} drift at {current_drift}%'
logging.info('Constraints Created...')
except ValueError as error:
logging.error(error)
raise ItemGenerationError(
"Bundle min and/or max larger than bundle amount provided",
error.args[0])