0%

基于Python从InfluxDB中读取大量数据实践

背景

由于项目的需求,我们要在Python中查询InfluxDB的数据且查询的数据量较大,因此需要调研一种能够有效防止内存使用过多的查询方式。

方式一

在Python中查询InfluxDB数据时,一般会这样做:

1
2
3
4
5
from influxdb import InfluxDBClient
client = InfluxDBClient('xx.xx.xx.xx', 8086, database='monitor')
result = client.query("SELECT * FROM xx")
for point in result.get_points():
pass

在InfluxDB库的文档中对get_points()方法是这样描述的:

Return a generator for all the points that match the given filters.

get_points()方法会返回一个生成器,那么是不是意味着这种方式已经考虑到了查询大数据集的情况呢。

那下面就来测试一下内存占用情况。

1
2
3
4
5
6
7
Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
7 51.5547 MiB 51.5547 MiB 1 @profile(precision=4)
8 def test1():
9 53.0391 MiB 1.4844 MiB 1 result = client.query('SELECT * FROM sysio LIMIT 1000')
10 53.0391 MiB 0.0000 MiB 1001 for point in result.get_points():
11 53.0391 MiB 0.0000 MiB 1000 pass
1
2
3
4
5
6
7
Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
7 51.5508 MiB 51.5508 MiB 1 @profile(precision=4)
8 def test1():
9 54.1836 MiB 2.6328 MiB 1 result = client.query('SELECT * FROM sysio LIMIT 2000')
10 54.1836 MiB 0.0000 MiB 2001 for point in result.get_points():
11 54.1836 MiB 0.0000 MiB 2000 pass

从测试结果来看,随着数据量的增加,使用的内存增加较多,说明这种方式并不能防止内存使用过多。

方式二(分块)

从文档中得知,InfluxDBClient对象的query方法有以下两个参数:

  • chunked (bool) – Enable to use chunked responses from InfluxDB. With chunked enabled, one ResultSet is returned per chunk containing all results within that chunk
  • chunk_size (int) – Size of each chunk to tell InfluxDB to use.

从以上描述中可以看出,使用这两个参数可以进行分块查询。

测试内存占用情况如下:

1
2
3
4
5
6
7
Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
14 51.5508 MiB 51.5508 MiB 1 @profile(precision=4)
15 def test2():
16 54.3047 MiB 2.7539 MiB 1 result = client.query('SELECT * FROM sysio LIMIT 2000', chunked=True, chunk_size=100)
17 54.3047 MiB 0.0000 MiB 2001 for point in result.get_points():
18 54.3047 MiB 0.0000 MiB 2000 pass

根据测试结果得出分块方式并不会减少内存占用,甚至会使用更多内存。

为什么分块了却没有减少内存的使用呢,这一点在influxdb的源码中可以看出。

1
2
3
4
5
6
7
8
9
10
11
12
def _read_chunked_response(response, raise_errors=True):
result_set = {}
for line in response.iter_lines():
if isinstance(line, bytes):
line = line.decode('utf-8')
data = json.loads(line)
for result in data.get('results', []):
for _key in result:
if isinstance(result[_key], list):
result_set.setdefault(
_key, []).extend(result[_key])
return ResultSet(result_set, raise_errors=raise_errors)

_read_chunked_response方法使用一个字典来存放分块后的所有内容,而没有使用生成器的方式,这就解释了为什么使用的内存没有减少反而增加了。

那是不是改写成生成器就可以了呢,其实InfluxDB的后续版本已经实现了这一功能。但是在我的机器上并没有执行成功,是因为这个功能还存在bug且目前还未解决。所以这一部分就不展开描述了。

方式三(分页)

既然InfluxDB库中提供的方法暂时都没有办法达到我们的目的,那就只能自己来实现了。

这里简单实现了一个分页查询(仅用于测试),代码如下:

1
2
3
4
5
6
def query(count):
limit = 100
for offset in range(0, count, limit):
limit = min(count-offset, limit)
result = client.query('SELECT * FROM sysio LIMIT {} OFFSET {}'.format(limit, offset))
yield from result.get_points()

测试内存占用情况如下:

1
2
3
4
5
6
Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
29 51.5508 MiB 51.5508 MiB 1 @profile(precision=4)
30 def test3():
31 52.2422 MiB 0.6914 MiB 1001 for point in query(1000):
32 52.2422 MiB 0.0000 MiB 1000 pass
1
2
3
4
5
6
Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
29 51.5508 MiB 51.5508 MiB 1 @profile(precision=4)
30 def test3():
31 52.2734 MiB 0.7227 MiB 2001 for point in query(2000):
32 52.2734 MiB 0.0000 MiB 2000 pass
1
2
3
4
5
6
Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
29 51.5508 MiB 51.5508 MiB 1 @profile(precision=4)
30 def test3():
31 52.2891 MiB 0.7383 MiB 3001 for point in query(3000):
32 52.2891 MiB 0.0000 MiB 3000 pass

从测试结果可以看出,随着数据量的增加,使用的内存基本保持不变,说明该方式符合我们的需求。

结论

以上三种方式中,只有分页方式可以有效防止内存使用过多的问题。