贪心算法
贪心算法(Greedy Algorithm)是算法设计的重要策略之一。它在算法执行的每一步都做出当前看来最优的选择,期望通过局部最优选择达到全局最优。
贪心算法是一种在每一步选择中都采取当前最优解的算法策略,期望通过局部最优达到全局最优。文章阐述了贪心算法的核心思想,包括贪心选择性质和最优子结构,并通过活动安排、最优装载、哈夫曼编码、Dijkstra 最短路径及最小生成树等经典案例,详细讲解贪心策略的设计与正确性证明。内容涵盖贪心算法与动态规划的对比,提供 Python 实现代码及复杂度分析,帮助理解该算法的应用场景与局限性。

贪心算法(Greedy Algorithm)是算法设计的重要策略之一。它在算法执行的每一步都做出当前看来最优的选择,期望通过局部最优选择达到全局最优。
与动态规划不同,贪心算法不需要求解所有子问题,而是通过贪心选择性质直接构造最优解,通常具有更低的时间和空间复杂度。
核心思想对比:
| 特征 | 贪心算法 | 动态规划 |
|---|---|---|
| 决策方式 | 每步做局部最优选择 | 自底向上求解所有子问题 |
| 是否回溯 | 不回溯 | 可能回溯(在构造解时) |
| 子问题求解 | 只求解必要的子问题 | 求解所有子问题 |
| 适用条件 | 需要贪心选择性质 | 需要最优子结构和重叠子问题 |
| 时间复杂度 | 通常更低 | 可能更高 |
| 空间复杂度 | 通常更低 | 可能需要存储 DP 表 |
有 n 个活动需要使用同一个资源(如会议室),每个活动 i 有开始时间 s[i] 和结束时间 f[i]。如果两个活动的时间不重叠,则称它们是相容的。目标是选择最大的相容活动集合。
示例:设有 11 个活动
活动 i: 1 2 3 4 5 6 7 8 9 10 11
s[i]: 1 3 0 5 3 5 6 8 8 2 12
f[i]: 4 5 6 7 9 9 10 11 12 14 16
问题:如何选择活动,使得能安排的活动数最多?
暴力枚举所有可能的活动组合:O(2ⁿ) 种可能,不可行!
观察问题特点:
可能的策略:
按持续时间最短优先选择 ✗
活动:[1, 5], [4, 6], [5, 7], [6, 8]
按持续时间:先选 [5, 7], [6, 8](各 2 小时)
最优解:选择 [1, 5], [5, 7] 或 [4, 6], [6, 8]
按开始时间最早优先选择 ✗
活动:[1, 10], [2, 3], [4, 5]
按开始时间:选择 [1, 10],只能选 1 个
最优解:选择 [2, 3], [4, 5],能选 2 个
贪心策略:将活动按结束时间从小到大排序,依次选择与已选活动相容的活动。
直观理解:
严格证明(贪心选择性质):
定理:按结束时间最早优先选择的贪心策略能得到最优解。
证明(归纳法):
设活动按结束时间排序为 f₁ ≤ f₂ ≤ … ≤ fₙ。
归纳基础:选择活动 1(结束最早的)是最优的。
归纳假设:假设前 k 个贪心选择是正确的。
归纳步骤:第 k+1 个贪心选择选择的是与之前所有活动相容中结束最早的活动。
结论:通过归纳,贪心策略能得到最优解。
以 6 个活动为例:
活动:A1 A2 A3 A4 A5 A6
时间:[1,4] [3,5] [0,6] [5,7] [3,9] [5,9]
排序后(按结束时间):A1[1,4] → A2[3,5] → A3[0,6] → A4[5,7] → A5[3,9] → A6[5,9]
4 5 6 7 9 9
贪心选择过程:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
步骤 1: 选择 A1[1,4]
剩余时间从 4 开始
步骤 2: 检查 A2[3,5]
开始 3 < 结束 4,冲突,跳过
步骤 3: 检查 A3[0,6]
开始 0 < 结束 4,冲突,跳过
步骤 4: 检查 A4[5,7]
开始 5 ≥ 结束 4,选择!
剩余时间从 7 开始
步骤 5: 检查 A5[3,9]
开始 3 < 结束 7,冲突,跳过
步骤 6: 检查 A6[5,9]
开始 5 < 结束 7,冲突,跳过
最终选择:{A1, A4}
最大活动数:2
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
from typing import List, Tuple
class Activity:
"""活动类"""
def __init__(self, start: int, finish: int, name: str = None):
self.start = start
self.finish = finish
self.name = name if name else f"[{start},{finish}]"
def __lt__(self, other):
"""按结束时间排序"""
return self.finish < other.finish
def __repr__(self):
return f"{self.name}"
def activity_selection(activities: List[Activity]) -> List[Activity]:
"""
活动安排问题的贪心算法
参数:
activities: 活动列表
返回:
选择的相容活动列表
"""
if not activities:
return []
# 按结束时间排序
sorted_activities = sorted(activities)
n = len(sorted_activities)
selected = [sorted_activities[0]] # 选择第一个活动
last_finish = sorted_activities[0].finish
print("贪心选择过程:")
print("=" * 50)
print(f"初始选择:{sorted_activities[0]}")
for i in range(1, n):
current = sorted_activities[i]
print(f"检查 {current}: ", end="")
if current.start >= last_finish:
selected.append(current)
last_finish = current.finish
print(f"选择 ✓ (当前时间:{last_finish})")
else:
print(f"跳过 ✗ (冲突)")
print("=" * 50)
return selected
# 递归解法(对比)
def activity_selection_recursive(s: List[int], f: List[int], k: int, n: int) -> List[int]:
"""
递归求解活动安排问题
参数:
s: 开始时间数组(1-indexed)
f: 结束时间数组(1-indexed)
k: 上一个选择的活动索引
n: 活动总数
返回:
选择的活动索引列表
"""
m = k + 1 # 找到第一个与活动 k 相容的活动
while m <= n and s[m] < f[k]:
m += 1
if m <= n:
return [m] + activity_selection_recursive(s, f, m, n)
return []
# 示例用法
if __name__ == "__main__":
# 示例 1:基本示例
print("示例 1:基本活动安排")
print("-" * 50)
activities = [
Activity(1, 4, "会议 1"),
Activity(3, 5, "会议 2"),
Activity(0, 6, "会议 3"),
Activity(5, 7, "会议 4"),
Activity(3, 9, "会议 5"),
Activity(5, 9, "会议 6"),
]
selected = activity_selection(activities)
print(f"\n最终选择:{selected}")
print(f"最大活动数:{len(selected)}")
# 示例 2:递归解法
print("\n\n示例 2:递归解法")
print("-" * 50)
s = [0, 1, 3, 0, 5, 3, 5] # 开始时间(1-indexed)
f = [0, 4, 5, 6, 7, 9, 9] # 结束时间
result = activity_selection_recursive(s, f, 0, 6)
print(f"递归解法选择的活动索引:{result}")
print(f"选择的活动数:{len(result)}")
输出:
示例 1:基本活动安排 -------------------------------------------------- 贪心选择过程: ================================================== 初始选择:会议 1 检查 会议 2: 跳过 ✗ (冲突) 检查 会议 3: 跳过 ✗ (冲突) 检查 会议 4: 选择 ✓ (当前时间:7) 检查 会议 5: 跳过 ✗ (冲突) 检查 会议 6: 跳过 ✗ (冲突) ================================================== 最终选择:[会议 1, 会议 4] 最大活动数:2 示例 2:递归解法 -------------------------------------------------- 递归解法选择的活动索引:[1, 4] 选择的活动数:2
定义:可以通过做出局部最优选择来达到全局最优。
关键特征:
判断方法:
定义:问题的最优解包含其子问题的最优解。
与动态规划的对比:
共同点:都要求最优子结构性质
区别:
| 特征 | 贪心算法 | 动态规划 |
|---|---|---|
| 选择方式 | 局部最优 | 全局最优 |
| 子问题求解 | 只求解必要的 | 求解所有子问题 |
| 是否回溯 | 不回溯 | 可能回溯 |
证明方法:
# 证明模板
"""
定理:[贪心策略描述] 能得到最优解。
证明:
1. 基础情况:证明第一步选择是正确的
2. 归纳假设:假设前 k 步选择是正确的
3. 归纳步骤:证明第 k+1 步选择也是正确的
4. 结论:通过归纳,整个策略正确
"""
# 证明模板
"""
定理:[贪心策略描述] 能得到最优解。
证明(反证法):
假设贪心策略不能得到最优解。
推导出矛盾,因此假设错误,贪心策略正确。
"""
# 证明模板
"""
定理:[贪心策略描述] 能得到最优解。
证明(交换论证):
1. 设贪心解为 G,最优解为 O
2. 证明可以通过逐步交换,将 O 转换为 G
3. 每次交换不会使解变差
4. 因此 G 也是最优解
"""
硬币找零问题:
给定硬币面值和目标金额,求最少硬币数。
from typing import List, Tuple
# 贪心算法
def coin_change_greedy(coins: List[int], amount: int) -> Tuple[int, List[int]]:
"""
贪心算法:每次选择面值最大的硬币
注意:贪心算法并非总是最优!
"""
coins.sort(reverse=True)
count = 0
remaining = amount
used = []
for coin in coins:
while remaining >= coin:
remaining -= coin
count += 1
used.append(coin)
return count, used
# 动态规划
def coin_change_dp(coins: List[int], amount: int) -> Tuple[int, List[int]]:
"""
动态规划:始终能得到最优解
时间复杂度:O(amount × len(coins))
"""
dp = [float('inf')] * (amount + 1)
dp[0] = 0
for i in range(1, amount + 1):
for coin in coins:
if coin <= i:
dp[i] = min(dp[i], dp[i - coin] + 1)
# 重构解
if dp[amount] == float('inf'):
return 0, []
used = []
remaining = amount
for coin in sorted(coins, reverse=True):
while remaining >= coin and dp[remaining] == dp[remaining - coin] + 1:
used.append(coin)
remaining -= coin
return dp[amount], used
# 对比
print("硬币找零:贪心 vs 动态规划")
print("=" * 50)
# 案例 1:贪心有效
coins1 = [1, 5, 10, 25]
amount1 = 67
greedy1, used1 = coin_change_greedy(coins1, amount1)
dp1, used_dp1 = coin_change_dp(coins1, amount1)
print(f"\n案例 1:面值{coins1},目标{amount1}")
print(f"贪心算法:{greedy1}个硬币 {used1}")
print(f"动态规划:{dp1}个硬币 {used_dp1}")
print(f"结果:{'相同 ✓' if greedy1 == dp1 else '不同 ✗'}")
# 案例 2:贪心失效
coins2 = [1, 3, 4]
amount2 = 6
greedy2, used2 = coin_change_greedy(coins2, amount2)
dp2, used_dp2 = coin_change_dp(coins2, amount2)
print(f"\n案例 2:面值{coins2},目标{amount2}")
print(f"贪心算法:{greedy2}个硬币 {used2}")
print(f"动态规划:{dp2}个硬币 {used_dp2}")
print(f"结果:{'相同 ✓' if greedy2 == dp2 else '不同 ✗ (贪心非最优!)'}")
输出:
硬币找零:贪心 vs 动态规划 ================================================== 案例 1:面值 [1, 5, 10, 25],目标 67 贪心算法:6 个硬币 [25, 25, 10, 5, 1, 1] 动态规划:6 个硬币 [25, 25, 10, 5, 1, 1] 结果:相同 ✓ 案例 2:面值 [1, 3, 4],目标 6 贪心算法:3 个硬币 [4, 1, 1] 动态规划:2 个硬币 [3, 3] 结果:不同 ✗ (贪心非最优!)
关键洞察:
有一批集装箱需要装船,每个集装箱有重量 w[i],船的载重量为 C。目标是装载尽可能多的集装箱。
注意:与 0-1 背包问题不同,这里的目标是最大化数量而不是总重量。
示例:
集装箱重量:[5, 3, 7, 2, 8, 4, 1, 6]
船的载重量:20
策略:按重量最轻优先装载
直观理解:
定理:按重量最轻优先装载的贪心策略能得到最优解。
证明(交换论证):
设贪心算法得到的解为 G = {g₁, g₂, …, gₖ},按重量排序(w₁ ≤ w₂ ≤ … ≤ wₖ)。 设最优解为 O = {o₁, o₂, …, oₘ},按重量排序。
目标:证明 k = m(贪心解也是最优解)
证明:
| 特征 | 最优装载问题 | 0-1 背包问题 |
|---|---|---|
| 目标 | 最大化数量 | 最大化总价值 |
| 物品属性 | 只有重量 | 重量 + 价值 |
| 贪心算法 | 适用 ✓ | 不适用 ✗ |
| 算法选择 | 贪心 O(n log n) | 动态规划 O(nW) |
关键区别:
from typing import List, Tuple
def optimal_loading(weights: List[int], capacity: int) -> Tuple[int, List[int]]:
"""
最优装载问题的贪心算法
参数:
weights: 集装箱重量列表
capacity: 船的载重量
返回:
(装箱数量,选择的集装箱重量)
"""
# 按重量从小到大排序
sorted_weights = sorted(weights)
count = 0
total_weight = 0
selected = []
print("贪心选择过程:")
print("=" * 60)
print(f"{'序号':<8}{'重量':<10}{'累计重量':<12}{'剩余容量':<10}{'决策':<10}")
print("-" * 60)
remaining = capacity
for i, weight in enumerate(sorted_weights, 1):
print(f"{i:<8}{weight:<10}{total_weight:<12}{remaining:<10}", end=" ")
if remaining >= weight:
selected.append(weight)
total_weight += weight
remaining -= weight
count += 1
print("装入 ✓")
else:
print("跳过 ✗(超出容量)")
print("-" * 60)
print(f"最终结果:装入 {count} 个集装箱,总重量 {total_weight}")
return count, selected
# 示例用法
if __name__ == "__main__":
# 示例 1
print("示例 1:基本最优装载")
print("-" * 50)
weights = [5, 3, 7, 2, 8, 4, 1, 6]
capacity = 20
print(f"集装箱重量:{weights}")
print(f"船的载重量:{capacity}\n")
count, selected = optimal_loading(weights, capacity)
print(f"\n装载方案:")
print(f" 装箱数量:{count}")
print(f" 集装箱重量:{selected}")
print(f" 总重量:{sum(selected)}/{capacity}")
输出:
示例 1:基本最优装载 -------------------------------------------------- 集装箱重量:[5, 3, 7, 2, 8, 4, 1, 6] 船的载重量:20 贪心选择过程: ---------------------------------------------------- 序号 重量 累计重量 剩余容量 决策 ---------------------------------------------------- 1 1 0 20 装入 ✓ 2 2 1 19 装入 ✓ 3 3 3 17 装入 ✓ 4 4 6 14 装入 ✓ 5 5 10 10 装入 ✓ 6 6 15 5 跳过 ✗(超出容量) 7 7 15 5 跳过 ✗(超出容量) 8 8 15 5 跳过 ✗(超出容量) ---------------------------------------------------- 最终结果:装入 5 个集装箱,总重量 15 装载方案: 装箱数量:5 集装箱重量:[1, 2, 3, 4, 5] 总重量:15/20
哈夫曼编码是一种前缀编码,用于数据压缩。目标是为字符集中的每个字符分配二进制编码,使得:
示例:
文本:"hello world"
字符频率:h=1, e=1, l=3, o=2, ' '=1, w=1, r=1, d=1
目标:构造最优前缀编码
前缀编码:没有任何编码是另一个编码的前缀。
优点:无需分隔符就能唯一解码
示例:
定长编码:A=00, B=01, C=10, D=11(前缀编码)
变长编码:A=0, B=10, C=110, D=111(前缀编码)
非前缀编码:A=0, B=01(B 的前缀是 A,有歧义)
策略:频率最小的两个节点优先合并
算法步骤:
直观理解:
严格证明:
定理:哈夫曼编码生成的编码是所有前缀编码中平均长度最短的。
证明(归纳法和交换论证):
引理 1:在最优前缀编码中,频率最低的两个字符必定是兄弟节点
引理 2:合并两个最小频率节点后,子问题也是最优的
归纳基础:n=2 时,显然哈夫曼编码是最优的
归纳步骤:对于 n 个字符,哈夫曼算法合并频率最低的 x 和 y
以文本 'hello world' 为例:
字符频率统计:h: 1, e: 1, l: 3, o: 2, ' ': 1, w: 1, r: 1, d: 1
按频率排序:d:1, h:1, e:1, ' ':1, w:1, r:1, o:2, l:3
构建过程:
步骤 1: 合并 d(1) 和 h(1) → dh(2)
步骤 2: 合并 e(1) 和 ' '(1) → e' '(2)
步骤 3: 合并 w(1) 和 r(1) → wr(2)
步骤 4: 合并 dh(2) 和 e' '(2) → dhe' '(4)
步骤 5: 合并 wr(2) 和 o(2) → wro(4)
步骤 6: 合并 dhe' '(4) 和 wro(4) → all(8)
步骤 7: 合并 all(8) 和 l(3) → root(11)
最终哈夫曼树:
root(11)
/ \
all(8) l(3)
/ \
dhe'(4) wro(4)
/ \ / \
dh(2) e'(2) wr(2) o(2)
/ \ / \ / \
d h e ' ' w r
编码结果:
l: 0
o: 100
r: 1010
w: 1011
d: 11000
h: 11001
e: 11010
' ': 11011
import heapq
from typing import Dict, List
from collections import Counter
class HuffmanNode:
"""哈夫曼树节点"""
def __init__(self, char=None, freq=0, left=None, right=None):
self.char = char
self.freq = freq
self.left = left
self.right = right
def __lt__(self, other):
"""用于堆比较"""
return self.freq < other.freq
def is_leaf(self):
"""判断是否为叶子节点"""
return self.left is None and self.right is None
def build_huffman_tree(frequency: Dict[str, int]):
"""构建哈夫曼树"""
# 创建优先队列
heap = []
for char, freq in frequency.items():
node = HuffmanNode(char=char, freq=freq)
heapq.heappush(heap, node)
print("\n构建哈夫曼树的过程:")
print("=" * 60)
step = 1
# 合并节点
while len(heap) > 1:
left = heapq.heappop(heap)
right = heapq.heappop(heap)
print(f"步骤{step}: 合并 {left.char or'内'}({left.freq}) + "
f"{right.char or'内'}({right.freq}) = {left.freq + right.freq}")
merged = HuffmanNode(
freq=left.freq + right.freq,
left=left,
right=right
)
heapq.heappush(heap, merged)
step += 1
print("=" * 60)
return heap[0]
def generate_codes(root: HuffmanNode, current_code: str = "", codes: Dict[str, str] = None) -> Dict[str, str]:
"""从哈夫曼树生成编码"""
if codes is None:
codes = {}
if root is None:
return codes
if root.is_leaf():
codes[root.char] = current_code if current_code else "0"
return codes
generate_codes(root.left, current_code + "0", codes)
generate_codes(root.right, current_code + "1", codes)
return codes
def huffman_encode(text: str, codes: Dict[str, str]) -> str:
"""使用哈夫曼编码对文本编码"""
return ''.join(codes[char] for char in text)
def huffman_decode(encoded: str, root: HuffmanNode) -> str:
"""使用哈夫曼树对编码解码"""
result = []
current = root
for bit in encoded:
if bit == '0':
current = current.left
else:
current = current.right
if current.is_leaf():
result.append(current.char)
current = root
return ''.join(result)
# 示例用法
if __name__ == "__main__":
text = "hello world"
frequency = Counter(text)
print(f"原始文本:'{text}'")
print(f"字符频率:{dict(frequency)}")
# 构建哈夫曼树
root = build_huffman_tree(frequency)
# 生成编码
codes = generate_codes(root)
print("\n哈夫曼编码表:")
print("=" * 50)
for char, code in sorted(codes.items(), key=lambda x: len(x[1])):
print(f"'{char}': {code} (长度:{len(code)})")
print("=" * 50)
# 编码和解码
encoded = huffman_encode(text, codes)
print(f"\n编码结果:{encoded}")
decoded = huffman_decode(encoded, root)
print(f"解码结果:'{decoded}'")
print(f"正确性:{'✓ 正确' if decoded == text else '✗ 错误'}")
# 压缩率
original_bits = len(text) * 8
compressed_bits = len(encoded)
print(f"\n压缩统计:")
print(f" 原始比特数:{original_bits}")
print(f" 压缩比特数:{compressed_bits}")
print(f" 压缩比:{compressed_bits / original_bits:.2%}")
print(f" 节省空间:{(1 - compressed_bits / original_bits) * 100:.1f}%")
输出:
原始文本:'hello world' 字符频率:{'h': 1, 'e': 1, 'l': 3, 'o': 2, ' ': 1, 'w': 1, 'r': 1, 'd': 1} 构建哈夫曼树的过程: ============================================================ 步骤 1: 合并 d(1) + h(1) = 2 步骤 2: 合并 e(1) + (1) = 2 步骤 3: 合并 w(1) + r(1) = 2 步骤 4: 合并 内 (2) + 内 (2) = 4 步骤 5: 合并 内 (2) + o(2) = 4 步骤 6: 合并 内 (4) + 内 (4) = 8 步骤 7: 合并 内 (8) + l(3) = 11 ============================================================ 哈夫曼编码表: ================================================== 'l': 0 (长度:1) 'o': 100 (长度:3) 'w': 1011 (长度:4) 'r': 1010 (长度:4) 'h': 11001 (长度:5) 'e': 11010 (长度:5) ' ': 11011 (长度:5) 'd': 11000 (长度:5) ================================================== 编码结果:11001110101101001001101001111110100 解码结果:'hello world' 正确性:✓ 正确 压缩统计:原始比特数:88 压缩比特数:37 压缩比:42.05% 节省空间:58.0%
给定带权有向图 G = (V, E) 和源点 s,求从 s 到图中所有其他顶点的最短路径。
约束:所有边权必须非负
示例:
图:A(0)
/ | \
4 2 1
/ | \
B C D
| | |
3 3 2
| | |
E F G
求:从 A 到所有其他顶点的最短路径
策略:选择距离估计最小的未访问顶点
算法步骤:
为什么这个策略正确?
定理:Dijkstra 算法能正确计算单源最短路径。
证明(归纳法):
不变量:
归纳基础:初始时 S = {s},d[s] = 0 显然正确
归纳步骤:设 u 是选择的顶点(d[u] 最小)。
import heapq
from typing import Dict, List, Tuple, Set
from collections import defaultdict
class Graph:
"""图类(邻接表表示)"""
def __init__(self):
self.adj = defaultdict(list)
def add_edge(self, u: str, v: str, weight: int, directed: bool = False):
"""添加边"""
self.adj[u].append((v, weight))
if not directed:
self.adj[v].append((u, weight))
def vertices(self) -> Set[str]:
"""获取所有顶点"""
vertices = set(self.adj.keys())
for u in self.adj:
for v, _ in self.adj[u]:
vertices.add(v)
return vertices
def dijkstra(graph: Graph, start: str) -> Tuple[Dict[str, float], Dict[str, str]]:
"""
Dijkstra 算法实现
参数:
graph: 图对象
start: 源点
返回:
(距离字典,前驱字典)
"""
vertices = graph.vertices()
distances = {v: float('infinity') for v in vertices}
predecessors = {v: None for v in vertices}
distances[start] = 0
# 优先队列:(距离,顶点)
pq = [(0, start)]
visited = set()
print("Dijkstra 算法执行过程:")
print("=" * 70)
print(f"{'步骤':<6}{'当前顶点':<10}{'距离':<8}{'操作':<40}")
print("-" * 70)
step = 1
while pq:
current_dist, current = heapq.heappop(pq)
if current in visited:
continue
visited.add(current)
print(f"{step:<6}{current:<10}{current_dist:<8}", end=" ")
# 松弛操作
relaxed = False
for neighbor, weight in graph.adj[current]:
if neighbor in visited:
continue
new_dist = current_dist + weight
if new_dist < distances[neighbor]:
distances[neighbor] = new_dist
predecessors[neighbor] = current
heapq.heappush(pq, (new_dist, neighbor))
relaxed = True
if not relaxed:
print("无可松弛边")
step += 1
print("-" * 70)
return distances, predecessors
def reconstruct_path(predecessors: Dict[str, str], start: str, end: str) -> List[str]:
"""重构最短路径"""
path = []
current = end
while current is not None:
path.append(current)
current = predecessors[current]
path.reverse()
if path[0] == start:
return path
return None
# 示例用法
if __name__ == "__main__":
graph = Graph()
graph.add_edge('A', 'B', 4)
graph.add_edge('A', 'C', 2)
graph.add_edge('B', 'C', 1)
graph.add_edge('B', 'D', 5)
graph.add_edge('C', 'D', 8)
graph.add_edge('C', 'E', 10)
graph.add_edge('D', 'E', 2)
print("图的邻接表:")
for u in sorted(graph.adj.keys()):
edges = ', '.join([f"{v}({w})" for v, w in graph.adj[u]])
print(f" {u} → [{edges}]")
distances, predecessors = dijkstra(graph, 'A')
print("\n算法结果:")
print("=" * 70)
print(f"{'顶点':<10}{'最短距离':<12}{'路径'}")
print("-" * 70)
for v in sorted(distances.keys()):
if distances[v] == float('infinity'):
path = "不可达"
else:
path = reconstruct_path(predecessors, 'A', v)
path_str = ' → '.join(path)
print(f"{v:<10}{distances[v]:<12}{path_str}")
print("=" * 70)
输出:
图的邻接表:A → [B(4), C(2)] B → [C(1), D(5)] C → [B(1), D(8), E(10)] D → [B(5), C(8), E(2)] E → [C(10), D(2)] Dijkstra 算法执行过程: ====================================================================== 步骤 当前顶点 距离 操作 ---------------------------------------------------------------------- 1 A 0 松弛 A→B(4) 2 C 2 松弛 C→B(1) 3 B 3 松弛 B→D(5) 4 E 5 无可松弛边 5 D 8 无可松弛边 ---------------------------------------------------------------------- 算法结果: ====================================================================== 顶点 最短距离 路径 ---------------------------------------------------------------------- A 0 A B 3 A → C → B C 2 A → C D 8 A → C → B → D E 5 A → C → D → E ======================================================================
| 算法 | 时间复杂度 | 边权限制 | 特点 |
|---|---|---|---|
| Dijkstra | O((V+E)log V) | 非负 | 最常用,贪心 |
| Bellman-Ford | O(VE) | 可负 | 可检测负环 |
| Floyd-Warshall | O(V³) | 可负 | 全源最短路径 |
给定带权无向连通图 G = (V, E),求一棵生成树 T,使得 T 中所有边的权值之和最小。
示例:
图:A---4---B
| \ |
2 1 3
| \ |
C---5---D
求:最小生成树
割性质(Cut Property):对于图的任意割,横跨该割的最小权边必属于某个 MST
环性质(Cycle Property):对于图中的任意环,该环中权值最大的边必不属于任何 MST
贪心策略:从已选顶点集选择最小权值边
算法步骤:
复杂度:O((V + E) log V)
贪心策略:全局选择最小权值边
算法步骤:
复杂度:O(E log E)
| 特性 | Prim 算法 | Kruskal 算法 |
|---|---|---|
| 策略 | 顶点驱动 | 边驱动 |
| 数据结构 | 优先队列 | 并查集 |
| 适用图 | 稠密图 | 稀疏图 |
| 中间结果 | 始终连通 | 可能不连通 |
import heapq
from typing import Dict, List, Tuple, Set
from collections import defaultdict
class Graph:
"""图类"""
def __init__(self):
self.adj = defaultdict(list)
def add_edge(self, u: str, v: str, weight: int):
"""添加无向边"""
self.adj[u].append((v, weight))
self.adj[v].append((u, weight))
def edges(self) -> List[Tuple[int, str, str]]:
"""获取所有边"""
edges = set()
for u in self.adj:
for v, w in self.adj[u]:
if u < v: # 避免重复
edges.add((w, u, v))
return sorted(edges)
class UnionFind:
"""并查集(带路径压缩和按秩合并)"""
def __init__(self, vertices: Set[str]):
self.parent = {v: v for v in vertices}
self.rank = {v: 0 for v in vertices}
def find(self, x: str) -> str:
"""查找(带路径压缩)"""
if self.parent[x] != x:
self.parent[x] = self.find(self.parent[x])
return self.parent[x]
def union(self, x: str, y: str) -> bool:
"""合并(返回是否成功)"""
root_x = self.find(x)
root_y = self.find(y)
if root_x == root_y:
return False
# 按秩合并
if self.rank[root_x] < self.rank[root_y]:
self.parent[root_x] = root_y
elif self.rank[root_x] > self.rank[root_y]:
self.parent[root_y] = root_x
else:
self.parent[root_y] = root_x
self.rank[root_x] += 1
return True
def prim_mst(graph: Graph, start: str) -> Tuple[List[Tuple[str, str, int]], int]:
"""Prim 算法实现"""
pq = [(0, start, None)]
visited = set()
mst_edges = []
total_weight = 0
print("\nPrim 算法执行过程:")
print("-" * 60)
while pq and len(visited) < len(graph.adj):
weight, current, from_vertex = heapq.heappop(pq)
if current in visited:
continue
visited.add(current)
if from_vertex is not None:
mst_edges.append((from_vertex, current, weight))
total_weight += weight
print(f"添加边:{from_vertex}-{current} (权值:{weight}), 累计:{total_weight}")
for neighbor, edge_weight in graph.adj[current]:
if neighbor not in visited:
heapq.heappush(pq, (edge_weight, neighbor, current))
return mst_edges, total_weight
def kruskal_mst(graph: Graph) -> Tuple[List[Tuple[str, str, int]], int]:
"""Kruskal 算法实现"""
edges = graph.edges()
vertices = set(graph.adj.keys())
for u in graph.adj:
for v, _ in graph.adj[u]:
vertices.add(v)
uf = UnionFind(vertices)
mst_edges = []
total_weight = 0
print("\nKruskal 算法执行过程:")
print("-" * 60)
for weight, u, v in edges:
if uf.union(u, v):
mst_edges.append((u, v, weight))
total_weight += weight
print(f"选择边:{u}-{v} (权值:{weight}), 累计:{total_weight}")
else:
print(f"跳过边:{u}-{v} (形成环)")
if len(mst_edges) == len(vertices) - 1:
break
return mst_edges, total_weight
# 示例用法
if __name__ == "__main__":
graph = Graph()
graph.add_edge('A', 'B', 4)
graph.add_edge('A', 'C', 4)
graph.add_edge('B', 'C', 2)
graph.add_edge('B', 'D', 3)
graph.add_edge('C', 'D', 5)
graph.add_edge('C', 'E', 2)
graph.add_edge('D', 'E', 1)
print("图的邻接表:")
for u in sorted(graph.adj.keys()):
edges = ', '.join([f"{v}({w})" for v, w in graph.adj[u]])
print(f" {u} → [{edges}]")
# Prim 算法
prim_edges, prim_weight = prim_mst(graph, 'A')
print(f"\nPrim 算法结果:")
print(f" MST 边数:{len(prim_edges)}")
print(f" 总权值:{prim_weight}")
# Kruskal 算法
kruskal_edges, kruskal_weight = kruskal_mst(graph)
print(f"\nKruskal 算法结果:")
print(f" MST 边数:{len(kruskal_edges)}")
print(f" 总权值:{kruskal_weight}")
Prim 算法:
Kruskal 算法:
有 n 个作业需要分配给 m 台相同的机器并行处理,每个作业 i 有处理时间 t[i]。目标是最小化所有作业的最大完成时间(makespan)。
注意:这是一个 NP 难问题,因此讨论近似算法。
示例:
作业时间:[8, 7, 6, 5, 4, 3, 2, 1]
机器数量:3
求:最小化最大完成时间
贪心策略:Longest Processing Time first(长作业优先)
算法步骤:
近似比:4/3 - 1/(3m)
LPT 算法不能保证得到最优解,但可以保证近似解不超过最优解的 4/3 倍。
定理:LPT 算法的近似比为 4/3 - 1/(3m)。
import heapq
from typing import List, Tuple
def lpt_schedule(jobs: List[int], m: int) -> Tuple[List[List[int]], int]:
"""
LPT 算法求解多机调度问题
参数:
jobs: 作业处理时间列表
m: 机器数量
返回:
(调度方案,makespan)
"""
# 按处理时间从大到小排序
sorted_jobs = sorted(jobs, reverse=True)
# 最小堆:(机器负载,机器编号)
heap = [(0, i) for i in range(m)]
heapq.heapify(heap)
# 调度结果
schedules = [[] for _ in range(m)]
print(f"\nLPT 算法执行过程({m}台机器):")
print("-" * 60)
for job_time in sorted_jobs:
load, machine = heapq.heappop(heap)
schedules[machine].append(job_time)
new_load = load + job_time
heapq.heappush(heap, (new_load, machine))
print(f"作业{job_time} → 机器{machine+1} (负载:{load}→{new_load})")
# 计算 makespan
makespan = max(load for load, _ in heap)
print("-" * 60)
return schedules, makespan
# 示例用法
if __name__ == "__main__":
jobs = [8, 7, 6, 5, 4, 3, 2, 1]
m = 3
print(f"作业时间:{jobs}")
print(f"机器数量:{m}")
schedules, makespan = lpt_schedule(jobs, m)
print(f"\n最终调度:")
for i, schedule in enumerate(schedules, 1):
total = sum(schedule)
jobs_str = ' + '.join(map(str, schedule))
print(f" 机器{i}: {jobs_str} = {total}")
print(f"\nMakespan: {makespan}")
| 特征 | 贪心算法 | 动态规划 |
|---|---|---|
| 决策方式 | 局部最优选择 | 自底向上求解 |
| 是否回溯 | 不回溯 | 可能回溯 |
| 子问题求解 | 只求解必要的 | 求解所有子问题 |
| 适用条件 | 贪心选择性质 | 最优子结构 + 重叠 |
| 时间复杂度 | 通常较低 | 可能较高 |
| 空间复杂度 | 通常较低 | 可能需要 DP 表 |
| 适用范围 | 较窄 | 较广 |
| 算法 | 问题类型 | 贪心策略 | 时间复杂度 |
|---|---|---|---|
| 活动安排 | 调度问题 | 结束时间最早优先 | O(n log n) |
| 最优装载 | 装载问题 | 重量最轻优先 | O(n log n) |
| 哈夫曼编码 | 数据压缩 | 频率最小节点优先合并 | O(n log n) |
| Dijkstra | 最短路径 | 距离最小顶点优先 | O((V+E)log V) |
| Prim | 最小生成树 | 最小权值边优先 | O((V+E)log V) |
| Kruskal | 最小生成树 | 全局最小边优先 | O(E log E) |
| LPT | 多机调度 | 长作业优先 | O(n log n) |

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online