压测性能调优总结

来自公司同事压测性能调优的分享

前情提要

完美中国项目进入压测环节,就是业务都做完啦,通过压测优化让系统性能达到设计标准,通过客户验收即可交付。这边有几个平衡点

  • 总体预算大体不变的框架下进行资源分配优化
  • 大体需求不变的前提下做功能效率优化

基于这两点,盲目说性能不够加机器砍功能说无法实现都是不可以的,压测的目的就是要攻克难关。

优化场景

数据同步效率优化

从问题出发

  • mysql数据为什么要同步到es?

设计

压测性能调优_01

可能的瓶颈点
(红字处

优化手段

  • 对数据进行多线程分片执行,对数据集进行分片,每个分片由单独线程执行,充分利用多核CPU的优势。

++原先一个大表(2000w+数据)全量同步需要20+h,优化后在1h内同步完成++

示例代码(非可执行代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
public class IndexExecutor<K, T>{
private static final int MAX_SAHRD_NUM = 32;
/**
* 最多32个线程
*/
private static ExecutorService executor = Executors.newFixedThreadPool(MAX_SAHRD_NUM);

public IndexExecutor() {
}

private IndexExecutor(Params params) {
Assert.isTrue(params.shard < MAX_SAHRD_NUM, "must small than MAX_SAHRD_NUM(32)");
this.mapper = params.mapper;
this.mapping = params.mapping;
this.params = params;
super.setApplicationContext(context);
super.setMapping(this.mapping);
super.setMapper(this.mapper);
}

private Params params;
private BaseMapper mapper;
private TableToIndexMappingDto mapping;

public void build() {
// 非空校验
Objects.requireNonNull(params.mapper);
Objects.requireNonNull(params.mapping);
List<String> tableNames = this.findTableNames();

// 不阻塞调用请求
new Thread(() -> {
String indexAliasesName = this.mapping.getIndicesAliasesName();
String newIndexName = this.createIndexNewName(indexAliasesName);
this.createIndexAndMapping(newIndexName);
int shard = params.shard;
long begin = 0;
long[] counts = null;

// 遍历每个tableName
for (String tableName : tableNames) {
Long count = 0L;
begin = System.currentTimeMillis();
if (tableNames.size() > 1) {
PartitionFeatureMapper partitionFeatureMapper = (PartitionFeatureMapper) mapper;
count = partitionFeatureMapper.countAllByTableName(tableName);
} else {
count = mapper.countAll();
}

counts = splitShard(shard, count);
CountDownLatch cd = new CountDownLatch(counts.length - 1);
for (int i = 0; i < counts.length; i++) {
long segmentCount = counts[i];
long lastCount = i == 0 ? 0L : counts[i - 1];
logger.info("分片序号:{} 位置 {}-{} 开始", i, lastCount, segmentCount);
executor.execute(() -> {
this.runShard(tableName, lastCount, segmentCount, newIndexName);
cd.countDown();
logger.info("{} 等待其他分片执行完毕,剩余分片数量 :{}", tableName, cd.getCount());
});
// 错开数据库io峰
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
logger.error("", e);
}
}
try {
cd.await();
} catch (InterruptedException e) {
logger.error("{} 表,全量更新索引失败,{} ", tableName, newIndexName, e);
}
}
this.addOrUpdateIndexWithAliasesRef(indexAliasesName, newIndexName);
logger.info("迁移索引别名到新索引成功,alies {} index {},耗时:{}ms", indexAliasesName, newIndexName, System.currentTimeMillis() - begin);
}).start();
}

/**
* 如果是表名是着正则就按正则查出所有表名,
* 否则直接返回表名
*
* @return
*/
public List<String> findTableNames() {
List<String> tableNames = null;
String tableName = this.mapping.getTableName();
if (this.mapper instanceof PartitionFeatureMapper) {
Connection connection = null;
String schema = null;
try {
// 这坨代码为了取库名
String jdbcUrl = this.params.dataSourceVo.getJdbcUrl();
String jdbcUserName = this.params.dataSourceVo.getJdbcUserName();
String jdbcUserPassword = this.params.dataSourceVo.getJdbcUserPassword();
connection = DriverManager.getConnection(jdbcUrl, jdbcUserName, jdbcUserPassword);
Field dbName = connection.getClass().getDeclaredField("dbName");
dbName.setAccessible(true);
schema = String.valueOf(dbName.get(connection));
} catch (Throwable e) {
logger.error("", e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (Throwable e) {
logger.error("", e);
}
}
}
PartitionFeatureMapper partitionFeatureMapper = (PartitionFeatureMapper) this.mapper;
// 从该schema 查询所有表名
String tableRegex = tableName;
tableNames = partitionFeatureMapper.findTablesByRegex(schema, tableRegex);
} else if (this.mapper instanceof ExectuorMapper) {
tableNames = Lists.newArrayList(tableName);
}
return tableNames;
}

/**
* 每个分片的任务
*
* @param beginPos
* @param endPos
* @param indexName
*/
private void runShard(String tableName, long beginPos, long endPos, String indexName) {
Map<String, String> map = null;
long count = endPos - beginPos;
if (this.mapper instanceof PartitionFeatureMapper) {
PartitionFeatureMapper mapper = (PartitionFeatureMapper) this.getMapper();
map = mapper.findMaxAndMinIdPartion(tableName, beginPos, count);
} else if (this.mapper instanceof ExectuorMapper) {
ExectuorMapper mapper = (ExectuorMapper) this.getMapper();
map = mapper.findMaxAndMinIdLimit(beginPos, count);
}
if (map == null || map.isEmpty()) {
logger.warn("beginPos{},count{},indexName{} map empty", beginPos, endPos, indexName);
return;
}
String minId = String.valueOf(map.get("minId"));
String actualMaxId = null;
long scanedCount = 0L;
long actualCount = 0L;
int batch = (int) (endPos > 1000 ? 1000 : endPos);
int runtime = 0;
List list = null;
do {
if (this.mapper instanceof PartitionFeatureMapper) {
PartitionFeatureMapper mapper = (PartitionFeatureMapper) this.getMapper();
actualMaxId = String.valueOf(mapper.findMaxIdPartition(tableName, minId, batch));

list = mapper.findByMaxAndMinIdPartition(tableName, minId, actualMaxId);

} else if (this.mapper instanceof ExectuorMapper) {
actualMaxId = String.valueOf(this.getMapper().findMaxId(minId, batch));
//mapper用dto接收
list = this.getMapper().findByMaxAndMinId(minId, actualMaxId);
}
if (logger.isDebugEnabled()) {
logger.info("segment[{}-{}] load from db success,minId:{} maxId:{}", beginPos, endPos, minId, actualMaxId);
}
if (list.isEmpty()) {
break;
}
scanedCount += list.size();

this.batchAddIndexByMapParams(indexName, list);

if (logger.isDebugEnabled()) {
logger.info("segment[{}-{}] put in es success,minId:{} maxId:{}", beginPos, endPos, minId, actualMaxId);
}
minId = actualMaxId;
actualCount += (long) list.size();
++runtime;
} while (scanedCount < count);

logger.info("db record count={},fetch record count={},runtime={}", new Object[]{endPos, actualCount, runtime});
}

/**
* 重写批量导入es逻辑
*
* @param newIndexName
* @param list
*/
protected void batchAddIndexByMapParams(String newIndexName, List<Map<String, Object>> list) {
if (list != null && !list.isEmpty()) {
String pattern = "yyyy-MM-dd HH:mm:ss";
List<IndexDocContentVo> docs = list.parallelStream().filter(doc -> doc != null).collect(Collectors.toList());
IndexDocumentVo indexDocumentVo = new IndexDocumentVo(newIndexName, this.mapping.getIndicesTypeName(), docs);
this.getSearchIndexService().addData(indexDocumentVo);
}
}

/**
* 根据总数 分片数 获取每个分片边界
*
* @param shardNums
* @param total
* @return
*/
public static long[] splitShard(int shardNums, long total) {
if (total <= shardNums || total <= 1000) {
return new long[]{total};
}
long constTotal = total;
int shardIndex = 1;
long[] counts = new long[shardNums];
long everShard = total / shardNums;
while (constTotal - everShard * shardIndex > 0) {
long newTotal = total - everShard * shardIndex;
counts[shardIndex - 1] = total - newTotal;
total = newTotal;
shardIndex++;
}
counts[shardNums - 1] = constTotal;
return counts;
}

public static class Params {
BaseMapper mapper;
DataSourceVo dataSourceVo;
TableToIndexMappingDto mapping;
Wrapper wrapper;
WrapperDto wrapperDto;
ListWrapper listWrapper;
ListWrapperDto listWrapperDto;

/**
* 这里分片 4 * 2(核)片 ,每个线程取一部分数据,各自开始load
*/
int shard = 8;

public void runIndex() {
IndexExecutor indexExecutor = new IndexExecutor(this);
indexExecutor.build();
}

public IndexExecutor build() {
IndexExecutor indexExecutor = new IndexExecutor(this);
return indexExecutor;
}

public Params withMapper(BaseMapper mapper) {
this.mapper = mapper;
return this;
}

public Params withMapping(TableToIndexMappingDto mapping) {
this.mapping = mapping;
return this;
}

public Params withShard(int shard) {
this.shard = shard;
return this;
}

public Params withWrapper(Wrapper wrapper) {
this.wrapper = wrapper;
return this;
}

public Params withWrapperDto(WrapperDto wrapperDto) {
this.wrapperDto = wrapperDto;
return this;
}

public Params withDataSourceVo(String beanName) {
DataSourceVo bean = context.getBean(beanName, DataSourceVo.class);
this.dataSourceVo = bean;
return this;
}

public Params withListMapper(ListWrapper listMapper) {
this.listWrapper = listMapper;
return this;
}

public Params withListDtoMapper(ListWrapperDto listMapper) {
this.listWrapperDto = listMapper;
return this;
}
}

public interface Wrapper {
Map<String, Object> wrapDoc(Map<String, Object> item);
}

public interface ListWrapper {
List<Map<String, Object>> wrapBatchDoc(List<Map<String, Object>> item);
}

public interface ListWrapperDto {
List<Map<String, Object>> wrapBatchDoc(List item);
}

public interface WrapperDto<T> {
T wrapDto(T item);

Class getTargetClass();
}
}

以上代码的核心点

获取分片边界 long[] splitShard(int shardNums, long total)

取数据时按where取数据,而不是limit取数据,可以命中拆分键(存疑。。

线程池 & JVM最佳线程数量 cpu核数 * 1.5 (存疑。。

SQL调优

  • 减少数据库交互次数

++原先一次数据组装耗时100s-500s不可控,优化后稳定在1s左右++

查询数据库的过程如果涉及for循环,看是否可以修改for循环取数据为in查询,减少数据库交互次数

原先

1
2
3
4
for (UserDto docValue : docValues) {
UserPersonalInfoDto personalInfo = userPersonalInfoMapper.findById(docValue.getPersonId());
//...
}

优化后

1
2
3
4
5
String personIds = docValues.stream()
.filter(item -> item != null && item.getPersonId() != null)
.map(item -> item.getPersonId().toString()).collect(Collectors.joining(","));
List<UserPersonalInfoDto> userPersonInfos = userPersonalInfoMapper.findByIds(personIds);
Map<Long, UserPersonalInfoDto> userPersonalInfoMap = userPersonInfos.stream().collect(Collectors.toMap(UserPersonalInfoDto::getId, Function.identity()));

同库情况下使用join代替多次数据库查询。用子查询优化主表,核心是先筛选,再join。减少表关联的数量级。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
SELECT
b.id,
b.`level`,
b.max_level,
b.`status`,
b.last_active_month,
b.create_time,
b.create_person,
b.update_person,
b.update_time,
b.tenant_id,
b.instance_id,
b.create_time
FROM
(
SELECT member_id from mm_card_map_user where dr = 0 and user_id in (${userIds})
) as a
LEFT JOIN mm_member b ON b.id = a.member_id
where b.dr = 0
GROUP by b.id;
  • 把多数据源聚合的过程的同步调用异步化

原来代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 补全信息
*
* @param item
* @author zhou.shilong
*/
public UserDto wrapFullItem(UserDto item) {
if (item != null && null != item.getId()) {
//personal
UserPersonalInfoDto personalInfo = userPersonalInfoMapper.findById(item.getPersonId());
if (null != personalInfo) {
item.setPersonalInfo(personalInfo);
}
//member
MemberDto memberDto = memberMapper.findByUserId(item.getId());
if (null != memberDto) {
item.setMember(memberDto);
}

//userRef
List<UserRefDto> usernames = userMapper.findUsrNameByUserId(item.getId());
List<UserRefDto> userphones = userMapper.findUserPhoneByUserId(item.getId());
List<UserRefDto> userCardNums = userMapper.findUserCardNumByUserId(item.getId());
item.setUserRName(usernames);
item.setUserRPhone(userphones);
item.setUserRCardnum(userCardNums);

///usercloud云商信息<单独同步,查询聚合>
///identification 身份证信息
UserIdentificationDto identification = identificationMapper.findUsrNameByUserId(item.getId());
item.setIdentification(identification);
///qq\weichat第三方信息
}

return item;
}

优化后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/**
* 补全信息
*
* @param item
* @author zhou.shilong
*/
public UserDto wrapFullItem(UserDto item) {
if (item != null && null != item.getId()) {
AsyncExecuter.init(4).execute(() -> {
//personal
UserPersonalInfoDto personalInfo = userPersonalInfoMapper.findById(item.getPersonId());
if (null != personalInfo) {
item.setPersonalInfo(personalInfo);
}
}).execute(() -> {
//member
MemberDto memberDto = memberMapper.findByUserId(item.getId());
if (null != memberDto) {
item.setMember(memberDto);
}
}).execute(() -> {
//userRef
List<UserRefDto> userCardNums = userMapper.findByUserId("us_user_r_cardnum", item.getId());
List<UserRefDto> userphones = userMapper.findByUserId("us_user_r_phone", item.getId());
List<UserRefDto> usernames = userMapper.findByUserId("us_user_r_username", item.getId());
item.setUserRName(usernames);
item.setUserRPhone(userphones);
item.setUserRCardnum(userCardNums);
}).execute(() -> {
///usercloud云商信息<单独同步,查询聚合>
///identification 身份证信息
UserIdentificationDto identification = identificationMapper.findUsrNameByUserId(item.getId());
item.setIdentification(identification);
///qq\weichat第三方信息
}).await();
}
return item;
}

/***
* 异步执行器
*/
public class AsyncExecuter {
/**
* 限制最多同时8个线程在运行
*/
private static final int MAX_THREAD_NUM = 8;
private static ExecutorService executor = Executors.newFixedThreadPool(MAX_THREAD_NUM);
private static Logger logger = LoggerFactory.getLogger(AsyncExecuter.class);
private CountDownLatch countDownLatch;
private long timeout;
private int taskCount = 0;
private int currentTask = 0;

private AsyncExecuter(int count, long timeout) {
Assert.isTrue(count > 0, "count greater than 0");
this.timeout = timeout;
this.taskCount = count;
this.countDownLatch = new CountDownLatch(count);
}

/**
* @param count 执行任务数
* @param milltime 等待超时时间
* @return
*/
public static AsyncExecuter init(int count, long milltime) {
return new AsyncExecuter(count, milltime);
}

public static AsyncExecuter init(int count) {
return new AsyncExecuter(count, -1);
}

/**
* 异步执行一个线程
*
* @param runnable
* @return
*/
public AsyncExecuter execute(Runnable runnable) {
this.currentTask++;
if (this.currentTask > this.taskCount) {
throw new IllegalStateException("execute task nums can not greater then count");
}
executor.execute(() -> {
try {
runnable.run();
} catch (Throwable e) {
logger.error("", e);
} finally {
countDownLatch.countDown();
}
});
return this;
}

/**
* 阻塞直到所有子任务完成
*/
public void await() {
try {
if (this.timeout == -1) {
countDownLatch.await();
} else {
countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
}
} catch (Throwable e) {
logger.error("", e);
}
}
}
  • 避免跨库关联数据

++数据库是DRDS,数据量大概在3亿+,预计2年内增量到5亿+。优化前数据库超时导致基本不可执行,优化后可正常导数据++

DRDS海量数据导入ES的策略优化,DRDS对调用层隐藏了分库分表的复杂性,方便了方法调用的统一,但因为对调用者透明,开发同学很容易无意间写出全库扫描的sql,反而降低了效率。在我们数据迁移的场景中,需要扫描所有数据做全量迁移,所以这里先列出所有数据库,针对具体的库并行调用扫描提高效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
列出所有分库
mysql> SHOW TOPOLOGY FROM LJLTEST;
+------+----------------+------------+
| ID | GROUP_NAME | TABLE_NAME |
+------+----------------+------------+
| 0 | TDDL5_00_GROUP | ljltest_00 |
| 1 | TDDL5_00_GROUP | ljltest_01 |
| 2 | TDDL5_00_GROUP | ljltest_02 |
| 3 | TDDL5_01_GROUP | ljltest_03 |
| 4 | TDDL5_01_GROUP | ljltest_04 |
| 5 | TDDL5_01_GROUP | ljltest_05 |
| 6 | TDDL5_02_GROUP | ljltest_06 |
| 7 | TDDL5_02_GROUP | ljltest_07 |
| 8 | TDDL5_02_GROUP | ljltest_08 |
| 9 | TDDL5_03_GROUP | ljltest_09 |
| 10 | TDDL5_03_GROUP | ljltest_10 |
| 11 | TDDL5_03_GROUP | ljltest_11 |
+------+----------------+------------+
12 rows in set (0.06 sec)

从某分库查询数据
/!TDDL:node='TDDL5_00_GROUP'*/ select * from ljltest_00;

其他工作

  • 增强组件对shardingJDBC分表扫描特性
  • 业务功能:优惠券、收货地址建立索引等

总结

方法论

  • 沿着业务的逻辑线路梳理流程节点,针对节点之间的通路,节点内部,用可量化的标准,找出瓶颈并优化。
  • 充分利用多核CPU的性能,多做并行处理
  • 使用批量调用接口替代单次调用接口,减少性能损耗
  • 优化SQL join的数量级(并非列举所有优化策略,仅仅是讲在此次完美压测用到的部分内容)

其他大佬的建议

  1. 线程池资源释放没有处理;
  2. 建议采用callable与futrueTask;
  3. 数据同步锁;
  4. DB IO峰值检测;
  5. ES分片
投食入口