diff --git a/.docker-compose/Dockerfile b/.docker-compose/Dockerfile index 81dcdbc..1c70967 100644 --- a/.docker-compose/Dockerfile +++ b/.docker-compose/Dockerfile @@ -9,12 +9,13 @@ RUN cd Cbc-2.9.8 && \ ./configure && \ make && \ make install -RUN python -m pip install fastapi[all] +RUN python -m pip install pydantic +RUN python -m pip install pySqsListener # Bundle app source COPY . /app WORKDIR /app/app -# CMD tail -f /dev/null -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"] +# CMD [ "python", "main.py" ] +CMD tail -f /dev/null diff --git a/Dockerfile b/Dockerfile index b681afb..1c70967 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,13 +9,13 @@ RUN cd Cbc-2.9.8 && \ ./configure && \ make && \ make install -RUN python -m pip install fastapi[all] - +RUN python -m pip install pydantic +RUN python -m pip install pySqsListener # Bundle app source COPY . /app WORKDIR /app/app -# CMD tail -f /dev/null -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"] +# CMD [ "python", "main.py" ] +CMD tail -f /dev/null diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py new file mode 100644 index 0000000..ecffc37 --- /dev/null +++ b/app/helpers/aws_helper.py @@ -0,0 +1,37 @@ +import boto3 +import os +import json + +session = boto3.Session( + aws_access_key_id=os.environ['SOLVER_AWS_ACCESS_KEY_ID'], + aws_secret_access_key=os.environ['SOLVER_AWS_SECRET_ACCESS_KEY'] +) + +s3 = session.resource('s3', region_name=os.environ['SOLVER_AWS_REGION']) +sqs = session.client('sqs', region_name=os.environ['SOLVER_AWS_REGION']) + +def get_key_from_message(message): + body = json.loads(message['Body']) + return body['Records'][0]['s3']['object']['key'] + +def get_object(key, bucket): + return s3.Object( + bucket_name=bucket, + key=key + ).get()['Body'].read() + +def file_stream_upload(buffer, name, bucket): + return s3.Bucket(bucket).upload_fileobj(buffer, name) + +def receive_message(queue, message_num=1, wait_time=1): + return sqs.receive_message( + QueueUrl=queue, + MaxNumberOfMessages=message_num, + WaitTimeSeconds=wait_time + ) + +def delete_message(queue, receipt): + return sqs.delete_message( + QueueUrl=queue, + ReceiptHandle=receipt + ) diff --git a/app/helpers/csv_helper.py b/app/helpers/csv_helper.py new file mode 100644 index 0000000..1f6699c --- /dev/null +++ b/app/helpers/csv_helper.py @@ -0,0 +1,5 @@ +import csv +import io + +def file_stream_reader(f): + return csv.reader(io.StringIO(f.read().decode('ascii'))) diff --git a/app/helpers/service_helper.py b/app/helpers/service_helper.py new file mode 100644 index 0000000..07b6900 --- /dev/null +++ b/app/helpers/service_helper.py @@ -0,0 +1,51 @@ +import csv +import io +import re + +def items_csv_to_dict(items_csv_reader): + items = [] + headers = [] + + # get headers and items + for key, row in enumerate(items_csv_reader): + if key == 0: + headers = row + else: + item = { 'attributes': [] } + + for key, col in enumerate(headers): + if key == 0: + item[col] = row[key] + elif col == 'b_param': + item[col] = row[key] + elif key > 1: + item['attributes'].append({ + 'id': col, + 'value': row[key], + 'type': 'metadata' + }) + + items.append(item) + + return items + +def solution_to_file(buffer, total_form_items, forms): + wr = csv.writer(buffer, dialect='excel', delimiter=',') + + # write header row for first row utilizing the total items all forms will have + # and the cut score as the last item + header = [x + 1 for x in range(total_form_items)] + ['cut score'] + wr.writerow(header) + + # add each form as row to processed csv + for form in forms: + # provide generated items and cut score + row = form.items + [form.cut_score] + wr.writerow(row) + + buff2 = io.BytesIO(buffer.getvalue().encode()) + + return buff2 + +def key_to_uuid(key): + return re.split("_", key)[0] diff --git a/app/helpers/tar_helper.py b/app/helpers/tar_helper.py new file mode 100644 index 0000000..1bfc0f5 --- /dev/null +++ b/app/helpers/tar_helper.py @@ -0,0 +1,9 @@ +import io +import tarfile + +def raw_to_tar(raw_object): + tarball = io.BytesIO(raw_object) + return tarfile.open(fileobj=tarball, mode='r:gz') + +def extract_file_from_tar(tar, file_name): + return tar.extractfile(tar.getmember(file_name)) diff --git a/app/main.py b/app/main.py index 8ec950c..1308bda 100644 --- a/app/main.py +++ b/app/main.py @@ -1,39 +1,27 @@ -from fastapi import FastAPI, __version__ -from pydantic import BaseModel -from typing import Set, List, Optional, Dict -from random import randint +import os +import sys -from models.solver_run import SolverRun -from models.solution import Solution -from models.form import Form +from services.loft_service import LoftService +from helpers import aws_helper +from sqs_listener import SqsListener +from sqs_listener.daemon import Daemon -app = FastAPI() +print("Starting Solver Service (v0.3.1)...") -@app.get("/") -async def root(): - return {"message": "Welcome to Measures LOFT solver service. v0.1"} +# # listen to the solver queue +while True: + msg = aws_helper.receive_message(os.environ['SOLVER_SQS_INGEST_QUEUE']) -@app.get("/healthcheck") -async def health(): - content = { - "maintainer": "Meazure Horizon Team", - "git_repo": "https://github.com/yardstick/measure-solver", - "server": "OK", - "fastapi version": __version__, - "app version": "0.1.1" - } - return content + # get the item from the queue + for message in msg.get("Messages", []): + # for now the solver service only supports Loft types + # this is here to allow us to create an extensible way to + # gather/manage/process data based on the particular needs + service = LoftService(message) + service.process() -@app.get('/readycheck') -async def ready(): - return 'OK' # just means we're on air + # delete message once process is complete + response = aws_helper.delete_message(os.environ['SOLVER_SQS_INGEST_QUEUE'], message['ReceiptHandle']) -@app.post('/solve/') -async def solve(solver_run: SolverRun): - response = Solution( - response_id=randint(100,5000), - forms=[Form( - items=[item.id for item in solver_run.items] - )] - ) - return response + # probably convert to logging, but maybe keep this to some extent + print("MESSAGE PROCESSED: ", message['MessageId']) diff --git a/app/models/attribute.py b/app/models/attribute.py index 0219ea6..1bdc837 100644 --- a/app/models/attribute.py +++ b/app/models/attribute.py @@ -2,6 +2,6 @@ from pydantic import BaseModel from typing import Optional class Attribute(BaseModel): - value: str + value: Optional[str] type: Optional[str] id: str diff --git a/app/models/form.py b/app/models/form.py index 5ff5414..ea7844f 100644 --- a/app/models/form.py +++ b/app/models/form.py @@ -5,3 +5,4 @@ from models.item import Item class Form(BaseModel): items: List[int] + cut_score: float diff --git a/app/models/item.py b/app/models/item.py index 975593e..8ae3b70 100644 --- a/app/models/item.py +++ b/app/models/item.py @@ -6,3 +6,4 @@ from models.attribute import Attribute class Item(BaseModel): id: int attributes: List[Attribute] + b_param: int diff --git a/app/models/objective_function.py b/app/models/objective_function.py index 0c7835d..62f96cb 100644 --- a/app/models/objective_function.py +++ b/app/models/objective_function.py @@ -1,5 +1,5 @@ from pydantic import BaseModel -from typing import Dict, List +from typing import Dict, List, AnyStr from models.target import Target @@ -9,5 +9,5 @@ class ObjectiveFunction(BaseModel): # likely with models representing each objective function type tif_targets: List[Target] tcc_targets: List[Target] - objective: "minimize" + objective: AnyStr = "minimize" weight: Dict = {'tif': 1, 'tcc': 1} diff --git a/app/models/solver_run.py b/app/models/solver_run.py index 5dea7f0..1c0727b 100644 --- a/app/models/solver_run.py +++ b/app/models/solver_run.py @@ -11,7 +11,8 @@ class SolverRun(BaseModel): items: List[Item] constraints: List[Constraint] irt_model: IRTModel - objective_fuction: ObjectiveFunction + objective_function: ObjectiveFunction total_form_items: int + theta_cut_score: float = 0.00 advanced_options: Optional[AdvancedOptions] engine: str diff --git a/app/services/base.py b/app/services/base.py new file mode 100644 index 0000000..77f72f7 --- /dev/null +++ b/app/services/base.py @@ -0,0 +1,4 @@ +class Base: + def __init__(self, source, ingest_type='message'): + self.ingest_type = ingest_type + self.source = source diff --git a/app/services/loft_service.py b/app/services/loft_service.py new file mode 100644 index 0000000..d571a61 --- /dev/null +++ b/app/services/loft_service.py @@ -0,0 +1,62 @@ +import os +import json +import random +import io + +from helpers import aws_helper, tar_helper, csv_helper, service_helper + +from models.solver_run import SolverRun +from models.solution import Solution +from models.form import Form + +from services.base import Base + +class LoftService(Base): + def process(self): + self.solver_run = SolverRun.parse_obj(self.retreive_attributes_from_message()) + self.solution = self.generate_solution() + self.result = self.stream_to_s3_bucket() + + def retreive_attributes_from_message(self): + # get s3 object + self.key = aws_helper.get_key_from_message(self.source) + s3_object = aws_helper.get_object(self.key, os.environ['SOLVER_INGEST_BUCKET']) + + # convert to tar + self.tar = tar_helper.raw_to_tar(s3_object) + + # get attributes file and convert to dict + attributes = json.loads(tar_helper.extract_file_from_tar(self.tar , 'solver_run_attributes.json').read()) + + # get items file and convert to dict + items_csv = tar_helper.extract_file_from_tar(self.tar , 'items.csv') + items_csv_reader = csv_helper.file_stream_reader(items_csv) + + # add items to attributes dict + attributes['items'] = service_helper.items_csv_to_dict(items_csv_reader) + + return attributes + + def generate_solution(self): + # temporary data for mocks + form_count = 10 + + # items will be generated from real solver process, this is for mock purposes + # real solver will return N forms and process a cut score, this is for mock purposes + return Solution( + response_id=random.randint(100,5000), + forms=[ + Form( + items=[item.id for item in random.sample(self.solver_run.items, self.solver_run.total_form_items)], + cut_score=120 + ) for x in range(form_count) + ] + ) + + def stream_to_s3_bucket(self): + # setup writer buffer and write processed forms to file + buffer = io.StringIO() + solution_file = service_helper.solution_to_file(buffer, self.solver_run.total_form_items, self.solution.forms) + + # upload generated file to s3 and return result + return aws_helper.file_stream_upload(solution_file, f'{service_helper.key_to_uuid(self.key)}.csv', os.environ['MEASURE_PROCESSED_BUCKET']) diff --git a/docker-compose.override.yml b/docker-compose.override.yml deleted file mode 100644 index b25559e..0000000 --- a/docker-compose.override.yml +++ /dev/null @@ -1,2 +0,0 @@ -override_file_placeholder: - image: busybox diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 9f5ea03..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,9 +0,0 @@ -measure-solver: - build: /apps/measure-solver/.docker-compose - environment: - VIRTUAL_HOST: yssolver.localhost - MEASURE_BASE_URL: http://admin.localhost - dns: - - 172.17.0.1 - volumes: - - /apps/measure-solver:/app diff --git a/docker_tag.txt b/docker_tag.txt deleted file mode 100644 index 8a6ce5f..0000000 --- a/docker_tag.txt +++ /dev/null @@ -1 +0,0 @@ -QUANT-987