python性能调优之key-query()

近期接触到一个性能调优的问题,需要在 mongo db 中比对大约 100G 的 csv 文件的 key 在 mongo db 中是否存在

近期接触到一个性能调优的问题,需要在 mongo db 中比对大约 100G 的 csv 文件的 key 在 mongo db 中是否存在

base line

首先做一个什么优化都没有的情况下的基准测试:

batch_size = 10**4*3
find_count = 0
total = 0
exception_count = 0
begin = datetime.now()
max_size = 10**6*5

for chunk in pd.read_csv(file, header = None, chunksize=batch_size, names = ['key']):
    total += batch_size
    try:
        find_count += mycol.count_documents({'Key': {'$in': chunk.key.tolist()}})
        print('size: {:,}, find: {}, time: {}'.format(total, find_count, datetime.now() - begin))
    except:
        exception_count += 1
    
    if total > max_size:
        break

print('size: {:,}, batch_size: {:,}, time: {}'.format(total, batch_size, datetime.now() - begin))

size: 5,000,000, batch_size: 30,000, time: 0:00:44

500 万的数据,耗时 44 秒

优化一:建立索引

给 Key 栏位建立 hashed 索引:

{
    "Key": "hashed"
}

size: 5,000,000, batch_size: 30,000, time: 0:00:34

耗时 34 秒

优化二:启用 network compression

pip install zstandard
import pymongo
import urllib

myclient = pymongo.MongoClient('mongodb://{}:{}@localhost/db?authSource=admin'.format(urllib.parse.quote('root'),
    urllib.parse.quote('example')), compressors='zstd')

size: 5,000,000, batch_size: 30,000, time: 0:00:20

耗时 20 秒

优化三:同时使用 index 和 network compression

size: 5,000,000, batch_size: 30,000, time: 0:00:09

优化四:使用多线程

使用多线程,将数据读取和数据查询分开进行,提升并发效率

import pandas as pd
from queue import Queue
from threading import Thread, Lock

my_queue = Queue(maxsize=10)
num_threads = 2
lock = Lock()

def do_stuff(id, q, mycol):
    global find_count
    global exception_count
    while True:
        try:
            count = mycol.count_documents({'Key': {'$in': q.get()}})
            with lock:
                find_count += count
                print('id: {}, size: {:,}, find: {}, now: {}'.format(id, total, find_count, datetime.now() - begin))
        except:
            exception_count += 1
            print('size exception: {:,}, find: {}, now: {}'.format(batch_size, find_count, datetime.now() - begin))
        
        q.task_done()

for i in range(num_threads):
    worker = Thread(target=do_stuff, args=(i, my_queue, mycol))
    worker.setDaemon(True)
    worker.start()

for chunk in pd.read_csv(file, header = None, chunksize = batch_size, names = ['key']):
    total += batch_size
    my_queue.put(chunk.key.tolist())

    if total > max_size:
        break

my_queue.join()

print('size: {:,}, batch_size: {:,}, find: {}, exception: {}, now: {}'.format(total, batch_size, find_count, exception_count, datetime.now() - begin))

size: 5,000,000, batch_size: 30,000, time: 0:00:05

耗时 5 秒

总结

no optimization index network compression multiple thread result
v 0:00:45
v 0:00:34 1.32x
v 0:00:20 2.25x
v v 0:00:09 5x
v v v 0:00:06 7.5x

参考

  • Mongo index
  • MongoDB Network Compression: A Win-Win
————————

近期接触到一个性能调优的问题,需要在 mongo db 中比对大约 100G 的 csv 文件的 key 在 mongo db 中是否存在

近期接触到一个性能调优的问题,需要在 mongo db 中比对大约 100G 的 csv 文件的 key 在 mongo db 中是否存在

base line

首先做一个什么优化都没有的情况下的基准测试:

batch_size = 10**4*3
find_count = 0
total = 0
exception_count = 0
begin = datetime.now()
max_size = 10**6*5

for chunk in pd.read_csv(file, header = None, chunksize=batch_size, names = ['key']):
    total += batch_size
    try:
        find_count += mycol.count_documents({'Key': {'$in': chunk.key.tolist()}})
        print('size: {:,}, find: {}, time: {}'.format(total, find_count, datetime.now() - begin))
    except:
        exception_count += 1
    
    if total > max_size:
        break

print('size: {:,}, batch_size: {:,}, time: {}'.format(total, batch_size, datetime.now() - begin))

size: 5,000,000, batch_size: 30,000, time: 0:00:44

500 万的数据,耗时 44 秒

优化一:建立索引

给 Key 栏位建立 hashed 索引:

{
    "Key": "hashed"
}

size: 5,000,000, batch_size: 30,000, time: 0:00:34

耗时 34 秒

优化二:启用 network compression

pip install zstandard
import pymongo
import urllib

myclient = pymongo.MongoClient('mongodb://{}:{}@localhost/db?authSource=admin'.format(urllib.parse.quote('root'),
    urllib.parse.quote('example')), compressors='zstd')

size: 5,000,000, batch_size: 30,000, time: 0:00:20

耗时 20 秒

优化三:同时使用 index 和 network compression

size: 5,000,000, batch_size: 30,000, time: 0:00:09

优化四:使用多线程

使用多线程,将数据读取和数据查询分开进行,提升并发效率

import pandas as pd
from queue import Queue
from threading import Thread, Lock

my_queue = Queue(maxsize=10)
num_threads = 2
lock = Lock()

def do_stuff(id, q, mycol):
    global find_count
    global exception_count
    while True:
        try:
            count = mycol.count_documents({'Key': {'$in': q.get()}})
            with lock:
                find_count += count
                print('id: {}, size: {:,}, find: {}, now: {}'.format(id, total, find_count, datetime.now() - begin))
        except:
            exception_count += 1
            print('size exception: {:,}, find: {}, now: {}'.format(batch_size, find_count, datetime.now() - begin))
        
        q.task_done()

for i in range(num_threads):
    worker = Thread(target=do_stuff, args=(i, my_queue, mycol))
    worker.setDaemon(True)
    worker.start()

for chunk in pd.read_csv(file, header = None, chunksize = batch_size, names = ['key']):
    total += batch_size
    my_queue.put(chunk.key.tolist())

    if total > max_size:
        break

my_queue.join()

print('size: {:,}, batch_size: {:,}, find: {}, exception: {}, now: {}'.format(total, batch_size, find_count, exception_count, datetime.now() - begin))

size: 5,000,000, batch_size: 30,000, time: 0:00:05

耗时 5 秒

总结

no optimization index network compression multiple thread result
v 0:00:45
v 0:00:34 1.32x
v 0:00:20 2.25x
v v 0:00:09 5x
v v v 0:00:06 7.5x

参考

  • Mongo index
  • MongoDB Network Compression: A Win-Win