Merge branch 'develop' into feature/QUANT-1196-solve-all-the-things

This commit is contained in:
Josh Burman 2021-10-29 05:44:58 +00:00
commit 8ee3b0c7d2
17 changed files with 204 additions and 56 deletions

View File

@ -9,12 +9,13 @@ RUN cd Cbc-2.9.8 && \
./configure && \
make && \
make install
RUN python -m pip install fastapi[all]
RUN python -m pip install pydantic
RUN python -m pip install pySqsListener
# Bundle app source
COPY . /app
WORKDIR /app/app
# CMD tail -f /dev/null
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"]
# CMD [ "python", "main.py" ]
CMD tail -f /dev/null

View File

@ -9,13 +9,13 @@ RUN cd Cbc-2.9.8 && \
./configure && \
make && \
make install
RUN python -m pip install fastapi[all]
RUN python -m pip install pydantic
RUN python -m pip install pySqsListener
# Bundle app source
COPY . /app
WORKDIR /app/app
# CMD tail -f /dev/null
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"]
# CMD [ "python", "main.py" ]
CMD tail -f /dev/null

37
app/helpers/aws_helper.py Normal file
View File

@ -0,0 +1,37 @@
import boto3
import os
import json
session = boto3.Session(
aws_access_key_id=os.environ['SOLVER_AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['SOLVER_AWS_SECRET_ACCESS_KEY']
)
s3 = session.resource('s3', region_name=os.environ['SOLVER_AWS_REGION'])
sqs = session.client('sqs', region_name=os.environ['SOLVER_AWS_REGION'])
def get_key_from_message(message):
body = json.loads(message['Body'])
return body['Records'][0]['s3']['object']['key']
def get_object(key, bucket):
return s3.Object(
bucket_name=bucket,
key=key
).get()['Body'].read()
def file_stream_upload(buffer, name, bucket):
return s3.Bucket(bucket).upload_fileobj(buffer, name)
def receive_message(queue, message_num=1, wait_time=1):
return sqs.receive_message(
QueueUrl=queue,
MaxNumberOfMessages=message_num,
WaitTimeSeconds=wait_time
)
def delete_message(queue, receipt):
return sqs.delete_message(
QueueUrl=queue,
ReceiptHandle=receipt
)

View File

@ -0,0 +1,5 @@
import csv
import io
def file_stream_reader(f):
return csv.reader(io.StringIO(f.read().decode('ascii')))

View File

@ -0,0 +1,51 @@
import csv
import io
import re
def items_csv_to_dict(items_csv_reader):
items = []
headers = []
# get headers and items
for key, row in enumerate(items_csv_reader):
if key == 0:
headers = row
else:
item = { 'attributes': [] }
for key, col in enumerate(headers):
if key == 0:
item[col] = row[key]
elif col == 'b_param':
item[col] = row[key]
elif key > 1:
item['attributes'].append({
'id': col,
'value': row[key],
'type': 'metadata'
})
items.append(item)
return items
def solution_to_file(buffer, total_form_items, forms):
wr = csv.writer(buffer, dialect='excel', delimiter=',')
# write header row for first row utilizing the total items all forms will have
# and the cut score as the last item
header = [x + 1 for x in range(total_form_items)] + ['cut score']
wr.writerow(header)
# add each form as row to processed csv
for form in forms:
# provide generated items and cut score
row = form.items + [form.cut_score]
wr.writerow(row)
buff2 = io.BytesIO(buffer.getvalue().encode())
return buff2
def key_to_uuid(key):
return re.split("_", key)[0]

View File

@ -0,0 +1,9 @@
import io
import tarfile
def raw_to_tar(raw_object):
tarball = io.BytesIO(raw_object)
return tarfile.open(fileobj=tarball, mode='r:gz')
def extract_file_from_tar(tar, file_name):
return tar.extractfile(tar.getmember(file_name))

View File

@ -1,39 +1,27 @@
from fastapi import FastAPI, __version__
from pydantic import BaseModel
from typing import Set, List, Optional, Dict
from random import randint
import os
import sys
from models.solver_run import SolverRun
from models.solution import Solution
from models.form import Form
from services.loft_service import LoftService
from helpers import aws_helper
from sqs_listener import SqsListener
from sqs_listener.daemon import Daemon
app = FastAPI()
print("Starting Solver Service (v0.3.1)...")
@app.get("/")
async def root():
return {"message": "Welcome to Measures LOFT solver service. v0.1"}
# # listen to the solver queue
while True:
msg = aws_helper.receive_message(os.environ['SOLVER_SQS_INGEST_QUEUE'])
@app.get("/healthcheck")
async def health():
content = {
"maintainer": "Meazure Horizon Team",
"git_repo": "https://github.com/yardstick/measure-solver",
"server": "OK",
"fastapi version": __version__,
"app version": "0.1.1"
}
return content
# get the item from the queue
for message in msg.get("Messages", []):
# for now the solver service only supports Loft types
# this is here to allow us to create an extensible way to
# gather/manage/process data based on the particular needs
service = LoftService(message)
service.process()
@app.get('/readycheck')
async def ready():
return 'OK' # just means we're on air
# delete message once process is complete
response = aws_helper.delete_message(os.environ['SOLVER_SQS_INGEST_QUEUE'], message['ReceiptHandle'])
@app.post('/solve/')
async def solve(solver_run: SolverRun):
response = Solution(
response_id=randint(100,5000),
forms=[Form(
items=[item.id for item in solver_run.items]
)]
)
return response
# probably convert to logging, but maybe keep this to some extent
print("MESSAGE PROCESSED: ", message['MessageId'])

View File

@ -2,6 +2,6 @@ from pydantic import BaseModel
from typing import Optional
class Attribute(BaseModel):
value: str
value: Optional[str]
type: Optional[str]
id: str

View File

@ -5,3 +5,4 @@ from models.item import Item
class Form(BaseModel):
items: List[int]
cut_score: float

View File

@ -6,3 +6,4 @@ from models.attribute import Attribute
class Item(BaseModel):
id: int
attributes: List[Attribute]
b_param: int

View File

@ -1,5 +1,5 @@
from pydantic import BaseModel
from typing import Dict, List
from typing import Dict, List, AnyStr
from models.target import Target
@ -9,5 +9,5 @@ class ObjectiveFunction(BaseModel):
# likely with models representing each objective function type
tif_targets: List[Target]
tcc_targets: List[Target]
objective: "minimize"
objective: AnyStr = "minimize"
weight: Dict = {'tif': 1, 'tcc': 1}

View File

@ -11,7 +11,8 @@ class SolverRun(BaseModel):
items: List[Item]
constraints: List[Constraint]
irt_model: IRTModel
objective_fuction: ObjectiveFunction
objective_function: ObjectiveFunction
total_form_items: int
theta_cut_score: float = 0.00
advanced_options: Optional[AdvancedOptions]
engine: str

4
app/services/base.py Normal file
View File

@ -0,0 +1,4 @@
class Base:
def __init__(self, source, ingest_type='message'):
self.ingest_type = ingest_type
self.source = source

View File

@ -0,0 +1,62 @@
import os
import json
import random
import io
from helpers import aws_helper, tar_helper, csv_helper, service_helper
from models.solver_run import SolverRun
from models.solution import Solution
from models.form import Form
from services.base import Base
class LoftService(Base):
def process(self):
self.solver_run = SolverRun.parse_obj(self.retreive_attributes_from_message())
self.solution = self.generate_solution()
self.result = self.stream_to_s3_bucket()
def retreive_attributes_from_message(self):
# get s3 object
self.key = aws_helper.get_key_from_message(self.source)
s3_object = aws_helper.get_object(self.key, os.environ['SOLVER_INGEST_BUCKET'])
# convert to tar
self.tar = tar_helper.raw_to_tar(s3_object)
# get attributes file and convert to dict
attributes = json.loads(tar_helper.extract_file_from_tar(self.tar , 'solver_run_attributes.json').read())
# get items file and convert to dict
items_csv = tar_helper.extract_file_from_tar(self.tar , 'items.csv')
items_csv_reader = csv_helper.file_stream_reader(items_csv)
# add items to attributes dict
attributes['items'] = service_helper.items_csv_to_dict(items_csv_reader)
return attributes
def generate_solution(self):
# temporary data for mocks
form_count = 10
# items will be generated from real solver process, this is for mock purposes
# real solver will return N forms and process a cut score, this is for mock purposes
return Solution(
response_id=random.randint(100,5000),
forms=[
Form(
items=[item.id for item in random.sample(self.solver_run.items, self.solver_run.total_form_items)],
cut_score=120
) for x in range(form_count)
]
)
def stream_to_s3_bucket(self):
# setup writer buffer and write processed forms to file
buffer = io.StringIO()
solution_file = service_helper.solution_to_file(buffer, self.solver_run.total_form_items, self.solution.forms)
# upload generated file to s3 and return result
return aws_helper.file_stream_upload(solution_file, f'{service_helper.key_to_uuid(self.key)}.csv', os.environ['MEASURE_PROCESSED_BUCKET'])

View File

@ -1,2 +0,0 @@
override_file_placeholder:
image: busybox

View File

@ -1,9 +0,0 @@
measure-solver:
build: /apps/measure-solver/.docker-compose
environment:
VIRTUAL_HOST: yssolver.localhost
MEASURE_BASE_URL: http://admin.localhost
dns:
- 172.17.0.1
volumes:
- /apps/measure-solver:/app

View File

@ -1 +0,0 @@
QUANT-987