for lead in leads:
构建哈希键
key_string = ''.join([str(lead.get(field, '')) for field in fields])
hash_value = hashlib.md5(key_string.encode()).hexdigest()
if hash_value not in seen_hashes:
seen_hashes.add(hash_value)
unique_leads.append(lead)
for lead in leads:
标准化电话号码
phone = lead.get('phone', '').strip()
if phone:
phone = ''.join(filter(str.isdigit, phone))
if len(phone) = 10:
lead['phone'] = phone
标准化邮箱
email = lead.get('email', '').lower().strip()
if email:
lead['email'] = email
清理姓名
name = lead.get('name', '').strip()
if name:
lead['name'] = name
cleaned_leads.append(lead)
return cleaned_leads
3.2 完整去重流程
python
class LeadDeduplicator:
def __init__(self):
self.duplicate_count = 0
2、根据策略选择去重方式
if strategy == 'strict':
result = self.strict_deduplicate(cleaned_leads)
elif strategy == 'fuzzy':
result = self.fuzzy_deduplicate(cleaned_leads)
else:
result = self.lenient_deduplicate(cleaned_leads)
4.1 数据库层面去重
sql
创建去重查询
WITH ranked_leads AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY email, phone
ORDER BY created_time DESC
) as rn
FROM leads
)
SELECT * FROM ranked_leads WHERE rn = 1;
4.2 分布式处理方案
python
from collections import defaultdict
import multiprocessing as mp
def parallel_deduplicate(leads_list, num_processes=4):
并行去重处理
分割数据
chunk_size = len(leads_list) // num_processes
chunks = [leads_list[i:i + chunk_size]
for i in range(0, len(leads_list), chunk_size)]
并行处理
with mp.Pool(num_processes) as pool:
results = pool.map(process_chunk, chunks)
合并结果
all_unique_leads = []
seen_keys = set()
for chunk_result in results:
for lead in chunk_result:
key = create_lead_key(lead)
if key not in seen_keys:
seen_keys.add(key)
all_unique_leads.append(lead)