设计五种算法精确的身份证号匹配
问题定义与数据准备
我们有两个Excel文件:
small.xlsx: 包含约5,000条记录。large.xlsx: 包含约140,000条记录。
目标:快速、高效地从large.xlsx中找出所有其“身份证号”字段存在于small.xlsx“身份证号”字段中的记录,并将这些匹配的记录保存到一个新的Excel文件result.xlsx中。
假设:身份证号字段名在两个表中都是id_card。
首先,我们进行准备工作,安装必要的库并模拟一些数据用于测试和性能估算。
pip install pandas openpyxl import pandas as pd import time import random # 为演示和测试,我们可以创建一些模拟数据(实际中使用pd.read_excel读取你的文件)defgenerate_id_card():"""生成一个模拟的18位身份证号""" region_code = random.choice(['110101','310104','440301'])# 随机地区码 birth_date =f"19{random.randint(50,99):02d}{random.randint(1,12):02d}{random.randint(1,28):02d}"# 随机生日 sequence_code =f"{random.randint(0,999):03d}"# 顺序码 check_code = random.choice(['X','0','1','2','3','4','5','6','7','8','9'])# 校验码return region_code + birth_date + sequence_code + check_code # 生成小表数据 (5000条) small_data ={'id_card':[generate_id_card()for _ inrange(5000)]} small_df = pd.DataFrame(small_data) small_df.to_excel('small.xlsx', index=False)# 生成大表数据 (140000条),并确保其中包含一部分小表中的ID large_list =[]# 首先,从small_df中随机取10000个ID(有重复,但身份证号唯一,所以用sample(n, replace=False))# 但为了确保有重复,我们这样做:大部分是新的,一部分是小的里面的。 ids_from_small = small_df['id_card'].tolist()# 假设大表中约有 3000 条记录与小表重合 overlap_ids = random.sample(ids_from_small,3000)for _ inrange(140000):# 约2%的概率使用一个重叠的ID,98%的概率生成一个新的IDif random.random()<0.02and overlap_ids: id_to_use = random.choice(overlap_ids)else: id_to_use = generate_id_card() large_list.append(id_to_use) large_data ={'id_card': large_list,'other_data':['Some other info']*140000} large_df = pd.DataFrame(large_data) large_df.to_excel('large.xlsx', index=False)print("模拟数据生成完成!")print(f"小表尺寸: {small_df.shape}")print(f"大表尺寸: {large_df.shape}")现在,我们开始设计算法。
算法一:暴力双重循环 (Brute Force Double Loop)
原理与步骤
这是最直观、最基础的方法。
- 加载数据:将两个Excel表分别读入Pandas DataFrame,
small_df和large_df。 - 嵌套循环:
- 外层循环遍历
large_df的每一行(140,000次迭代)。 - 内层循环遍历
small_df的每一行(5,000次迭代)。 - 对于每一对
(large_id, small_id),比较它们是否相等。
- 外层循环遍历
- 收集结果:如果相等,则将
large_df的当前行标记为匹配。 - 保存结果:将所有标记的行提取出来,保存到新的Excel文件。
Python实现
defalgorithm_1_brute_force(small_file, large_file, output_file):""" 算法1:暴力双重循环 """print("算法1:暴力双重循环 - 开始执行") start_time = time.time()# 1. 加载数据 small_df = pd.read_excel(small_file) large_df = pd.read_excel(large_file)# 确保id_card列是字符串类型,避免因数据类型导致的匹配失败 small_df['id_card']= small_df['id_card'].astype(str) large_df['id_card']= large_df['id_card'].astype(str)# 获取小表的身份证号列表 small_ids = small_df['id_card'].tolist()# 2. 嵌套循环比对 matched_rows =[]# 存储匹配的行索引或行数据 large_ids = large_df['id_card'].tolist()# 使用enumerate同时获取索引和值for i, large_id inenumerate(large_ids):for small_id in small_ids:if large_id == small_id: matched_rows.append(i)# 记录匹配的行索引break# 找到一个匹配就可以跳出内层循环,进入下一个大表ID# 3. 收集结果 result_df = large_df.iloc[matched_rows]# 4. 保存结果 result_df.to_excel(output_file, index=False) end_time = time.time() execution_time = end_time - start_time print(f"算法1完成。找到 {len(result_df)} 条匹配记录。耗时: {execution_time:.4f} 秒")return execution_time # 预计运行时间极长,在实际测试中可能跳过# time_1 = algorithm_1_brute_force('small.xlsx', 'large.xlsx', 'result_1.xlsx')优劣对比
- 优点:
- 实现极其简单,逻辑清晰,无需任何复杂的数据结构知识。
- 无需额外内存(除了存储结果)。
- 缺点:
- 时间复杂度极高:O(n * m),其中n是large表大小(140,000),m是small表大小(5,000)。总计 140,000 * 5,000 = 700,000,000 次比较。这是一个巨大的数字。
- 预计运行时间:在普通的Python环境中,每秒大约能进行10万到100万次简单比较(取决于硬件)。按乐观的100万次/秒计算,也需要 700秒(约12分钟)。在实际的Pandas操作中,由于循环开销,速度会更慢,预计需要 30分钟到数小时。绝对不推荐用于生产环境。
算法二:利用Pandas的isin()方法
原理与步骤
这是对暴力算法的极大优化,利用了Pandas内置的高效向量化操作。
- 加载数据:同算法一。
- 创建查询集合:将
small_df的“身份证号”列转换为一个Python集合(Set)。集合是基于哈希表实现的,其in操作的查询时间复杂度是平均O(1)。 - 向量化筛选:使用Pandas的
isin()方法。该方法会接收一个集合或列表,并返回一个布尔序列(Series),指示large_df的“身份证号”列中的每个元素是否存在于给定的集合中。 - 布尔索引:使用这个布尔序列对
large_df进行索引,快速筛选出所有匹配的行。 - 保存结果:同算法一。
Python实现
defalgorithm_2_pandas_isin(small_file, large_file, output_file):""" 算法2:利用Pandas的isin和集合 """print("算法2:Pandas isin() - 开始执行") start_time = time.time()# 1. 加载数据 small_df = pd.read_excel(small_file) large_df = pd.read_excel(large_file) small_df['id_card']= small_df['id_card'].astype(str) large_df['id_card']= large_df['id_card'].astype(str)# 2. 创建查询集合# 首先将small的id_card列转换为一个集合。# 集合的查找速度是O(1),非常快。 target_set =set(small_df['id_card'])# 3. 向量化筛选# isin()函数会检查large_df['id_card']的每个元素是否在target_set中 mask = large_df['id_card'].isin(target_set)# 4. 使用布尔索引获取结果 result_df = large_df[mask]# 5. 保存结果 result_df.to_excel(output_file, index=False) end_time = time.time() execution_time = end_time - start_time print(f"算法2完成。找到 {len(result_df)} 条匹配记录。耗时: {execution_time:.4f} 秒")return execution_time # 这是最推荐的方法之一,预计运行时间很短 time_2 = algorithm_2_pandas_isin('small.xlsx','large.xlsx','result_2.xlsx')优劣对比
- 优点:
- 实现非常简单,代码非常简洁。
- 速度极快。
isin()是Pandas内部用C优化过的向量化操作,背后通常也使用了哈希表机制。它的时间复杂度可以近似看作是O(n),n是large表的大小。创建集合是O(m)。 - 内存使用可控。集合占用的内存远小于DataFrame本身。
- 缺点:
- 本质上还是需要将小表的数据完全加载到内存中构建集合。
- 对于极端海量数据(例如小表有上亿条),构建集合可能成为瓶颈,但在本场景(5000条)中完全不是问题。
- 预计运行时间:1秒以内。这是处理此类问题的首选标准方法。
算法三:Pandas Merge(合并)
原理与步骤
利用数据库的INNER JOIN思想,使用Pandas的合并功能。
- 加载数据:同算法一。
- 执行内连接:使用
pd.merge()函数,以“身份证号”作为连接键,对两个DataFrame进行内连接。内连接的特性是只会保留两个表中键值匹配的行。 - 保存结果:连接的结果就是我们需要的数据,直接保存即可。需要注意的是,如果两个表有其他同名列,会产生后缀,但我们的目标只是身份证号匹配,所以可以直接保存。
Python实现
defalgorithm_3_pandas_merge(small_file, large_file, output_file):""" 算法3:Pandas Merge (Inner Join) """print("算法3:Pandas Merge - 开始执行") start_time = time.time()# 1. 加载数据 small_df = pd.read_excel(small_file) large_df = pd.read_excel(large_file) small_df['id_card']= small_df['id_card'].astype(str) large_df['id_card']= large_df['id_card'].astype(str)# 2. 执行内连接# on参数指定连接的列名。how='inner'表示内连接。 result_df = pd.merge(large_df, small_df[['id_card']], on='id_card', how='inner')# 注意:这里使用small_df[['id_card']]而不是small_df,是为了避免两个表有其他同名列导致合并后产生重复列(_x, _y)。# 我们只需要用small表的id_card列作为匹配键,不需要它的其他数据。# 3. 保存结果 result_df.to_excel(output_file, index=False) end_time = time.time() execution_time = end_time - start_time print(f"算法3完成。找到 {len(result_df)} 条匹配记录。耗时: {execution_time:.4f} 秒")return execution_time # 这也是非常高效且推荐的方法 time_3 = algorithm_3_pandas_merge('small.xlsx','large.xlsx','result_3.xlsx')优劣对比
- 优点:
- 实现极其简洁,语义清晰,一看就懂是做连接操作。
- 速度非常快。Pandas的
merge函数底层也经过了高度优化,通常基于哈希或排序-合并算法,效率很高。其性能与isin()方法在同一数量级。 - 可以轻松处理更复杂的连接条件(多列作为键)。
- 缺点:
- 会比
isin()方法产生一些额外的中间开销,因为需要协调两个表的列。但在本场景下差异微乎其微。 - 如果连接键不唯一,可能会产生笛卡尔积,导致结果行数爆炸(但身份证号是唯一的,所以不存在这个问题)。
- 会比
- 预计运行时间:1秒左右。与算法二同为首选标准方法。
算法四:使用数据库(SQLite)
原理与步骤
将数据加载到内存数据库(如SQLite)中,使用SQL语言的IN或JOIN语句来让数据库引擎完成高效的查找工作。
- 创建内存数据库:使用
sqlite3模块在内存中创建一个临时数据库。 - 导入数据:将两个Pandas DataFrame分别写入数据库中的两个表(例如
small_table和large_table)。 - 执行SQL查询:编写SQL查询语句(例如
SELECT large_table.* FROM large_table WHERE large_table.id_card IN (SELECT id_card FROM small_table))。 - 获取结果:将SQL查询的结果读回一个Pandas DataFrame。
- 保存结果:同算法一。
Python实现
import sqlite3 defalgorithm_4_sqlite(small_file, large_file, output_file):""" 算法4:使用SQLite内存数据库 """print("算法4:SQLite内存数据库 - 开始执行") start_time = time.time()# 1. 加载数据 small_df = pd.read_excel(small_file) large_df = pd.read_excel(large_file) small_df['id_card']= small_df['id_card'].astype(str) large_df['id_card']= large_df['id_card'].astype(str)# 2. 创建内存数据库连接 conn = sqlite3.connect(':memory:')# 3. 导入数据到数据库 small_df.to_sql('small_table', conn, index=False) large_df.to_sql('large_table', conn, index=False)# 4. 编写并执行SQL查询# 方法A: 使用IN子句 query =""" SELECT large_table.* FROM large_table WHERE large_table.id_card IN (SELECT id_card FROM small_table) """# 方法B: 使用INNER JOIN (通常数据库会对JOIN优化得更好)#"# SELECT l.* # FROM large_table l# INNER JOIN small_table s ON l.id_card = s.id_card# """ result_df = pd.read_sql_query(query, conn)# 5. 关闭连接 conn.close()# 6. 保存结果 result_df.to_excel(output_file, index=False) end_time = time.time() execution_time = end_time - start_time print(f"算法4完成。找到 {len(result_df)} 条匹配记录。耗时: {execution_time:.4f} 秒")return execution_time time_4 = algorithm_4_sqlite('small.xlsx','large.xlsx','result_4.xlsx')优劣对比
- 优点:
- 非常高效。SQLite等数据库引擎是专门为快速数据查询而设计的,会自动为连接键创建索引以加速查询(虽然在这个一次性操作中我们没显式创建,但它的查询优化器仍然很强大)。
- 处理海量数据时优势更明显。如果数据量大到Pandas操作困难,可以改用磁盘数据库,并显式创建索引。
- 可以利用强大的SQL语法处理极其复杂的匹配逻辑。
- 缺点:
- 实现步骤稍多,需要将数据导入导出数据库,增加了额外的I/O开销(尽管是在内存中)。
- 对于这种规模的问题,其性能通常略慢于纯Pandas的向量化操作(算法二和三),因为多了数据迁移的步骤。
- 预计运行时间:1-3秒。在数据量极大(数千万以上)或匹配逻辑复杂时,此方法优势会显现。
算法五:分块处理 (Chunking)
原理与步骤
这个算法并非用于提升速度,而是用于解决内存不足的问题。当large.xlsx文件巨大(例如几个GB),无法一次性读入内存时,就需要使用此方法。它结合了算法二的思想。
- 加载小数据:将
small.xlsx全部读入内存,并创建集合S。 - 分块读取大数据:使用Pandas的
read_excel()的chunksize参数,分批读取large.xlsx。每次只读取一个块(例如10000行)到内存中。 - 逐块处理:对于每个数据块,使用
isin(S)方法筛选出匹配的行。 - 累积结果:将每个块的处理结果追加到一个列表中,或者直接写入结果文件。
- 合并保存结果:处理完所有块后,将累积的结果合并并保存,或者由于是逐块写入文件,最后只需要确保文件关闭即可。
Python实现
defalgorithm_5_chunking(small_file, large_file, output_file, chunksize=10000):""" 算法5:分块处理(用于内存不足的大文件场景) """print("算法5:分块处理 - 开始执行") start_time = time.time()# 1. 加载小数据并创建集合 small_df = pd.read_excel(small_file) small_df['id_card']= small_df['id_card'].astype(str) target_set =set(small_df['id_card'])# 2. 初始化一个列表来存储每个块的结果,或者直接写入文件# 我们采用直接写入文件的方式,避免在内存中累积巨大的DataFrame first_chunk =True# 使用一个csv writer或者每次追加DataFrame到Excel比较麻烦。# 更简单的方法是:使用一个列表收集结果DF,最后再concat。但如果结果也很大,内存会爆。# 因此,我们使用‘a’(append)模式写入Excel,但注意OpenPyXL需要每次创建一个新的writer对象。# 这里为简单起见,我们收集块结果,最后统一写入。假设结果集本身不会太大(最多140000行中的一小部分)。 chunks_result_list =[]# 3. 分块读取大数据 chunk_reader = pd.read_excel(large_file, chunksize=chunksize)for chunk in chunk_reader: chunk['id_card']= chunk['id_card'].astype(str)# 4. 处理当前块 mask = chunk['id_card'].isin(target_set) filtered_chunk = chunk[mask] chunks_result_list.append(filtered_chunk)print(f"已处理一个数据块,该块找到 {len(filtered_chunk)} 条匹配记录。")# 5. 合并结果并保存if chunks_result_list: final_result_df = pd.concat(chunks_result_list, ignore_index=True)else: final_result_df = pd.DataFrame()# 创建一个空DataFrame final_result_df.to_excel(output_file, index=False) end_time = time.time() execution_time = end_time - start_time print(f"算法5完成。找到 {len(final_result_df)} 条匹配记录。耗时: {execution_time:.4f} 秒")return execution_time # 这个算法主要用于演示大文件处理,对于140000行数据,没必要分块。# time_5 = algorithm_5_chunking('small.xlsx', 'large.xlsx', 'result_5.xlsx', chunksize=50000)优劣对比
- 优点:
- 内存友好。核心优势是能够处理大于内存的数据文件,因为它永远不会同时将整个大表加载到内存中。
- 结合了Pandas向量化操作的效率。
- 缺点:
- 速度可能稍慢:由于需要多次读取文件、循环处理每个块,会产生额外的I/O开销和循环开销。比算法二、三要慢。
- 实现稍复杂。
- 预计运行时间:2-5秒(主要开销在分块读取Excel上)。只有在处理超大文件、内存不足时才需要使用。对于140000条记录,完全不需要分块。
总结与最终对比
我们将五种算法的优缺点和适用场景总结如下:
| 算法 | 优点 | 缺点 | 预计时间 | 推荐度 |
|---|---|---|---|---|
| 1. 暴力循环 | 实现简单 | 速度极慢,无法忍受 | ~30分钟以上 | ⭐(绝不推荐) |
| 2. Pandas isin() | 实现简单,速度最快 | 需要内存容纳小表集合 | <1秒 | ⭐⭐⭐⭐⭐(首选) |
| 3. Pandas Merge | 实现简单,速度最快 | 略有额外开销 | ~1秒 | ⭐⭐⭐⭐⭐(首选) |
| 4. SQLite | 高效,支持复杂查询,海量数据优势 | 步骤稍多,数据迁移开销 | 1-3秒 | ⭐⭐⭐⭐(备用方案) |
| 5. 分块处理 | 内存友好,可处理超大文件 | 速度较慢,实现稍复杂 | 2-5秒 | ⭐⭐⭐(特殊场景) |
最终结论与建议:
- 对于你的具体问题(5000条 vs 140000条):
- 毫不犹豫地选择算法二(
isin())或算法三(merge())。它们是专门为这种表格数据操作设计的,代码简洁、效率最高。两者性能差异可以忽略不计,按个人编码喜好选择即可。
- 毫不犹豫地选择算法二(
- 如果你的小表也变得非常大(例如上百万条):
- 算法二和三仍然有效,但创建集合或进行合并的内存消耗会变大。此时需要确保你的机器有足够RAM。
- 如果你的大表巨大(例如几十GB,无法读入内存):
- 选择算法五(分块处理),它是解决此类问题的标准范式。
- 如果你的匹配逻辑变得非常复杂(不仅仅是相等匹配):
- 考虑使用算法四(SQLite),利用SQL强大的表达能力来编写复杂查询,或者使用专门的记录链接库(如
recordlinkage)。
- 考虑使用算法四(SQLite),利用SQL强大的表达能力来编写复杂查询,或者使用专门的记录链接库(如
- 算法一(暴力循环):永远只存在于教学示例中,用于提醒大家时间复杂度的重要性。
代码执行:你可以创建一个主函数来运行和比较这些算法(除了算法一)。
if __name__ =='__main__': files =('small.xlsx','large.xlsx') times ={} times['alg_2']= algorithm_2_pandas_isin(*files,'result_2.xlsx') times['alg_3']= algorithm_3_pandas_merge(*files,'result_3.xlsx') times['alg_4']= algorithm_4_sqlite(*files,'result_4.xlsx')# 对于这个数据量,算法5可能比2和3慢,但可以测试 times['alg_5']= algorithm_5_chunking(*files,'result_5.xlsx', chunksize=50000)print("\n=== 所有算法耗时对比 ===")for alg, t in times.items():print(f"{alg}: {t:.4f} 秒")在实际运行中,你会看到算法二和三以绝对优势胜出。
