Giới thiệu
Trong thế giới số ngày nay, các tổ chức đang chìm ngập trong dữ liệu phi cấu trúc. Tài liệu, hình ảnh và PDF chứa thông tin có giá trị thường không được khai thác do nỗ lực thủ công cần thiết để trích xuất và phân tích chúng. Điều gì sẽ xảy ra nếu chúng ta có thể tự động xử lý những tài liệu này, trích xuất những hiểu biết có ý nghĩa và lưu trữ dữ liệu có cấu trúc để phân tích thêm?
Bài viết blog này sẽ hướng dẫn bạn xây dựng một pipeline xử lý tài liệu thông minh hoàn chỉnh bằng cách sử dụng các dịch vụ AWS. Pipeline của chúng ta sẽ tự động:
- Trích xuất văn bản từ hình ảnh và PDF bằng Amazon Textract
- Phân tích nội dung để tìm thực thể, cụm từ khóa và cảm xúc bằng Amazon Comprehend
- Lưu trữ kết quả có cấu trúc trong DynamoDB để truy vấn và phân tích dễ dàng
- Xử lý tài liệu tự động khi được tải lên S3
Những gì chúng ta đang xây dựng
Pipeline của chúng ta tạo ra một luồng liền mạch trong đó:
- Tài liệu được tải lên vào bucket S3 (hình ảnh, PDF, v.v.)
- Hàm Lambda kích hoạt tự động khi có file mới
- Textract trích xuất văn bản và xác định bố cục tài liệu
- Comprehend phân tích văn bản đã trích xuất để tìm hiểu biết
- Kết quả được lưu trữ trong DynamoDB với metadata có cấu trúc
Tổng quan Kiến trúc
Yêu cầu Hệ thống
- Lambda Runtime: Python 3.10
- Memory: 1024 MB (khuyến nghị)
- Timeout: 120 giây
- Environment Variables:
DDB_TABLE
: SmartDocResults (mặc định)LANG
: en (mặc định)
Triển khai Từng bước
Bước 1: Tạo Bảng DynamoDB
- Điều hướng đến AWS Console → DynamoDB
- Nhấp "Create table"
- Cấu hình:
- Tên bảng:
SmartDocResults
- Partition key:
doc_id
(String) - Sort key:
paragraph_id
(String)
- Tên bảng:
- Nhấp "Create table"
- Đợi trạng thái bảng = "Active"
Bước 2: Tạo Bucket S3
- AWS Console → S3
- Nhấp "Create bucket"
- Cấu hình:
- Tên bucket:
your-smart-doc-bucket
(thay đổi thành tên duy nhất) - Region: Chọn region ưa thích của bạn
- Tên bucket:
- Nhấp "Create bucket"
- Ghi nhớ tên bucket để sử dụng trong IAM policy
Bước 3: Tạo IAM Policy
- AWS Console → IAM → Policies
- Nhấp "Create policy"
- Chuyển sang tab "JSON"
- Sao chép nội dung từ
iam_policy.json
và thay thế placeholders:ACCOUNT_ID
: ID tài khoản AWS của bạnREGION
: Region của bạn (ví dụ: us-east-1)BUCKET_NAME
: Tên bucket S3 từ bước 2
- Nhấp "Next: Tags" → "Next: Review"
- Đặt tên policy:
SmartDocLambdaPolicy
- Nhấp "Create policy"
Least-privilege IAM Policy
{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3Access", "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": "arn:aws:s3:::BUCKET_NAME/*" }, { "Sid": "TextractAccess", "Effect": "Allow", "Action": [ "textract:AnalyzeDocument" ], "Resource": "*" }, { "Sid": "ComprehendAccess", "Effect": "Allow", "Action": [ "comprehend:DetectEntities", "comprehend:DetectKeyPhrases", "comprehend:DetectSentiment" ], "Resource": "*" }, { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:PutItem", "dynamodb:GetItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:REGION:ACCOUNT_ID:table/SmartDocResults" }, { "Sid": "CloudWatchLogs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:REGION:ACCOUNT_ID:*" } ]
}
Bước 4: Tạo IAM Role cho Lambda
- AWS Console → IAM → Roles
- Nhấp "Create role"
- Chọn "AWS service" → "Lambda"
- Nhấp "Next"
- Trong tab "Permissions":
- Tìm và chọn
SmartDocLambdaPolicy
vừa tạo - Check vào policy
- Tìm và chọn
- Nhấp "Next: Tags" → "Next: Review"
- Đặt tên role:
SmartDocLambdaRole
- Nhấp "Create role"
Bước 5: Tạo Hàm Lambda
- AWS Console → Lambda → Functions
- Nhấp "Create function"
- Chọn "Author from scratch"
- Cấu hình:
- Tên hàm:
SmartDocProcessor
- Runtime: Python 3.10
- Architecture: x86_64
- Tên hàm:
- Change default execution role:: Chọn "Use an existing role" →
SmartDocLambdaRole
- Nhấp "Create function"
Bước 6: Cấu hình Hàm Lambda
- Trong hàm Lambda, cuộn xuống phần "Code source"
- Xóa code mặc định và dán nội dung từ
lambda_function.py
- Nhấp "Deploy"
- Cấu hình "Configuration":
- General:
- Memory: 1024 MB
- Timeout: 2 phút
- General:
- Environment variables:
DDB_TABLE
:SmartDocResults
LANG
:en
import json
import boto3
import os
from datetime import datetime
from urllib.parse import unquote_plus
from typing import List, Dict, Any, Optional
import logging
from decimal import Decimal # Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO) # Initialize AWS clients
textract = boto3.client('textract')
comprehend = boto3.client('comprehend')
dynamodb = boto3.resource('dynamodb') # Environment variables
DDB_TABLE = os.environ.get('DDB_TABLE', 'SmartDocResults')
LANG = os.environ.get('LANG', 'en') def lambda_handler(event, context): """ Main Lambda handler for S3 → Textract → Comprehend → DynamoDB pipeline """ logger.info(f"Processing event: {json.dumps(event)}") # Get DynamoDB table table = dynamodb.Table(DDB_TABLE) # Process each S3 record for record in event.get('Records', []): try: # Extract S3 information bucket = record['s3']['bucket']['name'] key = unquote_plus(record['s3']['object']['key']) doc_id = os.path.basename(key) logger.info(f"Processing document: {doc_id} from bucket: {bucket}") # Step 1: Extract text using Textract text_lines = extract_text_from_s3(bucket, key) if not text_lines: logger.warning(f"No text extracted from {doc_id}") continue # Step 2: Split into paragraphs paragraphs = split_paragraphs(text_lines) logger.info(f"Found {len(paragraphs)} paragraphs in {doc_id}") # Step 3: Process each paragraph with Comprehend for paragraph_id, paragraph in enumerate(paragraphs, 1): if len(paragraph) >= 20: # Only process paragraphs with >= 20 characters logger.info(f"Processing paragraph {paragraph_id} (length: {len(paragraph)})") # Analyze with Comprehend entities = detect_entities_safe(paragraph, LANG) key_phrases = detect_key_phrases_safe(paragraph, LANG) sentiment = safe_detect_sentiment(paragraph, LANG) # Convert float values to Decimal for DynamoDB entities = convert_floats_to_decimal(entities) key_phrases = convert_floats_to_decimal(key_phrases) # Save to DynamoDB item = { 'doc_id': doc_id, 'paragraph_id': str(paragraph_id), # Convert to string 'content': paragraph, 'entities': entities, 'key_phrases': key_phrases, 'sentiment': sentiment, 'created_at': datetime.utcnow().isoformat() + 'Z' } table.put_item(Item=item) logger.info(f"Saved paragraph {paragraph_id} to DynamoDB") else: logger.info(f"Skipping paragraph {paragraph_id} (too short: {len(paragraph)} chars)") logger.info(f"Successfully processed document: {doc_id}") except Exception as e: logger.error(f"Error processing record {record}: {str(e)}") # Continue processing other records continue return { 'statusCode': 200, 'body': json.dumps('Processing completed') } def extract_text_from_s3(bucket: str, key: str) -> List[str]: """ Extract text from S3 object using Textract synchronous API """ try: response = textract.analyze_document( Document={ 'S3Object': { 'Bucket': bucket, 'Name': key } }, FeatureTypes=['LAYOUT'] ) # Extract LINE blocks and sort by reading order lines = [] line_blocks = [block for block in response['Blocks'