diff --git a/.docker-compose/Dockerfile b/.docker-compose/Dockerfile index 1c70967..319a6ea 100644 --- a/.docker-compose/Dockerfile +++ b/.docker-compose/Dockerfile @@ -11,11 +11,11 @@ RUN cd Cbc-2.9.8 && \ make install RUN python -m pip install pydantic RUN python -m pip install pySqsListener +RUN python -m pip install daemonize # Bundle app source COPY . /app WORKDIR /app/app -# CMD [ "python", "main.py" ] -CMD tail -f /dev/null +CMD [ "python", "main.py" ] diff --git a/Dockerfile b/Dockerfile index 1c70967..319a6ea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,11 +11,11 @@ RUN cd Cbc-2.9.8 && \ make install RUN python -m pip install pydantic RUN python -m pip install pySqsListener +RUN python -m pip install daemonize # Bundle app source COPY . /app WORKDIR /app/app -# CMD [ "python", "main.py" ] -CMD tail -f /dev/null +CMD [ "python", "main.py" ] diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py index ecffc37..89cc893 100644 --- a/app/helpers/aws_helper.py +++ b/app/helpers/aws_helper.py @@ -10,8 +10,7 @@ session = boto3.Session( s3 = session.resource('s3', region_name=os.environ['SOLVER_AWS_REGION']) sqs = session.client('sqs', region_name=os.environ['SOLVER_AWS_REGION']) -def get_key_from_message(message): - body = json.loads(message['Body']) +def get_key_from_message(body): return body['Records'][0]['s3']['object']['key'] def get_object(key, bucket): diff --git a/app/main.py b/app/main.py index 1308bda..7636ecc 100644 --- a/app/main.py +++ b/app/main.py @@ -1,27 +1,36 @@ -import os -import sys +import os, sys, logging from services.loft_service import LoftService from helpers import aws_helper + +from daemonize import Daemonize from sqs_listener import SqsListener -from sqs_listener.daemon import Daemon -print("Starting Solver Service (v0.3.1)...") +logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(levelname)s %(asctime)s - %(message)s") -# # listen to the solver queue -while True: - msg = aws_helper.receive_message(os.environ['SOLVER_SQS_INGEST_QUEUE']) - - # get the item from the queue - for message in msg.get("Messages", []): - # for now the solver service only supports Loft types - # this is here to allow us to create an extensible way to +class ServiceListener(SqsListener): + def handle_message(self, body, attributes, messages_attributes): # gather/manage/process data based on the particular needs - service = LoftService(message) + logging.info('Incoming message: %s', body) + + service = LoftService(body) service.process() - # delete message once process is complete - response = aws_helper.delete_message(os.environ['SOLVER_SQS_INGEST_QUEUE'], message['ReceiptHandle']) + logging.info('Process complete for %s', service.file_name) - # probably convert to logging, but maybe keep this to some extent - print("MESSAGE PROCESSED: ", message['MessageId']) +def main(): + logging.info('Starting Solver Service (v0.3.2)...') + listener = ServiceListener( + 'measure-development-solver-ingest', + region_name=os.environ['SOLVER_AWS_REGION'], + aws_access_key=os.environ['SOLVER_AWS_ACCESS_KEY_ID'], + aws_secret_key=os.environ['SOLVER_AWS_SECRET_ACCESS_KEY'], + queue_url=os.environ['SOLVER_SQS_INGEST_QUEUE'] + ) + listener.listen() + +if __name__ == '__main__': + myname=os.path.basename(sys.argv[0]) + pidfile='/tmp/%s' % myname # any name + daemon = Daemonize(app=myname,pid=pidfile, action=main, foreground=True) + daemon.start() diff --git a/app/services/loft_service.py b/app/services/loft_service.py index d571a61..aae2ad5 100644 --- a/app/services/loft_service.py +++ b/app/services/loft_service.py @@ -1,7 +1,4 @@ -import os -import json -import random -import io +import os, json, random, io, logging from helpers import aws_helper, tar_helper, csv_helper, service_helper @@ -18,6 +15,7 @@ class LoftService(Base): self.result = self.stream_to_s3_bucket() def retreive_attributes_from_message(self): + logging.info('Retrieving attributes from message...') # get s3 object self.key = aws_helper.get_key_from_message(self.source) s3_object = aws_helper.get_object(self.key, os.environ['SOLVER_INGEST_BUCKET']) @@ -34,10 +32,12 @@ class LoftService(Base): # add items to attributes dict attributes['items'] = service_helper.items_csv_to_dict(items_csv_reader) + logging.info('Processed Attributes...') return attributes def generate_solution(self): + logging.info('Processing Solution...') # temporary data for mocks form_count = 10 @@ -54,9 +54,11 @@ class LoftService(Base): ) def stream_to_s3_bucket(self): + self.file_name = f'{service_helper.key_to_uuid(self.key)}.csv' + logging.info('Streaming to %s s3 bucket %s', self.file_name, os.environ['MEASURE_PROCESSED_BUCKET']) # setup writer buffer and write processed forms to file buffer = io.StringIO() solution_file = service_helper.solution_to_file(buffer, self.solver_run.total_form_items, self.solution.forms) # upload generated file to s3 and return result - return aws_helper.file_stream_upload(solution_file, f'{service_helper.key_to_uuid(self.key)}.csv', os.environ['MEASURE_PROCESSED_BUCKET']) + return aws_helper.file_stream_upload(solution_file, self.file_name, os.environ['MEASURE_PROCESSED_BUCKET'])