Merge pull request #8 from yardstick/feature/QUANT-1169-se-sqs-listener
QUANT-1169: SQS listener and S3 download upload
This commit is contained in:
commit
e2099509ae
@ -9,12 +9,13 @@ RUN cd Cbc-2.9.8 && \
|
||||
./configure && \
|
||||
make && \
|
||||
make install
|
||||
RUN python -m pip install fastapi[all]
|
||||
RUN python -m pip install pydantic
|
||||
RUN python -m pip install pySqsListener
|
||||
|
||||
# Bundle app source
|
||||
COPY . /app
|
||||
|
||||
WORKDIR /app/app
|
||||
|
||||
# CMD tail -f /dev/null
|
||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"]
|
||||
# CMD [ "python", "main.py" ]
|
||||
CMD tail -f /dev/null
|
||||
|
@ -9,13 +9,13 @@ RUN cd Cbc-2.9.8 && \
|
||||
./configure && \
|
||||
make && \
|
||||
make install
|
||||
RUN python -m pip install fastapi[all]
|
||||
|
||||
RUN python -m pip install pydantic
|
||||
RUN python -m pip install pySqsListener
|
||||
|
||||
# Bundle app source
|
||||
COPY . /app
|
||||
|
||||
WORKDIR /app/app
|
||||
|
||||
# CMD tail -f /dev/null
|
||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"]
|
||||
# CMD [ "python", "main.py" ]
|
||||
CMD tail -f /dev/null
|
||||
|
37
app/helpers/aws_helper.py
Normal file
37
app/helpers/aws_helper.py
Normal file
@ -0,0 +1,37 @@
|
||||
import boto3
|
||||
import os
|
||||
import json
|
||||
|
||||
session = boto3.Session(
|
||||
aws_access_key_id=os.environ['SOLVER_AWS_ACCESS_KEY_ID'],
|
||||
aws_secret_access_key=os.environ['SOLVER_AWS_SECRET_ACCESS_KEY']
|
||||
)
|
||||
|
||||
s3 = session.resource('s3', region_name=os.environ['SOLVER_AWS_REGION'])
|
||||
sqs = session.client('sqs', region_name=os.environ['SOLVER_AWS_REGION'])
|
||||
|
||||
def get_key_from_message(message):
|
||||
body = json.loads(message['Body'])
|
||||
return body['Records'][0]['s3']['object']['key']
|
||||
|
||||
def get_object(key, bucket):
|
||||
return s3.Object(
|
||||
bucket_name=bucket,
|
||||
key=key
|
||||
).get()['Body'].read()
|
||||
|
||||
def file_stream_upload(buffer, name, bucket):
|
||||
return s3.Bucket(bucket).upload_fileobj(buffer, name)
|
||||
|
||||
def receive_message(queue, message_num=1, wait_time=1):
|
||||
return sqs.receive_message(
|
||||
QueueUrl=queue,
|
||||
MaxNumberOfMessages=message_num,
|
||||
WaitTimeSeconds=wait_time
|
||||
)
|
||||
|
||||
def delete_message(queue, receipt):
|
||||
return sqs.delete_message(
|
||||
QueueUrl=queue,
|
||||
ReceiptHandle=receipt
|
||||
)
|
5
app/helpers/csv_helper.py
Normal file
5
app/helpers/csv_helper.py
Normal file
@ -0,0 +1,5 @@
|
||||
import csv
|
||||
import io
|
||||
|
||||
def file_stream_reader(f):
|
||||
return csv.reader(io.StringIO(f.read().decode('ascii')))
|
51
app/helpers/service_helper.py
Normal file
51
app/helpers/service_helper.py
Normal file
@ -0,0 +1,51 @@
|
||||
import csv
|
||||
import io
|
||||
import re
|
||||
|
||||
def items_csv_to_dict(items_csv_reader):
|
||||
items = []
|
||||
headers = []
|
||||
|
||||
# get headers and items
|
||||
for key, row in enumerate(items_csv_reader):
|
||||
if key == 0:
|
||||
headers = row
|
||||
else:
|
||||
item = { 'attributes': [] }
|
||||
|
||||
for key, col in enumerate(headers):
|
||||
if key == 0:
|
||||
item[col] = row[key]
|
||||
elif col == 'b_param':
|
||||
item[col] = row[key]
|
||||
elif key > 1:
|
||||
item['attributes'].append({
|
||||
'id': col,
|
||||
'value': row[key],
|
||||
'type': 'metadata'
|
||||
})
|
||||
|
||||
items.append(item)
|
||||
|
||||
return items
|
||||
|
||||
def solution_to_file(buffer, total_form_items, forms):
|
||||
wr = csv.writer(buffer, dialect='excel', delimiter=',')
|
||||
|
||||
# write header row for first row utilizing the total items all forms will have
|
||||
# and the cut score as the last item
|
||||
header = [x + 1 for x in range(total_form_items)] + ['cut score']
|
||||
wr.writerow(header)
|
||||
|
||||
# add each form as row to processed csv
|
||||
for form in forms:
|
||||
# provide generated items and cut score
|
||||
row = form.items + [form.cut_score]
|
||||
wr.writerow(row)
|
||||
|
||||
buff2 = io.BytesIO(buffer.getvalue().encode())
|
||||
|
||||
return buff2
|
||||
|
||||
def key_to_uuid(key):
|
||||
return re.split("_", key)[0]
|
9
app/helpers/tar_helper.py
Normal file
9
app/helpers/tar_helper.py
Normal file
@ -0,0 +1,9 @@
|
||||
import io
|
||||
import tarfile
|
||||
|
||||
def raw_to_tar(raw_object):
|
||||
tarball = io.BytesIO(raw_object)
|
||||
return tarfile.open(fileobj=tarball, mode='r:gz')
|
||||
|
||||
def extract_file_from_tar(tar, file_name):
|
||||
return tar.extractfile(tar.getmember(file_name))
|
54
app/main.py
54
app/main.py
@ -1,39 +1,27 @@
|
||||
from fastapi import FastAPI, __version__
|
||||
from pydantic import BaseModel
|
||||
from typing import Set, List, Optional, Dict
|
||||
from random import randint
|
||||
import os
|
||||
import sys
|
||||
|
||||
from models.solver_run import SolverRun
|
||||
from models.solution import Solution
|
||||
from models.form import Form
|
||||
from services.loft_service import LoftService
|
||||
from helpers import aws_helper
|
||||
from sqs_listener import SqsListener
|
||||
from sqs_listener.daemon import Daemon
|
||||
|
||||
app = FastAPI()
|
||||
print("Starting Solver Service (v0.3.1)...")
|
||||
|
||||
@app.get("/")
|
||||
async def root():
|
||||
return {"message": "Welcome to Measures LOFT solver service. v0.1"}
|
||||
# # listen to the solver queue
|
||||
while True:
|
||||
msg = aws_helper.receive_message(os.environ['SOLVER_SQS_INGEST_QUEUE'])
|
||||
|
||||
@app.get("/healthcheck")
|
||||
async def health():
|
||||
content = {
|
||||
"maintainer": "Meazure Horizon Team",
|
||||
"git_repo": "https://github.com/yardstick/measure-solver",
|
||||
"server": "OK",
|
||||
"fastapi version": __version__,
|
||||
"app version": "0.1.1"
|
||||
}
|
||||
return content
|
||||
# get the item from the queue
|
||||
for message in msg.get("Messages", []):
|
||||
# for now the solver service only supports Loft types
|
||||
# this is here to allow us to create an extensible way to
|
||||
# gather/manage/process data based on the particular needs
|
||||
service = LoftService(message)
|
||||
service.process()
|
||||
|
||||
@app.get('/readycheck')
|
||||
async def ready():
|
||||
return 'OK' # just means we're on air
|
||||
# delete message once process is complete
|
||||
response = aws_helper.delete_message(os.environ['SOLVER_SQS_INGEST_QUEUE'], message['ReceiptHandle'])
|
||||
|
||||
@app.post('/solve/')
|
||||
async def solve(solver_run: SolverRun):
|
||||
response = Solution(
|
||||
response_id=randint(100,5000),
|
||||
forms=[Form(
|
||||
items=[item.id for item in solver_run.items]
|
||||
)]
|
||||
)
|
||||
return response
|
||||
# probably convert to logging, but maybe keep this to some extent
|
||||
print("MESSAGE PROCESSED: ", message['MessageId'])
|
||||
|
@ -2,6 +2,6 @@ from pydantic import BaseModel
|
||||
from typing import Optional
|
||||
|
||||
class Attribute(BaseModel):
|
||||
value: str
|
||||
value: Optional[str]
|
||||
type: Optional[str]
|
||||
id: str
|
||||
|
@ -5,3 +5,4 @@ from models.item import Item
|
||||
|
||||
class Form(BaseModel):
|
||||
items: List[int]
|
||||
cut_score: float
|
||||
|
@ -6,3 +6,4 @@ from models.attribute import Attribute
|
||||
class Item(BaseModel):
|
||||
id: int
|
||||
attributes: List[Attribute]
|
||||
b_param: int
|
||||
|
@ -1,5 +1,5 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import Dict, List
|
||||
from typing import Dict, List, AnyStr
|
||||
|
||||
from models.target import Target
|
||||
|
||||
@ -9,5 +9,5 @@ class ObjectiveFunction(BaseModel):
|
||||
# likely with models representing each objective function type
|
||||
tif_targets: List[Target]
|
||||
tcc_targets: List[Target]
|
||||
objective: "minimize"
|
||||
objective: AnyStr = "minimize"
|
||||
weight: Dict = {'tif': 1, 'tcc': 1}
|
||||
|
@ -11,7 +11,8 @@ class SolverRun(BaseModel):
|
||||
items: List[Item]
|
||||
constraints: List[Constraint]
|
||||
irt_model: IRTModel
|
||||
objective_fuction: ObjectiveFunction
|
||||
objective_function: ObjectiveFunction
|
||||
total_form_items: int
|
||||
theta_cut_score: float = 0.00
|
||||
advanced_options: Optional[AdvancedOptions]
|
||||
engine: str
|
||||
|
4
app/services/base.py
Normal file
4
app/services/base.py
Normal file
@ -0,0 +1,4 @@
|
||||
class Base:
|
||||
def __init__(self, source, ingest_type='message'):
|
||||
self.ingest_type = ingest_type
|
||||
self.source = source
|
62
app/services/loft_service.py
Normal file
62
app/services/loft_service.py
Normal file
@ -0,0 +1,62 @@
|
||||
import os
|
||||
import json
|
||||
import random
|
||||
import io
|
||||
|
||||
from helpers import aws_helper, tar_helper, csv_helper, service_helper
|
||||
|
||||
from models.solver_run import SolverRun
|
||||
from models.solution import Solution
|
||||
from models.form import Form
|
||||
|
||||
from services.base import Base
|
||||
|
||||
class LoftService(Base):
|
||||
def process(self):
|
||||
self.solver_run = SolverRun.parse_obj(self.retreive_attributes_from_message())
|
||||
self.solution = self.generate_solution()
|
||||
self.result = self.stream_to_s3_bucket()
|
||||
|
||||
def retreive_attributes_from_message(self):
|
||||
# get s3 object
|
||||
self.key = aws_helper.get_key_from_message(self.source)
|
||||
s3_object = aws_helper.get_object(self.key, os.environ['SOLVER_INGEST_BUCKET'])
|
||||
|
||||
# convert to tar
|
||||
self.tar = tar_helper.raw_to_tar(s3_object)
|
||||
|
||||
# get attributes file and convert to dict
|
||||
attributes = json.loads(tar_helper.extract_file_from_tar(self.tar , 'solver_run_attributes.json').read())
|
||||
|
||||
# get items file and convert to dict
|
||||
items_csv = tar_helper.extract_file_from_tar(self.tar , 'items.csv')
|
||||
items_csv_reader = csv_helper.file_stream_reader(items_csv)
|
||||
|
||||
# add items to attributes dict
|
||||
attributes['items'] = service_helper.items_csv_to_dict(items_csv_reader)
|
||||
|
||||
return attributes
|
||||
|
||||
def generate_solution(self):
|
||||
# temporary data for mocks
|
||||
form_count = 10
|
||||
|
||||
# items will be generated from real solver process, this is for mock purposes
|
||||
# real solver will return N forms and process a cut score, this is for mock purposes
|
||||
return Solution(
|
||||
response_id=random.randint(100,5000),
|
||||
forms=[
|
||||
Form(
|
||||
items=[item.id for item in random.sample(self.solver_run.items, self.solver_run.total_form_items)],
|
||||
cut_score=120
|
||||
) for x in range(form_count)
|
||||
]
|
||||
)
|
||||
|
||||
def stream_to_s3_bucket(self):
|
||||
# setup writer buffer and write processed forms to file
|
||||
buffer = io.StringIO()
|
||||
solution_file = service_helper.solution_to_file(buffer, self.solver_run.total_form_items, self.solution.forms)
|
||||
|
||||
# upload generated file to s3 and return result
|
||||
return aws_helper.file_stream_upload(solution_file, f'{service_helper.key_to_uuid(self.key)}.csv', os.environ['MEASURE_PROCESSED_BUCKET'])
|
@ -1,2 +0,0 @@
|
||||
override_file_placeholder:
|
||||
image: busybox
|
@ -1,9 +0,0 @@
|
||||
measure-solver:
|
||||
build: /apps/measure-solver/.docker-compose
|
||||
environment:
|
||||
VIRTUAL_HOST: yssolver.localhost
|
||||
MEASURE_BASE_URL: http://admin.localhost
|
||||
dns:
|
||||
- 172.17.0.1
|
||||
volumes:
|
||||
- /apps/measure-solver:/app
|
@ -1 +0,0 @@
|
||||
QUANT-987
|
Loading…
x
Reference in New Issue
Block a user