使用python进行nginx日志分析

讨论 寒雨连江
Lv3 见习炼丹师
发布在 Python编程   1371   0
讨论 寒雨连江   1371   0

    功能概述:

            对每天新增日志文件进行统计分析, 主要对API请求总数、最大/最小请求时间、成功率、成功次数、API名称等进行统计, 最后将统计数据进行Excel转换、图表展示.

    使用教程:

            1. nginx配置文件中需添加log_format进行日志格式化(否则脚本可能将无法读取!!!), 本脚本支持格式如下.

    log_format test '$time_iso8601^$remote_addr^$http_X_CMD^$request_time^$request^$status^$body_bytes_sent^$http_X_TOKEN'

            2. 需安装pymongo、pandas、plotly三方包.

            3. 运行行需修改脚本中fix me部分参数(数据库名称、日志文件夹路径等).

    应用场景:

            1. 可将脚本部署到项目服务器定时任务中, 每天凌晨执行, 对当日新增日志进行统计, 对项目的API请求进行分析, 通过图表展示快速定位请求耗时API、请求成功率较低API.

            2. 可自行修改nginx日志格式, 添加字段, 并在脚本中修改对应代码.

    脚本代码:

    # -*- coding:utf-8 -*-
    import os
    import time
    import pymongo
    import datetime
    import operator
    import pandas as pd
    import plotly.express as px
    
    from copy import deepcopy
    from prettytable import PrettyTable
    
    
    class LogAnalyse():
        """
        日志分析
        """
    
        def __init__(self, f, log_path, tag):
            self.f = f
            self.log_path = log_path
            self.tag = tag
    
        def _get_data(self, pointer):
            """
            获取数据
            :param pointer:
            :return:
            """
            self.f.seek(pointer, 0)
    
            data = []
            result_map = {}
            for i in self.f:
                i = i.strip().split('^')
                try:
                    time.strptime(i[0].replace('T', ' ').split('+')[0], "%Y-%m-%d %H:%M:%S")
                except:
                    continue
                if int(len(i)) < 2:
                    continue
                if i[2] in result_map:
                    result_map[i[2]]['count'] += 1
                    result_map[i[2]]['request_time'].append(i[3])
                    result_map[i[2]]['response_code'].append(i[5])
                else:
                    result_map[i[2]] = {
                        "start_at": ' '.join(i[0][:-6].split('T')),
                        "ip": i[1],
                        "cmd": i[2],
                        "request_time": [i[3], ],
                        "response_code": [i[5], ],
                        "count": 1
                    }
    
            for i in result_map.keys():
                val = result_map[i]
                sort_val = sorted([float(x) for x in val['request_time']])
                val['max_request_time'] = sort_val[-1]
                val['min_request_time'] = sort_val[0]
                val['avg_request_time'] = round(sum(sort_val) / len(sort_val), 3)
                val['success_request'] = val['response_code'].count('200')
                val['failed_request'] = val['count'] - val['success_request']
                rate = round(float(val['success_request']) / float(val['count']) * 100, 3)
                val['success_rate'] = rate
                del val['request_time']
                del val['response_code']
                del val['ip']
                data.append(val)
    
            return data
    
        def get_data(self, pointer):
            """
            获取数据
            :param pointer:
            :return:
            """
            self.f.seek(pointer, 0)
    
            data = []
            result_map = {}
            for i in self.f:
                i = i.strip().split('^')
                try:
                    time.strptime(i[0].replace('T', ' ').split('+')[0], "%Y-%m-%d %H:%M:%S")
                except:
                    continue
                if int(len(i)) < 2:
                    continue
                if i[2] in result_map:
                    request_time = float(i[3])
                    result_map[i[2]]['count'] += 1
                    result_map[i[2]]['max_request_time'] = request_time if request_time > result_map[i[2]][
                        'max_request_time'] else result_map[i[2]]['max_request_time']
                    result_map[i[2]]['min_request_time'] = request_time if request_time < result_map[i[2]][
                        'min_request_time'] else result_map[i[2]]['min_request_time']
                    result_map[i[2]]['request_time_sum'] += request_time
                    result_map[i[2]]['avg_request_time'] = round(
                        result_map[i[2]]['request_time_sum'] / result_map[i[2]]['count'], 3)
                    if i[5] == '200':
                        result_map[i[2]]["success_request"] += 1
                    else:
                        result_map[i[2]]["failed_request"] += 1
                    result_map[i[2]]["success_rate"] = round(
                        float(result_map[i[2]]["success_request"]) / float(result_map[i[2]]["count"]) * 100, 3)
                    result_map[i[2]]['request_time'] = request_time
    
                else:
                    request_time = float(i[3])
                    result_map[i[2]] = {
                        "cmd": i[2],
                        "request_time_sum": request_time,
                        "max_request_time": request_time,
                        "min_request_time": request_time,
                        "avg_request_time": request_time,
                        "success_request": 0,
                        "failed_request": 0,
                        "count": 1,
                        "start_at": ' '.join(i[0][:-6].split('T')),
                        "request_time": request_time,
                    }
                    if i[5] == '200':
                        result_map[i[2]]["success_request"] = 1
                        result_map[i[2]]["failed_request"] = 0
                    else:
                        result_map[i[2]]["success_request"] = 0
                        result_map[i[2]]["failed_request"] = 1
                    result_map[i[2]]["success_rate"] = round(
                        float(result_map[i[2]]["success_request"]) / float(result_map[i[2]]["count"]) * 100, 3)
    
            keys = list(result_map.keys())
            for i in keys:
                result_map = deepcopy(result_map)
                val = result_map[i]
                del val['request_time_sum']
                data.append(val)
    
            return data
    
        def save_data(self, data):
            """
            数据存储
            :param data:
            :return:
            """
            doc = {
                'path': self.log_path,
                'tag': self.tag,
                'state': 100,
                'data': data,
                'created_at': self.time_to_day(),
                'pointer': self.f.tell(),
            }
            db.log_analyse.insert(doc)
    
        def to_excel(self, data):
            """
            导出Excel
            :param data:
            :return:
            """
            field = ["cmd", "max_request_time", "min_request_time", "avg_request_time", "success_request",
                     "failed_request", "count", "start_at", "request_time", "success_rate"]
    
            table = PrettyTable(field)
            table.align["cmd"] = "l"
            sorted_data = sorted(data, key=operator.itemgetter('count'), reverse=True)
            data_df = pd.DataFrame(sorted_data)
            data_df.columns = field
            writer = pd.ExcelWriter('log_analyse.xlsx')
            data_df.to_excel(writer, float_format='%.3f')
            writer.save()
    
            return data_df
    
        def get_pointer(self):
            """
            获取/存储当前指针
            :return:
            """
            self.f.seek(0, 2)
            today_pointer = self.f.tell()
    
            yesterday = datetime.date.today() - datetime.timedelta(days=1)
            log_record = db.log_analyse.find_one({"tag": self.tag, "created_at": str(yesterday)})
            if log_record:
                yesterday_pointer = log_record['pointer']
                pointer = int(today_pointer) - int(yesterday_pointer)
                return pointer
            else:
                db.log_analyse.insert({
                    'path': self.log_path,
                    'tag': self.tag,
                    'state': 100,
                    'data': '',
                    'created_at': self.time_to_day(),
                    'pointer': today_pointer
                })
    
        def time_to_day(self):
            """
    
            :return:
            """
            return time.strftime("%Y-%m-%d", time.localtime())
    
        def _to_html(self, df):
            """
            html转换
            :param df:
            :return:
            """
            print(df)
            fig = px.bar(df, x="count", y="cmd", color='cmd', orientation='h',
                         hover_data=["max_request_time", "min_request_time", "avg_request_time", "success_request",
                                     "success_request", "success_rate"],
                         height=3000,
                         title='{} API访问量统计'.format(self.tag))
            fig.show()
    
        @staticmethod
        def get_all_file(path, paths, names):
            """
            递归获取目录下所有文件
            :param path:
            :return:
            """
            file_list = os.listdir(path)
    
            for file in file_list:
                file_path = os.path.join(path, file)
                if os.path.isdir(file_path):
                    LogAnalyse.get_all_file(file_path, paths, names)
                elif os.path.isfile(file_path):
                    paths.append(file_path)
                    names.append(file)
    
            return set(paths), set(names)
    
        def main(self):
            """
            run function
            :return:
            """
            pointer = self.get_pointer()
            if pointer:
                data = self.get_data(pointer)
                self.save_data(data)
                df = self.to_excel(data)
                self._to_html(df)
    
    
    if __name__ == '__main__':
        # FixMe
        client = pymongo.MongoClient("localhost", 27017)
        # 数据库名称
        db = client['deploy']
        # 日志文件路径
        path_list, names = LogAnalyse.get_all_file('/logs/test', paths=[], names=[])
        for log_path in path_list:
            if not log_path.endswith('api_web.log'):
                continue
            tag = log_path.split('/')[-1].split('_')[0]
            file_data = open(log_path)
            print('~~~~~~Start Handle {} Log~~~~~~ Path: {}'.format(tag, log_path))
            LogAnalyse(file_data, log_path, tag).main()
            print('~~~~~~Handle {} Log Done ~~~~~~ Path: {}'.format(tag, log_path))
            file_data.close()
    

    最终效果:

    版权声明:作者保留权利,不代表意本站立场。如需转载请联系本站以及作者。

    参与讨论

    回复《 使用python进行nginx日志分析

    EditorJs 编辑器

    沙发,很寂寞~
    反馈
    to-top--btn