Merge branch 'develop' into feature/QUANT-3102

This commit is contained in:
brmnjsh 2023-11-20 15:16:36 -05:00 committed by GitHub
commit 151f4ded7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 549 additions and 254 deletions

View File

@ -37,6 +37,5 @@ def get_object_tags(key: str, bucket: str) -> list:
tags = s3.get_object_tagging(Bucket=bucket, Key=key)['TagSet']
return tags
def file_stream_upload(buffer: io.BytesIO, name: str, bucket: str, action: str = None):
return s3.upload_fileobj(buffer, bucket, name, ExtraArgs={'Tagging': f'action={action}'})

View File

@ -1,14 +1,18 @@
from typing import List
from lib.irt.test_response_function import TestResponseFunction
from lib.irt.test_information_function import TestInformationFunction
from models.target import Target
from models.targets.tif_target import TifTarget
from models.targets.tcc_target import TccTarget
from models.item import Item
def generate_tif_results(items, solver_run):
targets = []
for target in solver_run.objective_function.tif_targets:
tif = TestInformationFunction(solver_run.irt_model).calculate(items, theta=target.theta)
targets.append(Target(theta=target.theta, value=target.value, result=tif))
targets.append(TifTarget(theta=target.theta, value=target.value, result=tif))
return targets
@ -17,6 +21,6 @@ def generate_tcc_results(items, solver_run):
for target in solver_run.objective_function.tcc_targets:
tcc = TestResponseFunction(solver_run.irt_model).calculate(items, theta=target.theta)
targets.append(Target(theta=target.theta, value=target.value, result=tcc))
targets.append(TccTarget(theta=target.theta, value=target.value, result=tcc))
return targets

View File

@ -0,0 +1,21 @@
from typing import Tuple
from models.item import Item
def sanctify(solved_items: [Item]) -> Tuple[list]:
enemy_ids = []
# get all enemies
for item in solved_items:
# if the item is already an an enemy
# then it's enemy is sacred
if item.id not in enemy_ids:
# if it has enemies, check if it exists as part of the solved items
for enemy_id in item.enemies:
# if it does, it's a true enemy
if enemy_id in (i.id for i in solved_items):
enemy_ids.append(enemy_id)
sacred_ids = [i.id for i in solved_items if i.id not in enemy_ids]
return sacred_ids, enemy_ids

View File

@ -1,86 +0,0 @@
from pulp import lpSum, LpProblem
from random import randint, sample
import logging
from helpers.common_helper import *
from models.bundle import Bundle
from models.solver_run import SolverRun
from models.item import Item
from lib.errors.item_generation_error import ItemGenerationError
def build_constraints(solver_run: SolverRun, problem: LpProblem,
items: list[Item], bundles: list[Bundle], selected_items: list[Item], selected_bundles: list[Bundle]) -> LpProblem:
logging.info('Creating Constraints...')
try:
total_form_items = solver_run.total_form_items
constraints = solver_run.constraints
for constraint in constraints:
attribute = constraint.reference_attribute
min = constraint.minimum
max = constraint.maximum
if attribute.type == 'metadata':
logging.info('Metadata Constraint Generating...')
problem += lpSum(
[
len(bundle.items_with_attribute(attribute)) * bundles[bundle.id] for bundle in selected_bundles
] +
[
item.attribute_exists(attribute).real * items[item.id] for item in selected_items
]
) >= round(total_form_items * (min / 100)), f'{attribute.id} - {attribute.value} - min'
problem += lpSum(
[
len(bundle.items_with_attribute(attribute)) * bundles[bundle.id] for bundle in selected_bundles
] +
[
item.attribute_exists(attribute).real * items[item.id] for item in selected_items
]
) <= round(total_form_items * (max / 100)), f'{attribute.id} - {attribute.value} - max'
elif attribute.type == 'bundle':
logging.info('Bundles Constraint Generating...')
# TODO: account for many different bundle types, since the id condition in L33 could yield duplicates
if selected_bundles != None:
# make sure the total bundles used in generated form is limited between min-max set
problem += lpSum([
bundles[bundle.id] for bundle in selected_bundles
]) == randint(int(constraint.minimum),
int(constraint.maximum))
logging.info('Constraints Created...')
return problem
except ValueError as error:
logging.error(error)
raise ItemGenerationError(
"Bundle min and/or max larger than bundle amount provided",
error.args[0])
def get_random_bundles(total_form_items: int,
bundles: list[Bundle],
min: int,
max: int,
found_bundles=False) -> list[Bundle]:
selected_bundles = None
total_bundle_items = 0
total_bundles = randint(min, max)
logging.info(f'Selecting Bundles (total of {total_bundles})...')
while found_bundles == False:
selected_bundles = sample(bundles, total_bundles)
total_bundle_items = sum(bundle.count for bundle in selected_bundles)
if total_bundle_items <= total_form_items:
found_bundles = True
if found_bundles == True:
return selected_bundles
else:
return get_random_bundles(total_form_items, total_bundles - 1, bundles)

View File

@ -43,7 +43,7 @@ class ServiceListener(Consumer):
logging.error(f'action of type {action} does not exist.')
def main():
logging.info('Starting IRT Service: That Was Rasch (v1.5.0)...')
logging.info('Starting IRT Service: The Enemies Within (v1.7.0)...')
# ToDo: Figure out a much better way of doing this.
# LocalStack wants 'endpoint_url', while prod doesnt :(

View File

@ -9,4 +9,5 @@ class AdvancedOptions(BaseModel):
brand_bound_tolerance: Optional[float] = None
max_forms: Optional[int] = None
precision: Optional[float] = None
ensure_form_uniqueness: bool = True
extra_param_range: Optional[List[Dict]] = None

View File

@ -1,8 +1,7 @@
from pydantic import BaseModel
from typing import Optional
from typing import Optional, Union
class Attribute(BaseModel):
value: Optional[str]
value: Optional[Union[str,int,list]]
type: Optional[str]
id: str

View File

@ -14,6 +14,16 @@ class Bundle(BaseModel):
items: List[Item]
type: str
def find_items(self, requested_items_ids: [int]) -> [Item]:
found_items = []
for item in self.items:
if item.id in requested_items_ids:
found_items.append(item)
return found_items
def tif(self, irt_model: IRTModel, theta: float) -> float:
return TestInformationFunction(irt_model).calculate(self.items, theta=theta)
@ -49,3 +59,13 @@ class Bundle(BaseModel):
def ordered_items(self) -> List[Item]:
return sorted(self.items, key=lambda item: item.position)
# are there enemys in the bundle?
def enemy_pair_count(self, pair: List[Item]) -> int:
pair_count = 0
for item in self.items:
if pair in item.enemy_pairs():
pair_count += 1
return pair_count

View File

@ -1,9 +0,0 @@
from pydantic import BaseModel
from models.attribute import Attribute
class Constraint(BaseModel):
reference_attribute: Attribute
minimum: float
maximum: float

View File

@ -0,0 +1,15 @@
from random import randint
from models.constraints.generic_constraint import *
class BundleConstraint(GenericConstraint):
def build(self, problem_handler: Problem, **kwargs) -> None:
logging.info('Bundles Constraint Generating...')
# TODO: account for many different bundle types, since the id condition in L33 could yield duplicates
if problem_handler.bundles != None and len(problem_handler.bundles) > 0:
# make sure the total bundles used in generated form is limited between min-max set
problem_handler.problem += lpSum([
problem_handler.solver_bundles_var[bundle.id] for bundle in problem_handler.bundles
]) == randint(int(self.minimum),
int(self.maximum)), f'Allowing min - max bundles'

View File

@ -0,0 +1,31 @@
from __future__ import annotations
from typing import List
from models.constraints.generic_constraint import *
class EnemyPairConstraint(GenericConstraint):
@classmethod
def create(cls: Type[_T], pair: List[int]) -> _T:
return cls(
minimum=0,
maximum=0,
reference_attribute=Attribute(
value=pair,
type='enemy_pair',
id='enemy_pair'
)
)
def build(self, problem_handler: Problem, **_) -> None:
logging.info('Enemy Pair Constraint Generating...')
pair = self.reference_attribute.value
problem_handler.problem += lpSum(
[
bundle.enemy_pair_count(pair) * problem_handler.solver_bundles_var[bundle.id]
for bundle in problem_handler.bundles
] + [
(pair in item.enemy_pairs()).real * problem_handler.solver_items_var[item.id]
for item in problem_handler.items
]
) <= 1, f'Enemy Pair constraint for pair: {pair}'

View File

@ -0,0 +1,34 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from models.constraints.generic_constraint import *
if TYPE_CHECKING:
from models.solver_run import SolverRun
class FormUniquenessConstraint(GenericConstraint):
@classmethod
def create(cls: Type[_T], total_items: int) -> _T:
return cls(
minimum=0,
maximum=0,
reference_attribute=Attribute(
value=total_items,
type='form_uniqueness',
id='form_uniqueness'
)
)
def build(self, problem_handler: Problem, **kwargs) -> None:
logging.info('Form Uniqueness Constraint Generating...')
# form uniqueness constraint
problem_handler.problem += lpSum(
[
kwargs['solution'].items_exist_in_forms(bundle.items) * problem_handler.solver_bundles_var[bundle.id]
for bundle in problem_handler.bundles
] + [
kwargs['solution'].items_exist_in_forms([item]) * problem_handler.solver_items_var[item.id]
for item in problem_handler.items
]
) <= self.reference_attribute.value, f'Ensuring uniqueness for form'

View File

@ -0,0 +1,17 @@
import logging
from pulp import lpSum
from pydantic import BaseModel
from typing import TypeVar, Type
from helpers.common_helper import *
from models.attribute import Attribute
from models.problem import Problem
_T = TypeVar("_T")
class GenericConstraint(BaseModel):
reference_attribute: Attribute
minimum: float
maximum: float

View File

@ -0,0 +1,29 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from models.constraints.generic_constraint import *
if TYPE_CHECKING:
from models.solver_run import SolverRun
class MetadataConstraint(GenericConstraint):
def build(self, problem_handler: Problem, **kwargs) -> None:
logging.info('Metadata Constraint Generating...')
problem_handler.problem += lpSum(
[
len(bundle.items_with_attribute(self.reference_attribute)) * problem_handler.solver_bundles_var[bundle.id] for bundle in problem_handler.bundles
] +
[
item.attribute_exists(self.reference_attribute).real * problem_handler.solver_items_var[item.id] for item in problem_handler.items
]
) >= round(kwargs['solver_run'].total_form_items * (self.minimum / 100)), f'{self.reference_attribute.id} - {self.reference_attribute.value} - min'
problem_handler.problem += lpSum(
[
len(bundle.items_with_attribute(self.reference_attribute)) * problem_handler.solver_bundles_var[bundle.id] for bundle in problem_handler.bundles
] +
[
item.attribute_exists(self.reference_attribute).real * problem_handler.solver_items_var[item.id] for item in problem_handler.items
]
) <= round(kwargs['solver_run'].total_form_items * (self.maximum / 100)), f'{self.reference_attribute.id} - {self.reference_attribute.value} - max'

View File

@ -0,0 +1,33 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from models.constraints.generic_constraint import *
if TYPE_CHECKING:
from models.solver_run import SolverRun
class TotalFormItemsConstraint(GenericConstraint):
@classmethod
def create(cls: Type[_T], total_items: int) -> _T:
return cls(
minimum=0,
maximum=0,
reference_attribute=Attribute(
value=total_items,
type='form_uniqueness',
id='form_uniqueness'
)
)
def build(self, problem_handler: Problem, **kwargs) -> None:
logging.info('Total Form Items Constraint Generating...')
problem_handler.problem += lpSum(
[
bundle.count * problem_handler.solver_bundles_var[bundle.id]
for bundle in problem_handler.bundles
] + [
1 * problem_handler.solver_items_var[item.id]
for item in problem_handler.items
]
) == self.reference_attribute.value, f'Total bundle form items for form'

View File

@ -1,21 +1,27 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from pydantic import BaseModel
from typing import List, TypeVar, Type
from helpers import irt_helper
from models.solver_run import SolverRun
from models.item import Item
from models.target import Target
from models.targets.tif_target import TifTarget
from models.targets.tcc_target import TccTarget
from lib.irt.test_response_function import TestResponseFunction
if TYPE_CHECKING:
from models.solver_run import SolverRun
_T = TypeVar("_T")
class Form(BaseModel):
items: List[Item]
cut_score: float
tif_results: List[Target]
tcc_results: List[Target]
tif_results: List[TifTarget]
tcc_results: List[TccTarget]
status: str = 'Not Optimized'
solver_variables: List[str]
@ -29,3 +35,10 @@ class Form(BaseModel):
tcc_results=irt_helper.generate_tcc_results(items, solver_run),
status=status,
solver_variables=solver_variables)
def has_item(self, item: Item) -> bool:
for i in self.items:
if item == i:
return True
return False

View File

@ -1,5 +1,6 @@
from pydantic import BaseModel
from typing import List, Optional
import logging
from pydantic import BaseModel, validator
from typing import List, Optional, Tuple
from models.attribute import Attribute
@ -10,16 +11,24 @@ class Item(BaseModel):
id: int
position: Optional[int] = None
passage_id: Optional[int] = None
enemies: List[int] = []
workflow_state: Optional[str] = None
attributes: List[Attribute] = None
b_param: float = 0.00
response: Optional[int] = None
def iif(self, solver_run, theta):
return ItemInformationFunction(solver_run.irt_model).calculate(b_param=self.b_param, theta=theta)
@validator("enemies", pre=True)
def set_enemies(cls, v) -> List[id]:
if v == "":
return []
enemies = list(filter(None, [int(enemy) for enemy in v.split(",")]))
return enemies
def irf(self, solver_run, theta):
return ItemResponseFunction(solver_run.irt_model).calculate(b_param=self.b_param, theta=theta)
def iif(self, irt_model, theta):
return ItemInformationFunction(irt_model).calculate(b_param=self.b_param, theta=theta)
def irf(self, irt_model, theta):
return ItemResponseFunction(irt_model).calculate(b_param=self.b_param, theta=theta)
def get_attribute(self, ref_attribute: Attribute) -> Attribute or None:
for attribute in self.attributes:
@ -42,7 +51,7 @@ class Item(BaseModel):
total = 0
for target in solver_run.objective_function.tif_targets:
total += self.iif(solver_run, target.theta)
total += self.iif(solver_run.irt_model, target.theta)
return total
@ -50,6 +59,18 @@ class Item(BaseModel):
total = 0
for target in solver_run.objective_function.tif_targets:
total += self.irf(solver_run, target.theta)
total += self.irf(solver_run.irt_model, target.theta)
return total
def enemy_pairs(self, sort: bool = True) -> List[List[int]]:
pairs = []
for enemy_id in self.enemies:
pair = [self.id, enemy_id]
if sort: pair.sort()
pairs.append(pair)
return pairs

View File

@ -1,24 +1,37 @@
from __future__ import annotations
from pydantic import BaseModel
from typing import Dict, List, AnyStr
from pulp import lpSum
from models.target import Target
from models.targets.tif_target import TifTarget
from models.targets.tcc_target import TccTarget
from models.problem import Problem
class ObjectiveFunction(BaseModel):
# minimizing tif/tcc target value is only option currently
# as we add more we can build this out to be more dynamic
# likely with models representing each objective function type
tif_targets: List[Target]
tcc_targets: List[Target]
tif_targets: List[TifTarget]
tcc_targets: List[TccTarget]
target_variance_percentage: int = 10
objective: AnyStr = "minimize"
weight: Dict = {'tif': 1, 'tcc': 1}
def for_problem(self, problem_handler: Problem) -> None:
problem_handler.problem += lpSum([
bundle.count * problem_handler.solver_bundles_var[bundle.id]
for bundle in problem_handler.bundles
] + [
problem_handler.solver_items_var[item.id]
for item in problem_handler.items
])
def increment_targets_drift(self,
limit: float or bool,
all: bool = False,
amount: float = 0.1,
targets: list[Target] = []) -> bool:
targets: list[TifTarget|TccTarget] = []) -> bool:
if all:
for target in self.tif_targets:
target.drift = round(target.drift + amount, 2)
@ -44,5 +57,5 @@ class ObjectiveFunction(BaseModel):
return minimum_drift
def all_targets(self) -> list[Target]:
def all_targets(self) -> list[TifTarget|TccTarget]:
return self.tif_targets + self.tcc_targets

127
app/models/problem.py Normal file
View File

@ -0,0 +1,127 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from pydantic import BaseModel
from typing import Any, List
from pulp import LpProblem, LpVariable, LpStatus, lpSum
import logging
from helpers.problem_helper import *
from helpers import service_helper
from models.solution import Solution
from models.item import Item
from models.bundle import Bundle
from lib.errors.item_generation_error import ItemGenerationError
if TYPE_CHECKING:
from models.solver_run import SolverRun
class Problem(BaseModel):
items: List[Item]
bundles: List[Bundle]
problem: Any
solver_items_var: Any = None
solver_bundles_var: Any = None
def __init__(self, **data) -> None:
super().__init__(**data)
# setup common Solver variables
self.solver_items_var = LpVariable.dicts("Item",
[item.id for item in self.items],
lowBound=0,
upBound=1,
cat='Binary')
self.solver_bundles_var = LpVariable.dicts("Bundle",
[bundle.id for bundle in self.bundles],
lowBound=0,
upBound=1,
cat='Binary')
def solve(self, solver_run: SolverRun, enemy_ids: List[int] = []) -> LpProblem:
logging.info('solving problem...')
self.problem.solve()
# NOTICE: Legacy enemies implementation
# leaving this in, just in case the current impl fails to function
# and we need an immediate solution
# if we allow enemies, go through the normal solving process
# if solver_run.allow_enemies:
# logging.info('enemes allowed, so just solving')
# self.problem.solve()
# # otherwise begin the process of filtering enemies
# else:
# self.problem.solve()
# # however, if the solve was infeasible, kick it back
# # to the normal process
# if LpStatus[self.problem.status] == 'Infeasible':
# return self.problem
# # otherwise continue
# else:
# # get items from solution
# solved_items, _ = service_helper.solution_items(self.problem.variables(), solver_run)
# # sacred items will remain the same (with new items added each run) between solve attempts
# # but new enemies will be appended
# sacred_ids, new_enemy_ids = sanctify(solved_items)
# # the current solve run found new enemies
# if new_enemy_ids:
# logging.info('enemies found, adding constraints...')
# # append the new enemies to the enemies_id list
# enemy_ids = list(set(enemy_ids+new_enemy_ids))
# # remove old enemy/sacred constraints
# if 'Exclude_enemy_items' in self.problem.constraints.keys(): self.problem.constraints.pop('Exclude_enemy_items')
# if 'Include_sacred_items' in self.problem.constraints.keys(): self.problem.constraints.pop('Include_sacred_items')
# # add constraint to not allow enemy items
# self.problem += lpSum([
# len(bundle.find_items(enemy_ids)) * self.solver_bundles_var[bundle.id]
# for bundle in self.bundles
# ] + [
# (item.id in enemy_ids) * self.solver_items_var[item.id]
# for item in self.items
# ]) == 0, 'Exclude enemy items'
# # add constraint to use sacred items
# self.problem += lpSum([
# len(bundle.find_items(sacred_ids)) * self.solver_bundles_var[bundle.id]
# for bundle in self.bundles
# ] + [
# (item.id in sacred_ids) * self.solver_items_var[item.id]
# for item in self.items
# ]) == len(sacred_ids), 'Include sacred items'
# # recursively solve until no enemies exist or infeasible
# logging.info('recursively solving...')
# self.solve(solver_run)
return self.problem
def generate(self, solution: Solution, solver_run: SolverRun) -> None:
try:
# creating problem objective function
solver_run.objective_function.for_problem(self)
logging.info('Creating Constraints...')
# generic constraints
for constraint in solver_run.constraints:
constraint.build(self, solver_run=solver_run, solution=solution)
# irt target constraints
for target in solver_run.objective_function.all_targets():
target.constraints(self, solver_run)
logging.info('Constraints Created...')
except ValueError as error:
logging.error(error)
raise ItemGenerationError(
error.msg,
error.args[0])

View File

@ -2,8 +2,18 @@ from pydantic import BaseModel
from typing import List
from models.form import Form
from models.item import Item
class Solution(BaseModel):
response_id: int
forms: List[Form]
def items_exist_in_forms(self, items: [Item]) -> bool:
items_found = 0
for item in items:
for form in self.forms:
if form.has_item(item):
items_found += 1
return items_found

View File

@ -1,31 +1,60 @@
from __future__ import annotations
from typing import TYPE_CHECKING, List, Literal, Optional, Tuple, Union, TypeVar
from pulp import lpSum
from pydantic import BaseModel
from typing import List, Literal, Optional
import itertools
import logging
import random
from models.item import Item
from models.constraint import Constraint
from models.constraints.generic_constraint import GenericConstraint
from models.constraints.metadata_constraint import MetadataConstraint
from models.constraints.bundle_constraint import BundleConstraint
from models.constraints.form_uniqueness_constraint import FormUniquenessConstraint
from models.constraints.total_form_items_constraint import TotalFormItemsConstraint
from models.constraints.enemy_pair_constraint import EnemyPairConstraint
from models.irt_model import IRTModel
from models.bundle import Bundle
from models.objective_function import ObjectiveFunction
from models.advanced_options import AdvancedOptions
if TYPE_CHECKING:
from models.solution import Solution
from models.problem import Problem
ConstraintType = TypeVar('ConstraintType', bound=GenericConstraint)
class SolverRun(BaseModel):
items: List[Item] = []
bundles: List[Bundle] = []
bundle_first_ordering: bool = True
constraints: List[Constraint]
constraints: List[ConstraintType]
irt_model: IRTModel
objective_function: ObjectiveFunction
total_form_items: int
total_forms: int = 1
theta_cut_score: float = 0.00
drift_style: Literal['constant', 'variable'] = 'constant'
allow_enemies: bool = False
advanced_options: Optional[AdvancedOptions]
engine: str
def __init__(self, **data) -> None:
super().__init__(**data)
# this is all a compensator for dynamically creating objects
# ideally we'd change the payload to determine what type it is
constraints: [ConstraintType] = []
# repackage to create appropriate constraint types
for constraint in self.constraints:
if constraint.reference_attribute.type == 'metadata':
constraints.append(MetadataConstraint(reference_attribute=constraint.reference_attribute, minimum=constraint.minimum, maximum=constraint.maximum))
elif constraint.reference_attribute.type == 'bundle':
constraints.append(BundleConstraint(reference_attribute=constraint.reference_attribute, minimum=constraint.minimum, maximum=constraint.maximum))
self.constraints = constraints
def get_item(self, item_id: int) -> Item or None:
for item in self.items:
if item.id == item_id:
@ -36,7 +65,7 @@ class SolverRun(BaseModel):
if bundle.id == bundle_id:
return bundle
def get_constraint_by_type(self, type: str) -> Constraint or None:
def get_constraint_by_type(self, type: str) -> ConstraintType or None:
for constraint in self.constraints:
if type == constraint.reference_attribute.type:
return constraint
@ -45,6 +74,18 @@ class SolverRun(BaseModel):
self.items = [item for item in self.items if item not in items]
return True
def generate_constraints(self) -> None:
# total form items
self.constraints.append(TotalFormItemsConstraint.create(self.total_form_items))
# ensure form uniqueness
if self.advanced_options.ensure_form_uniqueness:
self.constraints.append(FormUniquenessConstraint.create(self.total_form_items - 1))
# enemies constraints
for pair in self.enemy_pairs():
self.constraints.append(EnemyPairConstraint.create(pair))
def generate_bundles(self):
logging.info('Generating Bundles...')
# confirms bundle constraints exists
@ -97,7 +138,7 @@ class SolverRun(BaseModel):
logging.info('Bundles Generated...')
def get_constraint(self, name: str) -> Constraint:
def get_constraint(self, name: str) -> ConstraintType:
return next((constraint for constraint in self.constraints
if constraint.reference_attribute.id == name), None)
@ -114,17 +155,13 @@ class SolverRun(BaseModel):
else:
return self.items
def select_items_by_percent(self, percent: int) -> List[Item]:
items = self.unbundled_items()
total_items = len(items)
selected_items_amount = round(total_items - (total_items *
(percent / 100)))
def enemy_pairs(self) -> List[List[int]]:
pairs = []
return random.sample(items, selected_items_amount)
for item in self.items:
# add enemy pairs for item to pairs
pairs += item.enemy_pairs()
def select_bundles_by_percent(self, percent: int) -> List[Bundle]:
total_bundles = len(self.bundles)
selected_bundles_amount = round(total_bundles - (total_bundles *
(percent / 100)))
return random.sample(self.bundles, selected_bundles_amount)
# remove duplicates
pairs.sort()
return list(k for k,_ in itertools.groupby(pairs))

View File

@ -1,6 +1,15 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from pydantic import BaseModel
from typing import Optional
from pulp import lpSum
if TYPE_CHECKING:
from models.solver_run import SolverRun
from models.problem import Problem
class Target(BaseModel):
theta: float
value: float

View File

@ -0,0 +1,31 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from models.targets.target import *
if TYPE_CHECKING:
from models.problem import Problem
class TccTarget(Target):
def constraints(self, problem_handler: Problem, solver_run: SolverRun):
problem_handler.problem += lpSum([
bundle.trf(solver_run.irt_model, self.theta)
* problem_handler.solver_bundles_var[bundle.id]
for bundle in problem_handler.bundles
] + [
item.irf(solver_run.irt_model, self.theta) *
problem_handler.solver_items_var[item.id]
for item in problem_handler.items
]) >= self.minimum(
), f'Min TCC theta({self.theta}) at target {self.value} with a drift % of {self.drift}'
problem_handler.problem += lpSum([
bundle.trf(solver_run.irt_model, self.theta)
* problem_handler.solver_bundles_var[bundle.id]
for bundle in problem_handler.bundles
] + [
item.irf(solver_run.irt_model, self.theta) *
problem_handler.solver_items_var[item.id]
for item in problem_handler.items
]) <= self.maximum(
), f'Max TCC theta({self.theta}) at target {self.value} with a drift % of {self.drift}'

View File

@ -0,0 +1,31 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from models.targets.target import *
if TYPE_CHECKING:
from models.problem import Problem
class TifTarget(Target):
def constraints(self, problem_handler: Problem, solver_run: SolverRun):
problem_handler.problem += lpSum([
bundle.tif(solver_run.irt_model, self.theta)
* problem_handler.solver_bundles_var[bundle.id]
for bundle in problem_handler.bundles
] + [
item.iif(solver_run.irt_model, self.theta) *
problem_handler.solver_items_var[item.id]
for item in problem_handler.items
]) >= self.minimum(
), f'Min TIF theta({self.theta}) at target {self.value} with a drift % of {self.drift}'
problem_handler.problem += lpSum([
bundle.tif(solver_run.irt_model, self.theta)
* problem_handler.solver_bundles_var[bundle.id]
for bundle in problem_handler.bundles
] + [
item.iif(solver_run.irt_model, self.theta) *
problem_handler.solver_items_var[item.id]
for item in problem_handler.items
]) <= self.maximum(
), f'Max TIF theta({self.theta}) at target {self.value} with a drift % of {self.drift}'

View File

@ -1,15 +1,16 @@
import json, random, io, logging
from pulp import LpProblem, LpVariable, LpMinimize, LpStatus, lpSum
from pulp import LpProblem, LpMinimize, LpStatus
from lib.application_configs import ApplicationConfigs
from helpers import aws_helper, tar_helper, csv_helper, service_helper, solver_helper
from helpers import aws_helper, tar_helper, csv_helper, service_helper
from lib.errors.item_generation_error import ItemGenerationError
from models.solver_run import SolverRun
from models.solution import Solution
from models.problem import Problem
from models.form import Form
from models.target import Target
from models.targets.target import Target
from services.base import Base
@ -20,6 +21,7 @@ class FormGenerationService(Base):
try:
self.solver_run = self.create_solver_run_from_attributes()
self.solver_run.generate_bundles()
self.solver_run.generate_constraints()
self.solution = self.generate_solution()
self.result = self.stream_to_s3_bucket()
except ItemGenerationError as error:
@ -71,24 +73,6 @@ class FormGenerationService(Base):
form_number = form_count + 1
current_drift = 0 # FF Tokyo Drift
# adding an element of randomness to the items and bundles used
# may need to change impl based on limit of items available
selected_items = self.solver_run.select_items_by_percent(30)
selected_bundles = self.solver_run.select_bundles_by_percent(
30)
# setup common Solver variables
items = LpVariable.dicts("Item",
[item.id for item in selected_items],
lowBound=0,
upBound=1,
cat='Binary')
bundles = LpVariable.dicts("Bundle",
[bundle.id for bundle in selected_bundles],
lowBound=0,
upBound=1,
cat='Binary')
logging.info(f'Generating Solution for Form {form_number} using the {self.solver_run.irt_model.model} IRT model')
while current_drift <= Target.max_drift():
@ -97,99 +81,9 @@ class FormGenerationService(Base):
drift_percent)
# create problem
problem = LpProblem('ata-form-generate', LpMinimize)
# objective function
problem += lpSum([
bundle.count * bundles[bundle.id]
for bundle in selected_bundles
] + [
items[item.id]
for item in selected_items
])
# Form Constraints
problem += lpSum(
[
bundle.count * bundles[bundle.id]
for bundle in selected_bundles
] + [
1 * items[item.id]
for item in selected_items
]
) == self.solver_run.total_form_items, f'Total bundle form items for form {form_number}'
# Dynamic constraints.. currently we only support Metadata and Bundles(Cases/Passages)
problem = solver_helper.build_constraints(
self.solver_run, problem, items, bundles, selected_items, selected_bundles)
# form uniqueness constraints
# for form in solution.forms:
# form_item_options = [
# bundles[bundle.id]
# for bundle in selected_bundles
# ] + [
# items[item.id]
# for item in selected_items
# ]
# problem += len(
# set(form.solver_variables)
# & set(form_item_options)) / float(
# len(
# set(form.solver_variables)
# | set(form_item_options))) * 100 >= 10
logging.info('Creating TIF and TCC Elastic constraints')
# Behold our very own Elastic constraints!
for tif_target in self.solver_run.objective_function.tif_targets:
problem += lpSum([
bundle.tif(self.solver_run.irt_model, tif_target.theta)
* bundles[bundle.id]
for bundle in selected_bundles
] + [
item.iif(self.solver_run, tif_target.theta) *
items[item.id]
for item in selected_items
]) >= tif_target.minimum(
), f'Min TIF theta({tif_target.theta}) at target {tif_target.value} drift at {current_drift}%'
problem += lpSum([
bundle.tif(self.solver_run.irt_model, tif_target.theta)
* bundles[bundle.id]
for bundle in selected_bundles
] + [
item.iif(self.solver_run, tif_target.theta) *
items[item.id]
for item in selected_items
]) <= tif_target.maximum(
), f'Max TIF theta({tif_target.theta}) at target {tif_target.value} drift at {current_drift}%'
for tcc_target in self.solver_run.objective_function.tcc_targets:
problem += lpSum([
bundle.trf(self.solver_run.irt_model, tcc_target.theta)
* bundles[bundle.id]
for bundle in selected_bundles
] + [
item.irf(self.solver_run, tcc_target.theta) *
items[item.id]
for item in selected_items
]) >= tcc_target.minimum(
), f'Min TCC theta({tcc_target.theta}) at target {tcc_target.value} drift at {current_drift}%'
problem += lpSum([
bundle.trf(self.solver_run.irt_model, tcc_target.theta)
* bundles[bundle.id]
for bundle in selected_bundles
] + [
item.irf(self.solver_run, tcc_target.theta) *
items[item.id]
for item in selected_items
]) <= tcc_target.maximum(
), f'Max TCC theta({tcc_target.theta}) at target {tcc_target.value} drift at {current_drift}%'
logging.info(
f'Solving for Form {form_number} with a drift of {current_drift}%'
)
problem.solve()
problem_handler = Problem(items = self.solver_run.unbundled_items(), bundles = self.solver_run.bundles, problem = LpProblem('ata-form-generate', LpMinimize))
problem_handler.generate(solution, self.solver_run)
problem = problem_handler.solve(self.solver_run)
if LpStatus[problem.status] == 'Infeasible':
logging.info(
@ -203,6 +97,7 @@ class FormGenerationService(Base):
logging.info(
f'No feasible solution found for Form {form_number}!'
)
logging.error(problem)
self.add_form_to_solution(problem, solution)