TOP 100 freq words 问题 问题:有一个 1G 大小的文件,里面每一行是一个词,每个词的大小不超过 16 字节,内存限制大小是 1M。返回出现频率最高的 100 个单词
思路 问题分析
有一个 1GB 的文件,每行是一个词,每个词不超过 16 字节。
内存限制为 1MB,因此无法一次性加载整个文件到内存中处理。
需要返回出现频率最高的 100 个词,要求精确结果(非近似)。
核心挑战:内存小,文件大,需高效处理大数据,避免内存溢出。
解决思路:分治法 + 外部排序 + 堆维护 Top 100 使用分治法将大文件分解为小文件块,每个块在内存中处理,然后多级合并部分结果。在合并过程中,维护一个最小堆来跟踪全局频率最高的 100 个词。思路简单直接,步骤如下:
分块(Divide) :
将大文件分割成多个小文件块,每个块大小约 500KB(约 32,000 行),确保块内不同词的数量不超过内存限制(1MB)。
为什么 500KB?内存 1MB,每个词最大 16 字节,哈希表条目(词 + 频率)约 20-32 字节(包括开销),32,000 个不同词需约 640KB,小于 1MB。
分割方法:顺序读取大文件,逐行写入块文件,直到块大小达到约 500KB。重复直到处理完整个文件。
块数量:文件 1GB ≈ 1,000,000,000 字节,每行平均约 10 字节(词 + 换行符),总行数约 100,000,000 行。每个块 32,000 行,块数约 3,125(100,000,000 / 32,000 ≈ 3,125)。
块内处理(Process Chunks) :
对每个块文件:
读入内存(约 500KB)。
使用哈希表统计词频:逐行读取,以词为键,累加频率。
块处理完后,对哈希表中的词按键(词)排序。
输出到临时文件:每行格式为 “词 频率”,并按词排序(便于后续合并)。
内存使用:哈希表存储词(最大 16 字节)和频率(4 字节整数),条目上限 32,000,总内存约 640KB,满足 1MB 限制。
输出:每个块生成一个排序后的部分频率文件。
第一级合并(Merge Partially) :
部分文件数量多(约 3,125 个),需分组合并,避免同时打开太多文件(文件描述符限制,假设一次最多打开 256 个文件)。
分组:将约 3,125 个部分文件分成 13 组(每组 240 文件)和 1 组(剩余文件),确保每组文件数 ≤ 256。
对每组执行多路合并:
使用优先队列(最小堆)按键(词)合并:堆中每个条目存储(词、频率、文件索引),大小约 30 字节。
合并逻辑:弹出最小词,检查所有输入文件是否有相同词,累加频率,输出全局(词,聚合频率)到新临时文件(按词排序)。
内存使用:优先队列(256 条目 × 30 字节 ≈ 7.7KB),输入缓冲区(每个文件 512 字节缓冲区 × 256 文件 ≈ 128KB),输出缓冲区(4KB),总内存约 140KB < 1MB。
输出:每组生成一个更大的排序频率文件(约 13 个文件)。
第二级合并与 Top 100 提取(Final Merge and Heap Update) :
第一级合并后,文件数少(约 13 个),可一次性合并。
执行多路合并(13 路):
使用优先队列按键(词)合并:堆中每个条目约 30 字节。
合并逻辑:弹出最小词,累加所有输入文件中的频率,得到全局(词,全局频率)。
同时维护一个最小堆(大小 100)用于 Top 100:以频率为键,存储(词,频率)。
对于每个全局(词,频率),插入堆中。
如果堆大小 > 100,弹出最小频率条目,确保堆始终保留最大频率的 100 个词。
内存使用:优先队列(13 条目 × 30 字节 ≈ 0.4KB),输入缓冲区(13 文件 × 4KB = 52KB),堆(100 条目 × 30 字节 ≈ 3KB),总内存约 55KB < 1MB。
注意:无需输出完整全局文件,只更新堆,节省内存和磁盘。
处理完所有词后,堆中即为频率最高的 100 个词。
输出结果 :
堆中元素按频率最小堆组织,但 Top 100 已确定。
将堆中元素按频率降序排序(例如,通过逐个弹出并反转),返回结果。
为什么满足内存限制?
分块大小(500KB)确保块内处理在 1MB 内。
合并阶段使用小缓冲区(每个文件 512 字节-4KB)和紧凑数据结构(优先队列条目 30 字节),总内存始终 < 1MB。
堆维护 Top 100 仅需约 3KB,不影响内存。
优点
简单直接:分治减少内存压力,外部排序处理大数据,堆高效维护 Top K。
精确结果:通过全局频率累加,确保正确性。
可扩展:适用于更大文件或更小内存。
此思路可在标准编程语言(如 Python、Java)中实现,注意文件 I/O 和缓冲区管理以优化性能。
解法一 第一步:将 1 G 拆分成小文件,每个小文件的大小不超过 1 MB ,其实应该留空间用于统计一个小文件中单词出现的频率,所以为了保险起见,每个文件的大小不超过 512 kb,那么得到 1 GB / 512 KB = 2048 个小文件
这里不用 hash 的方法来切分小文件,直接顺序读取然后写到小文件,小文件大小多到了 512kb 就关闭,写下一个小文件
第二步:分别对每个文件做下面的事情: ① 将小文件中的单词加载进内存 ② 使用 HashMap 进行单词统计 ③ 将 HashMap 中词频数据写到另一个新的小文件中,我们称为词频小文件 这里再写的时候,对单词进行 hash (word) % 2048 写到对应的文件中 这样做的目的是为了相同的单词放到同一个文件中。
第三步:初始化一个 100 个节点的小顶堆,用于保存 100 个出现频率最多的单词 分别对每个词频小文件做下面的事情: ① 将小文件中的单词及其频率加载进内存 ② 使用 HashMap 进行单词统计 ③ 遍历 HashMap 中单词词频,如果词频大于小顶堆中堆顶词频的话,则放入小顶堆,否则不放
最终,小顶堆中的 100 个单词就是出现频率最多的单词了
该方案在第二步的第 ③ 点的时候有点问题:就是当词频小文件大于 1 M 了,该怎么处理呢? 或者说极端情况下,每个单词都只出现一次,并且每个单词的 hash (word) % 2048 值都是相同的话,那词频小文件的大小都会超过 1 G 了
解法二 第一步:使用多路归并排序对大文件进行排序,这样相同的单词肯定是挨着的
第二步: ① 初始化一个 100 个节点的小顶堆,用于保存 100 个出现频率最多的单词 ② 遍历整个文件,一个单词一个单词的从文件中取出来,并计数 ③ 等到遍历的单词和上一个单词不同的话,那么上一个单词及其频率如果大于堆顶的词的频率,那么放在堆中,否则不放
最终,小顶堆中就是出现频率前 100 的单词了
多路归并排序对大文件进行排序的步骤如下: ① 将文件按照顺序切分成大小不超过 512KB 的小文件,总共 2048 个小文件 ② 使用 1MB 内存分别对 2048 个小文件中的单词进行排序 ③ 使用一个大小为 2048 大小的堆,对 2048 个小文件进行多路排序,结果写到一个大文件中
代码实现 数据准备和工具类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class FileIOUtils { public static BufferedReader getReader (String name) { try { FileInputStream inputStream = new FileInputStream (name); BufferedReader br = new BufferedReader (new InputStreamReader (inputStream)); return br; } catch (IOException e) { throw new RuntimeException ("IOException" , e); } } public static BufferedWriter getWriter (String name) { try { FileOutputStream outputStream = new FileOutputStream (name); BufferedWriter bw = new BufferedWriter (new OutputStreamWriter (outputStream)); return bw; } catch (IOException e) { throw new RuntimeException ("IOException" , e); } } public static void closeReader (Reader reader) { try { if (reader != null ) { reader.close(); } } catch (IOException e) { throw new RuntimeException ("IOException" , e); } } public static void closeWriter (Writer writer) { try { if (writer != null ) { writer.close(); } } catch (IOException e) { throw new RuntimeException ("IOException" , e); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class _0_WordsGenerator { private static Random r = new Random (); public static void main (String[] args) throws IOException { BufferedWriter writer = FileIOUtils.getWriter("data\\top100\\words.txt" ); char [] chars = {'a' , 'b' , 'c' , 'd' , 'e' , 'f' , 'g' }; int m = chars.length; for (int i = 0 ; i < 10000 ; i++) { StringBuilder line = new StringBuilder (); for (int j = 0 ; j < r.nextInt(16 ); j++) { line.append(chars[r.nextInt(m)]); } if (line.length() == 0 ) continue ; writer.write(line.toString()); writer.newLine(); } FileIOUtils.closeWriter(writer); } }
将一个大文件切割成多个小文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class _1_FileSplit { public void splitFile (String fileName) throws IOException { int fileNum = 0 ; String fileSuffix = "data\\top100\\raw_data\\" ; String littleFileName = fileSuffix + fileNum; long totalSize = 0 ; BufferedWriter bw = FileIOUtils.getWriter(littleFileName); BufferedReader br = FileIOUtils.getReader(fileName); String line = null ; while ((line = br.readLine()) != null ) { if (totalSize >= 512 * 1024 ) { FileIOUtils.closeWriter(bw); fileNum++; littleFileName = fileSuffix + fileNum; bw = FileIOUtils.getWriter(littleFileName); totalSize = 0 ; } totalSize += line.length(); bw.write(line); bw.newLine(); } FileIOUtils.closeReader(br); } public static void main (String[] args) throws IOException { String fileName = "data\\top100\\words.txt" ; new _1_FileSplit ().splitFile(fileName); } }
每个小文件中的单词进行排序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class _2_LittleFileSorter { public void sortEachFile (String dirName) throws IOException { File dir = new File (dirName); File[] littleFiles = dir.listFiles(); for (int i = 0 ; i < littleFiles.length; i++) { BufferedReader br = FileIOUtils.getReader(littleFiles[i].getName()); List<String> words = new ArrayList <>(); String line = null ; while ((line = br.readLine()) != null ) { words.add(line); } FileIOUtils.closeReader(br); Collections.sort(words); BufferedWriter bw = FileIOUtils.getWriter("data\\top100\\sorted_data\\" + i); for (String word : words) { bw.write(word); bw.newLine(); } FileIOUtils.closeWriter(bw); } } public static void main (String[] args) throws IOException { String dir = "data\\top100\\raw_data" ; new _2_LittleFileSorter ().sortEachFile(dir); } }
对有序的小文件进行外部排序 我们先写一个 BufferedIterator 类,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class BufferedIterator { private BufferedReader reader; private String head; BufferedIterator(BufferedReader reader) { this .reader = reader; } public boolean hasNext () { try { head = this .reader.readLine(); } catch (IOException e) { e.printStackTrace(); head = null ; } return head != null ; } public String next () { return head; } public void close () throws Exception { this .reader.close(); } }
然后使用多路归并排序来实现文件的外部排序,如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public class _3_ExternalSorter { public void mergeSort (String dirName) throws Exception { File dir = new File (dirName); File[] children = dir.listFiles(); PriorityQueue<BufferedIterator> minHeap = new PriorityQueue <>(children.length, new Comparator <BufferedIterator>() { @Override public int compare (BufferedIterator o1, BufferedIterator o2) { return o1.next().compareTo(o2.next()); } }); for (File file : children) { BufferedReader br = FileIOUtils.getReader(file.getName()); BufferedIterator buf = new BufferedIterator (br); if (buf.hasNext()) { minHeap.add(buf); } else { buf.close(); } } BufferedWriter bw = FileIOUtils.getWriter("data\\top100\\sorted_words.txt" ); while (!minHeap.isEmpty()) { BufferedIterator firstBuf = minHeap.poll(); bw.write(firstBuf.next()); bw.newLine(); if (firstBuf.hasNext()) { minHeap.add(firstBuf); } else { firstBuf.close(); } } FileIOUtils.closeWriter(bw); } public static void main (String[] args) throws Exception { String dirName = "data\\top100\\sorted_data\\" ; new _3_ExternalSorter ().mergeSort(dirName); } }
利用 100 大小的小顶堆得到出现频率 top 100 的单词 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public class _4_Top_100_Words { class Pair { String word; int cnt; Pair(String word, int cnt) { this .word = word; this .cnt = cnt; } } public String[] top_100 (String fileName) throws Exception { PriorityQueue<Pair> minHeap = new PriorityQueue <>(100 , new Comparator <Pair>() { @Override public int compare (Pair o1, Pair o2) { return o1.cnt - o2.cnt; } }); String prevWord = null ; int prevCnt = 0 ; BufferedReader br = FileIOUtils.getReader(fileName); String currWord = null ; while ((currWord = br.readLine()) != null ) { if (!currWord.equals(prevWord)) { if (minHeap.size() < 100 ) { minHeap.add(new Pair (prevWord, prevCnt)); } else if (prevCnt > minHeap.peek().cnt) { minHeap.remove(); minHeap.add(new Pair (prevWord, prevCnt)); } prevWord = currWord; prevCnt = 0 ; } prevCnt++; } String[] res = new String [100 ]; int index = 0 ; while (!minHeap.isEmpty()) { res[index++] = minHeap.poll().word; } return res; } public static void main (String[] args) throws Exception { String fileName = "data\\top100\\sorted_words.txt" ; String[] res = new _4_Top_100_Words ().top_100(fileName); System.out.println(Arrays.toString(res)); } }
MapReduce 框架 Hadoop Spark MapReduce 思路解决大文件词频统计与 Top 100 问题 核心思想 MapReduce 是分布式计算模型,天然适合处理海量数据。我们将问题分解为两个 MapReduce 作业:
词频统计作业 :精确计算每个词的全局频率
Top 100 提取作业 :从全局词频中筛选最高频词
系统架构 1 2 3 4 5 6 7 8 9 输入文件 (1GB) │ ├─ MapReduce Job 1 (词频统计) │ ├─ Mapper 局部词频统计 │ └─ Reducer 全局词频合并 │ └─ MapReduce Job 2 (Top 100 提取) ├─ Mapper 局部 Top 100 筛选 └─ Reducer 全局 Top 100 合并
Job 1: 全局词频统计 目标 :精确计算每个词的出现频率
Map 阶段
1 2 3 4 5 6 7 8 def mapper (block ): word_count = {} for word in block: word_count[word] = word_count.get(word, 0 ) + 1 for word, count in word_count.items(): yield (word, count)
内存控制 :每个 Mapper 处理单块(500KB),哈希表内存 ≈700KB < 1MB
输出示例 :(apple, 15)
, (banana, 23)
, …
Shuffle 阶段
按单词哈希分区(如 hash(word) % R
)
相同单词的计数发送到同一 Reducer
Reduce 阶段
输入 :相同单词的所有局部计数 [ (word, [count1, count2,...]) ]
1 2 3 def reducer (word, counts ): total = sum (counts) yield (word, total)
内存控制:
设置 Reducer 数量 R = 4000
每个 Reducer 处理约 1亿/R ≈ 25,000
个单词
内存占用:25,000 词 × 30 字节 ≈ 750KB < 1MB
输出 :全局词频文件(R
个分区文件)
Job 2: Top 100 提取 目标 :从全局词频中筛选频率最高的 100 个词
Map 阶段
输入 :Job 1 的输出文件(每个文件 ≈250KB)
1 2 3 4 5 6 7 8 9 10 11 12 def mapper (file ): heap = MinHeap(100 ) for (word, count) in file: if heap.size < 100 or count > heap.min (): heap.push((count, word)) if heap.size > 100 : heap.pop() for count, word in heap.items: yield (None , (word, count))
内存控制 :100 个词 × 30 字节 ≈ 3KB,远低于 1MB
输出示例 :(None, ("apple", 1580))
, (None, ("banana", 1423))
…
Reduce 阶段(单 Reducer)
输入 :所有 Mapper 的 Top 100 结果(共 4000 × 100 = 400,000
条)
1 2 3 4 5 6 7 8 9 10 11 def reducer (_, values ): heap = MinHeap(100 ) for (word, count) in values: if heap.size < 100 or count > heap.min (): heap.push((count, word)) if heap.size > 100 : heap.pop() return sorted (heap.items, reverse=True )
内存控制:
流式处理:逐条读取记录,不一次性加载
堆大小固定 100,内存 ≈3KB
文件缓冲区:4KB
总计 < 10KB << 1MB
关键优化与内存保障
动态 Reducer 数量 :
Job 1 中 R = 4000
确保每个 Reducer 处理 ≤25,000 词
公式:R > (总词数 × 30 字节) / 1MB
流式处理 :
Job 2 的 Reducer 逐条处理记录,避免加载全部 400,000 条数据
堆排序优势 :
时间复杂度 O(n log k)
(k=100
为常数级)
空间复杂度 O(k)
严格受限
分布式扩展性 :
文件更大时:增加 Mapper 数量
词量激增时:线性增加 Job 1 的 Reducer 数量
执行示例 1 2 3 4 5 6 7 8 9 flowchart TB A[1GB 输入文件] --> B{Job1 Mappers} B -->|局部词频| C[Shuffle] C --> D{Job1 Reducers} D -->|全局词频| E[4000 个分区文件] E --> F{Job2 Mappers} F -->|局部 Top100| G[Shuffle] G --> H{Job2 Reducer} H --> I[全局 Top100]
此方案严格满足 1MB 内存限制,充分利用 MapReduce 的分布式能力,同时保证结果精确性。在 Hadoop/Spark 等框架可直接实现。