sneaky update: add multi-objective functions

This commit is contained in:
Joshua Burman 2023-11-17 19:02:42 -05:00
parent 4001200e9b
commit 82b6cd25ed
4 changed files with 54 additions and 20 deletions

View File

@ -1,6 +1,5 @@
import logging
from pydantic import BaseModel, validator from pydantic import BaseModel, validator
from typing import List, Optional, Tuple from typing import List, Optional
from models.attribute import Attribute from models.attribute import Attribute

View File

@ -1,13 +1,17 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Any, Literal, Union
from pydantic import BaseModel from pydantic import BaseModel, validator
from typing import Dict, List, AnyStr from typing import Dict, List
from pulp import lpSum from pulp import lpSum, LpMinimize, LpMaximize
from models.targets.tif_target import TifTarget from models.targets.tif_target import TifTarget
from models.targets.tcc_target import TccTarget from models.targets.tcc_target import TccTarget
from models.problem import Problem from models.problem import Problem
if TYPE_CHECKING:
from models.solver_run import SolverRun
class ObjectiveFunction(BaseModel): class ObjectiveFunction(BaseModel):
# minimizing tif/tcc target value is only option currently # minimizing tif/tcc target value is only option currently
# as we add more we can build this out to be more dynamic # as we add more we can build this out to be more dynamic
@ -15,18 +19,46 @@ class ObjectiveFunction(BaseModel):
tif_targets: List[TifTarget] tif_targets: List[TifTarget]
tcc_targets: List[TccTarget] tcc_targets: List[TccTarget]
target_variance_percentage: int = 10 target_variance_percentage: int = 10
objective: AnyStr = "minimize" objective: Literal[1,-1] = 1
functions: List[Literal['tcc', 'tif']] = ['tcc', 'tif']
weight: Dict = {'tif': 1, 'tcc': 1} weight: Dict = {'tif': 1, 'tcc': 1}
def for_problem(self, problem_handler: Problem) -> None: @validator("objective", pre=True)
problem_handler.problem += lpSum([ def set_objective(cls, v) -> List[int]:
bundle.count * problem_handler.solver_bundles_var[bundle.id] if v == 'minimize':
return 1
elif v == 'maximize':
return -1
else:
return None
def for_problem(self, problem_handler: Problem, solver_run: SolverRun) -> List[lpSum]:
functions = []
for function in self.functions:
if function == 'tcc':
functions.append(lpSum([
bundle.trf(solver_run.irt_model, solver_run.theta_cut_score)
* problem_handler.solver_bundles_var[bundle.id]
for bundle in problem_handler.bundles for bundle in problem_handler.bundles
] + [ ] + [
item.irf(solver_run.irt_model, solver_run.theta_cut_score) *
problem_handler.solver_items_var[item.id]
for item in problem_handler.items
]))
elif function == 'tif':
problem_handler.problem += lpSum([
bundle.tif(solver_run.irt_model, solver_run.theta_cut_score)
* problem_handler.solver_bundles_var[bundle.id]
for bundle in problem_handler.bundles
] + [
item.iif(solver_run.irt_model, solver_run.theta_cut_score) *
problem_handler.solver_items_var[item.id] problem_handler.solver_items_var[item.id]
for item in problem_handler.items for item in problem_handler.items
]) ])
return functions
def increment_targets_drift(self, def increment_targets_drift(self,
limit: float or bool, limit: float or bool,
all: bool = False, all: bool = False,

View File

@ -43,7 +43,12 @@ class Problem(BaseModel):
def solve(self, solver_run: SolverRun, enemy_ids: List[int] = []) -> LpProblem: def solve(self, solver_run: SolverRun, enemy_ids: List[int] = []) -> LpProblem:
logging.info('solving problem...') logging.info('solving problem...')
self.problem.solve()
# creating problem multi-objective functions
objective_functions = solver_run.objective_function.for_problem(self, solver_run)
self.problem.sequentialSolve(objective_functions)
return self.problem
# NOTICE: Legacy enemies implementation # NOTICE: Legacy enemies implementation
# leaving this in, just in case the current impl fails to function # leaving this in, just in case the current impl fails to function
@ -106,9 +111,6 @@ class Problem(BaseModel):
def generate(self, solution: Solution, solver_run: SolverRun) -> None: def generate(self, solution: Solution, solver_run: SolverRun) -> None:
try: try:
# creating problem objective function
solver_run.objective_function.for_problem(self)
logging.info('Creating Constraints...') logging.info('Creating Constraints...')
# generic constraints # generic constraints
for constraint in solver_run.constraints: for constraint in solver_run.constraints:

View File

@ -1,6 +1,6 @@
import json, random, io, logging import json, random, io, logging
from pulp import LpProblem, LpMinimize, LpStatus from pulp import LpProblem, LpMinimize, LpMaximize, LpStatus
from lib.application_configs import ApplicationConfigs from lib.application_configs import ApplicationConfigs
from helpers import aws_helper, tar_helper, csv_helper, service_helper from helpers import aws_helper, tar_helper, csv_helper, service_helper
@ -81,7 +81,8 @@ class FormGenerationService(Base):
drift_percent) drift_percent)
# create problem # create problem
problem_handler = Problem(items = self.solver_run.unbundled_items(), bundles = self.solver_run.bundles, problem = LpProblem('ata-form-generate', LpMinimize)) problem = LpProblem('ata-form-generate', self.solver_run.objective_function.objective)
problem_handler = Problem(items = self.solver_run.unbundled_items(), bundles = self.solver_run.bundles, problem = problem)
problem_handler.generate(solution, self.solver_run) problem_handler.generate(solution, self.solver_run)
problem = problem_handler.solve(self.solver_run) problem = problem_handler.solve(self.solver_run)