from __future__ import annotations from typing import TYPE_CHECKING from pydantic import BaseModel from typing import Any, List from pulp import LpProblem, LpVariable, LpStatus, lpSum import logging from helpers.problem_helper import * from helpers import service_helper from models.solution import Solution from models.item import Item from models.bundle import Bundle from lib.errors.item_generation_error import ItemGenerationError if TYPE_CHECKING: from models.solver_run import SolverRun class Problem(BaseModel): items: List[Item] bundles: List[Bundle] problem: Any solver_items_var: Any = None solver_bundles_var: Any = None def __init__(self, **data) -> None: super().__init__(**data) # setup common Solver variables self.solver_items_var = LpVariable.dicts("Item", [item.id for item in self.items], lowBound=0, upBound=1, cat='Binary') self.solver_bundles_var = LpVariable.dicts("Bundle", [bundle.id for bundle in self.bundles], lowBound=0, upBound=1, cat='Binary') def solve(self, solver_run: SolverRun, enemy_ids: List[int] = []) -> LpProblem: logging.info('solving problem...') self.problem.solve() # NOTICE: Legacy enemies implementation # leaving this in, just in case the current impl fails to function # and we need an immediate solution # if we allow enemies, go through the normal solving process # if solver_run.allow_enemies: # logging.info('enemes allowed, so just solving') # self.problem.solve() # # otherwise begin the process of filtering enemies # else: # self.problem.solve() # # however, if the solve was infeasible, kick it back # # to the normal process # if LpStatus[self.problem.status] == 'Infeasible': # return self.problem # # otherwise continue # else: # # get items from solution # solved_items, _ = service_helper.solution_items(self.problem.variables(), solver_run) # # sacred items will remain the same (with new items added each run) between solve attempts # # but new enemies will be appended # sacred_ids, new_enemy_ids = sanctify(solved_items) # # the current solve run found new enemies # if new_enemy_ids: # logging.info('enemies found, adding constraints...') # # append the new enemies to the enemies_id list # enemy_ids = list(set(enemy_ids+new_enemy_ids)) # # remove old enemy/sacred constraints # if 'Exclude_enemy_items' in self.problem.constraints.keys(): self.problem.constraints.pop('Exclude_enemy_items') # if 'Include_sacred_items' in self.problem.constraints.keys(): self.problem.constraints.pop('Include_sacred_items') # # add constraint to not allow enemy items # self.problem += lpSum([ # len(bundle.find_items(enemy_ids)) * self.solver_bundles_var[bundle.id] # for bundle in self.bundles # ] + [ # (item.id in enemy_ids) * self.solver_items_var[item.id] # for item in self.items # ]) == 0, 'Exclude enemy items' # # add constraint to use sacred items # self.problem += lpSum([ # len(bundle.find_items(sacred_ids)) * self.solver_bundles_var[bundle.id] # for bundle in self.bundles # ] + [ # (item.id in sacred_ids) * self.solver_items_var[item.id] # for item in self.items # ]) == len(sacred_ids), 'Include sacred items' # # recursively solve until no enemies exist or infeasible # logging.info('recursively solving...') # self.solve(solver_run) return self.problem def generate(self, solution: Solution, solver_run: SolverRun) -> None: try: # creating problem objective function solver_run.objective_function.for_problem(self) logging.info('Creating Constraints...') # generic constraints for constraint in solver_run.constraints: constraint.build(self, solver_run=solver_run, solution=solution) # irt target constraints for target in solver_run.objective_function.all_targets(): target.constraints(self, solver_run) logging.info('Constraints Created...') except ValueError as error: logging.error(error) raise ItemGenerationError( error.msg, error.args[0])