irt-service/app/models/objective_function.py
2023-11-17 19:02:42 -05:00

94 lines
3.5 KiB
Python

from __future__ import annotations
from typing import TYPE_CHECKING, Any, Literal, Union
from pydantic import BaseModel, validator
from typing import Dict, List
from pulp import lpSum, LpMinimize, LpMaximize
from models.targets.tif_target import TifTarget
from models.targets.tcc_target import TccTarget
from models.problem import Problem
if TYPE_CHECKING:
from models.solver_run import SolverRun
class ObjectiveFunction(BaseModel):
# minimizing tif/tcc target value is only option currently
# as we add more we can build this out to be more dynamic
# likely with models representing each objective function type
tif_targets: List[TifTarget]
tcc_targets: List[TccTarget]
target_variance_percentage: int = 10
objective: Literal[1,-1] = 1
functions: List[Literal['tcc', 'tif']] = ['tcc', 'tif']
weight: Dict = {'tif': 1, 'tcc': 1}
@validator("objective", pre=True)
def set_objective(cls, v) -> List[int]:
if v == 'minimize':
return 1
elif v == 'maximize':
return -1
else:
return None
def for_problem(self, problem_handler: Problem, solver_run: SolverRun) -> List[lpSum]:
functions = []
for function in self.functions:
if function == 'tcc':
functions.append(lpSum([
bundle.trf(solver_run.irt_model, solver_run.theta_cut_score)
* problem_handler.solver_bundles_var[bundle.id]
for bundle in problem_handler.bundles
] + [
item.irf(solver_run.irt_model, solver_run.theta_cut_score) *
problem_handler.solver_items_var[item.id]
for item in problem_handler.items
]))
elif function == 'tif':
problem_handler.problem += lpSum([
bundle.tif(solver_run.irt_model, solver_run.theta_cut_score)
* problem_handler.solver_bundles_var[bundle.id]
for bundle in problem_handler.bundles
] + [
item.iif(solver_run.irt_model, solver_run.theta_cut_score) *
problem_handler.solver_items_var[item.id]
for item in problem_handler.items
])
return functions
def increment_targets_drift(self,
limit: float or bool,
all: bool = False,
amount: float = 0.1,
targets: list[TifTarget|TccTarget] = []) -> bool:
if all:
for target in self.tif_targets:
target.drift = round(target.drift + amount, 2)
for target in self.tcc_targets:
target.drift = round(target.drift + amount, 2)
else:
for target in targets:
target.drift = round(target.drift + amount, 2)
return amount
def update_targets_drift(self, amount: float = 0.0):
for target in self.tif_targets:
target.drift = round(amount, 2)
for target in self.tcc_targets:
target.drift = round(amount, 2)
def minimum_drift(self) -> float:
minimum_drift = 0.0
for target in self.all_targets():
if target.drift < minimum_drift:
minimum_drift = target.drift
return minimum_drift
def all_targets(self) -> list[TifTarget|TccTarget]:
return self.tif_targets + self.tcc_targets