import boto3 import io from typing import Union from lib.application_configs import ApplicationConfigs session = boto3.Session( aws_access_key_id=ApplicationConfigs.aws_access_key_id, aws_secret_access_key=ApplicationConfigs.aws_secret_key) # ToDo: Figure out a much better way of doing this. # LocalStack wants endpoint_url, while prod doesnt :( if ApplicationConfigs.local_dev_env: s3 = session.client('s3', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url) sqs = session.client('sqs', region_name=ApplicationConfigs.region_name, endpoint_url=ApplicationConfigs.endpoint_url) else: s3 = session.client('s3', region_name=ApplicationConfigs.region_name) sqs = session.client('sqs', region_name=ApplicationConfigs.region_name) def get_key_from_message(body: dict) -> str: return body['Records'][0]['s3']['object']['key'] def get_bucket_from_message(body: dict) -> str: return body['Records'][0]['s3']['bucket']['name'] def get_object(key: str, bucket: str) -> bytes: return s3.get_object( Bucket=bucket, Key=key, )['Body'].read() def get_object_tag(key: str, bucket: str, tag_key: str) -> Union[str, None]: tags = get_object_tags(key, bucket) return next((tag for tag in tags if tag['Key'] == tag_key), None) def get_object_tags(key: str, bucket: str) -> list: tags = s3.get_object_tagging(Bucket=bucket, Key=key)['TagSet'] return tags def file_stream_upload(buffer: io.BytesIO, name: str, bucket: str, action: str = None): return s3.upload_fileobj(buffer, bucket, name, ExtraArgs={'Tagging': f'action={action}'})