refactored app to be more well factored
This commit is contained in:
parent
a7528876f4
commit
78b115800f
97
app/main.py
97
app/main.py
@ -1,93 +1,24 @@
|
||||
# from fastapi import FastAPI, __version__
|
||||
|
||||
# app = FastAPI()
|
||||
|
||||
# @app.get("/")
|
||||
# async def root():
|
||||
# return {"message": "Welcome to Measures LOFT solver service. v0.1"}
|
||||
|
||||
# @app.get("/healthcheck")
|
||||
# async def health():
|
||||
# content = {
|
||||
# "maintainer": "Meazure Horizon Team",
|
||||
# "git_repo": "https://github.com/yardstick/measure-solver",
|
||||
# "server": "OK",
|
||||
# "fastapi version": __version__,
|
||||
# "app version": "0.1.0"
|
||||
# }
|
||||
# return content
|
||||
|
||||
# @app.get('/readycheck')
|
||||
# async def ready():
|
||||
# return 'OK!' # just means we're on air
|
||||
|
||||
# @app.post('/solve/')
|
||||
# async def solve(solver_run: SolverRun):
|
||||
# response = Solution(
|
||||
# response_id=randint(100,5000),
|
||||
# forms=[Form(
|
||||
# items=[item.id for item in solver_run.items]
|
||||
# )]
|
||||
# )
|
||||
# return response
|
||||
|
||||
# import os
|
||||
|
||||
# from sqs_listener import SqsListener
|
||||
|
||||
|
||||
# class MyListener(SqsListener):
|
||||
# def handle_message(self, body, attributes, messages_attributes):
|
||||
# print(body)
|
||||
|
||||
# listener = MyListener(os.environ['SOLVER_SQS_INGEST_QUEUE'], error_queue='my-error-queue', region_name='ca-central-1')
|
||||
# listener.listen()
|
||||
import boto3
|
||||
import json
|
||||
import time
|
||||
import os
|
||||
|
||||
from pydantic import BaseModel
|
||||
from typing import Set, List, Optional, Dict
|
||||
from random import randint
|
||||
from services.loft import Loft
|
||||
from helpers import aws_helper
|
||||
|
||||
from models.solver_run import SolverRun
|
||||
from models.solution import Solution
|
||||
from models.form import Form
|
||||
print("Starting Solver Service (v0.3.0)...")
|
||||
|
||||
session = boto3.Session(
|
||||
aws_access_key_id=os.environ['SOLVER_AWS_ACCESS_KEY_ID'],
|
||||
aws_secret_access_key=os.environ['SOLVER_AWS_SECRET_ACCESS_KEY']
|
||||
)
|
||||
|
||||
s3 = session.resource('s3', region_name=os.environ['SOLVER_AWS_REGION'])
|
||||
sqs = session.client('sqs', region_name=os.environ['SOLVER_AWS_REGION'])
|
||||
|
||||
print("STARTING SOLVER APP")
|
||||
# listen to the solver queue
|
||||
while True:
|
||||
msg = sqs.receive_message(
|
||||
QueueUrl=os.environ['SOLVER_SQS_INGEST_QUEUE'],
|
||||
MaxNumberOfMessages=1,
|
||||
WaitTimeSeconds=1
|
||||
)
|
||||
msg = aws_helper.receive_message(os.environ['SOLVER_SQS_INGEST_QUEUE'])
|
||||
|
||||
# get the item from the queue
|
||||
for message in msg.get("Messages", []):
|
||||
print("NEW MESSAGE: ", message)
|
||||
for message in msg.get("Messages", []):
|
||||
# for now the solver service only supports Loft types
|
||||
# this is here to allow us to create an extensible way to
|
||||
# gather/manage/process data based on the particular needs
|
||||
service = Loft(message)
|
||||
service.process()
|
||||
|
||||
# upload the item to the processed bucket
|
||||
# print("INGEST OBJ: ", s3.Object(bucket_name=os.environ['SOLVER_INGEST_BUCKET'],
|
||||
# key=body['Records'][0]['s3']['object']['key']).get()['Body'].read())
|
||||
# delete message once process is complete
|
||||
response = aws_helper.delete_message(os.environ['SOLVER_SQS_INGEST_QUEUE'], message['ReceiptHandle'])
|
||||
|
||||
s3.Bucket(os.environ['MEASURE_PROCESSED_BUCKET']).put_object(Key=str(time.time())+'.json', Body=json.dumps({
|
||||
"example": "PROCESSED",
|
||||
"time": time.time(),
|
||||
"parent": message['MessageId']
|
||||
}))
|
||||
|
||||
response = sqs.delete_message(
|
||||
QueueUrl=os.environ['SOLVER_SQS_INGEST_QUEUE'],
|
||||
ReceiptHandle=message['ReceiptHandle'],
|
||||
)
|
||||
print("MESSAGE PROCESSED: ", message['MessageId'])
|
||||
# probably convert to logging, but maybe keep this to some extent
|
||||
print("MESSAGE PROCESSED: ", message['MessageId'])
|
||||
|
Loading…
x
Reference in New Issue
Block a user