デジタルクリエイティブ株式会社
記事

AWS CDKでS3に保存されたメールをLambdaで解析してDynamoDBに保存する

代表の中野です。今回は弊社のインフラのコード化の一環でaws cdkを採用したので、その解説記事です。 この記事では、すでに **SES → S3 保存** ができている状態を前提にして、 1. S3の保存イベントでLambdaを起動 2. Lambdaがメール内容を解析 3. DynamoDBにFrom・To・Subjectを保存 という仕組みを **AWS CDK (Python)** で構築します。
ogp

代表の中野です。今回は弊社のインフラのコード化の一環でaws cdkを採用したので、その解説記事です。

この記事では、すでに SES → S3 保存 ができている状態を前提にして、

  1. S3の保存イベントでLambdaを起動
  2. Lambdaがメール内容を解析
  3. DynamoDBにFrom・To・Subjectを保存

という仕組みを AWS CDK (Python) で構築します。

 

(すでに設定済み) 
SES → S3 (受信メール保存) 

(今回CDKで作る部分) S3:ObjectCreated
  ↓ 
 Lambda (メール解析)
  ↓ 
 DynamoDB (保存)

 

 

プロジェクト構成

mail_cdk/
├── app.py
├── stacks/
│   └── myapp_stack.py.py
└── lambda_functions/
    └── s3_to_dynamodb.py

Lambda関数のコード (lambda_functions/s3_to_dynamodb.py)

S3からメールを取得し、From/To/Subjectを解析してDynamoDBに保存します。

 

# lambda_functions/s3_to_dynamodb.py

import boto3
import os
import json
import time
import email
from email.utils import parseaddr
from email.header import decode_header
from datetime import datetime, timezone, timedelta

class MailParser:
    """Minimal MailParser fallback when mailparser package is unavailable."""
    def __init__(self):
        self._msg = None
    def parse_from_bytes(self, data: bytes):
        self._msg = email.message_from_bytes(data)
    def _decode_header(self, value: str) -> str:
        """=?UTF-8?B?...?= のようなエンコード文字列をデコード"""
        if not value:
            return value
        decoded_parts = []
        for part, enc in decode_header(value):
            if isinstance(part, bytes):
                decoded_parts.append(part.decode(enc or "utf-8", errors="replace"))
            else:
                decoded_parts.append(part)
        return ''.join(decoded_parts)
    @property
    def subject(self):
        return self._msg.get('subject') if self._msg else None
    @property
    def from_(self):
        if not self._msg:
            return None
        addr_raw = self._msg.get('from')
        # 表示名とアドレスを分離
        _, addr = parseaddr(addr_raw)
        return [(None, addr)] if addr else []
    @property
    def to(self):
        if not self._msg:
            return None
        addr_raw = self._msg.get('to')
        _, addr = parseaddr(addr_raw)
        return [(None, addr)] if addr else []
    @property
    def body(self):
        """プレーンテキストの本文を取得"""
        if not self._msg:
            return None
        if self._msg.is_multipart():
            # multipartの場合、最初の text/plain パートを探す
            for part in self._msg.walk():
                content_type = part.get_content_type()
                content_disposition = str(part.get("Content-Disposition"))
                if content_type == "text/plain" and "attachment" not in content_disposition:
                    charset = part.get_content_charset() or "utf-8"
                    return part.get_payload(decode=True).decode(charset, errors="replace")
            return None
        else:
            # シングルパートなら直接本文を返す
            charset = self._msg.get_content_charset() or "utf-8"
            return self._msg.get_payload(decode=True).decode(charset, errors="replace")

# .env をロード

def handler(event, context):
    dynamodb = boto3.resource('dynamodb')
    s3 = boto3.client('s3')
    table = dynamodb.Table("MailDeliveryTable")

    jst = timezone(timedelta(hours=9))

    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        event_time = record.get('eventTime')
        if event_time:
            utc_dt = datetime.fromisoformat(event_time.replace('Z', '+00:00'))
            jst_dt = utc_dt.astimezone(jst)
            received_at = jst_dt.isoformat()
        else:
            received_at = datetime.now(jst).isoformat()

        # S3から.emlファイルを取得
        response = s3.get_object(Bucket=bucket, Key=key)
        raw_email = response['Body'].read()

        # emailパーサーで解析 (mailparser or fallback)
        parser = MailParser()
        parser.parse_from_bytes(raw_email)
        subject = parser.subject
        from_addr = parser.from_[0][1] if parser.from_ else None
        to_addr = parser.to[0][1] if parser.to else None
        body = parser.body

        # 必要な情報をDynamoDBに保存
        table.put_item(
            Item={
                "messageId": key,  # S3のキーを一意IDに
                "received_at": received_at,
                "subject": subject,
                "from": from_addr,
                "to": to_addr,
                "body": body,
                "s3_key": key,
                "ttl": int(time.time()) + 60 * 60 * 24 * 30  # 30 days
            }
        )

    return {"status": "done"}

CDKスタック (stacks/myapp_stack.py)

  • 既存のS3バケット名を指定
  • DynamoDBテーブルを作成
  • Lambdaをデプロイし、S3イベントをトリガーに登録
  •  
# stacks/myapp_stack.py

from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_s3 as s3,
    aws_s3_notifications as s3n,
    aws_dynamodb as dynamodb,
    Duration,
    RemovalPolicy
)
from constructs import Construct
import os
from dotenv import load_dotenv

load_dotenv()

class MyAppStack(Stack):

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # 既存のS3バケットを参照(SESで保存済みのバケット)
        bucket = s3.Bucket.from_bucket_name(
            self,
            "MailBucket",
            "nano-maildelivery"
        )

        # DynamoDBテーブル
        table_name = "MailDeliveryTable"
        table = dynamodb.Table(
            self, "MailTable",
            partition_key=dynamodb.Attribute(
                name="messageId", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="received_at", type=dynamodb.AttributeType.STRING
            ),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            time_to_live_attribute="ttl",
            removal_policy=RemovalPolicy.DESTROY,
            table_name=table_name
        )

        # Lambda関数
        lambda_fn = _lambda.Function(
            self,
            "S3ToDynamoLambda",
            runtime=_lambda.Runtime.PYTHON_3_11,
            handler="s3_to_dynamodb.handler",
            code=_lambda.Code.from_asset(os.path.join(os.getcwd(), "lambda_functions")),
            timeout=Duration.seconds(30),
            function_name="S3ToDynamoLambda",
            environment={
                "TABLE_NAME": table.table_name
            }
        )

        # LambdaにDynamoDBへの書き込み権限とS3読取権限を付与
        table.grant_write_data(lambda_fn)
        bucket.grant_read(lambda_fn)

        # LambdaにS3イベントを通知する設定
        # S3のバケットに追加された時にlambdaが動く
        notification = s3n.LambdaDestination(lambda_fn)
        bucket.add_event_notification(s3.EventType.OBJECT_CREATED, notification)

app.py

 

#!/usr/bin/env python3
# app.py
import aws_cdk as cdk
from stacks.myapp_stack import MyAppStack

app = cdk.App()

MyAppStack(app, "MyAppStack")
app.synth()

 

デプロイ

 

cdk bootstrap
cdk deploy

動作確認

  1. SES経由でメールを送信
  2. メールがS3バケットに保存される(既に設定済み)
  3. そのS3イベントでLambdaが実行される
  4. DynamoDBのテーブルにメール情報が保存される