irt-service/app/main.py
2021-10-19 18:59:46 +00:00

94 lines
2.5 KiB
Python

# from fastapi import FastAPI, __version__
# app = FastAPI()
# @app.get("/")
# async def root():
# return {"message": "Welcome to Measures LOFT solver service. v0.1"}
# @app.get("/healthcheck")
# async def health():
# content = {
# "maintainer": "Meazure Horizon Team",
# "git_repo": "https://github.com/yardstick/measure-solver",
# "server": "OK",
# "fastapi version": __version__,
# "app version": "0.1.0"
# }
# return content
# @app.get('/readycheck')
# async def ready():
# return 'OK!' # just means we're on air
# @app.post('/solve/')
# async def solve(solver_run: SolverRun):
# response = Solution(
# response_id=randint(100,5000),
# forms=[Form(
# items=[item.id for item in solver_run.items]
# )]
# )
# return response
# import os
# from sqs_listener import SqsListener
# class MyListener(SqsListener):
# def handle_message(self, body, attributes, messages_attributes):
# print(body)
# listener = MyListener(os.environ['SOLVER_SQS_INGEST_QUEUE'], error_queue='my-error-queue', region_name='ca-central-1')
# listener.listen()
import boto3
import json
import time
import os
from pydantic import BaseModel
from typing import Set, List, Optional, Dict
from random import randint
from models.solver_run import SolverRun
from models.solution import Solution
from models.form import Form
session = boto3.Session(
aws_access_key_id=os.environ['SOLVER_AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['SOLVER_AWS_SECRET_ACCESS_KEY']
)
s3 = session.resource('s3', region_name=os.environ['SOLVER_AWS_REGION'])
sqs = session.client('sqs', region_name=os.environ['SOLVER_AWS_REGION'])
print("STARTING SOLVER APP")
# listen to the solver queue
while True:
msg = sqs.receive_message(
QueueUrl=os.environ['SOLVER_SQS_INGEST_QUEUE'],
MaxNumberOfMessages=1,
WaitTimeSeconds=1
)
# get the item from the queue
for message in msg.get("Messages", []):
print("NEW MESSAGE: ", message)
# upload the item to the processed bucket
# print("INGEST OBJ: ", s3.Object(bucket_name=os.environ['SOLVER_INGEST_BUCKET'],
# key=body['Records'][0]['s3']['object']['key']).get()['Body'].read())
s3.Bucket(os.environ['MEASURE_PROCESSED_BUCKET']).put_object(Key=str(time.time())+'.json', Body=json.dumps({
"example": "PROCESSED",
"time": time.time(),
"parent": message['MessageId']
}))
response = sqs.delete_message(
QueueUrl=os.environ['SOLVER_SQS_INGEST_QUEUE'],
ReceiptHandle=message['ReceiptHandle'],
)
print("MESSAGE PROCESSED: ", message['MessageId'])