tcodercn blog

a lazy coder

概述

公司开发平台使用pydb2来连接db2数据库,之前一直都是提交代码到服务器通过log来调试,操作繁琐并且效率低下。为了支持本地调试,于是有了本次折腾。

一开始是直接用pip安装ibm_db,但是后来发现有些场景本地调试通过,放到服务器运行就报错。毕竟是两套完全不同的库,为了减少本地调试跟服务器之间的差异,还是决定自己在windows重新编译pydb2。

安装

db2服务端

为了测试,需要先安装db2服务端。我选择在wsl下通过docker安装运行。

在本地提前创建好数据目录

# mkdir -p /db2/database

拉取镜像

# docker pull ibmcom/db2

查看镜像

# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
ibmcom/db2 latest 2ec8bf76e622 2 years ago 2.79GB

创建容器

# docker run -itd --name testdb --privileged=true -p 50000:50000 -e LICENSE=accept -e DB2INST1_PASSWORD=testdb -e DBNAME=testdb -v /db2/database:/database ibmcom/db2

查看容器状态

# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS
NAMES
9268aef03fb9 ibmcom/db2 "/var/db2_setup/lib/…" 21 hours ago Up 22 seconds 22/tcp, 55000/tcp, 60006-60007/tcp, 0.0.0.0:50000->50000/tcp, :::50000->50000/tcp testdb

启停容器

# docker start testdb
# docker stop testdb

进入容器

# docker exec -it testdb /bin/bash
# su - db2inst1

查看实例版本及状态

$ db2level
DB21085I This instance or install (instance name, where applicable:
"db2inst1") uses "64" bits and DB2 code release "SQL11058" with level
identifier "0609010F".
Informational tokens are "DB2 v11.5.8.0", "s2209201700", "DYN2209201700AMD64",
and Fix Pack "0".
Product is installed at "/opt/ibm/db2/V11.5".

$ db2pd -

Database Member 0 -- Active -- Up 0 days 00:05:29 -- Date 2024-12-24-15.26.25.728524

启停实例

$ db2start
12/24/2024 15:27:23 0 0 SQL1063N DB2START processing was successful.
SQL1063N DB2START processing was successful.

$ db2stop
12/24/2024 15:27:18 0 0 SQL1064N DB2STOP processing was successful.
SQL1064N DB2STOP processing was successful.

测试数据库

$ db2 list db directory

System Database Directory

Number of entries in the directory = 1

Database 1 entry:

Database alias = TESTDB
Database name = TESTDB
Local database directory = /database/data
Database release level = 15.00
Comment =
Directory entry type = Indirect
Catalog database partition number = 0
Alternate server hostname =
Alternate server port number =

$ db2 connect to testdb

Database Connection Information

Database server = DB2/LINUXX8664 11.5.8.0
SQL authorization ID = DB2INST1
Local database alias = TESTDB

$ db2 "select count(*) from syscat.tables"

1
-----------
437

1 record(s) selected.

db2客户端驱动

官网下载:https://www.ibm.com/support/pages/db2-odbc-cli-driver-download-and-installation-information

选择

IBM Data Server Client Packages (11.5.*, All platforms)
DSClients-ntx64-dsdriver-11.5.9000.352-FP000
v11.5.9_ntx64_dsdriver_EN.exe

下载安装后,需要将安装目录C:\Program Files\IBM\IBM DATA SERVER DRIVER\bin配置到Path环境变量,验证安装是否成功:

> db2level
DB21085I This instance or install (instance name, where applicable: "*") uses
"64" bits and DB2 code release "SQL11059" with level identifier "060A010F".
Informational tokens are "DB2 v11.5.9000.352", "s2310270807",
"DYN2310270807WIN64", and Fix Pack "0".
Product is installed at "C:\PROGRA~1\IBM\IBMDAT~1" with DB2 Copy Name
"IBMDBCL1".

测试数据库

> db2cli execsql -connstring "database=testdb;hostname=localhost;port=50000;uid=db2inst1;pwd=testdb"
IBM DATABASE 2 Interactive CLI Sample Program
(C) COPYRIGHT International Business Machines Corp. 1993,1996
All Rights Reserved
Licensed Materials - Property of IBM
US Government Users Restricted Rights - Use, duplication or
disclosure restricted by GSA ADP Schedule Contract with IBM Corp.

> select count(*) from syscat.tables
select count(*) from syscat.tables
FetchAll: Columns: 1
1
437
FetchAll: 1 rows fetched.

python

官网下载2.7的64位版本:https://www.python.org/download/releases/2.7/

pydb2

官网下载:https://sourceforge.net/projects/pydb2/

这个库官方其实已经完全停止维护,最新代码变更是2008年,官方编译的windows版本是python 2.5

我们选择下载源码包PyDB2_1.1.1.tar.gz

编译

初步编译

直接编译会报错

> python setup.py install
Traceback (most recent call last):
File "setup.py", line 94, in <module>
db2rootdir = find_db2rootdir()
File "setup.py", line 75, in find_db2rootdir
raise Exception('Unable to locate DB2 installation directory.\nTry editing DB2_ROOT in setup.py')
Exception: Unable to locate DB2 installation directory.
Try editing DB2_ROOT in setup.py

翻了下源码,会检测一些环境变量,以及会检测db2level输出的安装位置,这些都已经过时。直接修改源码定义的db2安装路径为本机路径

DB2_ROOT = r"C:\Program Files\IBM\IBM DATA SERVER DRIVER"

重新编译会继续报另外一个错误

> python setup.py install
Found DB2 with user-specified variable
DB2 install path: "C:\Program Files\IBM\IBM DATA SERVER DRIVER"
DB2 include path: "C:\Program Files\IBM\IBM DATA SERVER DRIVER\include"
DB2 lib path: "C:\Program Files\IBM\IBM DATA SERVER DRIVER\lib"
DB2 library: "db2cli"
WARNING:
It seems that you did not install the 'Application Development Kit'.
Compilation may fail.
running install
running build
running build_py
creating build
creating build\lib.win-amd64-2.7
copying DB2.py -> build\lib.win-amd64-2.7
running build_ext
building '_db2' extension
error: Unable to find vcvarsall.bat

这里提示未安装vc编译器

安装vc编译器

一开始我直接安装微软最新版本的vc,编译报各种奇奇怪怪的错误,通过搜索stackoverflow的问答,发现有几个要求:

  1. 编译python扩展模块的vc版本必须与编译python主程序的vc版本一致:python 2.7官方版本是使用Microsoft Visual C++ 2008编译的。

  2. express版本不支持编译64位程序。

可以在 msdn itellyou找到了对应的安装包,下载安装的时候注意勾选x64支持(默认是不选的)

编译前需要执行vc环境变量初始化脚本(注意需要传入参x64

> "E:\Program Files (x86)\Microsoft Visual Studio 9.0\VC\vcvarsall.bat" x64
Setting environment for using Microsoft Visual Studio 2008 Beta2 x64 tools.

再次编译

重新编译通过,链接时报错

> python setup.py install
Found DB2 with user-specified variable
DB2 install path: "C:\Program Files\IBM\IBM DATA SERVER DRIVER"
DB2 include path: "C:\Program Files\IBM\IBM DATA SERVER DRIVER\include"
DB2 lib path: "C:\Program Files\IBM\IBM DATA SERVER DRIVER\lib"
DB2 library: "db2cli"
running install
running build
running build_py
running build_ext
building '_db2' extension

E:\Program Files (x86)\Microsoft Visual Studio 9.0\VC\BIN\amd64\cl.exe /c /nologo /Ox /MD /W3 /GS- /DNDEBUG "-IC:\Program Files\IBM\IBM DATA SERVER DRIVER\include" -IC:\Python27\include -IC:\Python27\PC /Tc_db2_module.c /Fobuild\temp.win-amd64-2.7\Release\_db2_module.obj
_db2_module.c

E:\Program Files (x86)\Microsoft Visual Studio 9.0\VC\BIN\amd64\link.exe /DLL /nologo /INCREMENTAL:NO "/LIBPATH:C:\Program Files\IBM\IBM DATA SERVER DRIVER\lib" /LIBPATH:C:\Python27\libs /LIBPATH:C:\Python27\PCbuild\amd64 /LIBPATH:C:\Python27\PC\VS9.0\amd64 db2cli.lib /EXPORT:init_db2 build\temp.win-amd64-2.7\Release\_db2_module.obj /OUT:build\lib.win-amd64-2.7\_db2.pyd /IMPLIB:build\temp.win-amd64-2.7\Release\_db2.lib /MANIFESTFILE:build\temp.win-amd64-2.7\Release\_db2.pyd.manifest
_db2_module.obj : warning LNK4197: 多次指定导出“init_db2”;使用第一个规范
正在创建库 build\temp.win-amd64-2.7\Release\_db2.lib 和对象 build\temp.win-amd64-2.7\Release\_db2.exp
_db2_module.obj : error LNK2019: 无法解析的外部符号 SQLGetDiagRec,该符号在函数 _DB2_GetDiagRec 中被引用
...
_db2_module.obj : error LNK2019: 无法解析的外部符号 SQLDisconnect,该符号在函数 DB2ConnObj_close 中被引用
build\lib.win-amd64-2.7\_db2.pyd : fatal error LNK1120: 28 个无法解析的外部命令
error: command 'E:\\Program Files (x86)\\Microsoft Visual Studio 9.0\\VC\\BIN\\amd64\\link.exe' failed with exit status 1120

这里是因为python 2.7本身是64位的,但是链接的时候选择的db2cli.lib是32位的。修改setup.pydb2cli修改为db2cli64,重新编译成功。

E:\Program Files (x86)\Microsoft Visual Studio 9.0\VC\BIN\amd64\link.exe /DLL /nologo /INCREMENTAL:NO "/LIBPATH:C:\Program Files\IBM\IBM DATA SERVER DRIVER\lib" /LIBPATH:C:\Python27\libs /LIBPATH:C:\Python27\PCbuild\amd64 /LIBPATH:C:\Python27\PC\VS9.0\amd64 db2cli64.lib /EXPORT:init_db2 build\temp.win-amd64-2.7\Release\_db2_module.obj /OUT:build\lib.win-amd64-2.7\_db2.pyd /IMPLIB:build\temp.win-amd64-2.7\Release\_db2.lib /MANIFESTFILE:build\temp.win-amd64-2.7\Release\_db2.pyd.manifest
_db2_module.obj : warning LNK4197: 多次指定导出“init_db2”;使用第一个规范
正在创建库 build\temp.win-amd64-2.7\Release\_db2.lib 和对象 build\temp.win-amd64-2.7\Release\_db2.exp
running install_lib
copying build\lib.win-amd64-2.7\DB2.py -> C:\Python27\Lib\site-packages
copying build\lib.win-amd64-2.7\_db2.pyd -> C:\Python27\Lib\site-packages
byte-compiling C:\Python27\Lib\site-packages\DB2.py to DB2.pyc
running install_egg_info
Writing C:\Python27\Lib\site-packages\PyDB2-1.1.1-py2.7.egg-info

增加数据源

> db2cli writecfg add -dsn testdb -database testdb -host localhost -port 50000

===============================================================================
db2cli writecfg completed successfully.
===============================================================================

验证数据源

> db2cli validate -dsn testdb -connect -user db2inst1 -passwd testdb

===============================================================================
Client information for the current copy (copy name: IBMDBCL1):
===============================================================================

...

===============================================================================
The validation is completed.
===============================================================================

修改测试配置test/Config.py

ConnDict = {
'dsn': 'testdb',
'uid': 'db2inst1',
'pwd': 'testdb',
}

运行测试案例,顺利通过

> python test/test_basic.py

Ran 46 tests in 6.610s

FAILED (failures=1)

优化

旧版本db2驱动连接数据库之前是需要本地手工配置好数据源,而新版本支持直接传入连接字符串来连接数据库,省去本地手工步骤。

> db2cli execsql -connstring "database=testdb;hostname=localhost;port=50000;uid=db2inst1;pwd=testdb"

我们修改下源码_db2_module.c,让pydb2也支持这个特征。

代码很简单,判断到dsn包含字符;则使用SQLDriverConnect 连接数据库

char *sep;
SQLCHAR buffer[255];
SQLSMALLINT outlen;

sep = strchr(dsn, ';');

Py_BEGIN_ALLOW_THREADS ;

if (sep == NULL) {
rc = SQLConnect(c->hdbc,
(SQLCHAR *)dsn, SQL_NTS,
(SQLCHAR *)uid, SQL_NTS,
(SQLCHAR *)pwd, SQL_NTS
);
} else {
rc = SQLDriverConnect(c->hdbc,
(SQLPOINTER) NULL,
dsn,
SQL_NTS,
buffer, 255, &outlen,
SQL_DRIVER_NOPROMPT);
}

Py_END_ALLOW_THREADS ;

测试通过

>>> import DB2
>>> conn = DB2.connect("database=testdb;hostname=localhost;port=50000;uid=db2inst1;pwd=testdb", "", "")
>>> cursor = conn.cursor()
>>> cursor.execute("select count(*) from syscat.tables")
>>> cursor.fetchall()
[(437,)]

概述

一次投产后,监控告警,虚拟机物理内存使用率一直在上升。通过 nmon 数据分析,发现业务进程占用的内存比投产前有明显的增加。

热更新机制

开始之前,先介绍下运行平台的架构。平台使用 c 语言编写,通过 python解释器执行业务代码。为了支持热更新,平台会在出口卸载 py 模块,这样下次请求进来就会重新加载 py 模块,代码变更就可以及时生效。

简化版本的平台代码(plat.py)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys

def callOne():
# 备份
origList = set(sys.modules)
try:
# 调用业务代码
import busi
busi.run()
finally:
# 还原
for modName in list(sys.modules.keys()):
# 卸载业务代码载入的模块
if modName not in origList:
del sys.modules[modName]


def main():
callOne()
raw_input("press <ENTER> to exit")


if __name__ == "__main__":
main()

业务代码(busi.py)没有复杂逻辑,简单的一个 import+print

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from logging.handlers import RotatingFileHandler

def run():
print("wow, i am ok")

问题重现

检查进程内存:

cat /proc/$(ps aux | grep plat | grep -v grep | awk '{print $2}')/status | grep VmRSS

只占用了 7M 内存:

VmRSS:      7608 kB

试试改成调用 1000 次:

def main():
for i in range(1000):
callOne()
raw_input("press <ENTER> to exit")

可以观察到此时内存占用飙升到了 558M:

VmRSS:    558148 kB

比较 LOW 的排查办法

当时线上发现问题之后,用的是最LOW的排查方法:把版本差异代码逐段注释,最终定位到问题所在:

# from logging.handlers import RotatingFileHandler

def run():
print("wow, i am ok")

那么为什么简单的一行 import 会导致内存泄漏呢?通过分析 logging.__init__.py,发现问题所在

import atexit
atexit.register(shutdown)

logging 模块被 import 过程中会调用atexit.register(shutdown) ,后者会将这个函数保存在atexit._exithandlers

平台热更新机制导致logging模块循环构造->注册->GC 无法回收,引起物理内存占用飙升。

通过增加清理代码可以发现内存泄漏问题得到解决,再次验证这个结论:

from logging.handlers import RotatingFileHandler

def run():
print("wow, i am ok")

# 清理
import atexit
atexit._exithandlers = []

比较不 LOW 的排查办法

分析大对象

python 内置了 API 获取内存对象信息:

# 获取所有GC对象
gc.get_objects()

# 获取对象大小(不包含间接引用)
sys.getsizeof(obj)

我们先根据对象大小排序,看看是否有大对象:

def showMemoryBySize():
objList = []
for obj in gc.get_objects():
size = sys.getsizeof(obj)
objList.append([obj, size])
topList = sorted(objList, key=lambda e: e[1], reverse=True)[:10]
for (obj, size) in topList:
print("OBJ: %s, TYPE: %s, SIZE=%sM, REPR=%s" % (
id(obj),
type(obj),
size / 1024 / 1024,
repr(obj)[:50],
))
print("object count: %s" % len(objList))

由于笔记本性能太拉胯,循环 1000 次要跑很久,这里改成了循环 100 次。可以看到没有明显的大对象:

OBJ: 119037464, TYPE: <type 'dict'>, SIZE=0M, REPR={'SocketType': <class 'socket._socketobject'>, 'ge
OBJ: 117955848, TYPE: <type 'dict'>, SIZE=0M, REPR={'SocketType': <type '_socket.socket'>, 'getaddrin
OBJ: 117039448, TYPE: <type 'dict'>, SIZE=0M, REPR={'SocketType': <class 'socket._socketobject'>, 'ge
OBJ: 117087224, TYPE: <type 'dict'>, SIZE=0M, REPR={'SocketType': <type '_socket.socket'>, 'getaddrin
OBJ: 117040536, TYPE: <type 'dict'>, SIZE=0M, REPR={'__path__': ['C:\\Python27\\lib\\logging'], 'LogR
OBJ: 117929912, TYPE: <type 'dict'>, SIZE=0M, REPR={'SocketType': <class 'socket._socketobject'>, 'ge
OBJ: 114833608, TYPE: <type 'dict'>, SIZE=0M, REPR={'SocketType': <type '_socket.socket'>, 'getaddrin
OBJ: 117931272, TYPE: <type 'dict'>, SIZE=0M, REPR={'__path__': ['C:\\Python27\\lib\\logging'], 'LogR
OBJ: 42392296, TYPE: <type 'dict'>, SIZE=0M, REPR={'functools': <module 'functools' from 'C:\Python2
OBJ: 42426712, TYPE: <type 'dict'>, SIZE=0M, REPR={'bytearray': <type 'bytearray'>, 'IndexError': <t
object count: 78383

输出末尾可以看到有 7.8W 个对象未被回收,考虑是不是有大量小对象,我们按类型合并排序:

def showMemoryByType():
objDict = defaultdict(lambda: [0, 0])
for obj in gc.get_objects():
size = sys.getsizeof(obj)
objType = type(obj)
objSum = objDict[objType]
objSum[0] += 1 # 个数
objSum[1] += size # 大小
topList = sorted(objDict.items(), key=lambda e: e[1][1], reverse=True)[:10]
for (objType, (count, size)) in topList:
print("TYPE: %-30s, COUNT=%5s, SIZE=%sM" % (
objType,
count,
size / 1024 / 1024,
))

大头在dict/function/type

TYPE: <type 'dict'>                 , COUNT= 6444, SIZE=10M
TYPE: <type 'function'> , COUNT=31701, SIZE=3M
TYPE: <type 'type'> , COUNT= 4041, SIZE=3M
TYPE: <type 'tuple'> , COUNT= 7836, SIZE=0M
TYPE: <type 'list'> , COUNT= 2927, SIZE=0M
TYPE: <type 'weakref'> , COUNT= 4385, SIZE=0M
TYPE: <type 'getset_descriptor'> , COUNT= 2018, SIZE=0M
TYPE: <type 'member_descriptor'> , COUNT= 1987, SIZE=0M
TYPE: <type 'functools.partial'> , COUNT= 1400, SIZE=0M
TYPE: <type 'instancemethod'> , COUNT= 1501, SIZE=0M

我们按类型分别统计:

def showMemoryByType2():
def _showTop(_type, _d):
print("TYPE: %s" % _type)
# 按大小排序
_topList = sorted(_d.items(), key=lambda e: e[1][1], reverse=True)[:10]
for (_key, (_count, _size)) in _topList:
print("KEY: %-20s, COUNT=%s, SIZE=%sK" % (
_key,
_count,
_size / 1024,
))

dictInfo = {}
funcInfo = {}
typeInfo = {}
for obj in gc.get_objects():
size = sys.getsizeof(obj)
objType = type(obj)
if objType is dict:
# dict:按长度合并
d = dictInfo
key = str(len(obj))
elif objType is FunctionType:
# function:按名字合并
d = funcInfo
key = "%s.%s" % (obj.__module__, obj.__name__)
elif objType is type:
d = typeInfo
# type:按名字合并
key = "%s.%s" % (obj.__module__, obj.__name__)
else:
continue
l = d.setdefault(key, [0, 0])
l[0] += 1 # 个数
l[1] += size # 大小
_showTop("dict", dictInfo)
_showTop("func", funcInfo)
_showTop("type", typeInfo)

输出如下:

TYPE: dict
KEY: 144 , COUNT=102, SIZE=1251K
KEY: 88 , COUNT=100, SIZE=1226K
KEY: 185 , COUNT=100, SIZE=1226K
KEY: 95 , COUNT=101, SIZE=638K
KEY: 7 , COUNT=711, SIZE=571K
KEY: 31 , COUNT=203, SIZE=512K
KEY: 27 , COUNT=201, SIZE=506K
KEY: 6 , COUNT=511, SIZE=481K
KEY: 19 , COUNT=208, SIZE=362K
KEY: 5 , COUNT=1334, SIZE=354K

TYPE: func
KEY: logging.__init__ , COUNT=1300, SIZE=142K
KEY: logging.handlers.__init__, COUNT=1200, SIZE=131K
KEY: threading.__init__ , COUNT=1000, SIZE=109K
KEY: logging.handlers.emit, COUNT=800, SIZE=87K
KEY: logging.handlers.close, COUNT=500, SIZE=54K
KEY: logging.emit , COUNT=400, SIZE=43K
KEY: logging.critical , COUNT=300, SIZE=32K
KEY: logging.info , COUNT=300, SIZE=32K
KEY: logging.exception , COUNT=300, SIZE=32K
KEY: threading.__repr__ , COUNT=300, SIZE=32K

TYPE: type
KEY: socket._fileobject , COUNT=100, SIZE=126K
KEY: socket._socketobject, COUNT=100, SIZE=114K
KEY: weakref.KeyedRef , COUNT=100, SIZE=91K
KEY: logging.LogRecord , COUNT=100, SIZE=87K
KEY: logging.handlers.SMTPHandler, COUNT=100, SIZE=87K
KEY: logging.handlers.BaseRotatingHandler, COUNT=100, SIZE=87K
KEY: logging.handlers.HTTPHandler, COUNT=100, SIZE=87K
KEY: threading._Timer , COUNT=100, SIZE=87K
KEY: logging.Formatter , COUNT=100, SIZE=87K
KEY: logging.LoggerAdapter, COUNT=100, SIZE=87K

func/type大部分都跟logging有关,那么dict里面的又是什么呢?

dictList = []
if key == "144" and len(dictList) < 10:
dictList.append(obj)

下断点查看dictList,发现都是ModuleType,我们分析下都有哪些模块:

def showMemoryByModule():
modInfo = {}
for obj in gc.get_objects():
size = sys.getsizeof(obj)
objType = type(obj)
if objType is dict:
modName = obj.get("__name__", "")
if not modName:
modName = obj.get("__module__", "")
l = modInfo.setdefault(modName, [0, 0])
l[0] += 1 # 个数
l[1] += size # 大小
# 按大小排序
_topList = sorted(modInfo.items(), key=lambda e: e[1][1], reverse=True)[:10]
for (_key, (_count, _size)) in _topList:
print("KEY: %-20s, COUNT=%s, SIZE=%sK" % (
_key,
_count,
_size / 1024,
))

不出所料,logging赫然在列:

KEY: logging             , COUNT=1500, SIZE=2423K
KEY: , COUNT=1186, SIZE=1451K
KEY: logging.handlers , COUNT=1300, SIZE=1207K
KEY: xmlrpclib , COUNT=18, SIZE=25K
KEY: __builtin__ , COUNT=2, SIZE=24K
KEY: _socket , COUNT=2, SIZE=24K

查找引用关系

到了这里,我们已经知道logging存在泄露,那么具体是哪行代码导致的呢?

python 内置了 API 获取内存对象

# 获取入参对象引用的其他对象
gc.get_referents(obj)

# 获取引用入参对象的其他对象
gc.get_referrers(obj)

因为对象引用关系是一张图,手撸代码来排查会比较繁琐,我们可以直接使用现成的库objgraph,只需要传入对象,就会直接生成一张可视化的引用关系图

pip install objgraph

需要额外安装graphvizxdot,官网指引是使用pip安装,我试过之后报错。这里直接使用apt安装

sudo apt install graphviz xdot

要调用的核心方法

def show_backrefs(objs, # 对象列表
max_depth=3, # 最大深度
extra_ignore=(), # 忽略id集
filter=None, # 过滤器
too_many=10, # 最大广度
highlight=None, # 高亮
filename=None, # 输出文件名
extra_info=None, # 额外信息
refcounts=False, # 引用计数
shortnames=True, # 简短名称
output=None, # 输出对象
extra_node_attrs=None, # 额外属性
):

默认情况下,生成的引用关系图会很庞大,我们需要编写一个过滤器,把无关对象过滤

def _filter(target):
name = getattr(target, "__module__", "")
if not name:
name = getattr(target, "__name__", "")
# 跳过graphviz
if name.startswith("graphviz"):
return False
# 跳过logging.handlers
if name.startswith("logging.handlers"):
return False
# 跳过未被外部引用的函数
if name.startswith("logging") and sys.getrefcount(target) <= 6:
return False
return True

logging模块下面的function非常多,但是绝大部分都是function与module之间的互相引用,所以引用计数为1的function不是我们关注点,需要排除。但是我们不能直接判断sys.getrefcount(target) <= 1,因为对象遍历过程objgraph和我们自己的filter也会引用这个对象。通过调试,只要把1改成6即可。

def showRefRel():
# 搜索logging对象
objs = []
for obj in gc.get_objects():
if type(obj) is dict:
modName = obj.get("__name__", "")
if modName == "logging":
objs.append(obj)

# 输出引用关系图
objgraph.show_backrefs(objs, # 对象列表
max_depth=100, # 最大深度
filter=_filter, # 过滤器
too_many=100, # 最大广度
filename="ref.svg", # 输出文件名
refcounts=True, # 引用计数
shortnames=False, # 简短名称
extra_node_attrs=None, # 额外属性
)

使用浏览器打开生成的ref.svg,可以直观看到问题的引用关系链路

atexit._exithandlers
logging.shutdown
logging.shutdown.__globals__
logging.__dict__

引用关系图

总结

  1. 不完善的模块热更新机制很容易引发内存泄露问题,对应的业务代码躺枪(内心无辜独白:我就是老老实实地按照规范写着代码,为什么也会中枪呢?)。
  2. 未来容器化之后,每次都是全量版本,就用不到这个热更新机制了。
  3. 临时解决,只要在业务代码执行前import好对应的模块,后续不要反复加载/卸载。
  4. 这个模块热更新机制其实还引发了另外一个问题,后文继续讨论。

读完本文,你会知道:

  1. 缓存的基本概念
  2. spring 缓存抽象模型
  3. 如何使用 spring 缓存
  4. 如何扩展 spring 缓存

目录
@[toc]

概述

传统模式下,很多并发不大的系统都是直接将查询请求发到 DB:

直接访问-1

随着业务发展,业务逻辑会变得越来越复杂,系统并发数也会逐渐上涨,导致传递到 DB 的查询请求以几何级数上涨,DB 慢慢变得不堪重负。

为了应对这个问题,我们需要将常用的数据缓存起来,避免业务量*N的查询请求穿透到 DB。

根据刷新方式的不同,大体有两种方案。

方案 1-外部定时刷新

外部定时刷新-1

  1. 缓存刷新程序读取 DB,然后写入缓存;
  2. 联机交易直接读取缓存,不再访问数据库;

这个方案存在几个缺点:

  1. 需要特定缓存程序定期刷新。如果这个刷新动作出现问题,会产生大面积的参数变更不生效。
  2. 缓存数据格式死板。为了通用,格式必须跟 DB 表保持一致,应用层获取到之后还需要自行加工处理。
  3. 刷新频率无法精细控制。比如一些数据一天刷新一次即可,一些数据需要 10 秒刷新一次。
  4. 无法区分冷热数据,空间利用率差。比如一张表 10000 条数据,常被访问的也就 100 条,另外 9900 条数据可能一年都不会用到一次,还是一样被加载到缓存里面,冷数据不能根据 LRU 淘汰。

方案 2-访问自动刷新

访问自动刷新-1

  1. 联机交易访问缓存,如果有值且未过期,直接返回调用者;
  2. 访问数据库获取最新值;
  3. 写入缓存,返回调用者;

因为方案1的种种问题,所以现在主流都是采样方案2

demo 准备

接下来我们做个 demo,看看应该如何逐步实现并改进缓存方案。

需求:通过用户 id 查询用户 name

UserService-用户服务接口

接口只有一个方法getNameFromId,入参为用户 id,返回值为用户名:

public interface UserService {
public String getNameFromId(String userId);
}

AbstractUserService-用户服务抽象类

提供一个方法模拟实现从 DB 查询,让子类可以直接调用:

public abstract class AbstractUserService implements UserService {
private Logger log = LoggerFactory.getLogger(getClass());

protected String getNameFromDb(String userId) {
log.info("db query: {}", userId);
// 暂停
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 返回
return "Name_" + userId;
}
}

testDirect - 测试方法

通过传入不同的实现来测试对应的缓存效果:

private void testDirect(UserService userSvc, String userId) {
String name;
// 1
name = userSvc.getNameFromId(userId);
log.info("result: {} -> {}", userId, name);
// 2
name = userSvc.getNameFromId(userId);
log.info("result: {} -> {}", userId, name);
// 3
name = userSvc.getNameFromId(userId);
log.info("result: {} -> {}", userId, name);
}

无缓存

无缓存版本直接调用父类方法访问 DB:

@Component
public class NoCacheUserService extends AbstractUserService {
@Override
public String getNameFromId(String userId) {
return getNameFromDb(userId);
}
}

执行后从输出可以观察到每次调用都是访问 DB:

db query: I0001
result: I0001 -> Name_I0001
db query: I0001
result: I0001 -> Name_I0001
db query: I0001
result: I0001 -> Name_I0001

原始缓存

程序员的第一想法肯定是不用搞那么多杂七杂八的,自己动手用 Map 实现一个缓存:

@Component
public class SimpleCacheUserService extends AbstractUserService {
private Map<String, String> cacheMap = new ConcurrentHashMap<>();

public String getNameFromId(String userId) {
String name = cacheMap.get(userId);
if (name == null) {
name = getNameFromDb(userId);
cacheMap.put(userId, name);
}
return name;
}
}

执行后从输出可以观察到只有第一次调用访问 DB,后面都是直接从缓存获取

db query: I0001
result: I0001 -> Name_I0001
result: I0001 -> Name_I0001
result: I0001 -> Name_I0001

那么,这个方案存在什么不足呢?

  1. 侵入性高。业务代码与缓存逻辑耦合在一起,不利于后续维护。
  2. 不能灵活扩展,比如某类热点用户 id 才缓存,其他不缓存。
  3. 绑死 ConcurrentHashMap,无法随意切换其他更优秀的缓存实现,比如 ehcache/redis 等。
  4. 缺乏自动刷新、过期淘汰等现代缓存特征。

那么,spring 是怎么做的呢?

spring 缓存

相比之前侵入式的方案,spring 采用的是声明式缓存,缓存逻辑完全脱离业务代码。我们要做的只是在方法上面增加一个注解@Cacheable

@Component
public class SpringCacheUserService extends AbstractUserService {
@Override
@Cacheable("SpringCache")
public String getNameFromId(String userId) {
return getNameFromDb(userId);
}
}

运行后好像没效果,那是因为我们还没开启缓存。在程序入口添加一个注解@EnableCaching

@SpringBootApplication
@EnableCaching
public class TestCacheApp {
}

再次运行就可以观察到缓存生效了

如果要实现 K 开头的用户 id 才缓存,怎么做呢?很简单,修改下注解,使用SPEL声明条件即可:

@Cacheable(cacheNames="SpringCache", condition="#userId.startsWith('K')")

我们简单分析下原理。

原始调用链:

原始调用链-1

spring 通过 AOP,在调用者和目标类中间插入代理类,拦截方法调用,实现缓存逻辑:

AOP-1

在这个设计下,应用层统一使用@Cacheable,而后端的缓存实现就可以灵活扩展,还能自由切换、组合各种优秀的缓存方案,比如 ehcache/guava/caffeine/redis。

逐步扩展

支持过期时间

spring 默认使用ConcurrentHashMap实现缓存,因此是不支持过期时间的,我们将其换成Caffeine

添加依赖:

implementation 'org.springframework.boot:spring-boot-starter-cache'
implementation 'com.github.ben-manes.caffeine:caffeine:2.7.0'

在 application.yml 添加配置,设置缓存 2 秒过期:

spring.cache.caffeine.spec: expireAfterWrite=2s

添加测试方法,中间插入一个 sleep 休眠 2.2 秒:

private void testExpire(UserService userSvc, String userId) {
String name;
// 1
name = userSvc.getNameFromId(userId);
log.info("result: {} -> {}", userId, name);
// 2
name = userSvc.getNameFromId(userId);
log.info("result: {} -> {}", userId, name);
// sleep
try {
Thread.sleep(2200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 3
name = userSvc.getNameFromId(userId);
log.info("result: {} -> {}", userId, name);
// 4
name = userSvc.getNameFromId(userId);
log.info("result: {} -> {}", userId, name);
}

执行结果可以观察到 sleep 之后,缓存过期失效,重新查询 DB:

db query: I0001
result: I0001 -> Name_I0001
result: I0001 -> Name_I0001
db query: I0001
result: I0001 -> Name_I0001
result: I0001 -> Name_I0001

这个方案下缓存过期时间是全局性的,不支持不同类型的缓存单独配置不同的缓存过期时间。比如普通参数表可以 1 小时后过期,但是关键参数表却必须控制在 1 分钟内过期,如何实现呢?

精细控制过期时间

我们可以从注解着手,在缓存名称后面追加过期时间,变成:

@Cacheable(cacheNames="SpringCache,2")

新增一个CacheManager,重写父类方法createCaffeineCache,在里面处理缓存名称:

@Component
public class ExtCacheManager extends CaffeineCacheManager {
@Override
protected Cache createCaffeineCache(String name) {
// 解析缓存名称
String[] items = name.split(",");
String cacheName = items[0];
long cacheTime = 60;
if (items.length >= 2) {
cacheTime = Long.parseLong(items[1]);
}
// 创建缓存
com.github.benmanes.caffeine.cache.Cache<Object, Object> nativeCache =
Caffeine.newBuilder()
.expireAfterWrite(cacheTime, TimeUnit.SECONDS)
.build();
return new CaffeineCache(cacheName, nativeCache);
}
}

执行后从输出可以观察到此时缓存过期时间可以精细控制了。

过期处理策略

到了这里,我们需要暂时停下来,讨论下缓存过期的处理策略。

当缓存过期后,如果不加以处理,直接在当前请求更新缓存,就会导致多个并发请求瞬间穿透到 DB:

缓存过期-穿透-1

一种做法是判断到过期的时候加锁,抢占成功的就去 DB 刷新缓存,其他请求则等待:

缓存过期-等待-1

从上图可以看到,这样会造成请求瞬间卡顿。

我们改进下,未争抢到锁的请求不等待,而是直接使用旧值:

缓存过期-旧值-1

乍看之下没问题,但是仔细想想,如果一直没请求进来,在缓存过期很久之后再出现这个场景,此时取到的旧值已经严重过期,再直接使用可能会引发问题:

缓存过期-旧值-2

先总结下前面的需求,我们要尽可能做到

  1. 不并行更新缓存,否则会冲击到 DB;
  2. 不产生锁等待,否则会导致瞬间卡顿;
  3. 不使用过期缓存值,否则会影响到业务处理;

这几点按重要性排序应该是3 > 1 > 2,综合几个策略,我们可以:

  1. 缓存即将过期,1 个请求负责刷新缓存,其他请求则使用缓存值
  2. 缓存已经过期,1 个请求负责刷新缓存,其他请求则锁等待

缓存过期-提前-1

缓存过期-提前-1

此时的缓存有效时间会不断的向前滚动,只需要 1 个请求负责更新缓存,其他请求直接使用缓存值:

缓存过期-提前-1

实现提前刷新

当 caffeine 判断到需要刷新的时候(预设的刷新时间过期时间到达),就会主动调用我们实现的 CacheLoader:

public interface CacheLoader<K, V> {
V load (K key) throws Exception;
}

但是这个接口方法只有一个key参数,我们怎么实现刷新呢?

AOP-1

从前面的这个图可以看到,缓存模块并不知道值来源于 DB 还是哪里,刷新的唯一途径就是调用目标方法。但是目标方法上面只有一个@Cacheable注解而已,我们怎么获取到相关信息呢?

spring 给我们提供的方案是注解上面的keyGenerator参数,每次缓存操作的时候,spring 都会调用这个接口获取到 key:

@FunctionalInterface
public interface KeyGenerator {
Object generate(Object target, Method method, Object... params);
}

我们可以在这上面做文章,使用自定义 key,将目标方法保存起来,然后提供一个 invoke 方法给 CacheLoader 调用。另外,我们还需要实现equals/hashCode/toString等 key 比较时要用到的基础方法。

public class ExtKey {
private final Object target;
private final Method method;
private final Object[] params;
private final int hashCode;

public ExtKey(Object target, Method method, Object... params) {
this.target = target;
this.method = method;
// 复制参数
this.params = new Object[params.length];
System.arraycopy(params, 0, this.params, 0, params.length);
// 计算hash
this.hashCode = Arrays.deepHashCode(this.params);
}

public Object invoke() throws Exception {
return this.method.invoke(target, params);
}

@Override
public boolean equals(Object other) {
return (this == other ||
(other instanceof ExtKey && Arrays.deepEquals(this.params, ((ExtKey) other).params)));
}

@Override
public final int hashCode() {
return this.hashCode;
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" + StringUtils.arrayToCommaDelimitedString(this.params) + "]";
}
}

KeyGenerator 直接返回 ExtKey 即可:

@Component("ExtKeyGenerator")
public class ExtKeyGenerator implements KeyGenerator {
@Override
public Object generate(Object target, Method method, Object... params) {
return new ExtKey(target, method, params);
}
}

在注解里面添加刷新时间(-0.5 即提前 0.5 秒刷新)及 KeyGenerator 信息:

@Cacheable(cacheNames="SpringCache,1,-0.5",
keyGenerator="ExtKeyGenerator")

然后在缓存初始化里面添加 CacheLoader:

@Component
public class ExtCacheManager extends CaffeineCacheManager {
//
private Logger log = LoggerFactory.getLogger(TestCacheApp.class);

@Override
protected Cache createCaffeineCache(String name) {
// 解析缓存名称
String[] items = name.split(",");
String cacheName = items[0];
long cacheTime = 60;
long refreshTime = 0;
// 缓存过期时间
if (items.length >= 2) {
cacheTime = (long) (1000 * Float.parseFloat(items[1]));
}
// 缓存刷新时间 = 缓存过期时间 + 时间差(负数)
if (items.length >= 3) {
refreshTime = cacheTime + (long) (1000 * Float.parseFloat(items[2]));
}
// 缓存加载器
CacheLoader<Object, Object> loader = new CacheLoader<Object, Object>() {
@Override
public @Nullable Object load(@NonNull Object key) throws Exception {
log.info("refresh cache: {}", key);
ExtKey extKey = (ExtKey) key;
return extKey.invoke();
}
};
// 创建缓存
Caffeine<Object, Object> builder = Caffeine.newBuilder()
.expireAfterWrite(cacheTime, TimeUnit.MILLISECONDS);
if (refreshTime > 0) {
// 提前刷新
builder.refreshAfterWrite(refreshTime, TimeUnit.MILLISECONDS)
.executor(Runnable::run);
}
return new CaffeineCache(cacheName, builder.build(loader));
}
}

对于一些系统,如果加载方法不能运行在公共线程池ForkJoinPool.commonPool(),而必须在缓存请求线程执行,则需要显式指定 executor(Runnable::run)

接下来测试下效果,首先新建线程类:

private static class VirtualUser extends Thread {
// 入参
private final UserService userSvc;
private final String userId;
private final CountDownLatch latch;

// 停止标志
public volatile boolean stopFlag = false;
// 计数
public int counter = 0;

public VirtualUser(UserService userSvc, String userId, CountDownLatch latch) {
this.userSvc = userSvc;
this.userId = userId;
this.latch = latch;
}

@Override
public void run() {
try {
// 等待
latch.await();
// 循环执行
while (true) {
// 测试
String name = userSvc.getNameFromId(userId);
// 累加
counter++;
// 暂停
Thread.sleep(0);
// 是否停止
if (stopFlag) {
break;
}
}
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

然后新建一个多线程测试方法:

private void testMultiThread(UserService userSvc, String userId) throws Exception {
// 虚拟用户数
final int VUSER_COUNT = 200;

log.info("create thread ...");
CountDownLatch latch = new CountDownLatch(1);
List<VirtualUser> threadList = new ArrayList<>();
for (int i = 0; i < VUSER_COUNT; i++) {
VirtualUser thread = new VirtualUser(userSvc, userId, latch);
thread.start();
threadList.add(thread);
}

log.info("go ...");
latch.countDown();

Thread.sleep(10 * 1000);

log.info("stop thread ...");
for (VirtualUser thread : threadList) {
thread.stopFlag = true;
}

int totalCount = 0;
for (VirtualUser thread : threadList) {
thread.join();
totalCount += thread.counter;
}

log.info("all done, counter: {}", totalCount);
}

通过比较参数cacheNames="SpringCache,1,-0.5"cacheNames="SpringCache,1"对应的执行效果,可以观察到提前刷新是比过期刷新效率高的,并且随着并发数刷新耗时的上升,两者的差距会越来越明显。

集中配置

缓存名称、大小、过期时间、刷新时间等参数目前都是直接放在注解上面,不利于管理维护,我们可以将其集中起来。

在 application.yml 添加配置,key 为类名,value 为容量,过期时间,刷新时间

ext.cache:
cacheItemMap:
SpringCacheUserService: 100,1,-0.5

配置对应的 bean:

@Component
@ConfigurationProperties("ext.cache")
public class CacheConfig {
private Map<String, String> cacheItemMap;

public void setCacheItemMap(Map<String, String> cacheItemMap) {
this.cacheItemMap = cacheItemMap;
}

public CacheItem getCacheItem(String cacheName) {
// 配置值
String cfgStr = cacheItemMap.get(cacheName);
String[] items = cfgStr.split(",");
long maxSize = Long.parseLong(items[0]);
long cacheTime = 1000 * (long) Float.parseFloat(items[1]);
long refreshTime = 1000 * (long) Float.parseFloat(items[2]);
// 计算刷新时间:-1 + 5 = 4
if (refreshTime != 0) {
refreshTime += cacheTime;
}
// 返回
return new CacheItem(maxSize, cacheTime, refreshTime);
}
}
public class CacheItem {
private final long maxSize;
private final long cacheTime;
private final long refreshTime;

public CacheItem(long maxSize, long cacheTime, long refreshTime) {
this.maxSize = maxSize;
this.cacheTime = cacheTime;
this.refreshTime = refreshTime;
}

public long getMaxSize() {
return maxSize;
}

public long getCacheTime() {
return cacheTime;
}

public long getRefreshTime() {
return refreshTime;
}
}

修改下CacheManager读取配置:

@Component
public class ExtCacheManager extends CaffeineCacheManager {
//
private Logger log = LoggerFactory.getLogger(TestCacheApp.class);

@Autowired
private CacheConfig cacheConfig;

@Override
protected Cache createCaffeineCache(String name) {
// 获取配置
CacheItem cacheItem = cacheConfig.getCacheItem(name);
long maxSize = cacheItem.getMaxSize();
long cacheTime = cacheItem.getCacheTime();
long refreshTime = cacheItem.getRefreshTime();
// 缓存加载器
CacheLoader<Object, Object> loader = new CacheLoader<Object, Object>() {
@Override
public @Nullable Object load(@NonNull Object key) throws Exception {
log.info("refresh cache: {}", key);
ExtKey extKey = (ExtKey) key;
return extKey.invoke();
}
};
// 创建缓存
Caffeine<Object, Object> builder = Caffeine.newBuilder()
.expireAfterWrite(cacheTime, TimeUnit.MILLISECONDS);
if (maxSize > 0) {
builder.maximumSize(maxSize);
}
if (refreshTime > 0) {
// 提前刷新
builder.refreshAfterWrite(refreshTime, TimeUnit.MILLISECONDS)
.executor(Runnable::run);
}
return new CaffeineCache(name, builder.build(loader));
}
}

原来的缓存名称都是直接写在注解上面的,如果要动态生成,需要拓展CacheResolver

@FunctionalInterface
public interface CacheResolver {
Collection<? extends Cache> resolveCaches(CacheOperationInvocationContext<?> context);
}

我们只需要实现AbstractCacheResolvergetCacheNames即可:

@Component("ExtCacheResolver")
public class ExtCacheResolver extends AbstractCacheResolver {
public ExtCacheResolver(CacheManager cacheManager) {
super(cacheManager);
}

@Override
protected Collection<String> getCacheNames(CacheOperationInvocationContext<?> context) {
// 使用类名作为缓存名称
String cacheName = context.getTarget().getClass().getSimpleName();
return Arrays.asList(cacheName);
}
}

在注解里面引用CacheResolver

@Cacheable(keyGenerator="ExtKeyGenerator",
cacheResolver="ExtCacheResolver")

每个使用缓存的地方都要这样注解,对用户太不友好了。我们可以利用 spring 的注解组合功能:

@Target({
ElementType.METHOD,
})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Cacheable(keyGenerator="ExtKeyGenerator",
cacheResolver="ExtCacheResolver")
public @interface ExtCacheable {

}

直接引用这个注解:

@ExtCacheable
public String getNameFromId(String userId) {
return getNameFromDb(userId);
}

相比原来冗长的注解,简洁不少:

@Cacheable(cacheNames="SpringCache,1,-0.5", keyGenerator="ExtKeyGenerator")
->
@ExtCacheable

模块关系

看到这里,头脑是不是有点蒙圈了?

我们整理一下各个模块之间的关系:

关系-1

发散思考

全程下来,很深的感受是 AOP 不愧为 spring 的两大特征之一(另外一个是 DI),而 SCS 作为协调者,做到了 caller/target/cache 三者之间的平滑处理。

那么,我们还可以用 SCS 做什么?

  1. 切换本地缓存 caffeine 为远程缓存 redis;
  2. 组合 caffeine 和 redis,少量、时效性不高的参数放在 caffeine(分布式、速度快),大量、时效性高的数据放在 redis(节省内存、更新方便);

demo 下载链接:[ testcache.7z]

概述

读完本文,你会知道:

  1. 如何衡量性能,要看哪些指标,这些指标之间的关系是怎么样的。
  2. 面对一个性能不足的系统,如何按照一个普遍适用的策略,按部就班地排查问题并优化。
  3. 为什么火焰图可以快速定位问题,如何解读火焰图。

性能指标

如果我们要做性能优化,首先要明白如何衡量性能。

性能主要看三大指标:并发数、响应时间和TPS

并发数

即系统同时并发处理的请求数量。由两个因素决定:

  1. 客户端并发请求数量,对应LoadRunner里面的用户数;
  2. 系统处理器数量,对应系统处理线程/进程池数量;

当[1]<[2],系统处于空闲状态,此时并发数为[1],差异部分为系统处理器空闲数量
并发数-1

当[1]>[2],系统处于过载状态,此时并发数为[2],差异部分为客户端请求排队数量
并发数-1

当[1]=[2](或稍微大于[why?]),系统处于全速状态,此时既不会产生大量的客户端请求排队,也不会出现系统处理资源空闲的情况,因此吞吐量会达到(某种条件下的)最优状态。

响应时间

客户端发出请求到接收到响应,两者的时间差即为响应时间。例如12:01:01秒发出请求,12:01:05秒接收到响应,则该笔交易响应时间为4秒。

响应时间按照所处阶段可以划分为三大部分:

  1. 通讯链路,包括客户端与业务系统之间请求链路、应答链路的耗时。
  2. 业务系统内部,包括队列堆积,计算判断,数据库,文件IO等各种操作导致的耗时。
  3. 上游系统,即业务系统依赖的上游系统的处理时间。

响应时间-1

TPS

每秒处理事务数,如果系统平均一秒可以处理100笔交易,则TPS为100。

TPS是衡量系统吞吐量最直观,也是最重要的指标。可以简单分析得出,TPS与并发数成正比,与平均响应时间成反比,即

TPS = 并发数 / 平均响应时间

例如一个系统并发数为100,平均响应时间0.1秒,则其TPS=(100/0.1)=1000,也就是一秒钟可以处理1000笔交易。

响应时间不同产生的TPS差异对比:

并发数-1

并发数不同产生的TPS差异对比:

并发数-1

优化策略

那么,面对一个性能不足的系统,我们该如何下手?

还记得前面的公式吗?

TPS = 并发数 / 平均响应时间

从这个公式可以推导出,为了提高系统吞吐量,可以从两方面着手:

  1. 提高并发数
  2. 降低平均响应时间

提高并发数

提高并发数一个很简单的方法就是增加处理器数量,让客户端请求不需要排队就可以马上被并发处理。但是单台机器CPU处理能力是有上限的,并不能无限制地增加处理线程/进程。

当CPU占用率达到了70%(为什么不是100%?),我们就不能再简单粗暴地增加处理器数量。此时应该想办法降低CPU占用率

降低CPU占用率,本质就是减少每个请求对应的CPU计算量:

  1. 减少计算次数。例如超大List转换为HashMap,使查询时间复杂度由O(n)变成O(1)
  2. 缓存计算结果。例如String.replaceAll(xxx, yyy)改为类初始化的时候,预编译正则表达式Pattern.compile(xxx),后面多个请求则直接复用这个对象,调用replaceAll(yyy)

降低平均响应时间

通常,通过各种手段优化程序,降低了CPU占用率后,平均响应时间也会相应缩短。但是很多时候你会发现,CPU不高的前提下,增加客户端并发请求数和系统处理器数,TPS并没有相应的增加,甚至出现下降的场景。此时,你要考虑问题是不是出在某个CPU以外的瓶颈点:

  1. 内存不足导致频繁PAGE IN/OUT(交换区)
  2. JAVA堆上限过小导致频繁FULL GC(STW)
  3. 数据库行热点导致请求排队堵塞
  4. 日志输出过多导致IO等待
  5. 反复创建线程或数据库连接,没有利用池技术复用资源
  6. 代码使用了全局锁导致所有线程都在锁等待

以上各种问题,每一点都可以独立成章,一大堆各式各样的工具分析排查。那么,有没有一种通用的(不管什么原因导致的瓶颈),语言无关的(不管java/python/c/lua)工具,能帮我们快速定位问题?

没错,火焰图就可以做到这些。

火焰图解读

传统分析方法

想想,我们过去通常是如何分析性能问题的?

  1. 随机打开一笔交易日志,人肉查看分析各个节点的耗时。
  2. 使用jstack等工具抓取线程堆栈,人肉查看分析里面是否存在锁,或者某个栈是否出现多次。
  3. 使用五花八门的工具查看分析各个怀疑的点,拿着不同的锤子这里敲敲,那里打打。

那么,以上的方法存在什么问题呢?

  1. 侵入式分析。需要确保关键节点有对应的日志,如果没有还得修改代码加上日志输出,而日志输出过多又反过来影响性能。如果是JDK内部代码,我们怎么改?
  2. 无法处理数据抖动。例如同样一条sql在这笔请求耗时100ms,那笔请求耗时150ms,如何衡量?人肉再抽样看多几个日志?
  3. 干扰信息过多。例如日志里面一大堆变量输出,或者堆栈里面一大堆非工作线程信息。
  4. 无法量化数据。看到的是一条条孤立的、不直观的数据,无法从整体来总览俯瞰各个节点的数据。无法准确判定这个方法消耗了10%的CPU,那个sql占用了20%的耗时。
  5. 过于依赖开发人员的经验、以及其对系统架构、代码细节的熟悉程度,换个人就完全无从下手。
  6. 无法排查到应用层以外的问题,例如发起方压力不够导致系统不够繁忙,或者平台设计有问题导致交易在平台层耗时过长。

而火焰图则可以完美解决上面的这些问题。

原理

火焰图一般为svg格式,使用浏览器可以直接打开。

它的生成一般可以分成三个步骤:采样、统计、生成。部分工具可以一次性生成,但是基本原理还是一样的。

采样

所谓采样,就是通过各种工具,不断地抓取正在运行的进程的堆栈信息。

我们自己可以写个简单的shell脚本,采集java进程的堆栈信息:

$PID=12345
for i in {1..100}
do
jstack $PID >> time.raw
sleep 0.01
done

最终采集到的数据:

"main" #1 prio=5 os_prio=0 tid=0x00007f2a0000f800 nid=0xaee waiting on condition [0x00007f2a09544000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)

"main" #1 prio=5 os_prio=0 tid=0x00007f2a0000f800 nid=0xaee waiting on condition [0x00007f2a09544000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)

...

统计

统计就是读取采样数据,然后按出现次数进行归集:

cat time.raw | ./stackcollapse-jstack.pl > time.coll

归集后数据变成这样的形式(调用栈+采样次数):

sun.misc.Unsafe.park;java.util.concurrent.locks.LockSupport.park;java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt 10
xxx1;yyy1;zzz1 20
xxx2;yyy2 30
...

生成

最后一步就是根据统计信息生成火焰图:

cat time.coll | ./flamegraph.pl –colors=java > time.svg

图形解读

火焰图-CPU:

火焰图-CPU-1

火焰图-耗时:

火焰图-耗时-1

最终展现出来的图形很像一个个在抖动的火焰,因而得名火焰图

火焰图主要有两种,分别对应两个维度的数据:CPU、耗时

那么,我们该怎么看火焰图?

  1. x轴表示某个方法片段的比例,越宽则说明占比越高,优化后对应的效果也越好。
  2. y轴表示程序堆栈,从下往上表示完整的代码调用链,我们可以根据y轴快速准确定位代码位置。
  3. 颜色只是为了区分层次,无特定含义。
  4. 点击某个区块,会以该区块为基础,自动将其上层调用链展开,下层则自动置灰。当我们需要集中精力分析某个代码片段的时候会经常用到。
  5. 移到某个区块,最底下会自动显示对应的函数名、采样次数、整体百分比
  6. Ctrl+F,在弹出的文本框输入关键字,会自动高亮包含该关键字的区块,并在右下角显示相关区块占用的相对百分比

以上面的火焰图-CPU为例,我们分析下里面的信息:

  1. 最底层为all区块(100%),次底层左侧和右侧的小区块是java的native、gc、vm栈(2%),中间一大块则是用户层(98%)。
  2. 中间可以分为两部分:netty(7%),ThreadPoolExecutor(90%)。
  3. 一直往上到PayTrade/PAYS11/TPAYS11.execute(86%)则是应用层(业务层)代码,通常也是我们的优化重点。
  4. 点击PayTrade/PAYS11/TPAYS11.execute自动展开,然后按Ctrl+F,在弹出的文本框输入mysql,相关的区块自动高亮,同时右下角显示Matched: 29%,即mysql相关调用的相对百分比29%。如果此时系统平均CPU占用为70%,则其绝对百分比70% * 86% * 29% = 17%,如果我们把这部分代码干掉,可以节省17%的物理CPU资源。

火焰图-耗时也完全可以用上面的思路分析,只是此时的维度由CPU变成了耗时

下一步

本文简单介绍了性能指标优化策略以及火焰图解读,后面会另文补充各种语言下生成火焰图、定位问题以及解决的介绍。

各种语言对应的火焰图工具:

语言 名称
通用 FlameGraph
java async-profiler perf-map-agent
python pyflame py-spy
0%