# -*- coding: utf-8 -*-
import os
import time
import pymysql
import multiprocessing
def split_history(fullname, outfile):
""" 分割大文件 """
size = 0
Head = True
Linecount = 200000
HeadStr = ''
f = open(fullname, 'rb')
outputf = open(outfile + 'test_history{}.xlsx'.format(size), 'w')
if Head:
HeadStr = f.readline()
outputf.write(HeadStr)
lines = 0
for line in f:
outputf.write(line)
lines += 1
if lines == Linecount:
lines = 0
outputf.close()
size += 1
print size
outputf = open(outfile + 'test_history{}.xlsx'.format(size), 'w')
if Head: # 新文件需要增加行首时
outputf.write(HeadStr)
if not outputf.closed:
outputf.close()
def result(fullname):
""" zabbix入库 """
config = {
'host': '0.0.0.0',
'user': "xxx",
'password': "xxx",
'database': "xxx",
'charset': 'utf8mb4', # 支持1-4个字节字符
'cursorclass': pymysql.cursors.DictCursor
}
conn = pymysql.connect(**config)
cursor = conn.cursor()
s = 0
step = 0
sql = ""
Head = True
HeadStr = ''
f = open(fullname, 'rb')
if Head:
HeadStr = f.readline()
for line in f:
s += 1
step += 1
item_id, clock, value, ns = line.replace('\n', '').split('\t')
sql_ = """
INSERT INTO xxx ( xxx, xxx, `xxx`, xxx )
VALUES
( {}, {}, {}, {} );""".format(item_id, clock, value, ns)
sql += sql_
if s == 1000:
s = 0
cursor.execute(sql)
conn.commit()
print step
time.sleep(0.2)
sql = ""
if sql:
cursor.execute(sql)
conn.close()
def process(target):
""" 在进程池中分发文件处理 """
pool = multiprocessing.Pool(processes=15)
path_list = file_name(target)
for p in path_list:
print 'execute: {}'.format(p)
pool.apply_async(result, (p,))
pool.close()
pool.join()
def file_name(file_dir):
""" 获取目标文件夹下文件 """
result_path = []
for root, dirs, files in os.walk(file_dir):
for f in files:
result_path.append(os.path.join(file_dir, f))
return result_path
if __name__ == '__main__':
process('/xxx/xxx')