130 lines
5.3 KiB
Python
130 lines
5.3 KiB
Python
from __future__ import annotations
|
|
from typing import TYPE_CHECKING
|
|
|
|
from pydantic import BaseModel
|
|
from typing import Any, List
|
|
from pulp import LpProblem, LpVariable, LpStatus, lpSum, COIN
|
|
|
|
import logging
|
|
|
|
from helpers.problem_helper import *
|
|
from helpers import service_helper
|
|
|
|
from models.solution import Solution
|
|
from models.item import Item
|
|
from models.bundle import Bundle
|
|
|
|
from lib.errors.item_generation_error import ItemGenerationError
|
|
|
|
if TYPE_CHECKING:
|
|
from models.solver_run import SolverRun
|
|
|
|
class Problem(BaseModel):
|
|
items: List[Item]
|
|
bundles: List[Bundle]
|
|
problem: Any
|
|
solver_items_var: Any = None
|
|
solver_bundles_var: Any = None
|
|
|
|
def __init__(self, **data) -> None:
|
|
super().__init__(**data)
|
|
|
|
# setup common Solver variables
|
|
self.solver_items_var = LpVariable.dicts("Item",
|
|
[item.id for item in self.items],
|
|
lowBound=0,
|
|
upBound=1,
|
|
cat='Binary')
|
|
self.solver_bundles_var = LpVariable.dicts("Bundle",
|
|
[bundle.id for bundle in self.bundles],
|
|
lowBound=0,
|
|
upBound=1,
|
|
cat='Binary')
|
|
|
|
def solve(self, solver_run: SolverRun, enemy_ids: List[int] = []) -> LpProblem:
|
|
logging.info('solving problem...')
|
|
|
|
# creating problem multi-objective functions
|
|
objective_functions = solver_run.objective_function.for_problem(self, solver_run)
|
|
self.problem.sequentialSolve(objective_functions, None, None, COIN(gapRel=0.2, timeLimit=300))
|
|
|
|
return self.problem
|
|
|
|
# NOTICE: Legacy enemies implementation
|
|
# leaving this in, just in case the current impl fails to function
|
|
# and we need an immediate solution
|
|
# if we allow enemies, go through the normal solving process
|
|
# if solver_run.allow_enemies:
|
|
# logging.info('enemes allowed, so just solving')
|
|
# self.problem.solve()
|
|
# # otherwise begin the process of filtering enemies
|
|
# else:
|
|
# self.problem.solve()
|
|
|
|
# # however, if the solve was infeasible, kick it back
|
|
# # to the normal process
|
|
# if LpStatus[self.problem.status] == 'Infeasible':
|
|
# return self.problem
|
|
# # otherwise continue
|
|
# else:
|
|
# # get items from solution
|
|
# solved_items, _ = service_helper.solution_items(self.problem.variables(), solver_run)
|
|
|
|
# # sacred items will remain the same (with new items added each run) between solve attempts
|
|
# # but new enemies will be appended
|
|
# sacred_ids, new_enemy_ids = sanctify(solved_items)
|
|
|
|
# # the current solve run found new enemies
|
|
# if new_enemy_ids:
|
|
# logging.info('enemies found, adding constraints...')
|
|
|
|
# # append the new enemies to the enemies_id list
|
|
# enemy_ids = list(set(enemy_ids+new_enemy_ids))
|
|
|
|
# # remove old enemy/sacred constraints
|
|
# if 'Exclude_enemy_items' in self.problem.constraints.keys(): self.problem.constraints.pop('Exclude_enemy_items')
|
|
# if 'Include_sacred_items' in self.problem.constraints.keys(): self.problem.constraints.pop('Include_sacred_items')
|
|
|
|
# # add constraint to not allow enemy items
|
|
# self.problem += lpSum([
|
|
# len(bundle.find_items(enemy_ids)) * self.solver_bundles_var[bundle.id]
|
|
# for bundle in self.bundles
|
|
# ] + [
|
|
# (item.id in enemy_ids) * self.solver_items_var[item.id]
|
|
# for item in self.items
|
|
# ]) == 0, 'Exclude enemy items'
|
|
|
|
# # add constraint to use sacred items
|
|
# self.problem += lpSum([
|
|
# len(bundle.find_items(sacred_ids)) * self.solver_bundles_var[bundle.id]
|
|
# for bundle in self.bundles
|
|
# ] + [
|
|
# (item.id in sacred_ids) * self.solver_items_var[item.id]
|
|
# for item in self.items
|
|
# ]) == len(sacred_ids), 'Include sacred items'
|
|
|
|
# # recursively solve until no enemies exist or infeasible
|
|
# logging.info('recursively solving...')
|
|
# self.solve(solver_run)
|
|
|
|
return self.problem
|
|
|
|
def generate(self, solution: Solution, solver_run: SolverRun) -> None:
|
|
try:
|
|
logging.info('Creating Constraints...')
|
|
# generic constraints
|
|
for constraint in solver_run.constraints:
|
|
constraint.build(self, solver_run=solver_run, solution=solution)
|
|
|
|
# irt target constraints
|
|
for target in solver_run.objective_function.all_targets():
|
|
target.constraints(self, solver_run)
|
|
|
|
logging.info('Constraints Created...')
|
|
except ValueError as error:
|
|
logging.error(error)
|
|
raise ItemGenerationError(
|
|
error.msg,
|
|
error.args[0])
|
|
|