117 lines
4.5 KiB
Python
117 lines
4.5 KiB
Python
from __future__ import annotations
|
|
from typing import TYPE_CHECKING
|
|
|
|
from pydantic import BaseModel
|
|
from typing import Any, List
|
|
from pulp import LpProblem, LpVariable, LpStatus, lpSum
|
|
|
|
import logging
|
|
|
|
from helpers.problem_helper import *
|
|
from helpers import service_helper
|
|
|
|
from models.solution import Solution
|
|
from models.item import Item
|
|
from models.bundle import Bundle
|
|
|
|
from lib.errors.item_generation_error import ItemGenerationError
|
|
|
|
if TYPE_CHECKING:
|
|
from models.solver_run import SolverRun
|
|
|
|
class Problem(BaseModel):
|
|
items: List[Item]
|
|
bundles: List[Bundle]
|
|
problem: Any
|
|
solver_items_var: Any = None
|
|
solver_bundles_var: Any = None
|
|
|
|
def __init__(self, **data) -> None:
|
|
super().__init__(**data)
|
|
|
|
# setup common Solver variables
|
|
self.solver_items_var = LpVariable.dicts("Item",
|
|
[item.id for item in self.items],
|
|
lowBound=0,
|
|
upBound=1,
|
|
cat='Binary')
|
|
self.solver_bundles_var = LpVariable.dicts("Bundle",
|
|
[bundle.id for bundle in self.bundles],
|
|
lowBound=0,
|
|
upBound=1,
|
|
cat='Binary')
|
|
|
|
def solve(self, solver_run: SolverRun) -> LpProblem:
|
|
logging.info('solving problem...')
|
|
# if we allow enemies, go through the normal solving process
|
|
if solver_run.allow_enemies:
|
|
logging.info('enemes allowed, so just solving')
|
|
self.problem.solve()
|
|
# otherwise begin the process of filtering enemies
|
|
else:
|
|
self.problem.solve()
|
|
|
|
# however, if the solve was infeasible, kick it back
|
|
# to the normal process
|
|
if LpStatus[self.problem.status] == 'Infeasible':
|
|
return self.problem
|
|
# otherwise continue
|
|
else:
|
|
# get items from solution
|
|
solved_items, _ = service_helper.solution_items(self.problem.variables(), solver_run)
|
|
|
|
sacred_ids, enemy_ids = sanctify(solved_items)
|
|
|
|
if enemy_ids:
|
|
logging.info('enemies found, adding constraints...')
|
|
|
|
# remove old enemy/sacred constraints
|
|
if 'Exclude_enemy_items' in self.problem.constraints.keys(): self.problem.constraints.pop('Exclude_enemy_items')
|
|
if 'Include_sacred_items' in self.problem.constraints.keys(): self.problem.constraints.pop('Include_sacred_items')
|
|
|
|
# add constraint to not allow enemy items
|
|
self.problem += lpSum([
|
|
len(bundle.find_items(enemy_ids)) * self.solver_bundles_var[bundle.id]
|
|
for bundle in self.bundles
|
|
] + [
|
|
(item.id in enemy_ids) * self.solver_items_var[item.id]
|
|
for item in self.items
|
|
]) == 0, 'Exclude enemy items'
|
|
|
|
# add constraint to use sacred items
|
|
self.problem += lpSum([
|
|
len(bundle.find_items(sacred_ids)) * self.solver_bundles_var[bundle.id]
|
|
for bundle in self.bundles
|
|
] + [
|
|
(item.id in sacred_ids) * self.solver_items_var[item.id]
|
|
for item in self.items
|
|
]) == len(sacred_ids), 'Include sacred items'
|
|
|
|
# recursively solve until no enemies exist or infeasible
|
|
logging.info('recursively solving...')
|
|
self.solve(solver_run)
|
|
|
|
return self.problem
|
|
|
|
def generate(self, solution: Solution, solver_run: SolverRun) -> None:
|
|
try:
|
|
# creating problem objective function
|
|
solver_run.objective_function.for_problem(self)
|
|
|
|
logging.info('Creating Constraints...')
|
|
# generic constraints
|
|
for constraint in solver_run.constraints:
|
|
constraint.build(self, solver_run=solver_run, solution=solution)
|
|
|
|
# irt target constraints
|
|
for target in solver_run.objective_function.all_targets():
|
|
target.constraints(self, solver_run)
|
|
|
|
logging.info('Constraints Created...')
|
|
except ValueError as error:
|
|
logging.error(error)
|
|
raise ItemGenerationError(
|
|
error.msg,
|
|
error.args[0])
|
|
|