-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathlog.py
187 lines (157 loc) · 5.95 KB
/
log.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""logging utilities"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import json
import logging
import traceback
from functools import partial
from http.cookies import SimpleCookie
from urllib.parse import urlparse, urlunparse
from tornado.log import LogFormatter, access_log
from tornado.web import HTTPError, StaticFileHandler
from .handlers.pages import HealthCheckHandler
from .metrics import prometheus_log_method
def coroutine_frames(all_frames):
"""Extract coroutine boilerplate frames from a frame list
for better stack/traceback printing of coroutines
"""
useful_frames = []
for frame in all_frames:
if frame[0] == '<string>' and frame[2] == 'raise_exc_info':
continue
# start out conservative with filename + function matching
# maybe just filename matching would be sufficient
elif frame[0].endswith('tornado/gen.py') and frame[2] in {
'run',
'wrapper',
'__init__',
}:
continue
elif frame[0].endswith('tornado/concurrent.py') and frame[2] == 'result':
continue
useful_frames.append(frame)
return useful_frames
def coroutine_traceback(typ, value, tb):
"""Scrub coroutine frames from a traceback
Coroutine tracebacks have a bunch of identical uninformative frames at each yield point.
This removes those extra frames, so tracebacks should be easier to read.
This might be a horrible idea.
Returns a list of strings (like traceback.format_tb)
"""
all_frames = traceback.extract_tb(tb)
useful_frames = coroutine_frames(all_frames)
tb_list = ['Traceback (most recent call last):\n']
tb_list.extend(traceback.format_list(useful_frames))
tb_list.extend(traceback.format_exception_only(typ, value))
return tb_list
class CoroutineLogFormatter(LogFormatter):
"""Log formatter that scrubs coroutine frames"""
def formatException(self, exc_info):
return ''.join(coroutine_traceback(*exc_info))
# url params to be scrubbed if seen
# any url param that *contains* one of these
# will be scrubbed from logs
SCRUB_PARAM_KEYS = ('token', 'auth', 'key', 'code', 'state')
def _scrub_uri(uri):
"""scrub auth info from uri"""
if '/api/authorizations/cookie/' in uri or '/api/authorizations/token/' in uri:
uri = uri.rsplit('/', 1)[0] + '/[secret]'
parsed = urlparse(uri)
if parsed.query:
# check for potentially sensitive url params
# use manual list + split rather than parsing
# to minimally perturb original
parts = parsed.query.split('&')
changed = False
for i, s in enumerate(parts):
if '=' in s:
key, value = s.split('=', 1)
for substring in SCRUB_PARAM_KEYS:
if substring in key:
parts[i] = key + '=[secret]'
changed = True
if changed:
parsed = parsed._replace(query='&'.join(parts))
return urlunparse(parsed)
return uri
def _scrub_headers(headers):
"""scrub auth info from headers"""
headers = dict(headers)
if 'Authorization' in headers:
auth = headers['Authorization']
if ' ' in auth:
auth_type = auth.split(' ', 1)[0]
else:
# no space, hide the whole thing in case there was a mistake
auth_type = ''
headers['Authorization'] = f'{auth_type} [secret]'
if 'Cookie' in headers:
c = SimpleCookie(headers['Cookie'])
redacted = []
for name in c.keys():
redacted.append(f"{name}=[secret]")
headers['Cookie'] = '; '.join(redacted)
return headers
# log_request adapted from IPython (BSD)
def log_request(handler):
"""log a bit more information about each request than tornado's default
- move static file get success to debug-level (reduces noise)
- get proxied IP instead of proxy IP
- log referer for redirect and failed requests
- log user-agent for failed requests
- record per-request metrics in prometheus
"""
status = handler.get_status()
request = handler.request
if status == 304 or (
status < 300 and isinstance(handler, (StaticFileHandler, HealthCheckHandler))
):
# static-file success and 304 Found are debug-level
log_level = logging.DEBUG
elif status < 400:
log_level = logging.INFO
elif status < 500:
log_level = logging.WARNING
else:
log_level = logging.ERROR
uri = _scrub_uri(request.uri)
headers = _scrub_headers(request.headers)
request_time = 1000.0 * handler.request.request_time()
# always log slow responses (longer than 1s) at least info-level
if request_time >= 1000 and log_level < logging.INFO:
log_level = logging.INFO
log_method = partial(access_log.log, log_level)
try:
user = handler.current_user
except (HTTPError, RuntimeError):
username = ''
else:
if user is None:
username = ''
elif isinstance(user, str):
username = user
elif isinstance(user, dict):
username = user['name']
else:
username = user.name
ns = dict(
status=status,
method=request.method,
ip=request.remote_ip,
uri=uri,
request_time=request_time,
user=username,
location='',
)
msg = "{status} {method} {uri}{location} ({user}@{ip}) {request_time:.2f}ms"
if status >= 500 and status not in {502, 503}:
log_method(json.dumps(headers, indent=2))
elif status in {301, 302}:
# log redirect targets
# FIXME: _headers is private, but there doesn't appear to be a public way
# to get headers from tornado
location = handler._headers.get('Location')
if location:
ns['location'] = f' -> {_scrub_uri(location)}'
log_method(msg.format(**ns))
prometheus_log_method(handler)