Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert telemetry changes #49

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 165 additions & 14 deletions azure-pipelines/azure-pipelines-build-telemetry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,177 @@ name: $(TeamProject)_$(Build.DefinitionName)_$(SourceBranchName)_$(Date:yyyyMMdd
stages:
- stage: Build
pool: sonicbld-1es
variables:
- group: sonicbld
jobs:
- job: Build
timeoutInMinutes: 240
timeoutInMinutes: 120
steps:
- script: |
sudo apt-get update
sudo apt-get install -y python3-pip
sudo pip3 install azure.core azure.kusto.data azure.kusto.ingest azure.storage.blob azure.storage.queue
sudo pip3 install azure-storage-queue azure-storage-blob
sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 10
# Install Azure cli
curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
az login --service-principal --use-cert-sn-issuer -u 08fd13c1-63ab-4b08-9007-f4ff86b61248 -p $CERTPATH --tenant 72f988bf-86f1-41af-91ab-2d7cd011db47
env:
CERTPATH: $(CERTPATH)
displayName: Install build tools
- bash: |
python3 azure-pipelines/scripts/publish-mssonic-logs.py
- task: PythonScript@0
displayName: Publish SONiC telemetry
inputs:
scriptSource: 'inline'
script: |
import datetime, base64, json, time, os, re
from urllib import request
from azure.storage.queue import QueueClient
from azure.storage.blob import BlobServiceClient

QUEUE_NAME="builds"
CONTAINER="build"
if os.getenv('AZURE_STORAGE_QUEUE_NAME'):
QUEUE_NAME = os.getenv('AZURE_STORAGE_QUEUE_NAME')
if os.getenv('AZURE_STORAGE_CONTAINER'):
CONTAINER = os.getenv('AZURE_STORAGE_CONTAINER')
print("QUEUE_NAME={} AZURE_STORAGE_CONTAINER={}".format(QUEUE_NAME, CONTAINER))
AZURE_STORAGE_CONNECTION_STRING='$(AZURE_STORAGE_CONNECTION_STRING)'
BUILD_MESSAGES = 'buildmessages'
BUILD_INFOS = 'builds'
BUILD_LOGS = 'buildlogs'
BUILD_COVERAGES = 'buildcoverages'
MESSAGE_PER_PAGE = 10
MAX_PAGE_COUNT = 30
HEADERS = {"Authorization": "Bearer " + "$(System.AccessToken)"}
blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING)

# Upload a list of lines to blob
def upload_to_blob(lines, blob_prefix, file_prefix=""):
now = datetime.datetime.now()
if len(lines) == 0:
return
local_file_name = file_prefix + now.strftime("_%Y%m%d-%H%M%S-%f") + '.json'
with open(local_file_name, "w") as file:
count = file.write('\n'.join(lines))
blob_file_name = blob_prefix + now.strftime("/%Y/%m/%d/") + local_file_name
blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=blob_file_name)
with open(local_file_name, "rb") as data:
blob_client.upload_blob(data)
os.remove(local_file_name)

# Download the web content from the url
def get_response(url):
for i in range(0, 3):
try:
print(url)
req = request.Request(url, headers=HEADERS)
response = request.urlopen(req, timeout=300)
data=response.read()
encoding = response.info().get_content_charset('utf-8')
return data.decode(encoding)
except Exception as e:
print(e)
time.sleep(10)
raise Exception('failed to get response from {0}'.format(url))

def get_coverage(build_info):
base_url = re.sub('/_apis/.*', '/_apis', build_info['url'])
url = '{0}/test/codecoverage?buildId={1}&api-version=6.0-preview.1'.format(base_url, build_info['id'])
coverage_content = get_response(url)
info = json.loads(json.dumps(build_info))
coverage = json.loads(coverage_content)
results = []
if 'coverageData' in coverage and len(coverage['coverageData']) > 0:
info['coverage'] = coverage_content
results.append(json.dumps(info))
return results

# Get the build logs
def get_build_logs(timeline_url, build_info):
timeline_content = get_response(timeline_url)
if not timeline_content:
return []
records = json.loads(timeline_content)['records']
results = []
#max_column_size = 104855000
max_column_size = 40*1024*1024 #40M
for record in records:
record['content'] = ""
record['buildId'] = build_info['id']
record['definitionId'] = build_info['definitionId']
record['definitionName'] = build_info['definitionName']
record['sourceBranch'] = build_info['sourceBranch']
record['sourceVersion'] = build_info['sourceVersion']
record['triggerInfo'] = build_info['triggerInfo']
record['reason'] = build_info['reason']
if record['log']:
log_url = record['log']['url']
log = get_response(log_url)
content = log[:max_column_size]
lines = []
for line in content.split('\n'):
if '&sp=' in line and '&sig=' in line:
continue
lines.append(line)
record['content'] = '\n'.join(lines)
if 'parameters' in build_info:
record['parameters'] = build_info['parameters']
if 'status' in build_info:
record['status'] = build_info['status']
if 'uri' in build_info:
record['uri'] = build_info['uri']
results.append(json.dumps(record))
return results

queue_client = QueueClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING, QUEUE_NAME)
messages = queue_client.receive_messages(messages_per_page=MESSAGE_PER_PAGE, visibility_timeout=3600)
page = 0
for msg_batch in messages.by_page():
page = page + 1
if page > MAX_PAGE_COUNT:
break
local_file_name = datetime.datetime.now().strftime("_%Y%m%d-%H%M%S-%f") + '.json'
build_messages = []
msgs = []
build_infos = []
build_logs = []
build_coverages = []
msg_count = 0
for msg in msg_batch:
msg_count = msg_count + 1
print("process message {} on page {}, current log count {}".format(msg_count, page, len(build_logs)))
msgs.append(msg)
msg_content = base64.b64decode(msg.content)
build = json.loads(msg_content)
content = json.dumps(build, separators=(',', ':'))
build_messages.append(content)
build_url = build['resource']['url']
if 'dev.azure.com' not in build_url:
print("Skipped the the url {}".format(build_url))
continue
build_content = get_response(build_url)
if not build_content:
print("Skipped the message for no build content, the message: {}".format(msg_content))
continue
build_info = json.loads(build_content)
build_info['definitionId'] = build_info['definition']['id']
build_info['definitionName'] = build_info['definition']['name']
build_infos.append(json.dumps(build_info))
timeline_url = build_info['_links']['timeline']['href']
logs = get_build_logs(timeline_url, build_info)
build_logs += logs
build_coverages += get_coverage(build_info)
upload_to_blob(build_messages, BUILD_MESSAGES)
upload_to_blob(build_infos, BUILD_INFOS)
upload_to_blob(build_coverages, BUILD_COVERAGES)
split_build_logs = []
log_size = 0
max_upload_size = 80 * 1024 * 1024 # 80M
for build_log in build_logs:
if log_size >= max_upload_size:
print("Split the logs to upload, log_size {}".format(log_size))
upload_to_blob(split_build_logs, BUILD_LOGS)
split_build_logs = []
log_size = 0
split_build_logs.append(build_log)
log_size += len(build_log)
print("Upload log, log_size {}".format(log_size))
upload_to_blob(split_build_logs, BUILD_LOGS)
for msg in msgs:
queue_client.delete_message(msg)
exit(0)
env:
SYSTEM_ACCESSTOKEN: $(System.AccessToken)
TOKEN: $(MSAZURE-TOKEN)
displayName: Ingest data into kusto
AZURE_STORAGE_CONNECTION_STRING: '$(AZURE_STORAGE_CONNECTION_STRING)'
184 changes: 173 additions & 11 deletions azure-pipelines/azure-pipelines-github-telemetry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,185 @@ name: $(TeamProject)_$(Build.DefinitionName)_$(SourceBranchName)_$(Date:yyyyMMdd

stages:
- stage: Build
pool: sonicbld-1es
pool:
vmImage: 'ubuntu-latest'
jobs:
- job: Build
timeoutInMinutes: 120
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '3.x'
addToPath: true
architecture: 'x64'
- script: |
set -ex
# Install Azure cli
curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
az login --service-principal --use-cert-sn-issuer -u 08fd13c1-63ab-4b08-9007-f4ff86b61248 -p $CERTPATH --tenant 72f988bf-86f1-41af-91ab-2d7cd011db47
pip3 install azure-storage-queue azure-storage-blob pytz python-dateutil azure.core azure.kusto.data azure.kusto.ingest
env:
CERTPATH: $(CERTPATH)
pip install azure-storage-queue azure-storage-blob pytz python-dateutil
displayName: Install build tools
- script: |
python3 azure-pipelines/scripts/publish-github-prs.py $GITHUB_TOKEN $AZURE_STORAGE_CONNECTION_STRING
- task: PythonScript@0
displayName: Publish SONiC telemetry
env:
AZURE_STORAGE_CONNECTION_STRING: '$(AZURE_STORAGE_CONNECTION_STRING)'
GITHUB_TOKEN: '$(GITHUB_TOKEN)'
displayName: Upload PR info to kusto
inputs:
scriptSource: 'inline'
script: |
import datetime, base64, json, time, os, re, pytz, math
from urllib import request
from urllib.error import HTTPError
from http.client import IncompleteRead
from azure.core.exceptions import ResourceNotFoundError
from dateutil import parser
import http.client
from azure.storage.blob import BlobServiceClient

CONTAINER = 'build'
INFO_PULLREQUESTS_FILE = "info/pullrequests.json"
GITHUB_TOKEN = '$(GITHUB_TOKEN)'
AZURE_STORAGE_CONNECTION_STRING = '$(AZURE_STORAGE_CONNECTION_STRING)'
blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING)

url="https://api.github.com/graphql"
timestamp = datetime.datetime.utcnow()
timeoffset = datetime.timedelta(minutes=5)
until = (timestamp - timeoffset).replace(tzinfo=pytz.UTC)
if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']:
until = parser.isoparse(os.environ['END_TIMESTAMP']).replace(tzinfo=pytz.UTC)
delta = datetime.timedelta(minutes=60)
if 'TIMEDELTA_IN_MINUTES' in os.environ and os.environ['TIMEDELTA_IN_MINUTES']:
timedelta_in_minutes = max(int(os.environ['TIMEDELTA_IN_MINUTES']), 30)
delta = datetime.timedelta(minutes=timedelta_in_minutes)
max_timedelta_in_days = 35

# Upload a list of lines to blob
def upload_to_blob(lines, blob_prefix, file_prefix=""):
now = datetime.datetime.now()
if not lines:
print("no lines to upload, skipped")
return
local_file_name = file_prefix + now.strftime("_%Y%m%d-%H%M%S-%f") + '.json'
with open(local_file_name, "w") as file:
count = file.write('\n'.join(lines))
blob_file_name = blob_prefix + now.strftime("/%Y/%m/%d/") + local_file_name
blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=blob_file_name)
with open(local_file_name, "rb") as data:
blob_client.upload_blob(data)
os.remove(local_file_name)

def get_start_timestamp(force=False):
if not force and 'START_TIMESTAMP' in os.environ and os.environ['START_TIMESTAMP']:
return parser.isoparse(os.environ['START_TIMESTAMP']).replace(tzinfo=pytz.UTC)
blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE)
try:
download_stream = blob_client.download_blob()
info = json.loads(download_stream.readall())
return parser.isoparse(info['timestamp']).replace(tzinfo=pytz.UTC)
except ResourceNotFoundError:
pass
start_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=max_timedelta_in_days)
return start_timestamp.replace(tzinfo=pytz.UTC)

def update_start_timestamp():
if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']:
last = get_start_timestamp(True)
if last > until:
print('skipped update the start timestamp, until:%s < last:%s'.format(until.isoformat(), last.isoformat()))
return
blob_file_name="info/pullrequests.json"
blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE)
info = {}
info['timestamp'] = until.isoformat()
data = json.dumps(info)
blob_client.upload_blob(data, overwrite=True)

# The GitHub Graphql supports to query 100 items per page, and 10 page in max.
# To workaround it, split the query into several time range "delta", in a time range, need to make sure less than 1000 items.
def get_pullrequests():
results = []
start_timestamp = get_start_timestamp()
print('start: {0}, until: {1}'.format(start_timestamp.isoformat(), until.isoformat()), flush=True)
query_pattern = '''
{
search(query: "org:azure org:sonic-net is:pr updated:%s..%s sort:updated", %s type: ISSUE, first: 100) {
issueCount
pageInfo {
hasNextPage
endCursor
}
edges {
cursor
node {
... on PullRequest {
url
number
assignees (first: 10) {
nodes {
login
}
}
title
createdAt
closedAt
merged
mergedAt
updatedAt
mergedBy {login}
author {login}
baseRefName
baseRepository {name, url, owner{login}}
repository {name, url, owner{login}}
mergeCommit {id, oid, committedDate}
commits (first: 3) {nodes{commit{oid, message}}}
state
}
}
}
}
}
'''
start = start_timestamp
count = math.ceil((until - start) / delta)
for index in range(count):
end = min(start+delta, until)
condition = ""
while True: # pagination, support 1000 total, support 100 per page
print("Query: index:%s, count:%s, start:%s, end:%s, page:%s" % (index, count, start.isoformat(), end.isoformat(), condition), flush=True)
query = query_pattern %(start.isoformat(), end.isoformat(), condition)
req = request.Request(url, method="POST")
req.add_header('Content-Type', 'application/json')
req.add_header('Authorization', "Bearer {0}".format(GITHUB_TOKEN))
body = {}
body['query'] = query
data = bytes(json.dumps(body), encoding="utf-8")
content = {}
for i in range(10):
try:
r = request.urlopen(req, data=data)
content = json.loads(r.read())
break
except HTTPError as e:
print('Try count: {0}, error code: {1}, reason: {2}'.format(i, e.code, e.reason))
time.sleep(3)
except IncompleteRead as e:
print("IncompleteRead", e)
time.sleep(3)
if 'data' not in content:
print(content)
break
edges = content['data']['search']['edges']
for edge in edges:
node = edge['node']
node['dumpedAt'] = timestamp.isoformat()
results.append(json.dumps(node))
print("Read edge count: {0}, total count: {1}".format(len(results), content['data']['search']['issueCount']), flush=True)
hasNextPage = content['data']['search']['pageInfo']['hasNextPage']
print(content['data']['search']['pageInfo'])
if not hasNextPage:
break
condition = 'after: "{0}",'.format(edges[-1]['cursor'])
print(condition)
start = end
return results

results = get_pullrequests()
upload_to_blob(results, 'pullrequests')
update_start_timestamp()
Loading