Python日志模块-多进程日志记录

12,629 阅读19分钟

一、问题描述二、分析2.1 logging模块实现日志回滚2.2 多进程日志安全输出到同一文件方案三、解决方案3.1 使用ConcurrentRotatingFileHandler包3.2 concurrent-log-handler包3.3 对日志输出加锁3.4 重写FileHandler类3.5 单独进程负责日志事件3.6 logging.SocketHandler的方案四、参考文献

一、问题描述

项目中,使用RotatingFileHandler根据日志文件大小来切分日志。设置文件的MaxBytes1GBbackupCount大小为5。

经查看,发现日志文件的大小均小于10MB,且每个回滚日志文件的写入时间也都比较接近。

二、分析

日志文件过小,猜测是代码有问题,或者是文件内容有丢失;日志写入时间接近猜测是同时写入的问题。

经检查,代码没有问题,排除此原因。考虑当前使用gunicorn的多进程启动程序,多半是多个进程同时写入当个文件造成日志文件丢失。

logging模块是线程安全的,但并不是进程安全的。

如何解决此问题呢?首先先过一遍Pythonlogging模块在处理日志回滚的具体实现方法。

2.1 logging模块实现日志回滚

loggingRotatingFileHandler类和TimedRotatingFileHandler类分别实现按照日志文件大小和日志文件时间来切分文件,均继承自BaseRotatingHandler类。

BaseRotatingHandler类中实现了文件切分的触发和执行,具体过程如下:

 1def emit(self, record):
2    """
3        Emit a record.
4        Output the record to the file, catering for rollover as described
5        in doRollover().
6        """

7    try:
8        if self.shouldRollover(record):
9            self.doRollover()
10        logging.FileHandler.emit(self, record)
11    except Exception:
12        self.handleError(record)

具体的执行过程shouldRollover(record)doRollover()函数则在RotatingFileHandler类和TimedRotatingFileHandler类中实现。

RotatingFileHandler类为例,doRollover()函数流程如下:

 1def doRollover(self):
2    if self.stream:
3        self.stream.close()
4        self.stream = None
5    if self.backupCount > 0:
6        for i in range(self.backupCount - 10-1): # 从backupCount,依次到1
7            sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i))
8            dfn = self.rotation_filename("%s.%d" % (self.baseFilename,
9                                                        i + 1))
10            if os.path.exists(sfn):
11                if os.path.exists(dfn):
12                    os.remove(dfn)
13                os.rename(sfn, dfn) # 实现将xx.log.i->xx.log.i+1
14        dfn = self.rotation_filename(self.baseFilename + ".1")
15        # ---------start-----------
16        if os.path.exists(dfn): # 判断如果xx.log.1存在,则删除xx.log.1
17            os.remove(dfn)
18        self.rotate(self.baseFilename, dfn) # 将xx.log->xx.log.1
19        # ----------end------------
20    if not self.delay:
21        self.stream = self._open() # 执行新的xx.log

分析如上过程,整个步骤是:

  1. 当前正在处理的日志文件名为self.baseFilename,该值self.baseFilename = os.path.abspath(filename)是设置的日志文件的绝对路径,假设baseFilenameerror.log
  2. 当进行文件回滚的时候,会依次将error.log.i重命名为error.log.i+1
  3. 判断error.log.1是否存在,若存在,则删除,将当前日志文件error.log重命名为error.log.1
  4. self.stream重新指向新建error.log文件。

当程序启动多进程时,每个进程都会执行doRollover过程,若有多个进程进入临界区,则会导致dfn被删除多次等多种混乱操作。

2.2 多进程日志安全输出到同一文件方案

相应的解决方法:

  1. 将日志发送到同一个进程中,由该进程负责输出到文件中(使用QueueQueueHandler将所有日志事件发送至一个进程中)
  2. 对日志输出加锁,每个进程在执行日志输出时先获得锁(用多处理模块中的Lock类来序列化对进程的文件访问)
  3. 让所有进程都将日志记录至一个SocketHandler,然后用一个实现了套接字服务器的单独进程一边从套接字中读取一边将日志记录至文件(Python手册中提供)

三、解决方案

3.1 使用ConcurrentRotatingFileHandler包

该方法就属于加锁方案。

ConcurrentLogHandler 可以在多进程环境下安全的将日志写入到同一个文件,并且可以在日志文件达到特定大小时,分割日志文件(支持按文件大小分割)。但ConcurrentLogHandler 不支持按时间分割日志文件的方式。

ConcurrentLogHandler 模块使用文件锁定,以便多个进程同时记录到单个文件,而不会破坏日志事件。该模块提供与RotatingFileHandler类似的文件循环方案。

该模块尝试不惜一切代价保存记录,这意味着日志文件将大于指定的最大大小(如果磁盘空间不足,则坚持使用RotatingFileHandler,因为它是严格遵守最大文件大小),如果有多个脚本的实例同时运行并写入同一个日志文件,那么所有脚本都应该使用ConcurrentLogHandler,不应该混合和匹配这这个类。

并发访问通过使用文件锁来处理,该文件锁应确保日志消息不会被丢弃或破坏。这意味着将为写入磁盘的每个日志消息获取并释放文件锁。(在Windows上,您可能还会遇到临时情况,必须为每个日志消息打开和关闭日志文件。)这可能会影响性能。在我的测试中,性能绰绰有余,但是如果您需要大容量或低延迟的解决方案,建议您将其放在其他地方。

ConcurrentRotatingFileLogHandler类是python标准日志处理程序RotatingFileHandler的直接替代。

这个包捆绑了portalocker来处理文件锁定。由于使用了portalocker模块,该模块当前仅支持“nt”“posix”平台。

安装:

1pip install ConcurrentLogHandler

该模块支持Python2.6及以后版本。当前最新版本是0.9.1

ConcurrentLogHandler的使用方法与其他handler类一致,如与RotatingFileHandler的使用方法一样。

初始化函数及参数:

1class ConcurrentRotatingFileHandler(BaseRotatingHandler):
2    """
3    Handler for logging to a set of files, which switches from one file to the
4    next when the current file reaches a certain size. Multiple processes can
5    write to the log file concurrently, but this may mean that the file will
6    exceed the given size.
7    """

8    def __init__(self, filename, mode='a', maxBytes=0, backupCount=0,
9                 encoding=None, debug=True, delay=0)
:

参数含义同Python内置RotatingFileHandler类相同,具体可参考上一篇博文。同样继承自BaseRotatingHandler类。

简单的示例:

1import logging
2from cloghandler import ConcurrentRotatingFileHandler
3
4logger = logging.getLogger()
5rotateHandler = ConcurrentRotatingFileHandler('./logs/my_logfile.log'"a"1024*10245)
6logger.addHandler(rotateHandler)
7logger.setLevel(logging.DEBUG)
8
9logger.info('This is a info message.')

为了适应没有ConcurrentRotatingFileHandler包的情况,增加回退使用RotatingFileHandler的代码:

1try:
2    from cloghandler import ConcurrentRotatingFileHandler as RFHandler
3except ImportError:
4    from warning import warn
5    warn('ConcurrentRotatingFileHandler package not installed, Using builtin log handler')
6    from logging.handlers import RotatingFileHandler as RFHandler

运行后可以发现,会自动创建一个.lock文件,通过锁的方式来安全的写日志文件。

备注: 该库自2013年以后就没有再更新,若有问题,可使用3.2小节中的concurrent-log-handler软件包。


在非单独使用python脚本的时候,注意使用方式:

 1# 不建议使用方式
2from cloghandler import ConcurrentRotatingFileHandler
3
4.......
5'handlers':{
6        "error_file": {
7            "class""ConcurrentRotatingFileHandler",
8            "maxBytes"100*1024*1024,# 日志的大小
9            "backupCount"3,
10# 建议写完整
11import cloghandler
12'handlers':{
13        "error_file": {
14            "class""cloghandler.ConcurrentRotatingFileHandler",
15            "maxBytes"100*1024*1024,# 日志的大小
16            "backupCount"3,

否则,会出现如下错误:

1Error: Unable to configure handler 'access_file': Cannot resolve 'ConcurrentRotatingFileHandler': No module named 'ConcurrentRotatingFileHandler'

3.2 concurrent-log-handler包

该模块同样也为python的标准日志记录软件提供了额外的日志处理程序。即回将日志事件写入日志文件,当文件达到一定大小时,该日志文件将轮流轮转,多个进程可以安全地写入同一日志文件,还可以将其进行压缩(开启)。WindowsPOSIX系统均受支持。

它可以看做是旧版本cloghandler的直接替代品,主需要将cloghandler更改为concurrent_log_handler

其特征及说明与cloghandler一致,具体可见3.1小节。

安装

1pip install concurrent-log-handler

若是从源码安装,则执行如下命令:

1python setup.py install

使用示例

1import logging
2from concurrent_log_handler import ConcurrentRotatingFileHandler
3
4logger = logging.getLogger()
5rotateHandler = ConcurrentRotatingFileHandler('./logs/mylogfile.log''a'512*10245)
6logger.addHandler(rotateHandler)
7logger.setLevel(logging.DEBUG)
8
9logger.info('This is a info message.')

同样的,若要分发代码,不确定是否都已安装concurrent_log_handler软件包时,使Python可以轻松的回退到内置的RotatingFileHandler。下面是示例:

 1import logging
2try:
3    from concurrent_log_handler import ConcurrentRotatingFileHandler as RFHandler
4except ImportError:
5    # 下面两行可选
6    from warnings import warn
7    warn('concurrent_log_handler package not installed. Using builtin log handler')
8    from logging.handlers import RotatingFileHandler as RFHandler
9
10logger = logging.getLogger()
11rotateHandler = RFHandler('./logs/mylogfile.log''a'1024*10245)
12logger.addHandler(rotateHandler)

同样的,建议直接导入concurrent_log_handler,使用concurrent_log_handler.ConcurrentRotatingFileHandler方式。

3.3 对日志输出加锁

TimedRotatingFileHandlerdoRollover函数的主要部分如下:

 1def doRollover(self):
2    ....
3    dfn = self.rotation_filename(self.baseFilename + "." +
4                                     time.strftime(self.suffix, timeTuple))
5    # -------begin-------
6    if os.path.exists(dfn): # 判断如果存在dfn,则删除
7            os.remove(dfn)
8    self.rotate(self.baseFilename, dfn) # 将当前日志文件重命名为dfn
9    # --------end--------
10    if self.backupCount > 0:
11        for s in self.getFilesToDelete():
12            os.remove(s)
13    if not self.delay:
14        self.stream = self._open()
15    ....

修改思路:

判断dfn文件是否已经存在,如果存在,表示已经被rename过了;如果不存在,则只允许一个进程去rename,其他进程需等待。

新建一个类继承自TimeRotatingFileHandler,修改doRollover函数,只需处理上面代码的注释部分即可。如下:

 1class MPTimeRotatingFileHandler(TimeRotatingFileHandler):
2    def doRollover(self):
3        ....
4        dfn = self.rotation_filename(self.baseFilename + "." +
5                                     time.strftime(self.suffix, timeTuple))
6        # ----modify start----
7        if not os.path.exists(dfn):
8            f = open(self.baseFilename, 'a')
9            fcntl.lockf(f.fileno(), fcntl.LOCK_EX)
10            if os.path.exists(self.baseFilename): # 判断baseFilename是否存在
11                self.rotate(self.baseFilename, dfn)
12        # ----modify end-----
13        if self.backupCount > 0:
14        for s in self.getFilesToDelete():
15            os.remove(s)
16        ....

3.4 重写FileHandler类

logging.handlers.py中各类的继承关系如下图所示:

image-20200106151225654
image-20200106151225654

TimeRotatingFileHandler类就是继承自该类,在FileHandler类中增加一些处理。

具体可参考以下博文:

  1. python logging日志模块以及多进程日志 | doudou0o blog

  2. python多进程解决日志错乱问题_qq_20690231的博客-CSDN博客


Python官方手册中,提供了多进程中日志记录至单个文件的方法。

logging是线程安全的,将单个进程中的多个线程日志记录至单个文件也是支持的。但将多个进程中的日志记录至单个文件中则不支持,因为在Python中并没有在多个进程中实现对单个文件访问的序列化的标准方案。

将多个进程中日志记录至单个文件中,有以下几个方案:

  1. 让所有进程都将日志记录至一个 SocketHandler,然后用一个实现了套接字服务器的单独进程一边从套接字中读取一边将日志记录至文件。
  2. 使用 QueueQueueHandler 将所有的日志事件发送至你的多进程应用的一个进程中。

3.5 单独进程负责日志事件

一个单独监听进程负责监听其他进程的日志事件,并根据自己的配置记录。

示例:

 1import logging
2import logging.handlers
3import multiprocessing
4
5from random import choice, random
6import time
7
8def listener_configurer():
9    root = logging.getLogger()
10    h = logging.handlers.RotatingFileHandler('test.log''a'300,10# rotate file设置的很小,以便于查看结果
11    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
12    h.setFormatter(f)
13    root.addHandler(h)
14
15def listenser_process(queue, configurer):
16    configurer()
17    while True:
18        try:
19            record = queue.get()
20            if record is None:
21                break
22            logger = logging.getLogger(record.name)
23            logger.handle(record)
24        except Exception:
25            import sys, traceback
26            print('Whoops! Problem:', file=sys.stderr)
27            trackback.print_exc(file=sys.stderr)
28
29LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
30          logging.ERROR, logging.CRITICAL]
31
32LOGGERS = ['a.b.c''d.e.f']
33
34MESSAGES = [
35    'Random message #1',
36    'Random message #2',
37    'Random message #3',
38]
39
40def worker_configurer(queue):
41    h = logging.handlers.QueueHandler(queue)
42    root = logging.getLogger()
43    root.addHandler(h)
44    root.setLevel(logging.DEBUG)
45
46# 该循环仅记录10个事件,这些事件具有随机的介入延迟,然后终止
47def worker_process(queue, configurer):
48    configurer(queue)
49    name = multiprocessing.current_process().name
50    print('Worker started:%s'%name)
51    for i in range(10):
52        time.sleep(random())
53        logger = logging.getLogger(choice(LOGGERS))
54        level = choice(LEVELS)
55        message = choice(MESSAGES)
56        logger.log(level, message)
57# 创建队列,创建并启动监听器,创建十个工作进程并启动它们,等待它们完成,然后将None发送到队列以通知监听器完成
58def main():
59    queue = multiprocessing.Queue(-1)
60    listener = multiprocessing.Process(target=listener_process,
61                                      args=(queue, listener_configurer))
62    listener.start()
63    workers = []
64    for i in range(10):
65        worker = multiprocessing.Process(target=worker_process,
66                                        args=(queue, listener_configurer))
67        workers.append(worker)
68        worker.start()
69    for w in workers:
70        w.join()
71    queue.put_nowait(None)
72    listener.join()
73
74if __name__ == '__main__':
75    main()

使用主进程中一个单独的线程记录日志

下面这段代码展示了如何使用特定的日志记录配置,例如foo记录器使用了特殊的处理程序,将foo子系统中所有的事件记录至一个文件mplog-foo.log。在主进程(即使是在工作进程中产生的日志事件)的日志记录机制中将直接使用恰当的配置。

 1import logging
2import logging.config
3import logging.handlers
4from multiprocessing import Process, Queue
5import random
6import threading
7import time
8
9def logger_thread(q):
10    while True:
11        record = q.get()
12        if record is None:
13            break
14        logger = logging.getLogger(record.name)
15        logger.handle(record)
16
17def worker_process(q):
18    qh = logging.handlers.QueueHandler(q)
19    root = logging.getLogger()
20    root.setLevel(logging.DEBUG)
21    root.addHandler(qh)
22    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
23              logging.CRITICAL]
24    loggers = ['foo''foo.bar''foo.bar.baz''spam''spam.ham''spam.ham.eggs']
25
26    for i in range(100):
27        lv1=l = random.choice(levles)
28        logger = logging.getLogger(random.choice(loggers))
29        logger.log(lvl, 'Message no. %d', i)
30
31for __name__ == '__main__':
32    q = Queue()
33    d = {
34        'version'1,
35        'formatters': {
36            'detailed': {
37                'class''logging.Formatter',
38                'format''%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
39            }
40        },
41        'handlers': {
42            'console': {
43                'class''logging.StreamHandler',
44                'level''INFO',
45            },
46            'file': {
47                'class''logging.FileHandler',
48                'filename''mplog.log',
49                'mode''w',
50                'formatter''detailed',
51            },
52            'foofile': {
53                'class''logging.FileHandler',
54                'filename''mplog-foo.log',
55                'mode''w',
56                'formatter''detailed',
57            },
58            'errors': {
59                'class''logging.FileHandler',
60                'filename''mplog-errors.log',
61                'mode''w',
62                'level''ERROR',
63                'formatter''detailed',
64            },
65        },
66        'loggers': {
67            'foo': {
68                'handlers': ['foofile']
69            }
70        },
71        'root': {
72            'level''DEBUG',
73            'handlers': ['console''file''errors']
74        },
75    }
76    workers = []
77    for i in range(5):
78        wp = Process(target=worker_process, name='worker %d'%(i+1), args=(q,))
79        workers.append(wp)
80        wp.start()
81    logging.config.dictConfig(d)
82    lp = threading.Thread(target=logger_thread, args=(q,))
83    lp.start()
84
85    for wp in workers:
86        wp.join()
87    q.put(None)
88    lp.join()

3.6 logging.SocketHandler的方案

该方案具体的使用方法,下方博文给出了说明,具体实现参考如下博客:

Python中logging在多进程环境下打印日志 - VictoKu - 博客园

四、参考文献

  1. 用 Python 写一个多进程兼容的 TimedRotatingFileHandler - piperck - 博客园