deamonize and cursory logging
This commit is contained in:
@ -10,8 +10,7 @@ session = boto3.Session(
|
|||||||
s3 = session.resource('s3', region_name=os.environ['SOLVER_AWS_REGION'])
|
s3 = session.resource('s3', region_name=os.environ['SOLVER_AWS_REGION'])
|
||||||
sqs = session.client('sqs', region_name=os.environ['SOLVER_AWS_REGION'])
|
sqs = session.client('sqs', region_name=os.environ['SOLVER_AWS_REGION'])
|
||||||
|
|
||||||
def get_key_from_message(message):
|
def get_key_from_message(body):
|
||||||
body = json.loads(message['Body'])
|
|
||||||
return body['Records'][0]['s3']['object']['key']
|
return body['Records'][0]['s3']['object']['key']
|
||||||
|
|
||||||
def get_object(key, bucket):
|
def get_object(key, bucket):
|
||||||
|
50
app/main.py
50
app/main.py
@ -1,27 +1,51 @@
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
|
||||||
from services.loft_service import LoftService
|
from services.loft_service import LoftService
|
||||||
from helpers import aws_helper
|
from helpers import aws_helper
|
||||||
from sqs_listener import SqsListener
|
from sqs_listener import SqsListener
|
||||||
from sqs_listener.daemon import Daemon
|
from sqs_listener.daemon import Daemon
|
||||||
|
|
||||||
print("Starting Solver Service (v0.3.1)...")
|
|
||||||
|
|
||||||
# # listen to the solver queue
|
|
||||||
while True:
|
|
||||||
msg = aws_helper.receive_message(os.environ['SOLVER_SQS_INGEST_QUEUE'])
|
|
||||||
|
|
||||||
# get the item from the queue
|
class ServiceListener(SqsListener):
|
||||||
for message in msg.get("Messages", []):
|
def handle_message(self, body, attributes, messages_attributes):
|
||||||
# for now the solver service only supports Loft types
|
|
||||||
# this is here to allow us to create an extensible way to
|
|
||||||
# gather/manage/process data based on the particular needs
|
# gather/manage/process data based on the particular needs
|
||||||
service = LoftService(message)
|
print(f'{time.localtime()} - message received...')
|
||||||
|
service = LoftService(body)
|
||||||
service.process()
|
service.process()
|
||||||
|
# log the things
|
||||||
|
|
||||||
# delete message once process is complete
|
class ServiceDaemon(Daemon):
|
||||||
response = aws_helper.delete_message(os.environ['SOLVER_SQS_INGEST_QUEUE'], message['ReceiptHandle'])
|
def run(self):
|
||||||
|
print("Starting Solver Service (v0.3.2)...")
|
||||||
|
print("Initializing listener")
|
||||||
|
listener = ServiceListener(
|
||||||
|
'measure-development-solver-ingest',
|
||||||
|
region_name=os.environ['SOLVER_AWS_REGION'],
|
||||||
|
aws_access_key=os.environ['SOLVER_AWS_ACCESS_KEY_ID'],
|
||||||
|
aws_secret_key=os.environ['SOLVER_AWS_SECRET_ACCESS_KEY'],
|
||||||
|
queue_url=os.environ['SOLVER_SQS_INGEST_QUEUE']
|
||||||
|
)
|
||||||
|
listener.listen()
|
||||||
|
|
||||||
# probably convert to logging, but maybe keep this to some extent
|
if __name__ == "__main__":
|
||||||
print("MESSAGE PROCESSED: ", message['MessageId'])
|
daemon = ServiceDaemon('/var/run/sqs_daemon.pid',stdout='/app/logs/stdout', stderr='/app/logs/stderr', stdin='/app/logs/stdin')
|
||||||
|
if len(sys.argv) == 2:
|
||||||
|
if 'start' == sys.argv[1]:
|
||||||
|
print("Starting listener daemon")
|
||||||
|
daemon.start()
|
||||||
|
elif 'stop' == sys.argv[1]:
|
||||||
|
print("Attempting to stop the daemon")
|
||||||
|
daemon.stop()
|
||||||
|
elif 'restart' == sys.argv[1]:
|
||||||
|
daemon.restart()
|
||||||
|
else:
|
||||||
|
print("Unknown command")
|
||||||
|
sys.exit(2)
|
||||||
|
sys.exit(0)
|
||||||
|
else:
|
||||||
|
print("usage: %s start|stop|restart" % sys.argv[0])
|
||||||
|
sys.exit(2)
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
class Base:
|
class Base:
|
||||||
def __init__(self, source, ingest_type='message'):
|
def __init__(self, source, ingest_type='message'):
|
||||||
|
print(f'creating instance of service of type LOFT...')
|
||||||
self.ingest_type = ingest_type
|
self.ingest_type = ingest_type
|
||||||
self.source = source
|
self.source = source
|
||||||
|
@ -38,6 +38,7 @@ class LoftService(Base):
|
|||||||
return attributes
|
return attributes
|
||||||
|
|
||||||
def generate_solution(self):
|
def generate_solution(self):
|
||||||
|
print("generating solution...")
|
||||||
# temporary data for mocks
|
# temporary data for mocks
|
||||||
form_count = 10
|
form_count = 10
|
||||||
|
|
||||||
@ -54,6 +55,7 @@ class LoftService(Base):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def stream_to_s3_bucket(self):
|
def stream_to_s3_bucket(self):
|
||||||
|
print("streaming solution to s3...")
|
||||||
# setup writer buffer and write processed forms to file
|
# setup writer buffer and write processed forms to file
|
||||||
buffer = io.StringIO()
|
buffer = io.StringIO()
|
||||||
solution_file = service_helper.solution_to_file(buffer, self.solver_run.total_form_items, self.solution.forms)
|
solution_file = service_helper.solution_to_file(buffer, self.solver_run.total_form_items, self.solution.forms)
|
||||||
|
Reference in New Issue
Block a user