功能概述:
对每天新增日志文件进行统计分析, 主要对API请求总数、最大/最小请求时间、成功率、成功次数、API名称等进行统计, 最后将统计数据进行Excel转换、图表展示.
使用教程:
1. nginx配置文件中需添加log_format进行日志格式化(否则脚本可能将无法读取!!!), 本脚本支持格式如下.
log_format test '$time_iso8601^$remote_addr^$http_X_CMD^$request_time^$request^$status^$body_bytes_sent^$http_X_TOKEN'
2. 需安装pymongo、pandas、plotly三方包.
3. 运行行需修改脚本中fix me部分参数(数据库名称、日志文件夹路径等).
应用场景:
1. 可将脚本部署到项目服务器定时任务中, 每天凌晨执行, 对当日新增日志进行统计, 对项目的API请求进行分析, 通过图表展示快速定位请求耗时API、请求成功率较低API.
2. 可自行修改nginx日志格式, 添加字段, 并在脚本中修改对应代码.
脚本代码:
# -*- coding:utf-8 -*-
import os
import time
import pymongo
import datetime
import operator
import pandas as pd
import plotly.express as px
from copy import deepcopy
from prettytable import PrettyTable
class LogAnalyse():
"""
日志分析
"""
def __init__(self, f, log_path, tag):
self.f = f
self.log_path = log_path
self.tag = tag
def _get_data(self, pointer):
"""
获取数据
:param pointer:
:return:
"""
self.f.seek(pointer, 0)
data = []
result_map = {}
for i in self.f:
i = i.strip().split('^')
try:
time.strptime(i[0].replace('T', ' ').split('+')[0], "%Y-%m-%d %H:%M:%S")
except:
continue
if int(len(i)) < 2:
continue
if i[2] in result_map:
result_map[i[2]]['count'] += 1
result_map[i[2]]['request_time'].append(i[3])
result_map[i[2]]['response_code'].append(i[5])
else:
result_map[i[2]] = {
"start_at": ' '.join(i[0][:-6].split('T')),
"ip": i[1],
"cmd": i[2],
"request_time": [i[3], ],
"response_code": [i[5], ],
"count": 1
}
for i in result_map.keys():
val = result_map[i]
sort_val = sorted([float(x) for x in val['request_time']])
val['max_request_time'] = sort_val[-1]
val['min_request_time'] = sort_val[0]
val['avg_request_time'] = round(sum(sort_val) / len(sort_val), 3)
val['success_request'] = val['response_code'].count('200')
val['failed_request'] = val['count'] - val['success_request']
rate = round(float(val['success_request']) / float(val['count']) * 100, 3)
val['success_rate'] = rate
del val['request_time']
del val['response_code']
del val['ip']
data.append(val)
return data
def get_data(self, pointer):
"""
获取数据
:param pointer:
:return:
"""
self.f.seek(pointer, 0)
data = []
result_map = {}
for i in self.f:
i = i.strip().split('^')
try:
time.strptime(i[0].replace('T', ' ').split('+')[0], "%Y-%m-%d %H:%M:%S")
except:
continue
if int(len(i)) < 2:
continue
if i[2] in result_map:
request_time = float(i[3])
result_map[i[2]]['count'] += 1
result_map[i[2]]['max_request_time'] = request_time if request_time > result_map[i[2]][
'max_request_time'] else result_map[i[2]]['max_request_time']
result_map[i[2]]['min_request_time'] = request_time if request_time < result_map[i[2]][
'min_request_time'] else result_map[i[2]]['min_request_time']
result_map[i[2]]['request_time_sum'] += request_time
result_map[i[2]]['avg_request_time'] = round(
result_map[i[2]]['request_time_sum'] / result_map[i[2]]['count'], 3)
if i[5] == '200':
result_map[i[2]]["success_request"] += 1
else:
result_map[i[2]]["failed_request"] += 1
result_map[i[2]]["success_rate"] = round(
float(result_map[i[2]]["success_request"]) / float(result_map[i[2]]["count"]) * 100, 3)
result_map[i[2]]['request_time'] = request_time
else:
request_time = float(i[3])
result_map[i[2]] = {
"cmd": i[2],
"request_time_sum": request_time,
"max_request_time": request_time,
"min_request_time": request_time,
"avg_request_time": request_time,
"success_request": 0,
"failed_request": 0,
"count": 1,
"start_at": ' '.join(i[0][:-6].split('T')),
"request_time": request_time,
}
if i[5] == '200':
result_map[i[2]]["success_request"] = 1
result_map[i[2]]["failed_request"] = 0
else:
result_map[i[2]]["success_request"] = 0
result_map[i[2]]["failed_request"] = 1
result_map[i[2]]["success_rate"] = round(
float(result_map[i[2]]["success_request"]) / float(result_map[i[2]]["count"]) * 100, 3)
keys = list(result_map.keys())
for i in keys:
result_map = deepcopy(result_map)
val = result_map[i]
del val['request_time_sum']
data.append(val)
return data
def save_data(self, data):
"""
数据存储
:param data:
:return:
"""
doc = {
'path': self.log_path,
'tag': self.tag,
'state': 100,
'data': data,
'created_at': self.time_to_day(),
'pointer': self.f.tell(),
}
db.log_analyse.insert(doc)
def to_excel(self, data):
"""
导出Excel
:param data:
:return:
"""
field = ["cmd", "max_request_time", "min_request_time", "avg_request_time", "success_request",
"failed_request", "count", "start_at", "request_time", "success_rate"]
table = PrettyTable(field)
table.align["cmd"] = "l"
sorted_data = sorted(data, key=operator.itemgetter('count'), reverse=True)
data_df = pd.DataFrame(sorted_data)
data_df.columns = field
writer = pd.ExcelWriter('log_analyse.xlsx')
data_df.to_excel(writer, float_format='%.3f')
writer.save()
return data_df
def get_pointer(self):
"""
获取/存储当前指针
:return:
"""
self.f.seek(0, 2)
today_pointer = self.f.tell()
yesterday = datetime.date.today() - datetime.timedelta(days=1)
log_record = db.log_analyse.find_one({"tag": self.tag, "created_at": str(yesterday)})
if log_record:
yesterday_pointer = log_record['pointer']
pointer = int(today_pointer) - int(yesterday_pointer)
return pointer
else:
db.log_analyse.insert({
'path': self.log_path,
'tag': self.tag,
'state': 100,
'data': '',
'created_at': self.time_to_day(),
'pointer': today_pointer
})
def time_to_day(self):
"""
:return:
"""
return time.strftime("%Y-%m-%d", time.localtime())
def _to_html(self, df):
"""
html转换
:param df:
:return:
"""
print(df)
fig = px.bar(df, x="count", y="cmd", color='cmd', orientation='h',
hover_data=["max_request_time", "min_request_time", "avg_request_time", "success_request",
"success_request", "success_rate"],
height=3000,
title='{} API访问量统计'.format(self.tag))
fig.show()
@staticmethod
def get_all_file(path, paths, names):
"""
递归获取目录下所有文件
:param path:
:return:
"""
file_list = os.listdir(path)
for file in file_list:
file_path = os.path.join(path, file)
if os.path.isdir(file_path):
LogAnalyse.get_all_file(file_path, paths, names)
elif os.path.isfile(file_path):
paths.append(file_path)
names.append(file)
return set(paths), set(names)
def main(self):
"""
run function
:return:
"""
pointer = self.get_pointer()
if pointer:
data = self.get_data(pointer)
self.save_data(data)
df = self.to_excel(data)
self._to_html(df)
if __name__ == '__main__':
# FixMe
client = pymongo.MongoClient("localhost", 27017)
# 数据库名称
db = client['deploy']
# 日志文件路径
path_list, names = LogAnalyse.get_all_file('/logs/test', paths=[], names=[])
for log_path in path_list:
if not log_path.endswith('api_web.log'):
continue
tag = log_path.split('/')[-1].split('_')[0]
file_data = open(log_path)
print('~~~~~~Start Handle {} Log~~~~~~ Path: {}'.format(tag, log_path))
LogAnalyse(file_data, log_path, tag).main()
print('~~~~~~Handle {} Log Done ~~~~~~ Path: {}'.format(tag, log_path))
file_data.close()
最终效果: