From 449c33915fca8b2d7c0b0eede68f11964e1a0270 Mon Sep 17 00:00:00 2001 From: Josh Burman Date: Sun, 31 Oct 2021 03:50:07 +0000 Subject: [PATCH 1/4] deamonize and cursory logging --- app/helpers/aws_helper.py | 3 +-- app/main.py | 50 ++++++++++++++++++++++++++---------- app/services/base.py | 1 + app/services/loft_service.py | 2 ++ 4 files changed, 41 insertions(+), 15 deletions(-) diff --git a/app/helpers/aws_helper.py b/app/helpers/aws_helper.py index ecffc37..89cc893 100644 --- a/app/helpers/aws_helper.py +++ b/app/helpers/aws_helper.py @@ -10,8 +10,7 @@ session = boto3.Session( s3 = session.resource('s3', region_name=os.environ['SOLVER_AWS_REGION']) sqs = session.client('sqs', region_name=os.environ['SOLVER_AWS_REGION']) -def get_key_from_message(message): - body = json.loads(message['Body']) +def get_key_from_message(body): return body['Records'][0]['s3']['object']['key'] def get_object(key, bucket): diff --git a/app/main.py b/app/main.py index 1308bda..8d381e2 100644 --- a/app/main.py +++ b/app/main.py @@ -1,27 +1,51 @@ import os import sys +import json +import time from services.loft_service import LoftService from helpers import aws_helper from sqs_listener import SqsListener from sqs_listener.daemon import Daemon -print("Starting Solver Service (v0.3.1)...") -# # listen to the solver queue -while True: - msg = aws_helper.receive_message(os.environ['SOLVER_SQS_INGEST_QUEUE']) - # get the item from the queue - for message in msg.get("Messages", []): - # for now the solver service only supports Loft types - # this is here to allow us to create an extensible way to +class ServiceListener(SqsListener): + def handle_message(self, body, attributes, messages_attributes): # gather/manage/process data based on the particular needs - service = LoftService(message) + print(f'{time.localtime()} - message received...') + service = LoftService(body) service.process() + # log the things - # delete message once process is complete - response = aws_helper.delete_message(os.environ['SOLVER_SQS_INGEST_QUEUE'], message['ReceiptHandle']) +class ServiceDaemon(Daemon): + def run(self): + print("Starting Solver Service (v0.3.2)...") + print("Initializing listener") + listener = ServiceListener( + 'measure-development-solver-ingest', + region_name=os.environ['SOLVER_AWS_REGION'], + aws_access_key=os.environ['SOLVER_AWS_ACCESS_KEY_ID'], + aws_secret_key=os.environ['SOLVER_AWS_SECRET_ACCESS_KEY'], + queue_url=os.environ['SOLVER_SQS_INGEST_QUEUE'] + ) + listener.listen() - # probably convert to logging, but maybe keep this to some extent - print("MESSAGE PROCESSED: ", message['MessageId']) +if __name__ == "__main__": + daemon = ServiceDaemon('/var/run/sqs_daemon.pid',stdout='/app/logs/stdout', stderr='/app/logs/stderr', stdin='/app/logs/stdin') + if len(sys.argv) == 2: + if 'start' == sys.argv[1]: + print("Starting listener daemon") + daemon.start() + elif 'stop' == sys.argv[1]: + print("Attempting to stop the daemon") + daemon.stop() + elif 'restart' == sys.argv[1]: + daemon.restart() + else: + print("Unknown command") + sys.exit(2) + sys.exit(0) + else: + print("usage: %s start|stop|restart" % sys.argv[0]) + sys.exit(2) diff --git a/app/services/base.py b/app/services/base.py index 77f72f7..10c5951 100644 --- a/app/services/base.py +++ b/app/services/base.py @@ -1,4 +1,5 @@ class Base: def __init__(self, source, ingest_type='message'): + print(f'creating instance of service of type LOFT...') self.ingest_type = ingest_type self.source = source diff --git a/app/services/loft_service.py b/app/services/loft_service.py index d571a61..492bd48 100644 --- a/app/services/loft_service.py +++ b/app/services/loft_service.py @@ -38,6 +38,7 @@ class LoftService(Base): return attributes def generate_solution(self): + print("generating solution...") # temporary data for mocks form_count = 10 @@ -54,6 +55,7 @@ class LoftService(Base): ) def stream_to_s3_bucket(self): + print("streaming solution to s3...") # setup writer buffer and write processed forms to file buffer = io.StringIO() solution_file = service_helper.solution_to_file(buffer, self.solver_run.total_form_items, self.solution.forms) From 3a9500349363d9bdda5deb4026e5c56f0076092d Mon Sep 17 00:00:00 2001 From: Josh Burman Date: Mon, 1 Nov 2021 20:57:20 +0000 Subject: [PATCH 2/4] use daemonize and attach to foreground --- .docker-compose/Dockerfile | 4 +-- Dockerfile | 4 +-- app/main.py | 56 +++++++++++++------------------------- 3 files changed, 23 insertions(+), 41 deletions(-) diff --git a/.docker-compose/Dockerfile b/.docker-compose/Dockerfile index 1c70967..319a6ea 100644 --- a/.docker-compose/Dockerfile +++ b/.docker-compose/Dockerfile @@ -11,11 +11,11 @@ RUN cd Cbc-2.9.8 && \ make install RUN python -m pip install pydantic RUN python -m pip install pySqsListener +RUN python -m pip install daemonize # Bundle app source COPY . /app WORKDIR /app/app -# CMD [ "python", "main.py" ] -CMD tail -f /dev/null +CMD [ "python", "main.py" ] diff --git a/Dockerfile b/Dockerfile index 1c70967..319a6ea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,11 +11,11 @@ RUN cd Cbc-2.9.8 && \ make install RUN python -m pip install pydantic RUN python -m pip install pySqsListener +RUN python -m pip install daemonize # Bundle app source COPY . /app WORKDIR /app/app -# CMD [ "python", "main.py" ] -CMD tail -f /dev/null +CMD [ "python", "main.py" ] diff --git a/app/main.py b/app/main.py index 8d381e2..d595404 100644 --- a/app/main.py +++ b/app/main.py @@ -1,14 +1,10 @@ -import os -import sys -import json -import time +import os, sys, json, time from services.loft_service import LoftService from helpers import aws_helper + +from daemonize import Daemonize from sqs_listener import SqsListener -from sqs_listener.daemon import Daemon - - class ServiceListener(SqsListener): def handle_message(self, body, attributes, messages_attributes): @@ -18,34 +14,20 @@ class ServiceListener(SqsListener): service.process() # log the things -class ServiceDaemon(Daemon): - def run(self): - print("Starting Solver Service (v0.3.2)...") - print("Initializing listener") - listener = ServiceListener( - 'measure-development-solver-ingest', - region_name=os.environ['SOLVER_AWS_REGION'], - aws_access_key=os.environ['SOLVER_AWS_ACCESS_KEY_ID'], - aws_secret_key=os.environ['SOLVER_AWS_SECRET_ACCESS_KEY'], - queue_url=os.environ['SOLVER_SQS_INGEST_QUEUE'] - ) - listener.listen() +def main(): + print("Starting Solver Service (v0.3.2)...") + print("Initializing listener") + listener = ServiceListener( + 'measure-development-solver-ingest', + region_name=os.environ['SOLVER_AWS_REGION'], + aws_access_key=os.environ['SOLVER_AWS_ACCESS_KEY_ID'], + aws_secret_key=os.environ['SOLVER_AWS_SECRET_ACCESS_KEY'], + queue_url=os.environ['SOLVER_SQS_INGEST_QUEUE'] + ) + listener.listen() -if __name__ == "__main__": - daemon = ServiceDaemon('/var/run/sqs_daemon.pid',stdout='/app/logs/stdout', stderr='/app/logs/stderr', stdin='/app/logs/stdin') - if len(sys.argv) == 2: - if 'start' == sys.argv[1]: - print("Starting listener daemon") - daemon.start() - elif 'stop' == sys.argv[1]: - print("Attempting to stop the daemon") - daemon.stop() - elif 'restart' == sys.argv[1]: - daemon.restart() - else: - print("Unknown command") - sys.exit(2) - sys.exit(0) - else: - print("usage: %s start|stop|restart" % sys.argv[0]) - sys.exit(2) +if __name__ == '__main__': + myname=os.path.basename(sys.argv[0]) + pidfile='/tmp/%s' % myname # any name + daemon = Daemonize(app=myname,pid=pidfile, action=main, foreground=True) + daemon.start() From 5a2871b1024f1c699f6071ce4cb777308eeb1f2a Mon Sep 17 00:00:00 2001 From: Josh Burman Date: Tue, 2 Nov 2021 03:32:35 +0000 Subject: [PATCH 3/4] remove print statements --- app/main.py | 1 - app/services/base.py | 1 - app/services/loft_service.py | 2 -- 3 files changed, 4 deletions(-) diff --git a/app/main.py b/app/main.py index d595404..9d9f7d1 100644 --- a/app/main.py +++ b/app/main.py @@ -9,7 +9,6 @@ from sqs_listener import SqsListener class ServiceListener(SqsListener): def handle_message(self, body, attributes, messages_attributes): # gather/manage/process data based on the particular needs - print(f'{time.localtime()} - message received...') service = LoftService(body) service.process() # log the things diff --git a/app/services/base.py b/app/services/base.py index 10c5951..77f72f7 100644 --- a/app/services/base.py +++ b/app/services/base.py @@ -1,5 +1,4 @@ class Base: def __init__(self, source, ingest_type='message'): - print(f'creating instance of service of type LOFT...') self.ingest_type = ingest_type self.source = source diff --git a/app/services/loft_service.py b/app/services/loft_service.py index 492bd48..d571a61 100644 --- a/app/services/loft_service.py +++ b/app/services/loft_service.py @@ -38,7 +38,6 @@ class LoftService(Base): return attributes def generate_solution(self): - print("generating solution...") # temporary data for mocks form_count = 10 @@ -55,7 +54,6 @@ class LoftService(Base): ) def stream_to_s3_bucket(self): - print("streaming solution to s3...") # setup writer buffer and write processed forms to file buffer = io.StringIO() solution_file = service_helper.solution_to_file(buffer, self.solver_run.total_form_items, self.solution.forms) From 1c50ad5642f3dcc3b4db76f31b228f9729349c70 Mon Sep 17 00:00:00 2001 From: Josh Burman Date: Tue, 2 Nov 2021 04:07:15 +0000 Subject: [PATCH 4/4] logging all the things... --- app/main.py | 12 ++++++++---- app/services/loft_service.py | 12 +++++++----- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/app/main.py b/app/main.py index 9d9f7d1..7636ecc 100644 --- a/app/main.py +++ b/app/main.py @@ -1,4 +1,4 @@ -import os, sys, json, time +import os, sys, logging from services.loft_service import LoftService from helpers import aws_helper @@ -6,16 +6,20 @@ from helpers import aws_helper from daemonize import Daemonize from sqs_listener import SqsListener +logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(levelname)s %(asctime)s - %(message)s") + class ServiceListener(SqsListener): def handle_message(self, body, attributes, messages_attributes): # gather/manage/process data based on the particular needs + logging.info('Incoming message: %s', body) + service = LoftService(body) service.process() - # log the things + + logging.info('Process complete for %s', service.file_name) def main(): - print("Starting Solver Service (v0.3.2)...") - print("Initializing listener") + logging.info('Starting Solver Service (v0.3.2)...') listener = ServiceListener( 'measure-development-solver-ingest', region_name=os.environ['SOLVER_AWS_REGION'], diff --git a/app/services/loft_service.py b/app/services/loft_service.py index d571a61..aae2ad5 100644 --- a/app/services/loft_service.py +++ b/app/services/loft_service.py @@ -1,7 +1,4 @@ -import os -import json -import random -import io +import os, json, random, io, logging from helpers import aws_helper, tar_helper, csv_helper, service_helper @@ -18,6 +15,7 @@ class LoftService(Base): self.result = self.stream_to_s3_bucket() def retreive_attributes_from_message(self): + logging.info('Retrieving attributes from message...') # get s3 object self.key = aws_helper.get_key_from_message(self.source) s3_object = aws_helper.get_object(self.key, os.environ['SOLVER_INGEST_BUCKET']) @@ -34,10 +32,12 @@ class LoftService(Base): # add items to attributes dict attributes['items'] = service_helper.items_csv_to_dict(items_csv_reader) + logging.info('Processed Attributes...') return attributes def generate_solution(self): + logging.info('Processing Solution...') # temporary data for mocks form_count = 10 @@ -54,9 +54,11 @@ class LoftService(Base): ) def stream_to_s3_bucket(self): + self.file_name = f'{service_helper.key_to_uuid(self.key)}.csv' + logging.info('Streaming to %s s3 bucket %s', self.file_name, os.environ['MEASURE_PROCESSED_BUCKET']) # setup writer buffer and write processed forms to file buffer = io.StringIO() solution_file = service_helper.solution_to_file(buffer, self.solver_run.total_form_items, self.solution.forms) # upload generated file to s3 and return result - return aws_helper.file_stream_upload(solution_file, f'{service_helper.key_to_uuid(self.key)}.csv', os.environ['MEASURE_PROCESSED_BUCKET']) + return aws_helper.file_stream_upload(solution_file, self.file_name, os.environ['MEASURE_PROCESSED_BUCKET'])