記事
AWS CDKでS3に保存されたメールをLambdaで解析してDynamoDBに保存する
代表の中野です。今回は弊社のインフラのコード化の一環でaws cdkを採用したので、その解説記事です。 この記事では、すでに **SES → S3 保存** ができている状態を前提にして、 1. S3の保存イベントでLambdaを起動 2. Lambdaがメール内容を解析 3. DynamoDB にFrom・To・Subjectを保存 という仕組みを **AWS CDK (Python)** で構築します。

代表の中野です。今回は弊社のインフラのコード化の一環でaws cdkを採用したので、その解説記事です。
この記事では、すでに SES → S3 保存 ができている状態を前提にして、
- S3の保存イベントでLambdaを起動
- Lambdaがメール内容を解析
- DynamoDBにFrom・To・Subjectを保存
という仕組みを AWS CDK (Python) で構築します。
(すでに設定済み)
SES → S3 (受信メール保存)
(今回CDKで作る部分) S3:ObjectCreated
↓
Lambda (メール解析)
↓
DynamoDB (保存)
プロジェクト構成
mail_cdk/
├── app.py
├── stacks/
│ └── myapp_stack.py.py
└── lambda_functions/
└── s3_to_dynamodb.py
Lambda関数のコード (lambda_functions/s3_to_dynamodb.py)
S3からメールを取得し、From/To/Subjectを解析してDynamoDBに保存します。
# lambda_functions/s3_to_dynamodb.py
import boto3
import os
import json
import time
import email
from email.utils import parseaddr
from email.header import decode_header
from datetime import datetime, timezone, timedelta
class MailParser:
"""Minimal MailParser fallback when mailparser package is unavailable."""
def __init__(self):
self._msg = None
def parse_from_bytes(self, data: bytes):
self._msg = email.message_from_bytes(data)
def _decode_header(self, value: str) -> str:
"""=?UTF-8?B?...?= のようなエンコード文字列をデコード"""
if not value:
return value
decoded_parts = []
for part, enc in decode_header(value):
if isinstance(part, bytes):
decoded_parts.append(part.decode(enc or "utf-8", errors="replace"))
else:
decoded_parts.append(part)
return ''.join(decoded_parts)
@property
def subject(self):
return self._msg.get('subject') if self._msg else None
@property
def from_(self):
if not self._msg:
return None
addr_raw = self._msg.get('from')
# 表示名とアドレスを分離
_, addr = parseaddr(addr_raw)
return [(None, addr)] if addr else []
@property
def to(self):
if not self._msg:
return None
addr_raw = self._msg.get('to')
_, addr = parseaddr(addr_raw)
return [(None, addr)] if addr else []
@property
def body(self):
"""プレーンテキストの本文を取得"""
if not self._msg:
return None
if self._msg.is_multipart():
# multipartの場合、最初の text/plain パートを探す
for part in self._msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if content_type == "text/plain" and "attachment" not in content_disposition:
charset = part.get_content_charset() or "utf-8"
return part.get_payload(decode=True).decode(charset, errors="replace")
return None
else:
# シングルパートなら直接本文を返す
charset = self._msg.get_content_charset() or "utf-8"
return self._msg.get_payload(decode=True).decode(charset, errors="replace")
# .env をロード
def handler(event, context):
dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
table = dynamodb.Table("MailDeliveryTable")
jst = timezone(timedelta(hours=9))
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
event_time = record.get('eventTime')
if event_time:
utc_dt = datetime.fromisoformat(event_time.replace('Z', '+00:00'))
jst_dt = utc_dt.astimezone(jst)
received_at = jst_dt.isoformat()
else:
received_at = datetime.now(jst).isoformat()
# S3から.emlファイルを取得
response = s3.get_object(Bucket=bucket, Key=key)
raw_email = response['Body'].read()
# emailパーサーで解析 (mailparser or fallback)
parser = MailParser()
parser.parse_from_bytes(raw_email)
subject = parser.subject
from_addr = parser.from_[0][1] if parser.from_ else None
to_addr = parser.to[0][1] if parser.to else None
body = parser.body
# 必要な情報をDynamoDBに保存
table.put_item(
Item={
"messageId": key, # S3のキーを一意IDに
"received_at": received_at,
"subject": subject,
"from": from_addr,
"to": to_addr,
"body": body,
"s3_key": key,
"ttl": int(time.time()) + 60 * 60 * 24 * 30 # 30 days
}
)
return {"status": "done"}
CDKスタック (stacks/myapp_stack.py)
- 既存のS3バケット名を指定
- DynamoDBテーブルを作成
- Lambdaをデプロイし、S3イベントをトリガーに登録
# stacks/myapp_stack.py
from aws_cdk import (
Stack,
aws_lambda as _lambda,
aws_s3 as s3,
aws_s3_notifications as s3n,
aws_dynamodb as dynamodb,
Duration,
RemovalPolicy
)
from constructs import Construct
import os
from dotenv import load_dotenv
load_dotenv()
class MyAppStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# 既存のS3バケットを参照(SESで保存済みのバケット)
bucket = s3.Bucket.from_bucket_name(
self,
"MailBucket",
"nano-maildelivery"
)
# DynamoDBテーブル
table_name = "MailDeliveryTable"
table = dynamodb.Table(
self, "MailTable",
partition_key=dynamodb.Attribute(
name="messageId", type=dynamodb.AttributeType.STRING
),
sort_key=dynamodb.Attribute(
name="received_at", type=dynamodb.AttributeType.STRING
),
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
time_to_live_attribute="ttl",
removal_policy=RemovalPolicy.DESTROY,
table_name=table_name
)
# Lambda関数
lambda_fn = _lambda.Function(
self,
"S3ToDynamoLambda",
runtime=_lambda.Runtime.PYTHON_3_11,
handler="s3_to_dynamodb.handler",
code=_lambda.Code.from_asset(os.path.join(os.getcwd(), "lambda_functions")),
timeout=Duration.seconds(30),
function_name="S3ToDynamoLambda",
environment={
"TABLE_NAME": table.table_name
}
)
# LambdaにDynamoDBへの書き込み権限とS3読取権限を付与
table.grant_write_data(lambda_fn)
bucket.grant_read(lambda_fn)
# LambdaにS3イベントを通知する設定
# S3のバケットに追加された時にlambdaが動く
notification = s3n.LambdaDestination(lambda_fn)
bucket.add_event_notification(s3.EventType.OBJECT_CREATED, notification)
app.py
#!/usr/bin/env python3
# app.py
import aws_cdk as cdk
from stacks.myapp_stack import MyAppStack
app = cdk.App()
MyAppStack(app, "MyAppStack")
app.synth()
デプロイ
cdk bootstrap
cdk deploy
動作確認
- SES経由でメールを送信
- メールがS3バケットに保存される(既に設定済み)
- そのS3イベントでLambdaが実行される
- DynamoDBのテーブルにメール情報が保存される