python

记一次小数据集,高复杂度的运算

  • 场景
    3000个基金各做10几次的指标运算,需要耗费大量数据库查询操作以及pandas的内存计算

准备工作

工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import configparser

import MySQLdb

from DBUtils import PooledDB
from MySQLdb.cursors import DictCursor


from util.sys_constants import CONF_PATH

cf = configparser.ConfigParser()
cf.read(CONF_PATH) # 读取配置文件

items = cf.items("mysql")
res = dict(items)

# mysqlConf = MysqlConf()
# [mysql]
MYSQL_HOST = res['mysql_host']
MYSQL_PORT = res['mysql_port']
MYSQL_USER = res['mysql_user']
MYSQL_PWD = res['mysql_pwd']
MYSQL_CHARSET = res['mysql_charset']

# 万德数据源
WIND_MYSQL_DB = res['wind_mysql_db']
WIND_MYSQL_POOL_NUM = res['wind_mysql_pool_num']
WIND_MIN_CACHED = res['wind_min_cached'] # 最小空闲连接
WIND_MAX_CACHED = res['wind_max_cached'] # 最大空闲连接
WIND_MAX_SHARED = res['wind_max_shared'] # 最大共享连接

# FOF数据源
FOF_MYSQL_DB = res['fof_mysql_db']
FOF_MYSQL_POOL_NUM = res['fof_mysql_pool_num']
FOF_MIN_CACHED = res['fof_min_cached'] # 最小空闲连接
FOF_MAX_CACHED = res['fof_max_cached'] # 最大空闲连接
FOF_MAX_SHARED = res['fof_max_shared'] # 最大共享连接


class MysqlConf:
from enum import Enum

DB = Enum('tp', ('fof', 'wind'))

__wind_pool = None
__fof_pool = None

def __init__(self):
self.host = MYSQL_HOST
self.port = MYSQL_PORT
self.user = MYSQL_USER
self.pwd = MYSQL_PWD
self.charset = MYSQL_CHARSET
self.wind_db = WIND_MYSQL_DB
self.wind_pool_num = WIND_MYSQL_POOL_NUM
self.wind_min_cached = WIND_MIN_CACHED
self.wind_max_cached = WIND_MAX_CACHED
self.wind_max_shared = WIND_MAX_SHARED
self.fof_db = FOF_MYSQL_DB
self.fof_pool_num = FOF_MYSQL_POOL_NUM
self.fof_min_cached = FOF_MIN_CACHED
self.fof_max_cached = FOF_MAX_CACHED
self.fof_max_shared = FOF_MAX_SHARED
MysqlConf.__wind_pool = self.create_connection_pool(MysqlConf.DB.wind)
MysqlConf.__fof_pool = self.create_connection_pool(MysqlConf.DB.fof)

def create_connection_pool(self, dbtype):
try:
if dbtype is None or dbtype not in MysqlConf.DB:
pass
if dbtype is MysqlConf.DB.wind:
pool = PooledDB.PooledDB(MySQLdb, user=self.user, passwd=self.pwd, host=self.host,
port=int(self.port), db=self.wind_db, charset=self.charset,
cursorclass=DictCursor,
mincached=int(self.wind_min_cached), maxcached=int(self.wind_max_cached),
maxshared=int(self.wind_max_shared),
maxconnections=int(self.wind_pool_num), use_unicode=False)
return pool
else:
pool = PooledDB.PooledDB(MySQLdb, user=self.user, passwd=self.pwd, host=self.host,
port=int(self.port), db=self.fof_db, charset=self.charset,
cursorclass=DictCursor,
mincached=int(self.fof_min_cached), maxcached=int(self.fof_max_cached),
maxshared=int(self.fof_max_shared),
maxconnections=int(self.fof_pool_num), use_unicode=False)
return pool

except Exception as e:
raise Exception('conn datasource Excepts,%s!!!(%s).' % (self, str(e)))

@staticmethod
def get_connection(dbname):
conn = None
if dbname is MysqlConf.DB.wind:
conn = MysqlConf.get_wind_conn()
elif dbname is MysqlConf.DB.fof:
conn = MysqlConf.get_fof_conn()
else:
pass
return conn

@staticmethod
def get_wind_conn():
global pl
if MysqlConf.__wind_pool is None:
pl = MysqlConf().create_connection_pool(MysqlConf.DB.wind)
else:
pl = MysqlConf.__wind_pool
return pl.connection()

@staticmethod
def get_fof_conn():
global pl
if MysqlConf.__fof_pool is None:
pl = MysqlConf().create_connection_pool(MysqlConf.DB.fof)
else:
pl = MysqlConf.__fof_pool
return pl.connection()

def __str__(self) -> str:
return "mysql info:{" + self.host + "," + self.user + "," + self.wind_db + "," \
+ self.wind_pool_num + "," + self.fof_db + "," + self.fof_pool_num + "}"

pandas操作
//todo