电子邮件验证很棘手 — 简单的 “@” 检查是不够的,但使用复杂的正则表达式也会导致问题。
基本电子邮件结构检查
让我们从一个简单但有用的验证方法开始:
def basic_email_check(email: str) -> bool:
"""
Perform basic email format validation.
Args:
email: The email address to validate
Returns:
bool: True if email passes basic checks, False otherwise
"""
# Remove leading/trailing whitespace
email = email.strip()
# Basic checks
if not email: # Check if empty
return False
if len(email) > : # RFC length limit
return False
# Check for exactly one @
if email.count('@') != 1:
return False
# Split into local and domain parts
local, domain = email.split('@')
# Check local and domain part lengths
if len(local) > : # RFC limit
return False
if len(domain) > :
return False
# Check if local or domain are empty
if not local or not domain:
return False
return True
# Example usage
test_emails = [
'user@example.com',
'invalid.email@',
'@invalid.com',
'no.at.sign',
'multiple@@signs.com',
'space in@email.com'
]
for email in test_emails:
result = basic_email_check(email)
print(f"{email}: {'Valid' if result else 'Invalid'}")
这个基本的验证器:
- 检查是否存在一个 “@”
- 验证两个部分都不是空的
- 强制实施 RFC 长度限制
- 处理常见的边缘情况
添加模式匹配
让我们用模式匹配来增强我们的验证:
import re
def pattern_email_check(email: str) -> tuple[bool, str]:
"""
Validate email using pattern matching.
Args:
email: Email address to validate
Returns:
tuple: (is_valid, reason)
"""
if not basic_email_check(email):
return False, "Failed basic format check"
# Pattern for allowed characters in local part
local_pattern = r'^[a-zA-Z0-9.!#$%&\'*+/=?^_`{|}~-]+$'
# Pattern for domain (includes internationalized domains)
domain_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,}[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+$'
local, domain = email.split('@')
# Check local part
if not re.match(local_pattern, local):
return False, "Invalid characters in local part"
# Check for consecutive special characters
if '..' in local:
return False, "Consecutive dots not allowed"
if local[0] == '.' or local[-1] == '.':
return False, "Local part cannot start or end with dot"
# Check domain
if not re.match(domain_pattern, domain):
return False, "Invalid domain format"
return True, "Valid email address"
# Example usage with detailed feedback
test_emails = [
'user.name@example.com',
'user..name@example.com',
'@example.com">user.@example.com',
'user@subdomain.example.co.uk',
'user@invalid',
'user@.com',
'user.name@example.',
'user+filter@example.com'
]
print("Detailed Email Validation Results:")
for email in test_emails:
is_valid, reason = pattern_email_check(email)
print(f"\nEmail: {email}")
print(f"Valid: {is_valid}")
print(f"Reason: {reason}")
此验证器添加:
- 使用正则表达式进行字符集验证
- 连续点检查
- 正确的域格式验证
- 详细的反馈消息
真实世界的电子邮件验证类
以下是您可以在生产环境中使用的更完整的解决方案:
from dataclasses import dataclass
from typing import List, Optional
import dns.resolver
import re
@dataclass
class ValidationResult:
"""Store email validation results"""
is_valid: bool
errors: List[str]
warnings: List[str]
class EmailValidator:
def __init__(self, check_dns: bool = False):
self.check_dns = check_dns
self.errors = []
self.warnings = []
# Common disposable email domains
self.disposable_domains = {
'tempmail.com', 'throwaway.com', 'temporarymail.com'
}
def _check_format(self, email: str) -> bool:
"""Check email format using comprehensive pattern"""
pattern = r"""
^[a-zA-Z0-9.!#$%&'*+/=?^_`{|}~-]+
@
[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,}[a-zA-Z0-9])?
(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,}[a-zA-Z0-9])?)*$
"""
if not re.match(pattern, email, re.VERBOSE):
self.errors.append("Invalid email format")
return False
return True
def _check_dns(self, domain: str) -> bool:
"""Verify domain has MX records"""
try:
dns.resolver.resolve(domain, 'MX')
return True
except (dns.resolver.NXDOMAIN,
dns.resolver.NoAnswer,
dns.resolver.NoNameservers):
self.errors.append(f"Domain {domain} has no MX records")
return False
except Exception as e:
self.warnings.append(f"DNS check failed: {str(e)}")
return True
def _check_disposable(self, domain: str) -> None:
"""Check if domain is a known disposable email provider"""
if domain in self.disposable_domains:
self.warnings.append(
f"Domain {domain} appears to be a disposable email service"
)
def validate(self, email: str) -> ValidationResult:
"""
Validate an email address.
Args:
email: Email address to validate
Returns:
ValidationResult with validation status and messages
"""
self.errors = []
self.warnings = []
# Basic format check
if not email or not isinstance(email, str):
self.errors.append("Invalid input")
return ValidationResult(False, self.errors, self.warnings)
# Remove whitespace
email = email.strip()
# Length check
if len(email) > :
self.errors.append("Email too long")
return ValidationResult(False, self.errors, self.warnings)
# Format check
if not self._check_format(email):
return ValidationResult(False, self.errors, self.warnings)
# Split email into parts
local, domain = email.split('@')
# Check lengths
if len(local) > :
self.errors.append("Local part too long")
# Check for disposable email service
self._check_disposable(domain)
# Perform DNS check if enabled
if self.check_dns and not self._check_dns(domain):
return ValidationResult(False, self.errors, self.warnings)
# Return final result
is_valid = len(self.errors) == 0
return ValidationResult(is_valid, self.errors, self.warnings)
# Example usage
validator = EmailValidator(check_dns=True)
test_emails = [
'user@example.com',
'invalid.email@nonexistent.domain',
'user@tempmail.com',
'too..many.dots@example.com',
'valid.email@gmail.com'
]
print("Validation Results:")
for email in test_emails:
result = validator.validate(email)
print(f"\nEmail: {email}")
print(f"Valid: {result.is_valid}")
if result.errors:
print("Errors:", result.errors)
if result.warnings:
print("Warnings:", result.warnings)
此生产就绪型验证器包括:
- DNS MX 记录检查
- 一次性电子邮件检测
- 全面的格式验证
- 详细的错误和警告消息
- 清晰的关注点分离
与 Web 表单集成
以下是将电子邮件验证与 Flask Web 应用程序集成的方法:
from flask import Flask, request, jsonify
from typing import Dict, Any
import asyncio
import aiohttp
class WebEmailValidator:
def __init__(self):
self.validator = EmailValidator(check_dns=True)
async def check_email_deliverability(self, email: str) -> Dict[str, Any]:
"""Check email deliverability using external API"""
# Note: Replace with your preferred email verification service
API_KEY = 'your_api_key'
url = f'https://api.emailverifier.com/check/{email}'
async with aiohttp.ClientSession() as session:
async with session.get(url, headers={'ApiKey': API_KEY}) as response:
if response.status == :
return await response.json()
return {'deliverable': None, 'error': 'API check failed'}
async def validate_email(self, email: str) -> Dict[str, Any]:
"""Complete email validation with deliverability check"""
# First, do basic validation
result = self.validator.validate(email)
response = {
'email': email,
'is_valid': result.is_valid,
'errors': result.errors,
'warnings': result.warnings
}
# If basic validation passes, check deliverability
if result.is_valid:
deliverability = await self.check_email_deliverability(email)
response['deliverability'] = deliverability
return response
app = Flask(__name__)
validator = WebEmailValidator()
@app.route('/validate-email', methods=['POST'])
async def validate_email():
email = request.json.get('email')
if not email:
return jsonify({'error': 'Email required'}),
result = await validator.validate_email(email)
return jsonify(result)
# Form validation helper
def validate_signup_form(form_data: Dict[str, str]) -> Dict[str, Any]:
"""Validate signup form with email validation"""
result = {
'is_valid': True,
'errors': {},
'warnings': {}
}
# Validate email
email = form_data.get('email', '').strip()
email_validation = validator.validator.validate(email)
if not email_validation.is_valid:
result['is_valid'] = False
result['errors']['email'] = email_validation.errors
if email_validation.warnings:
result['warnings']['email'] = email_validation.warnings
return result
@app.route('/signup', methods=['POST'])
def signup():
validation = validate_signup_form(request.form)
if not validation['is_valid']:
return jsonify(validation),
# Continue with signup process...
批量电子邮件验证
以下是有效验证多封电子邮件的方法:
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict
import csv
from pathlib import Path
class BatchEmailValidator:
def __init__(self, max_workers: int = 5):
self.validator = EmailValidator(check_dns=True)
self.max_workers = max_workers
def validate_emails(self, emails: List[str]) -> List[Dict[str, Any]]:
"""Validate multiple emails in parallel"""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
results = list(executor.map(self.validator.validate, emails))
return [
{
'email': email,
'is_valid': result.is_valid,
'errors': result.errors,
'warnings': result.warnings
}
for email, result in zip(emails, results)
]
def validate_csv(self,
input_path: str,
output_path: str,
email_column: str = 'email') -> Dict[str, int]:
"""Validate emails from CSV file"""
stats = {'total': 0, 'valid': 0, 'invalid': 0}
# Read input CSV
with open(input_path, 'r') as infile:
reader = csv.DictReader(infile)
rows = list(reader)
# Validate emails
emails = [row[email_column] for row in rows]
results = self.validate_emails(emails)
# Write results
with open(output_path, 'w', newline='') as outfile:
fieldnames = ['email', 'is_valid', 'errors', 'warnings']
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
for result in results:
writer.writerow({
'email': result['email'],
'is_valid': result['is_valid'],
'errors': '; '.join(result['errors']),
'warnings': '; '.join(result['warnings'])
})
stats['total'] += 1
stats['valid'] += 1 if result['is_valid'] else 0
stats['invalid'] += 0 if result['is_valid'] else 1
return stats
# Example usage
validator = BatchEmailValidator()
# Validate list of emails
emails = [
'user@example.com',
'invalid.email',
'user@nonexistent.domain',
'valid.user@gmail.com'
]
results = validator.validate_emails(emails)
print("\nBatch Validation Results:")
for result in results:
print(f"\nEmail: {result['email']}")
print(f"Valid: {result['is_valid']}")
if result['errors']:
print("Errors:", result['errors'])
if result['warnings']:
print("Warnings:", result['warnings'])
# Validate CSV file
stats = validator.validate_csv(
'input_emails.csv',
'validation_results.csv'
)
print("\nCSV Validation Stats:", stats)
自定义验证规则
以下是使用您自己的规则扩展验证器的方法:
from typing import Callable, List
class CustomEmailValidator(EmailValidator):
def __init__(self, check_dns: bool = False):
super().__init__(check_dns)
self.custom_rules: List[Callable] = []
def add_rule(self, rule: Callable[[str], tuple[bool, str]]):
"""Add custom validation rule"""
self.custom_rules.append(rule)
def validate(self, email: str) -> ValidationResult:
"""Run basic validation plus custom rules"""
# Run basic validation first
result = super().validate(email)
# If basic validation passed, run custom rules
if result.is_valid:
for rule in self.custom_rules:
passed, message = rule(email)
if not passed:
result.is_valid = False
result.errors.append(message)
return result
# Example custom rules
def no_role_accounts(email: str) -> tuple[bool, str]:
"""Reject common role-based email addresses"""
roles = {'admin', 'support', 'info', 'sales', 'contact'}
local = email.split('@')[0].lower()
if local in roles:
return False, "Role-based email addresses not allowed"
return True, ""
def required_domain(email: str) -> tuple[bool, str]:
"""Ensure email is from allowed domain"""
allowed_domains = {'company.com', 'subsidiary.com'}
domain = email.split('@')[1].lower()
if domain not in allowed_domains:
return False, "Email must be from an approved domain"
return True, ""
# Example usage
validator = CustomEmailValidator()
validator.add_rule(no_role_accounts)
validator.add_rule(required_domain)
test_emails = [
'user@company.com',
'admin@company.com',
'contact@example.com',
'valid.user@subsidiary.com'
]
print("\nCustom Validation Results:")
for email in test_emails:
result = validator.validate(email)
print(f"\nEmail: {email}")
print(f"Valid: {result.is_valid}")
if result.errors:
print("Errors:", result.errors)
if result.warnings:
print("Warnings:", result.warnings)
这些示例演示如何:
- 将电子邮件验证与 Web 应用程序集成
- 高效处理多封电子邮件
- 添加自定义验证规则
- 处理实际验证场景
记得:
- 电子邮件验证比看起来更复杂
- 平衡彻底性与实用性
- 在选择验证规则时考虑您的特定用例
- 优雅地处理错误和边缘情况
- 清楚地记录您的验证规则