归并排序,外排序,10G文件500M内存的排序

2,201 阅读3分钟
原文链接: www.cnblogs.com

老是被我家宝贝问这种类似的问题, 然后干脆写一篇相关文章吧

归并排序可以是一种外排序, 外排序是指利用外存也就是磁盘进行排序的一种简称。

典型的应用是hadoop 的 mapreduce 的merge 阶段

归并排序的: 假设有n 个元素, 将n 个元素分程x 组, 然后对每一组的元素进行排序, 然后将这 x 组已经排好序的序列合并起来。

说一下分成x 组的方式, 大概有两种:

  第一种: 递归的方式,  这种情况你可能都不会去关心 x 大小。

  第二种: 每组 m 个元素, x =  n/m。这种方法常用在内存不够的情况下。

 

所以归并排序的步骤分两步:

  第一步: 分x 组, 对每组的元素进行排序

  第二部: 合并, 将已经排序好的 x 组元素合并起来。

 

先说第一种递归的方式吧, 就是你可能会做到的归并排序的题目:

  我们先写第二步的代码:合并已经排序好的数组

def merge_sorted_list(l1, l2):
    i, j = 0, 0
    result = []
    while i < len(l1) and j < len(l2):
        if l1[i] <= l2[j]:
            result.append(l1[i])
            i += 1
        else:
            result.append(l2[j])
            j += 1
    result += l1[i:]
    result += l2[j:]
    return result

  再说第一步:

  

def merge_sort(l):
    if len(l) <= 1:
        return l
    num = len(l) / 2
    l1 = merge_sort(l[:num])
    l2 = merge_sort(l[num:])
    return merge_sorted_list(l1, l2)

全部代码:

def merge_sort(l):
    if len(l) <= 1:
        return l
    num = len(l) / 2 
    l1 = merge_sort(l[:num])
    l2 = merge_sort(l[num:])
    return merge_sorted_list(l1, l2) 
  
def merge_sorted_list(l1, l2):
    i, j = 0, 0
    result = []
    while i < len(l1) and j < len(l2):
        if l1[i] <= l2[j]:
            result.append(l1[i])
            i += 1
        else:
            result.append(l2[j])
            j += 1
    result += l1[i:]
    result += l2[j:]
    return result
  
a = [219, 9527, 211, 9218]
print a
b = merge_sort(a)
print b

总的来说这种归并排序理解起来以及代码写起来都是很简单的。

 

接下来说一下第二种吧,纯粹的外排序。用栗子的方式理解起来比较简单。

问题: 假设有2G的文件,里面是数字,一行一个数字,内存只有500M。让你排序,咋整?

其实思路很简单,一次只读一点点,排序好,然后再将将文件合并起来。然后最终保存为一个大文件。

先说下python open(filepath).readline(),一次只会返回一行,不必担心大文件,相信各个语言都有对应的方式。

首先第一步,大文件拆分成 x 个 block_size 大的小文件,每个小文件排好序

def split_file(file_path, block_size):
    f = open(file_path, 'r')
    fileno = 1 
    files = []
    while True:
        lines = f.readlines(block_size)
        if not lines:
            break
        lines = [int(i.strip()) for i in lines]
        lines.sort()
        files.append(save_file(lines, fileno))
        fileno += 1
    return files

第二部:将拆分成的小文件合并起来,然后将归并的东西写到大文件里面去, 这里用到的是多路归并的方法。多路归并在我的这篇文章里面有写到:www.cnblogs.com/dream-of-ca…

def nw_merge(files):
    fs = [open(file_) for file_ in files]
    min_map = {}
    out = open("/home/hrs/demo/files/out", "a")
    for f in fs:
        read = f.readline()
        if read:
            min_map[f] = int(read.strip())
    
    while min_map:
        min_ = min(min_map.items(), key = lambda x: x[1])
        min_f, min_v = min_
        out.write("{}".format(min_v))
        out.write("\n")
        nextline = min_f.readline()
        if nextline:
            min_map[min_f] = int(nextline.strip())
        else:
            del min_map[min_f]

全部代码:

 

import os

def save_file(l, fileno):
    filepath = "/home/hrs/demo/files/{}" .format(fileno)

    f = open(filepath, 'a')
    for i in l:
        f.write("{}".format(i))
        f.write("\n")
    f.close()
    return filepath


def nw_merge(files):
    fs = [open(file_) for file_ in files]
    min_map = {} # 用来记录每一路当前最小值。
    out = open("/home/hrs/demo/files/out", "a")
    for f in fs:
        read = f.readline()
        if read:
            min_map[f] = int(read.strip())

    while min_map:
     # 将最小值取出, 并将该最小值所在的那一路做对应的更新 min_ = min(min_map.items(), key = lambda x: x[1]) min_f, min_v = min_ out.write("{}".format(min_v)) out.write("\n") nextline = min_f.readline() if nextline: min_map[min_f] = int(nextline.strip()) else: del min_map[min_f] def split_file(file_path, block_size): f = open(file_path, 'r') fileno = 1 files = [] while True: lines = f.readlines(block_size) if not lines: break lines = [int(i.strip()) for i in lines] lines.sort() files.append(save_file(lines, fileno)) fileno += 1 return files if __name__ == "__main__": file_path = "/home/hrs/demo/ta" block_size = 500*1024*1024 #500M num_blocks = os.stat(file_path).st_size/block_size files = split_file(file_path, block_size) nw_merge(files)