1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| from sqlalchemy import create_engine import pandas as pd import time import io import codecs import csv from psycopg2 import extras as ex from faker import Faker
def coast_time(func): def fun(*args, **kwargs): t = time.perf_counter() result = func(*args, **kwargs) print(f'func {func.__name__} need time:{time.perf_counter() - t:.2f} s') return result
return fun
def psql_insert_copy(table, conn, keys, data_iter): dbapi_conn = conn.connection with dbapi_conn.cursor() as cur: s_buf = io.StringIO() writer = csv.writer(s_buf) writer.writerows(data_iter) s_buf.seek(0)
columns = ', '.join('"{}"'.format(k) for k in keys) if table.schema: table_name = '{}.{}'.format(table.schema, table.name) else: table_name = table.name
sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format( table_name, columns) cur.copy_expert(sql=sql, file=s_buf)
@coast_time def copy_insert(engine, data, table_name): conn = engine.raw_connection() cur = conn.cursor() output = io.StringIO() data.to_csv(output, sep='\t', header=False, index=False) output.seek(0) cur.copy_from(output, table_name, null="") conn.commit()
@coast_time def copy_insert_sql(engine, data, table_name): conn = engine.raw_connection() cur = conn.cursor() columns = data.columns str_cols = ",".join(columns) f = io.BytesIO()
StreamWriter = codecs.getwriter("utf-8") csv_writer = csv.writer(StreamWriter(f)) for row in data.values: csv_writer.writerow([_ for _ in row]) f.seek(0) cur.copy_expert(sql="copy {}({}) from stdin WITH (FORMAT CSV)".format(table_name, str_cols), file=f) conn.commit()
@coast_time def insert_many(engine, data, table_name): columns = data.columns str_cols = ",".join(columns) str_sss = ','.join(len(columns) * ['%s']) sql_query = """INSERT INTO {}({}) VALUES ({});""".format(table_name, str_cols, str_sss) data_list = [tuple(x) for x in data.values] conn = engine.raw_connection() cur = conn.cursor() cur.executemany(sql_query, data_list) conn.commit()
@coast_time def insert_values(engine, data, table_name): columns = data.columns str_cols = ",".join(columns) sql_query = """INSERT INTO {}({}) VALUES %s;""".format(table_name, str_cols) data_list = [tuple(x) for x in data.values] conn = engine.raw_connection() cur = conn.cursor() ex.execute_values(cur, sql_query, data_list, page_size=10000) conn.commit()
@coast_time def to_sql_insert_normal(engine, data, table_name): data.to_sql(table_name, engine, index=False, if_exists='replace')
@coast_time def to_sql_insert_copy(engine, data, table_name): data.to_sql(table_name, engine, method=psql_insert_copy, index=False, if_exists='replace')
user = 'postgres' password = '121457' host = '127.0.0.1' port = '5432' database_name = 'test_data_base'
database_url = 'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database_name}?client_encoding=utf8'.format( user=user, host=host, port=port, password=password, database_name=database_name, )
engine = create_engine(database_url) fake = Faker(locale='zh_CN')
def fake_row(i): row = [fake.name(), fake.address(), fake.city(), fake.bban(), fake.company(), fake.credit_card_number(card_type=None), fake.credit_card_provider(card_type=None), fake.date(pattern="%Y-%m-%d", end_datetime=None), fake.phone_number()] return row
start = time.time() fake_data = [fake_row(i) for i in range(1000000)] data = pd.DataFrame(fake_data) data.columns = ["col{}".format(i) for i in range(1, 10)] print("func generate data need:{:.2f}秒".format(time.time() - start))
insert_many(engine, data, "tb_faker1") insert_values(engine, data, "tb_faker2") copy_insert(engine, data, "tb_faker3") copy_insert_sql(engine, data, "tb_faker4") to_sql_insert_normal(engine, data, "tb_faker5") to_sql_insert_copy(engine, data, "tb_faker6")
|