海量数据排序思路
核心方案:外排序(分治+多路归并)MapReduce
外排序是指数据量太大,无法全部加载到内存中,需要将数据分成多个小块进行排序,然后将排序后的小块合并成一个大的有序块
1.分块排序(Map阶段)
- 分块策略
按1G内存容量限制,将100G文件拆分为 200个500MB分块(保留内存用于排序计算和系统开销)
- 内存排序
每个分块加载至内存后:
① 使用快速排序(时间复杂度O(n log n))
② 去重优化:若存在重复ID,排序时合并相同项(节省存储空间),根据要求是否去重
- 临时文件写入
排序后分块写入磁盘,命名规则:chunk_0001.sorted ~ chunk_0200.sorted
2. 多路归并(Reduce阶段)
使用K路归并算法合并这些排序好的小文件
具体实现方法:
- 从每个小文件中读取一小部分数据到内存(例如每个文件读取几MB)
- 建立最小堆(优先队列)来追踪当前每个文件的最小值
- 每次从堆中取出最小值,写入输出文件
- 从该最小值所在的文件再读入一个ID到堆中
- 重复上述过程直到所有文件都处理完
具体示例:
- ① 分批归并:每轮合并50个分块(50路归并),共需4轮(200/50)
- ② 堆排序优化:使用 最小堆(PriorityQueue) 管理各分块当前读取值
内存管理:
区域分配比例说明输入缓冲区800MB每个分块预读16MB(50分块×16MB)输出缓冲区200MB写满后批量刷新至磁盘3.核心代码(Java实现)
阶段1:分割大文件,并排序小文件,输出多个小文件排序后的结果- private static List<File> splitAndSort(String inputFile) throws IOException {
- List<File> chunks = new ArrayList<>();
- try (BufferedReader reader = new BufferedReader(new FileReader(inputFile))) {
- List<Long> buffer = new ArrayList<>(MAX_CHUNK_SIZE / 8);
- String line;
-
- while ((line = reader.readLine()) != null) {
- buffer.add(Long.parseLong(line));
- // 内存达到阈值时执行排序和写入
- if (buffer.size() >= MAX_CHUNK_SIZE / 8) {
- chunks.add(sortAndWrite(buffer));
- buffer.clear();
- }
- }
- // 处理剩余数据
- if (!buffer.isEmpty()) {
- chunks.add(sortAndWrite(buffer));
- }
- }
- return chunks;
- }
复制代码 阶段2:多路归并,使用最小堆优化- private static void mergeFiles(List<File> files, String output) throws IOException {
- PriorityQueue<FileEntry> minHeap = new PriorityQueue<>(
- files.size(),
- Comparator.comparingLong(FileEntry::current)
- );
- // 初始化堆(每个文件读取一个元素)
- for (File file : files) {
- BufferedReader reader = new BufferedReader(new FileReader(file));
- String line = reader.readLine();
- if (line != null) {
- minHeap.offer(new FileEntry(Long.parseLong(line), reader));
- }
- }
- try (BufferedWriter writer = new BufferedWriter(new FileWriter(output))) {
- while (!minHeap.isEmpty()) {
- FileEntry entry = minHeap.poll();
- writer.write(entry.current.toString());
- writer.newLine();
-
- // 从当前文件读取下一个元素
- String line = entry.reader.readLine();
- if (line != null) {
- entry.current = Long.parseLong(line);
- minHeap.offer(entry);
- } else {
- entry.reader.close(); // 关闭已读完的文件
- }
- }
- }
- }
复制代码 来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |