AWS Workmail, AWS SES, AWS S3 and AWS Lambda Function to automate


Combine AWS Workmail, AWS SES and AWS Lambda Function


I have done a project using AWS Workmail, AWS SES and AWS Lambda Function to automate a task which requires manually download a file from attachment whenever receiving an email. The workflow is: emails will be sent to AWS Workmail email address. AWS SES email receiving rule set can place all those emails to a linked S3 bucket. Once we receive the workmail in the linked S3 bucket, that linked bucket with this lambda function will be triggered to extract the attachment from the email and place it to another separated s3 bucket according to the sender-name/email-subject/year/month/day as S3 folder structure


Set up AWS Workmail


  1. Go to Workmail service on AWS and click add organization as shown below. You can name it as anything. My test one has named it as nine-data-testing.

'add organization'


  1. Click inside the organisation. Create a user. You can name it data so our email address will be data@nine-data-testing.awsapps.com. My test one is cqiu as shown below.

'Create a user'


Set up AWS SES


  1. Go to AWS SES service. Under Email Receiving section in the left menu, click Rule Sets and then click Create Rule.

'AWS SES'


Set up AWS S3


  1. Create a bucket name “datascience-email-sources” with below permission policy :

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowSESPuts",
            "Effect": "Allow",
            "Principal": {
                "Service": "ses.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3::: datascience-email-sources/*",
            "Condition": {
                "StringEquals": {
                    "aws:Referer": "AWSACCOUNTID"
                }
            }
        }
    ]
}

Back to AWS Workmail to add S3 rule


  1. Add Recipient using the aws workmail we just created and under Actions section, add an S3 rule with bucket datascience-email-sources we just created.

'AWS Workmail add s3 rule'


Set up AWS Lamdbda function


  1. Create lambda function workemail-attachment-extract with below triggers and permission to resources :

'AWS Lamdbda function overview'


The S3 trigger setting is as below. Monitoring all objectCreated Events.


'AWS Lamdbda function S3 trigger'


The policies for the lambda role I set up in IAM are below for your reference.


'AWS Lamdbda function Role Policy'


'AWS Lamdbda function Role Policy'


Below is the lambda function code. I have also open source this project in my github https://github.com/chrisqiqiu/aws-lambda-function-nine/tree/master/workmail-ses-s3-attachment


import threading
import json
import urllib.parse
import boto3
from boto3.s3.transfer import S3Transfer, TransferConfig
import os
import email
import re
import sys
import datetime
import requests

print('Loading function')

s3 = boto3.client('s3')
s3_wormhole = S3Transfer(s3,  config=TransferConfig(
    multipart_threshold=8 * 1024 * 1024,
    max_concurrency=10,
    num_download_attempts=10,
))

dest_bucket = "datascience-email-attachments"


def lambda_handler(event, context):
    print("Received event: " + json.dumps(event, indent=2))

    # print("IP address is : ")
    # print(requests.get('http://checkip.amazonaws.com').text.rstrip())

    try:
        # Get the object from the event and show its content type
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = urllib.parse.unquote_plus(
            event['Records'][0]['s3']['object']['key'], encoding='utf-8')

        os.chdir("/tmp")

        get(bucket, key, "workmail_with_attachment")
        with open("workmail_with_attachment") as f:
            data = f.read()

        message = email.message_from_string(data)

        email_sender = message.get("from")
        if re.search(r'(.*)<.*>', email_sender) 
        and re.search(r'(.*)<.*>', email_sender).group(1):
            email_sender = re.search(r'(.*)<.*>', email_sender).group(1)
            print(email_sender)

        email_sender = format_string_for_s3_bucket_name(email_sender)

        email_subject = format_string_for_s3_bucket_name(
            message.get('Subject'))

        now = datetime.datetime.now()
        now = f"{str(now.year)}/{str(now.month).zfill(2)}/{str(now.day).zfill(2)}"

        dest_bucket_key = f"{email_sender}/{email_subject}/{now}"

        if type(message.get_payload()) == list and len(message.get_payload()) == 2:
            attachment = message.get_payload()[1]

            if attachment.get_filename():
                attachment_filename = attachment.get_filename()

                with open(attachment_filename, 'wb') as f:
                    f.write(attachment.get_payload(decode=True))

                print(
                    f"Putting file {attachment_filename} to {dest_bucket}/{dest_bucket_key}")
                put(attachment_filename,
                    dest_bucket, f"{dest_bucket_key}/{attachment_filename}")

    except Exception as e:
        print(str(e))
        slack_notification(str(e))


def format_string_for_s3_bucket_name(bucket_name):
    found = re.sub(r"[^\w]",  ' ', bucket_name)
    found = re.sub(r"\s+", ' ', found).strip()
    print(found)
    print(re.sub(r"[^\w]",  '-', found))
    bucket_name = re.sub(r"[^\w]",  '-', found)
    print(bucket_name)
    return bucket_name


def get(bucket, key, local_path):
    """Download s3://bucket/key to local_path'."""
    try:
        s3_wormhole.download_file(
            bucket=bucket, key=key, filename=local_path)
    except Exception as e:
        print(
            "Could not get {} from S3 due to {}".format(key, e))
        return None
    else:
        print("Successfully get {} from S3".format(key))
    return key


def put(local_path, bucket, key):
    """Upload local_path to s3: // bucket/key and print upload progress."""
    try:

        s3_wormhole.upload_file(filename=local_path,
                                bucket=bucket,
                                key=key,
                                callback=ProgressPercentage(local_path))
    except Exception as e:
        print(
            "Could not upload {} to S3 due to {}".format(key, e))
    else:
        print("Successfully uploaded {} to S3".format(key))


def slack_notification(message):
    base_uri = "http://slack.datascience.ec2/postMessage"
    warningTemplateMessage = {
        "text": "<!here> Error: Workmail email trigger {}  "}

    headers = {'Content-Type': 'application/json'}

    warningMessage = {}
    warningMessage['text'] = warningTemplateMessage['text'].format(
        str(message))

    resp = requests.post(base_uri, headers=headers,
                         data=json.dumps(warningMessage))

    return resp


class ProgressPercentage(object):
    def __init__(self, filename):
        self._filename = filename
        self._size = float(os.path.getsize(filename))
        self._seen_so_far = 0
        self._lock = threading.Lock()

    def __call__(self, bytes_amount):
        # To simplify we'll assume this is hooked up
        # to a single filename.
        with self._lock:
            self._seen_so_far += bytes_amount
            percentage = (self._seen_so_far / self._size) * 100
            sys.stdout.write(
                "\r%s  %s / %s  (%.2f%%)" % (self._filename, self._seen_so_far,
                                             self._size, percentage))
            sys.stdout.flush()

Create the zip file based on the lamda function :

pip install -r requirements.txt -t ./

chmod 777 lambda_function.py

zip -r9 ../lambda_function.zip .


  1. Upload the zip file lambda_function.zip as shown below

'Upload zip file'


  1. Please set the Timeout to 5 minutes instead of 3 seconds :

'Upload zip file'


AWS Workmail, AWS SES, AWS S3 and AWS Lambda Function to automate
arrow_back

Previous

Snowflake Snowpipe API in Python to ingest data from AWS S3 to Snowflake stage tables

Next

Compare Disqus vs Valine comment system and how to set up Valine
arrow_forward