Skip to content

Commit 85503ea

Browse files
revert recent telemetry pipeline change to 299ef3d
1 parent 2eaacb5 commit 85503ea

5 files changed

+1096
-373
lines changed

azure-pipelines/azure-pipelines-build-telemetry.yml

+165-14
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,177 @@ name: $(TeamProject)_$(Build.DefinitionName)_$(SourceBranchName)_$(Date:yyyyMMdd
1919
stages:
2020
- stage: Build
2121
pool: sonicbld-1es
22-
variables:
23-
- group: sonicbld
2422
jobs:
2523
- job: Build
26-
timeoutInMinutes: 240
24+
timeoutInMinutes: 120
2725
steps:
2826
- script: |
2927
sudo apt-get update
3028
sudo apt-get install -y python3-pip
31-
sudo pip3 install azure.core azure.kusto.data azure.kusto.ingest azure.storage.blob azure.storage.queue
29+
sudo pip3 install azure-storage-queue azure-storage-blob
3230
sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 10
33-
# Install Azure cli
34-
curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
35-
az login --service-principal --use-cert-sn-issuer -u 08fd13c1-63ab-4b08-9007-f4ff86b61248 -p $CERTPATH --tenant 72f988bf-86f1-41af-91ab-2d7cd011db47
36-
env:
37-
CERTPATH: $(CERTPATH)
3831
displayName: Install build tools
39-
- bash: |
40-
python3 azure-pipelines/scripts/publish-mssonic-logs.py
32+
- task: PythonScript@0
33+
displayName: Publish SONiC telemetry
34+
inputs:
35+
scriptSource: 'inline'
36+
script: |
37+
import datetime, base64, json, time, os, re
38+
from urllib import request
39+
from azure.storage.queue import QueueClient
40+
from azure.storage.blob import BlobServiceClient
41+
42+
QUEUE_NAME="builds"
43+
CONTAINER="build"
44+
if os.getenv('AZURE_STORAGE_QUEUE_NAME'):
45+
QUEUE_NAME = os.getenv('AZURE_STORAGE_QUEUE_NAME')
46+
if os.getenv('AZURE_STORAGE_CONTAINER'):
47+
CONTAINER = os.getenv('AZURE_STORAGE_CONTAINER')
48+
print("QUEUE_NAME={} AZURE_STORAGE_CONTAINER={}".format(QUEUE_NAME, CONTAINER))
49+
AZURE_STORAGE_CONNECTION_STRING='$(AZURE_STORAGE_CONNECTION_STRING)'
50+
BUILD_MESSAGES = 'buildmessages'
51+
BUILD_INFOS = 'builds'
52+
BUILD_LOGS = 'buildlogs'
53+
BUILD_COVERAGES = 'buildcoverages'
54+
MESSAGE_PER_PAGE = 10
55+
MAX_PAGE_COUNT = 30
56+
HEADERS = {"Authorization": "Bearer " + "$(System.AccessToken)"}
57+
blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING)
58+
59+
# Upload a list of lines to blob
60+
def upload_to_blob(lines, blob_prefix, file_prefix=""):
61+
now = datetime.datetime.now()
62+
if len(lines) == 0:
63+
return
64+
local_file_name = file_prefix + now.strftime("_%Y%m%d-%H%M%S-%f") + '.json'
65+
with open(local_file_name, "w") as file:
66+
count = file.write('\n'.join(lines))
67+
blob_file_name = blob_prefix + now.strftime("/%Y/%m/%d/") + local_file_name
68+
blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=blob_file_name)
69+
with open(local_file_name, "rb") as data:
70+
blob_client.upload_blob(data)
71+
os.remove(local_file_name)
72+
73+
# Download the web content from the url
74+
def get_response(url):
75+
for i in range(0, 3):
76+
try:
77+
print(url)
78+
req = request.Request(url, headers=HEADERS)
79+
response = request.urlopen(req, timeout=300)
80+
data=response.read()
81+
encoding = response.info().get_content_charset('utf-8')
82+
return data.decode(encoding)
83+
except Exception as e:
84+
print(e)
85+
time.sleep(10)
86+
raise Exception('failed to get response from {0}'.format(url))
87+
88+
def get_coverage(build_info):
89+
base_url = re.sub('/_apis/.*', '/_apis', build_info['url'])
90+
url = '{0}/test/codecoverage?buildId={1}&api-version=6.0-preview.1'.format(base_url, build_info['id'])
91+
coverage_content = get_response(url)
92+
info = json.loads(json.dumps(build_info))
93+
coverage = json.loads(coverage_content)
94+
results = []
95+
if 'coverageData' in coverage and len(coverage['coverageData']) > 0:
96+
info['coverage'] = coverage_content
97+
results.append(json.dumps(info))
98+
return results
99+
100+
# Get the build logs
101+
def get_build_logs(timeline_url, build_info):
102+
timeline_content = get_response(timeline_url)
103+
if not timeline_content:
104+
return []
105+
records = json.loads(timeline_content)['records']
106+
results = []
107+
#max_column_size = 104855000
108+
max_column_size = 40*1024*1024 #40M
109+
for record in records:
110+
record['content'] = ""
111+
record['buildId'] = build_info['id']
112+
record['definitionId'] = build_info['definitionId']
113+
record['definitionName'] = build_info['definitionName']
114+
record['sourceBranch'] = build_info['sourceBranch']
115+
record['sourceVersion'] = build_info['sourceVersion']
116+
record['triggerInfo'] = build_info['triggerInfo']
117+
record['reason'] = build_info['reason']
118+
if record['log']:
119+
log_url = record['log']['url']
120+
log = get_response(log_url)
121+
content = log[:max_column_size]
122+
lines = []
123+
for line in content.split('\n'):
124+
if '&sp=' in line and '&sig=' in line:
125+
continue
126+
lines.append(line)
127+
record['content'] = '\n'.join(lines)
128+
if 'parameters' in build_info:
129+
record['parameters'] = build_info['parameters']
130+
if 'status' in build_info:
131+
record['status'] = build_info['status']
132+
if 'uri' in build_info:
133+
record['uri'] = build_info['uri']
134+
results.append(json.dumps(record))
135+
return results
136+
137+
queue_client = QueueClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING, QUEUE_NAME)
138+
messages = queue_client.receive_messages(messages_per_page=MESSAGE_PER_PAGE, visibility_timeout=3600)
139+
page = 0
140+
for msg_batch in messages.by_page():
141+
page = page + 1
142+
if page > MAX_PAGE_COUNT:
143+
break
144+
local_file_name = datetime.datetime.now().strftime("_%Y%m%d-%H%M%S-%f") + '.json'
145+
build_messages = []
146+
msgs = []
147+
build_infos = []
148+
build_logs = []
149+
build_coverages = []
150+
msg_count = 0
151+
for msg in msg_batch:
152+
msg_count = msg_count + 1
153+
print("process message {} on page {}, current log count {}".format(msg_count, page, len(build_logs)))
154+
msgs.append(msg)
155+
msg_content = base64.b64decode(msg.content)
156+
build = json.loads(msg_content)
157+
content = json.dumps(build, separators=(',', ':'))
158+
build_messages.append(content)
159+
build_url = build['resource']['url']
160+
if 'dev.azure.com' not in build_url:
161+
print("Skipped the the url {}".format(build_url))
162+
continue
163+
build_content = get_response(build_url)
164+
if not build_content:
165+
print("Skipped the message for no build content, the message: {}".format(msg_content))
166+
continue
167+
build_info = json.loads(build_content)
168+
build_info['definitionId'] = build_info['definition']['id']
169+
build_info['definitionName'] = build_info['definition']['name']
170+
build_infos.append(json.dumps(build_info))
171+
timeline_url = build_info['_links']['timeline']['href']
172+
logs = get_build_logs(timeline_url, build_info)
173+
build_logs += logs
174+
build_coverages += get_coverage(build_info)
175+
upload_to_blob(build_messages, BUILD_MESSAGES)
176+
upload_to_blob(build_infos, BUILD_INFOS)
177+
upload_to_blob(build_coverages, BUILD_COVERAGES)
178+
split_build_logs = []
179+
log_size = 0
180+
max_upload_size = 80 * 1024 * 1024 # 80M
181+
for build_log in build_logs:
182+
if log_size >= max_upload_size:
183+
print("Split the logs to upload, log_size {}".format(log_size))
184+
upload_to_blob(split_build_logs, BUILD_LOGS)
185+
split_build_logs = []
186+
log_size = 0
187+
split_build_logs.append(build_log)
188+
log_size += len(build_log)
189+
print("Upload log, log_size {}".format(log_size))
190+
upload_to_blob(split_build_logs, BUILD_LOGS)
191+
for msg in msgs:
192+
queue_client.delete_message(msg)
193+
exit(0)
41194
env:
42-
SYSTEM_ACCESSTOKEN: $(System.AccessToken)
43-
TOKEN: $(MSAZURE-TOKEN)
44-
displayName: Ingest data into kusto
195+
AZURE_STORAGE_CONNECTION_STRING: '$(AZURE_STORAGE_CONNECTION_STRING)'

azure-pipelines/azure-pipelines-github-telemetry.yml

+173-11
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,185 @@ name: $(TeamProject)_$(Build.DefinitionName)_$(SourceBranchName)_$(Date:yyyyMMdd
1818

1919
stages:
2020
- stage: Build
21-
pool: sonicbld-1es
21+
pool:
22+
vmImage: 'ubuntu-latest'
2223
jobs:
2324
- job: Build
2425
timeoutInMinutes: 120
2526
steps:
27+
- task: UsePythonVersion@0
28+
inputs:
29+
versionSpec: '3.x'
30+
addToPath: true
31+
architecture: 'x64'
2632
- script: |
27-
set -ex
28-
# Install Azure cli
29-
curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
30-
az login --service-principal --use-cert-sn-issuer -u 08fd13c1-63ab-4b08-9007-f4ff86b61248 -p $CERTPATH --tenant 72f988bf-86f1-41af-91ab-2d7cd011db47
31-
pip3 install azure-storage-queue azure-storage-blob pytz python-dateutil azure.core azure.kusto.data azure.kusto.ingest
32-
env:
33-
CERTPATH: $(CERTPATH)
33+
pip install azure-storage-queue azure-storage-blob pytz python-dateutil
3434
displayName: Install build tools
35-
- script: |
36-
python3 azure-pipelines/scripts/publish-github-prs.py $GITHUB_TOKEN $AZURE_STORAGE_CONNECTION_STRING
35+
- task: PythonScript@0
36+
displayName: Publish SONiC telemetry
3737
env:
3838
AZURE_STORAGE_CONNECTION_STRING: '$(AZURE_STORAGE_CONNECTION_STRING)'
3939
GITHUB_TOKEN: '$(GITHUB_TOKEN)'
40-
displayName: Upload PR info to kusto
40+
inputs:
41+
scriptSource: 'inline'
42+
script: |
43+
import datetime, base64, json, time, os, re, pytz, math
44+
from urllib import request
45+
from urllib.error import HTTPError
46+
from http.client import IncompleteRead
47+
from azure.core.exceptions import ResourceNotFoundError
48+
from dateutil import parser
49+
import http.client
50+
from azure.storage.blob import BlobServiceClient
51+
52+
CONTAINER = 'build'
53+
INFO_PULLREQUESTS_FILE = "info/pullrequests.json"
54+
GITHUB_TOKEN = '$(GITHUB_TOKEN)'
55+
AZURE_STORAGE_CONNECTION_STRING = '$(AZURE_STORAGE_CONNECTION_STRING)'
56+
blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING)
57+
58+
url="https://api.github.com/graphql"
59+
timestamp = datetime.datetime.utcnow()
60+
timeoffset = datetime.timedelta(minutes=5)
61+
until = (timestamp - timeoffset).replace(tzinfo=pytz.UTC)
62+
if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']:
63+
until = parser.isoparse(os.environ['END_TIMESTAMP']).replace(tzinfo=pytz.UTC)
64+
delta = datetime.timedelta(minutes=60)
65+
if 'TIMEDELTA_IN_MINUTES' in os.environ and os.environ['TIMEDELTA_IN_MINUTES']:
66+
timedelta_in_minutes = max(int(os.environ['TIMEDELTA_IN_MINUTES']), 30)
67+
delta = datetime.timedelta(minutes=timedelta_in_minutes)
68+
max_timedelta_in_days = 35
69+
70+
# Upload a list of lines to blob
71+
def upload_to_blob(lines, blob_prefix, file_prefix=""):
72+
now = datetime.datetime.now()
73+
if not lines:
74+
print("no lines to upload, skipped")
75+
return
76+
local_file_name = file_prefix + now.strftime("_%Y%m%d-%H%M%S-%f") + '.json'
77+
with open(local_file_name, "w") as file:
78+
count = file.write('\n'.join(lines))
79+
blob_file_name = blob_prefix + now.strftime("/%Y/%m/%d/") + local_file_name
80+
blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=blob_file_name)
81+
with open(local_file_name, "rb") as data:
82+
blob_client.upload_blob(data)
83+
os.remove(local_file_name)
84+
85+
def get_start_timestamp(force=False):
86+
if not force and 'START_TIMESTAMP' in os.environ and os.environ['START_TIMESTAMP']:
87+
return parser.isoparse(os.environ['START_TIMESTAMP']).replace(tzinfo=pytz.UTC)
88+
blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE)
89+
try:
90+
download_stream = blob_client.download_blob()
91+
info = json.loads(download_stream.readall())
92+
return parser.isoparse(info['timestamp']).replace(tzinfo=pytz.UTC)
93+
except ResourceNotFoundError:
94+
pass
95+
start_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=max_timedelta_in_days)
96+
return start_timestamp.replace(tzinfo=pytz.UTC)
97+
98+
def update_start_timestamp():
99+
if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']:
100+
last = get_start_timestamp(True)
101+
if last > until:
102+
print('skipped update the start timestamp, until:%s < last:%s'.format(until.isoformat(), last.isoformat()))
103+
return
104+
blob_file_name="info/pullrequests.json"
105+
blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE)
106+
info = {}
107+
info['timestamp'] = until.isoformat()
108+
data = json.dumps(info)
109+
blob_client.upload_blob(data, overwrite=True)
110+
111+
# The GitHub Graphql supports to query 100 items per page, and 10 page in max.
112+
# To workaround it, split the query into several time range "delta", in a time range, need to make sure less than 1000 items.
113+
def get_pullrequests():
114+
results = []
115+
start_timestamp = get_start_timestamp()
116+
print('start: {0}, until: {1}'.format(start_timestamp.isoformat(), until.isoformat()), flush=True)
117+
query_pattern = '''
118+
{
119+
search(query: "org:azure org:sonic-net is:pr updated:%s..%s sort:updated", %s type: ISSUE, first: 100) {
120+
issueCount
121+
pageInfo {
122+
hasNextPage
123+
endCursor
124+
}
125+
edges {
126+
cursor
127+
node {
128+
... on PullRequest {
129+
url
130+
number
131+
assignees (first: 10) {
132+
nodes {
133+
login
134+
}
135+
}
136+
title
137+
createdAt
138+
closedAt
139+
merged
140+
mergedAt
141+
updatedAt
142+
mergedBy {login}
143+
author {login}
144+
baseRefName
145+
baseRepository {name, url, owner{login}}
146+
repository {name, url, owner{login}}
147+
mergeCommit {id, oid, committedDate}
148+
commits (first: 3) {nodes{commit{oid, message}}}
149+
state
150+
}
151+
}
152+
}
153+
}
154+
}
155+
'''
156+
start = start_timestamp
157+
count = math.ceil((until - start) / delta)
158+
for index in range(count):
159+
end = min(start+delta, until)
160+
condition = ""
161+
while True: # pagination, support 1000 total, support 100 per page
162+
print("Query: index:%s, count:%s, start:%s, end:%s, page:%s" % (index, count, start.isoformat(), end.isoformat(), condition), flush=True)
163+
query = query_pattern %(start.isoformat(), end.isoformat(), condition)
164+
req = request.Request(url, method="POST")
165+
req.add_header('Content-Type', 'application/json')
166+
req.add_header('Authorization', "Bearer {0}".format(GITHUB_TOKEN))
167+
body = {}
168+
body['query'] = query
169+
data = bytes(json.dumps(body), encoding="utf-8")
170+
content = {}
171+
for i in range(10):
172+
try:
173+
r = request.urlopen(req, data=data)
174+
content = json.loads(r.read())
175+
break
176+
except HTTPError as e:
177+
print('Try count: {0}, error code: {1}, reason: {2}'.format(i, e.code, e.reason))
178+
time.sleep(3)
179+
except IncompleteRead as e:
180+
print("IncompleteRead", e)
181+
time.sleep(3)
182+
if 'data' not in content:
183+
print(content)
184+
break
185+
edges = content['data']['search']['edges']
186+
for edge in edges:
187+
node = edge['node']
188+
node['dumpedAt'] = timestamp.isoformat()
189+
results.append(json.dumps(node))
190+
print("Read edge count: {0}, total count: {1}".format(len(results), content['data']['search']['issueCount']), flush=True)
191+
hasNextPage = content['data']['search']['pageInfo']['hasNextPage']
192+
print(content['data']['search']['pageInfo'])
193+
if not hasNextPage:
194+
break
195+
condition = 'after: "{0}",'.format(edges[-1]['cursor'])
196+
print(condition)
197+
start = end
198+
return results
199+
200+
results = get_pullrequests()
201+
upload_to_blob(results, 'pullrequests')
202+
update_start_timestamp()

0 commit comments

Comments
 (0)