赞同 1
分享

1GBExcel大文件 分割多进程入库

简介:有需求需要把一个1GB的Excel文件数据入库,参考网上各种做法,最后选择分割之后用进程池并发入库。
  2020.09.14
  Bug Man
  1
  38
  172.17.0.1
  中国.上海
 
 

大文件分割参考文章

获取目录下文件参考文章

进程池参考文章

# -*- coding: utf-8 -*-
import os
import time
import pymysql
import multiprocessing


def split_history(fullname, outfile):
    """ 分割大文件 """
    size = 0
    Head = True
    Linecount = 200000
    HeadStr = ''
    f = open(fullname, 'rb')
    outputf = open(outfile + 'test_history{}.xlsx'.format(size), 'w')
    if Head:
        HeadStr = f.readline()
        outputf.write(HeadStr)
    lines = 0
    for line in f:
        outputf.write(line)
        lines += 1
        if lines == Linecount:
            lines = 0
            outputf.close()
            size += 1
            print size
            outputf = open(outfile + 'test_history{}.xlsx'.format(size), 'w')
            if Head:  # 新文件需要增加行首时
                outputf.write(HeadStr)
    if not outputf.closed:
        outputf.close()


def result(fullname):
    """ zabbix入库 """
    config = {
        'host': '0.0.0.0',
        'user': "xxx",
        'password': "xxx",
        'database': "xxx",
        'charset': 'utf8mb4',  # 支持1-4个字节字符
        'cursorclass': pymysql.cursors.DictCursor
    }
    conn = pymysql.connect(**config)
    cursor = conn.cursor()
    s = 0
    step = 0
    sql = ""
    Head = True
    HeadStr = ''
    f = open(fullname, 'rb')
    if Head:
        HeadStr = f.readline()
    for line in f:
        s += 1
        step += 1
        item_id, clock, value, ns = line.replace('\n', '').split('\t')
        sql_ = """
                INSERT INTO xxx ( xxx, xxx, `xxx`, xxx )
                VALUES
                    ( {}, {}, {}, {} );""".format(item_id, clock, value, ns)
        sql += sql_
        if s == 1000:
            s = 0
            cursor.execute(sql)
            conn.commit()
            print step
            time.sleep(0.2)
            sql = ""
    if sql:
        cursor.execute(sql)
    conn.close()


def process(target):
    """ 在进程池中分发文件处理 """
    pool = multiprocessing.Pool(processes=15)
    path_list = file_name(target)
    for p in path_list:
        print 'execute: {}'.format(p)
        pool.apply_async(result, (p,))
    pool.close()
    pool.join()


def file_name(file_dir):
    """ 获取目标文件夹下文件 """
    result_path = []
    for root, dirs, files in os.walk(file_dir):
        for f in files:
            result_path.append(os.path.join(file_dir, f))
    return result_path


if __name__ == '__main__':
    process('/xxx/xxx')