the big format
This commit is contained in:
@ -1,11 +1,12 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
|
||||
class AdvancedOptions(BaseModel):
|
||||
linearity_check: Optional[bool]
|
||||
show_progress: Optional[bool]
|
||||
max_solution_time: Optional[int]
|
||||
brand_bound_tolerance: Optional[float]
|
||||
max_forms: Optional[int]
|
||||
precision: Optional[float]
|
||||
extra_param_range: Optional[List[Dict]]
|
||||
linearity_check: Optional[bool]
|
||||
show_progress: Optional[bool]
|
||||
max_solution_time: Optional[int]
|
||||
brand_bound_tolerance: Optional[float]
|
||||
max_forms: Optional[int]
|
||||
precision: Optional[float]
|
||||
extra_param_range: Optional[List[Dict]]
|
||||
|
@ -1,7 +1,8 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class Attribute(BaseModel):
|
||||
value: Optional[str]
|
||||
type: Optional[str]
|
||||
id: str
|
||||
value: Optional[str]
|
||||
type: Optional[str]
|
||||
id: str
|
||||
|
@ -1,6 +1,23 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import List
|
||||
|
||||
from lib.irt.test_information_function import TestInformationFunction
|
||||
from lib.irt.test_response_function import TestResponseFunction
|
||||
|
||||
from models.item import Item
|
||||
from models.irt_model import IRTModel
|
||||
|
||||
|
||||
class Bundle(BaseModel):
|
||||
id: int
|
||||
count: int
|
||||
type: str
|
||||
id: int
|
||||
count: int
|
||||
items: List[Item]
|
||||
type: str
|
||||
|
||||
def tif(self, irt_model: IRTModel, theta: float) -> float:
|
||||
return TestInformationFunction(irt_model).calculate(self.items,
|
||||
theta=theta)
|
||||
|
||||
def trf(self, irt_model: IRTModel, theta: float) -> float:
|
||||
return TestResponseFunction(irt_model).calculate(self.items,
|
||||
theta=theta)
|
||||
|
@ -2,7 +2,8 @@ from pydantic import BaseModel
|
||||
|
||||
from models.attribute import Attribute
|
||||
|
||||
|
||||
class Constraint(BaseModel):
|
||||
reference_attribute: Attribute
|
||||
minimum: float
|
||||
maximum: float
|
||||
reference_attribute: Attribute
|
||||
minimum: float
|
||||
maximum: float
|
||||
|
@ -8,19 +8,20 @@ from models.target import Target
|
||||
|
||||
from lib.irt.test_response_function import TestResponseFunction
|
||||
|
||||
class Form(BaseModel):
|
||||
items: List[Item]
|
||||
cut_score: float
|
||||
tif_results: List[Target]
|
||||
tcc_results: List[Target]
|
||||
status: str = 'Not Optimized'
|
||||
|
||||
@classmethod
|
||||
def create(cls, items, solver_run, status):
|
||||
return cls(
|
||||
items=items,
|
||||
cut_score=TestResponseFunction(solver_run.irt_model).calculate(items, theta=solver_run.theta_cut_score),
|
||||
tif_results=irt_helper.generate_tif_results(items, solver_run),
|
||||
tcc_results=irt_helper.generate_tcc_results(items, solver_run),
|
||||
status=status
|
||||
)
|
||||
class Form(BaseModel):
|
||||
items: List[Item]
|
||||
cut_score: float
|
||||
tif_results: List[Target]
|
||||
tcc_results: List[Target]
|
||||
status: str = 'Not Optimized'
|
||||
|
||||
@classmethod
|
||||
def create(cls, items, solver_run, status):
|
||||
return cls(
|
||||
items=items,
|
||||
cut_score=TestResponseFunction(solver_run.irt_model).calculate(
|
||||
items, theta=solver_run.theta_cut_score),
|
||||
tif_results=irt_helper.generate_tif_results(items, solver_run),
|
||||
tcc_results=irt_helper.generate_tcc_results(items, solver_run),
|
||||
status=status)
|
||||
|
@ -1,12 +1,13 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import Dict
|
||||
|
||||
|
||||
class IRTModel(BaseModel):
|
||||
a_param: float
|
||||
b_param: Dict = {"schema_bson_id": str, "field_bson_id": str}
|
||||
c_param: float
|
||||
model: str
|
||||
a_param: float
|
||||
b_param: Dict = {"schema_bson_id": str, "field_bson_id": str}
|
||||
c_param: float
|
||||
model: str
|
||||
|
||||
|
||||
def formatted_b_param(self):
|
||||
return self.b_param['schema_bson_id'] + '-' + self.b_param['field_bson_id']
|
||||
def formatted_b_param(self):
|
||||
return self.b_param['schema_bson_id'] + '-' + self.b_param[
|
||||
'field_bson_id']
|
||||
|
@ -6,27 +6,32 @@ from models.attribute import Attribute
|
||||
from lib.irt.item_response_function import ItemResponseFunction
|
||||
from lib.irt.item_information_function import ItemInformationFunction
|
||||
|
||||
|
||||
class Item(BaseModel):
|
||||
id: int
|
||||
passage_id: Optional[int]
|
||||
workflow_state: Optional[str]
|
||||
attributes: List[Attribute]
|
||||
b_param: float = 0.00
|
||||
id: int
|
||||
passage_id: Optional[int]
|
||||
workflow_state: Optional[str]
|
||||
attributes: List[Attribute]
|
||||
b_param: float = 0.00
|
||||
|
||||
def iif(self, solver_run, theta):
|
||||
return ItemInformationFunction(solver_run.irt_model).calculate(b_param=self.b_param,theta=theta)
|
||||
def iif(self, solver_run, theta):
|
||||
return ItemInformationFunction(solver_run.irt_model).calculate(
|
||||
b_param=self.b_param, theta=theta)
|
||||
|
||||
def irf(self, solver_run, theta):
|
||||
return ItemResponseFunction(solver_run.irt_model).calculate(b_param=self.b_param,theta=theta)
|
||||
def irf(self, solver_run, theta):
|
||||
return ItemResponseFunction(solver_run.irt_model).calculate(
|
||||
b_param=self.b_param, theta=theta)
|
||||
|
||||
def get_attribute(self, ref_attribute):
|
||||
for attribute in self.attributes:
|
||||
if attribute.id == ref_attribute.id and attribute.value.lower() == ref_attribute.value.lower():
|
||||
return attribute.value
|
||||
return False
|
||||
def get_attribute(self, ref_attribute):
|
||||
for attribute in self.attributes:
|
||||
if attribute.id == ref_attribute.id and attribute.value.lower(
|
||||
) == ref_attribute.value.lower():
|
||||
return attribute.value
|
||||
return False
|
||||
|
||||
def attribute_exists(self, ref_attribute):
|
||||
for attribute in self.attributes:
|
||||
if attribute.id == ref_attribute.id and attribute.value.lower() == ref_attribute.value.lower():
|
||||
return True
|
||||
return False
|
||||
def attribute_exists(self, ref_attribute):
|
||||
for attribute in self.attributes:
|
||||
if attribute.id == ref_attribute.id and attribute.value.lower(
|
||||
) == ref_attribute.value.lower():
|
||||
return True
|
||||
return False
|
||||
|
@ -3,11 +3,12 @@ from typing import Dict, List, AnyStr
|
||||
|
||||
from models.target import Target
|
||||
|
||||
|
||||
class ObjectiveFunction(BaseModel):
|
||||
# minimizing tif/tcc target value is only option currently
|
||||
# as we add more we can build this out to be more dynamic
|
||||
# likely with models representing each objective function type
|
||||
tif_targets: List[Target]
|
||||
tcc_targets: List[Target]
|
||||
objective: AnyStr = "minimize"
|
||||
weight: Dict = {'tif': 1, 'tcc': 1}
|
||||
# minimizing tif/tcc target value is only option currently
|
||||
# as we add more we can build this out to be more dynamic
|
||||
# likely with models representing each objective function type
|
||||
tif_targets: List[Target]
|
||||
tcc_targets: List[Target]
|
||||
objective: AnyStr = "minimize"
|
||||
weight: Dict = {'tif': 1, 'tcc': 1}
|
||||
|
@ -3,6 +3,7 @@ from typing import List
|
||||
|
||||
from models.form import Form
|
||||
|
||||
|
||||
class Solution(BaseModel):
|
||||
response_id: int
|
||||
forms: List[Form]
|
||||
response_id: int
|
||||
forms: List[Form]
|
||||
|
@ -10,64 +10,87 @@ from models.bundle import Bundle
|
||||
from models.objective_function import ObjectiveFunction
|
||||
from models.advanced_options import AdvancedOptions
|
||||
|
||||
|
||||
class SolverRun(BaseModel):
|
||||
items: List[Item] = []
|
||||
bundles: Optional[Bundle]
|
||||
constraints: List[Constraint]
|
||||
irt_model: IRTModel
|
||||
objective_function: ObjectiveFunction
|
||||
total_form_items: int
|
||||
total_forms: int = 1
|
||||
theta_cut_score: float = 0.00
|
||||
advanced_options: Optional[AdvancedOptions]
|
||||
engine: str
|
||||
items: List[Item] = []
|
||||
bundles: list[Bundle] = []
|
||||
constraints: List[Constraint]
|
||||
irt_model: IRTModel
|
||||
objective_function: ObjectiveFunction
|
||||
total_form_items: int
|
||||
total_forms: int = 1
|
||||
theta_cut_score: float = 0.00
|
||||
advanced_options: Optional[AdvancedOptions]
|
||||
engine: str
|
||||
|
||||
def get_item(self, item_id):
|
||||
for item in self.items:
|
||||
if str(item.id) == item_id:
|
||||
return item
|
||||
return False
|
||||
def get_item(self, item_id: int) -> Item or None:
|
||||
for item in self.items:
|
||||
if str(item.id) == item_id:
|
||||
return item
|
||||
|
||||
def remove_items(self, items):
|
||||
self.items = [item for item in self.items if item not in items]
|
||||
return True
|
||||
def get_bundle(self, bundle_id: int) -> Bundle or None:
|
||||
for bundle in self.bundles:
|
||||
if str(bundle.id) == bundle_id:
|
||||
return bundle
|
||||
|
||||
def generate_bundles(self):
|
||||
logging.info('Generating Bundles...')
|
||||
bundle_constraints = (constraint.reference_attribute for constraint in self.constraints if constraint.reference_attribute.type == 'bundle')
|
||||
def get_constraint_by_type(self, type: str) -> Constraint or None:
|
||||
for constraint in self.constraints:
|
||||
if type == constraint.reference_attribute.type:
|
||||
return constraint
|
||||
|
||||
for bundle_constraint in bundle_constraints:
|
||||
type_attribute = bundle_constraint.id
|
||||
def remove_items(self, items: list[Item]) -> bool:
|
||||
self.items = [item for item in self.items if item not in items]
|
||||
return True
|
||||
|
||||
for item in self.items:
|
||||
attribute_id = getattr(item, type_attribute, None)
|
||||
def generate_bundles(self):
|
||||
logging.info('Generating Bundles...')
|
||||
# confirms bundle constraints exists
|
||||
bundle_constraints = (
|
||||
constraint.reference_attribute for constraint in self.constraints
|
||||
if constraint.reference_attribute.type == 'bundle')
|
||||
|
||||
# make sure the item has said attribute
|
||||
if attribute_id != None:
|
||||
# if there are pre-existing bundles, add new or increment existing
|
||||
# else create array with new bundle
|
||||
if self.bundles != None:
|
||||
# get index of the bundle in the bundles list if exists or None if it doesn't
|
||||
bundle_index = next((index for (index, bundle) in enumerate(self.bundles) if bundle.id == attribute_id and bundle.type == type_attribute), None)
|
||||
for bundle_constraint in bundle_constraints:
|
||||
type_attribute = bundle_constraint.id
|
||||
|
||||
# if the index doesn't exist add the new bundle of whatever type
|
||||
# else increment the count of the current bundle
|
||||
if bundle_index == None:
|
||||
self.bundles.append(Bundle(
|
||||
id=attribute_id,
|
||||
count=1,
|
||||
type=type_attribute
|
||||
))
|
||||
else:
|
||||
self.bundles[bundle_index].count += 1
|
||||
else:
|
||||
self.bundles = [Bundle(
|
||||
id=attribute_id,
|
||||
count=1,
|
||||
type=type_attribute
|
||||
)]
|
||||
for item in self.items:
|
||||
attribute_id = getattr(item, type_attribute, None)
|
||||
|
||||
logging.info('Bundles Generated...')
|
||||
# make sure the item has said attribute
|
||||
if attribute_id != None:
|
||||
# if there are pre-existing bundles, add new or increment existing
|
||||
# else create array with new bundle
|
||||
if self.bundles != None:
|
||||
# get index of the bundle in the bundles list if exists or None if it doesn't
|
||||
bundle_index = next(
|
||||
(index
|
||||
for (index, bundle) in enumerate(self.bundles)
|
||||
if bundle.id == attribute_id
|
||||
and bundle.type == type_attribute), None)
|
||||
|
||||
def get_constraint(self, name):
|
||||
return next((constraint for constraint in self.constraints if constraint.reference_attribute.id == name), None)
|
||||
# if the index doesn't exist add the new bundle of whatever type
|
||||
# else increment the count of the current bundle
|
||||
if bundle_index == None:
|
||||
self.bundles.append(
|
||||
Bundle(id=attribute_id,
|
||||
count=1,
|
||||
items=[item],
|
||||
type=type_attribute))
|
||||
else:
|
||||
self.bundles[bundle_index].count += 1
|
||||
self.bundles[bundle_index].items.append(item)
|
||||
else:
|
||||
self.bundles = [
|
||||
Bundle(id=attribute_id,
|
||||
count=1,
|
||||
items=[item],
|
||||
type=type_attribute)
|
||||
]
|
||||
|
||||
logging.info('Bundles Generated...')
|
||||
|
||||
def get_constraint(self, name: str) -> Constraint:
|
||||
return next((constraint for constraint in self.constraints
|
||||
if constraint.reference_attribute.id == name), None)
|
||||
|
||||
def unbundled_items(self) -> list:
|
||||
return [item for item in self.items if item.passage_id == None]
|
||||
|
@ -1,7 +1,8 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class Target(BaseModel):
|
||||
theta: float
|
||||
value: float
|
||||
result: Optional[float]
|
||||
theta: float
|
||||
value: float
|
||||
result: Optional[float]
|
||||
|
Reference in New Issue
Block a user