From 85503ea4e252791f0a5dc2d994331f0eedac2da5 Mon Sep 17 00:00:00 2001 From: Shilong Liu Date: Mon, 25 Nov 2024 16:39:04 +0800 Subject: [PATCH] revert recent telemetry pipeline change to 299ef3d2f5db9c4ec6c68e3c7ccb1253eec889d8 --- .../azure-pipelines-build-telemetry.yml | 179 ++++- .../azure-pipelines-github-telemetry.yml | 184 ++++- azure-pipelines/scripts/publish-github-prs.py | 166 ---- .../scripts/publish-mssonic-logs.py | 182 ----- patch | 758 ++++++++++++++++++ 5 files changed, 1096 insertions(+), 373 deletions(-) delete mode 100644 azure-pipelines/scripts/publish-github-prs.py delete mode 100644 azure-pipelines/scripts/publish-mssonic-logs.py create mode 100644 patch diff --git a/azure-pipelines/azure-pipelines-build-telemetry.yml b/azure-pipelines/azure-pipelines-build-telemetry.yml index d79e359..b4b33f3 100644 --- a/azure-pipelines/azure-pipelines-build-telemetry.yml +++ b/azure-pipelines/azure-pipelines-build-telemetry.yml @@ -19,26 +19,177 @@ name: $(TeamProject)_$(Build.DefinitionName)_$(SourceBranchName)_$(Date:yyyyMMdd stages: - stage: Build pool: sonicbld-1es - variables: - - group: sonicbld jobs: - job: Build - timeoutInMinutes: 240 + timeoutInMinutes: 120 steps: - script: | sudo apt-get update sudo apt-get install -y python3-pip - sudo pip3 install azure.core azure.kusto.data azure.kusto.ingest azure.storage.blob azure.storage.queue + sudo pip3 install azure-storage-queue azure-storage-blob sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 10 - # Install Azure cli - curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash - az login --service-principal --use-cert-sn-issuer -u 08fd13c1-63ab-4b08-9007-f4ff86b61248 -p $CERTPATH --tenant 72f988bf-86f1-41af-91ab-2d7cd011db47 - env: - CERTPATH: $(CERTPATH) displayName: Install build tools - - bash: | - python3 azure-pipelines/scripts/publish-mssonic-logs.py + - task: PythonScript@0 + displayName: Publish SONiC telemetry + inputs: + scriptSource: 'inline' + script: | + import datetime, base64, json, time, os, re + from urllib import request + from azure.storage.queue import QueueClient + from azure.storage.blob import BlobServiceClient + + QUEUE_NAME="builds" + CONTAINER="build" + if os.getenv('AZURE_STORAGE_QUEUE_NAME'): + QUEUE_NAME = os.getenv('AZURE_STORAGE_QUEUE_NAME') + if os.getenv('AZURE_STORAGE_CONTAINER'): + CONTAINER = os.getenv('AZURE_STORAGE_CONTAINER') + print("QUEUE_NAME={} AZURE_STORAGE_CONTAINER={}".format(QUEUE_NAME, CONTAINER)) + AZURE_STORAGE_CONNECTION_STRING='$(AZURE_STORAGE_CONNECTION_STRING)' + BUILD_MESSAGES = 'buildmessages' + BUILD_INFOS = 'builds' + BUILD_LOGS = 'buildlogs' + BUILD_COVERAGES = 'buildcoverages' + MESSAGE_PER_PAGE = 10 + MAX_PAGE_COUNT = 30 + HEADERS = {"Authorization": "Bearer " + "$(System.AccessToken)"} + blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING) + + # Upload a list of lines to blob + def upload_to_blob(lines, blob_prefix, file_prefix=""): + now = datetime.datetime.now() + if len(lines) == 0: + return + local_file_name = file_prefix + now.strftime("_%Y%m%d-%H%M%S-%f") + '.json' + with open(local_file_name, "w") as file: + count = file.write('\n'.join(lines)) + blob_file_name = blob_prefix + now.strftime("/%Y/%m/%d/") + local_file_name + blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=blob_file_name) + with open(local_file_name, "rb") as data: + blob_client.upload_blob(data) + os.remove(local_file_name) + + # Download the web content from the url + def get_response(url): + for i in range(0, 3): + try: + print(url) + req = request.Request(url, headers=HEADERS) + response = request.urlopen(req, timeout=300) + data=response.read() + encoding = response.info().get_content_charset('utf-8') + return data.decode(encoding) + except Exception as e: + print(e) + time.sleep(10) + raise Exception('failed to get response from {0}'.format(url)) + + def get_coverage(build_info): + base_url = re.sub('/_apis/.*', '/_apis', build_info['url']) + url = '{0}/test/codecoverage?buildId={1}&api-version=6.0-preview.1'.format(base_url, build_info['id']) + coverage_content = get_response(url) + info = json.loads(json.dumps(build_info)) + coverage = json.loads(coverage_content) + results = [] + if 'coverageData' in coverage and len(coverage['coverageData']) > 0: + info['coverage'] = coverage_content + results.append(json.dumps(info)) + return results + + # Get the build logs + def get_build_logs(timeline_url, build_info): + timeline_content = get_response(timeline_url) + if not timeline_content: + return [] + records = json.loads(timeline_content)['records'] + results = [] + #max_column_size = 104855000 + max_column_size = 40*1024*1024 #40M + for record in records: + record['content'] = "" + record['buildId'] = build_info['id'] + record['definitionId'] = build_info['definitionId'] + record['definitionName'] = build_info['definitionName'] + record['sourceBranch'] = build_info['sourceBranch'] + record['sourceVersion'] = build_info['sourceVersion'] + record['triggerInfo'] = build_info['triggerInfo'] + record['reason'] = build_info['reason'] + if record['log']: + log_url = record['log']['url'] + log = get_response(log_url) + content = log[:max_column_size] + lines = [] + for line in content.split('\n'): + if '&sp=' in line and '&sig=' in line: + continue + lines.append(line) + record['content'] = '\n'.join(lines) + if 'parameters' in build_info: + record['parameters'] = build_info['parameters'] + if 'status' in build_info: + record['status'] = build_info['status'] + if 'uri' in build_info: + record['uri'] = build_info['uri'] + results.append(json.dumps(record)) + return results + + queue_client = QueueClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING, QUEUE_NAME) + messages = queue_client.receive_messages(messages_per_page=MESSAGE_PER_PAGE, visibility_timeout=3600) + page = 0 + for msg_batch in messages.by_page(): + page = page + 1 + if page > MAX_PAGE_COUNT: + break + local_file_name = datetime.datetime.now().strftime("_%Y%m%d-%H%M%S-%f") + '.json' + build_messages = [] + msgs = [] + build_infos = [] + build_logs = [] + build_coverages = [] + msg_count = 0 + for msg in msg_batch: + msg_count = msg_count + 1 + print("process message {} on page {}, current log count {}".format(msg_count, page, len(build_logs))) + msgs.append(msg) + msg_content = base64.b64decode(msg.content) + build = json.loads(msg_content) + content = json.dumps(build, separators=(',', ':')) + build_messages.append(content) + build_url = build['resource']['url'] + if 'dev.azure.com' not in build_url: + print("Skipped the the url {}".format(build_url)) + continue + build_content = get_response(build_url) + if not build_content: + print("Skipped the message for no build content, the message: {}".format(msg_content)) + continue + build_info = json.loads(build_content) + build_info['definitionId'] = build_info['definition']['id'] + build_info['definitionName'] = build_info['definition']['name'] + build_infos.append(json.dumps(build_info)) + timeline_url = build_info['_links']['timeline']['href'] + logs = get_build_logs(timeline_url, build_info) + build_logs += logs + build_coverages += get_coverage(build_info) + upload_to_blob(build_messages, BUILD_MESSAGES) + upload_to_blob(build_infos, BUILD_INFOS) + upload_to_blob(build_coverages, BUILD_COVERAGES) + split_build_logs = [] + log_size = 0 + max_upload_size = 80 * 1024 * 1024 # 80M + for build_log in build_logs: + if log_size >= max_upload_size: + print("Split the logs to upload, log_size {}".format(log_size)) + upload_to_blob(split_build_logs, BUILD_LOGS) + split_build_logs = [] + log_size = 0 + split_build_logs.append(build_log) + log_size += len(build_log) + print("Upload log, log_size {}".format(log_size)) + upload_to_blob(split_build_logs, BUILD_LOGS) + for msg in msgs: + queue_client.delete_message(msg) + exit(0) env: - SYSTEM_ACCESSTOKEN: $(System.AccessToken) - TOKEN: $(MSAZURE-TOKEN) - displayName: Ingest data into kusto + AZURE_STORAGE_CONNECTION_STRING: '$(AZURE_STORAGE_CONNECTION_STRING)' diff --git a/azure-pipelines/azure-pipelines-github-telemetry.yml b/azure-pipelines/azure-pipelines-github-telemetry.yml index 88e4720..dcb2c94 100644 --- a/azure-pipelines/azure-pipelines-github-telemetry.yml +++ b/azure-pipelines/azure-pipelines-github-telemetry.yml @@ -18,23 +18,185 @@ name: $(TeamProject)_$(Build.DefinitionName)_$(SourceBranchName)_$(Date:yyyyMMdd stages: - stage: Build - pool: sonicbld-1es + pool: + vmImage: 'ubuntu-latest' jobs: - job: Build timeoutInMinutes: 120 steps: + - task: UsePythonVersion@0 + inputs: + versionSpec: '3.x' + addToPath: true + architecture: 'x64' - script: | - set -ex - # Install Azure cli - curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash - az login --service-principal --use-cert-sn-issuer -u 08fd13c1-63ab-4b08-9007-f4ff86b61248 -p $CERTPATH --tenant 72f988bf-86f1-41af-91ab-2d7cd011db47 - pip3 install azure-storage-queue azure-storage-blob pytz python-dateutil azure.core azure.kusto.data azure.kusto.ingest - env: - CERTPATH: $(CERTPATH) + pip install azure-storage-queue azure-storage-blob pytz python-dateutil displayName: Install build tools - - script: | - python3 azure-pipelines/scripts/publish-github-prs.py $GITHUB_TOKEN $AZURE_STORAGE_CONNECTION_STRING + - task: PythonScript@0 + displayName: Publish SONiC telemetry env: AZURE_STORAGE_CONNECTION_STRING: '$(AZURE_STORAGE_CONNECTION_STRING)' GITHUB_TOKEN: '$(GITHUB_TOKEN)' - displayName: Upload PR info to kusto \ No newline at end of file + inputs: + scriptSource: 'inline' + script: | + import datetime, base64, json, time, os, re, pytz, math + from urllib import request + from urllib.error import HTTPError + from http.client import IncompleteRead + from azure.core.exceptions import ResourceNotFoundError + from dateutil import parser + import http.client + from azure.storage.blob import BlobServiceClient + + CONTAINER = 'build' + INFO_PULLREQUESTS_FILE = "info/pullrequests.json" + GITHUB_TOKEN = '$(GITHUB_TOKEN)' + AZURE_STORAGE_CONNECTION_STRING = '$(AZURE_STORAGE_CONNECTION_STRING)' + blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING) + + url="https://api.github.com/graphql" + timestamp = datetime.datetime.utcnow() + timeoffset = datetime.timedelta(minutes=5) + until = (timestamp - timeoffset).replace(tzinfo=pytz.UTC) + if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']: + until = parser.isoparse(os.environ['END_TIMESTAMP']).replace(tzinfo=pytz.UTC) + delta = datetime.timedelta(minutes=60) + if 'TIMEDELTA_IN_MINUTES' in os.environ and os.environ['TIMEDELTA_IN_MINUTES']: + timedelta_in_minutes = max(int(os.environ['TIMEDELTA_IN_MINUTES']), 30) + delta = datetime.timedelta(minutes=timedelta_in_minutes) + max_timedelta_in_days = 35 + + # Upload a list of lines to blob + def upload_to_blob(lines, blob_prefix, file_prefix=""): + now = datetime.datetime.now() + if not lines: + print("no lines to upload, skipped") + return + local_file_name = file_prefix + now.strftime("_%Y%m%d-%H%M%S-%f") + '.json' + with open(local_file_name, "w") as file: + count = file.write('\n'.join(lines)) + blob_file_name = blob_prefix + now.strftime("/%Y/%m/%d/") + local_file_name + blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=blob_file_name) + with open(local_file_name, "rb") as data: + blob_client.upload_blob(data) + os.remove(local_file_name) + + def get_start_timestamp(force=False): + if not force and 'START_TIMESTAMP' in os.environ and os.environ['START_TIMESTAMP']: + return parser.isoparse(os.environ['START_TIMESTAMP']).replace(tzinfo=pytz.UTC) + blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE) + try: + download_stream = blob_client.download_blob() + info = json.loads(download_stream.readall()) + return parser.isoparse(info['timestamp']).replace(tzinfo=pytz.UTC) + except ResourceNotFoundError: + pass + start_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=max_timedelta_in_days) + return start_timestamp.replace(tzinfo=pytz.UTC) + + def update_start_timestamp(): + if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']: + last = get_start_timestamp(True) + if last > until: + print('skipped update the start timestamp, until:%s < last:%s'.format(until.isoformat(), last.isoformat())) + return + blob_file_name="info/pullrequests.json" + blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE) + info = {} + info['timestamp'] = until.isoformat() + data = json.dumps(info) + blob_client.upload_blob(data, overwrite=True) + + # The GitHub Graphql supports to query 100 items per page, and 10 page in max. + # To workaround it, split the query into several time range "delta", in a time range, need to make sure less than 1000 items. + def get_pullrequests(): + results = [] + start_timestamp = get_start_timestamp() + print('start: {0}, until: {1}'.format(start_timestamp.isoformat(), until.isoformat()), flush=True) + query_pattern = ''' + { + search(query: "org:azure org:sonic-net is:pr updated:%s..%s sort:updated", %s type: ISSUE, first: 100) { + issueCount + pageInfo { + hasNextPage + endCursor + } + edges { + cursor + node { + ... on PullRequest { + url + number + assignees (first: 10) { + nodes { + login + } + } + title + createdAt + closedAt + merged + mergedAt + updatedAt + mergedBy {login} + author {login} + baseRefName + baseRepository {name, url, owner{login}} + repository {name, url, owner{login}} + mergeCommit {id, oid, committedDate} + commits (first: 3) {nodes{commit{oid, message}}} + state + } + } + } + } + } + ''' + start = start_timestamp + count = math.ceil((until - start) / delta) + for index in range(count): + end = min(start+delta, until) + condition = "" + while True: # pagination, support 1000 total, support 100 per page + print("Query: index:%s, count:%s, start:%s, end:%s, page:%s" % (index, count, start.isoformat(), end.isoformat(), condition), flush=True) + query = query_pattern %(start.isoformat(), end.isoformat(), condition) + req = request.Request(url, method="POST") + req.add_header('Content-Type', 'application/json') + req.add_header('Authorization', "Bearer {0}".format(GITHUB_TOKEN)) + body = {} + body['query'] = query + data = bytes(json.dumps(body), encoding="utf-8") + content = {} + for i in range(10): + try: + r = request.urlopen(req, data=data) + content = json.loads(r.read()) + break + except HTTPError as e: + print('Try count: {0}, error code: {1}, reason: {2}'.format(i, e.code, e.reason)) + time.sleep(3) + except IncompleteRead as e: + print("IncompleteRead", e) + time.sleep(3) + if 'data' not in content: + print(content) + break + edges = content['data']['search']['edges'] + for edge in edges: + node = edge['node'] + node['dumpedAt'] = timestamp.isoformat() + results.append(json.dumps(node)) + print("Read edge count: {0}, total count: {1}".format(len(results), content['data']['search']['issueCount']), flush=True) + hasNextPage = content['data']['search']['pageInfo']['hasNextPage'] + print(content['data']['search']['pageInfo']) + if not hasNextPage: + break + condition = 'after: "{0}",'.format(edges[-1]['cursor']) + print(condition) + start = end + return results + + results = get_pullrequests() + upload_to_blob(results, 'pullrequests') + update_start_timestamp() diff --git a/azure-pipelines/scripts/publish-github-prs.py b/azure-pipelines/scripts/publish-github-prs.py deleted file mode 100644 index 7c35373..0000000 --- a/azure-pipelines/scripts/publish-github-prs.py +++ /dev/null @@ -1,166 +0,0 @@ -import datetime, base64, json, time, os, re, pytz, math, sys -from urllib import request -from urllib.error import HTTPError -from http.client import IncompleteRead -from azure.core.exceptions import ResourceNotFoundError -from dateutil import parser -import http.client -from azure.storage.blob import BlobServiceClient -from azure.identity import AzureCliCredential - -from azure.kusto.data import DataFormat -from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, ReportLevel, ReportMethod -from azure.kusto.data import KustoClient, KustoConnectionStringBuilder - -CONTAINER = 'build' -INFO_PULLREQUESTS_FILE = "info/pullrequests.json" -GITHUB_TOKEN = sys.argv[1] -AZURE_STORAGE_CONNECTION_STRING = sys.argv[2] -blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING) - -ingest_cluster = "https://ingest-sonic.westus2.kusto.windows.net" -ingest_kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(ingest_cluster) -ingest_client = QueuedIngestClient(ingest_kcsb) - -url="https://api.github.com/graphql" -timestamp = datetime.datetime.utcnow() -timeoffset = datetime.timedelta(minutes=5) -until = (timestamp - timeoffset).replace(tzinfo=pytz.UTC) -if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']: - until = parser.isoparse(os.environ['END_TIMESTAMP']).replace(tzinfo=pytz.UTC) -delta = datetime.timedelta(minutes=60) -if 'TIMEDELTA_IN_MINUTES' in os.environ and os.environ['TIMEDELTA_IN_MINUTES']: - timedelta_in_minutes = max(int(os.environ['TIMEDELTA_IN_MINUTES']), 30) - delta = datetime.timedelta(minutes=timedelta_in_minutes) -max_timedelta_in_days = 35 - -def kusto_ingest(database='build', table='', mapping='', lines=[]): - now = datetime.datetime.utcnow().isoformat().replace(':','_') - if lines: - tmpfile = f"{database}_{table}_{now}.json" - with open(tmpfile, "w") as file: - file.write('\n'.join(lines)) - properties = IngestionProperties(database=database, table=table, data_format=DataFormat.JSON, ingestion_mapping_reference=mapping) - response = ingest_client.ingest_from_file(tmpfile, properties) - print(response) - else: - print('No lines', database, table, buildid) - -def get_start_timestamp(force=False): - if not force and 'START_TIMESTAMP' in os.environ and os.environ['START_TIMESTAMP']: - return parser.isoparse(os.environ['START_TIMESTAMP']).replace(tzinfo=pytz.UTC) - blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE) - try: - download_stream = blob_client.download_blob() - info = json.loads(download_stream.readall()) - return parser.isoparse(info['timestamp']).replace(tzinfo=pytz.UTC) - except ResourceNotFoundError: - pass - start_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=max_timedelta_in_days) - return start_timestamp.replace(tzinfo=pytz.UTC) - -def update_start_timestamp(): - if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']: - last = get_start_timestamp(True) - if last > until: - print('skipped update the start timestamp, until:%s < last:%s'.format(until.isoformat(), last.isoformat())) - return - blob_file_name="info/pullrequests.json" - blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE) - info = {} - info['timestamp'] = until.isoformat() - data = json.dumps(info) - blob_client.upload_blob(data, overwrite=True) - -# The GitHub Graphql supports to query 100 items per page, and 10 page in max. -# To workaround it, split the query into several time range "delta", in a time range, need to make sure less than 1000 items. -def get_pullrequests(): - results = [] - start_timestamp = get_start_timestamp() - print('start: {0}, until: {1}'.format(start_timestamp.isoformat(), until.isoformat()), flush=True) - query_pattern = ''' - { - search(query: "org:azure org:sonic-net is:pr updated:%s..%s sort:updated", %s type: ISSUE, first: 100) { - issueCount - pageInfo { - hasNextPage - endCursor - } - edges { - cursor - node { - ... on PullRequest { - url - number - assignees (first: 10) { - nodes { - login - } - } - title - createdAt - closedAt - merged - mergedAt - updatedAt - mergedBy {login} - author {login} - baseRefName - baseRepository {name, url, owner{login}} - repository {name, url, owner{login}} - mergeCommit {id, oid, committedDate} - commits (first: 3) {nodes{commit{oid, message}}} - state - } - } - } - } - } - ''' - start = start_timestamp - count = math.ceil((until - start) / delta) - for index in range(count): - end = min(start+delta, until) - condition = "" - while True: # pagination, support 1000 total, support 100 per page - print("Query: index:%s, count:%s, start:%s, end:%s, page:%s" % (index, count, start.isoformat(), end.isoformat(), condition), flush=True) - query = query_pattern %(start.isoformat(), end.isoformat(), condition) - req = request.Request(url, method="POST") - req.add_header('Content-Type', 'application/json') - req.add_header('Authorization', "Bearer {0}".format(GITHUB_TOKEN)) - body = {} - body['query'] = query - data = bytes(json.dumps(body), encoding="utf-8") - content = {} - for i in range(10): - try: - r = request.urlopen(req, data=data) - content = json.loads(r.read()) - break - except HTTPError as e: - print('Try count: {0}, error code: {1}, reason: {2}'.format(i, e.code, e.reason)) - time.sleep(3) - except IncompleteRead as e: - print("IncompleteRead", e) - time.sleep(3) - if 'data' not in content: - print(content) - break - edges = content['data']['search']['edges'] - for edge in edges: - node = edge['node'] - node['dumpedAt'] = timestamp.isoformat() - results.append(json.dumps(node)) - print("Read edge count: {0}, total count: {1}".format(len(results), content['data']['search']['issueCount']), flush=True) - hasNextPage = content['data']['search']['pageInfo']['hasNextPage'] - print(content['data']['search']['pageInfo']) - if not hasNextPage: - break - condition = 'after: "{0}",'.format(edges[-1]['cursor']) - print(condition) - start = end - return results - -results = get_pullrequests() -kusto_ingest(database='build', table='PullRequests', mapping='PullRequests-json', lines=results) -update_start_timestamp() diff --git a/azure-pipelines/scripts/publish-mssonic-logs.py b/azure-pipelines/scripts/publish-mssonic-logs.py deleted file mode 100644 index d84dafc..0000000 --- a/azure-pipelines/scripts/publish-mssonic-logs.py +++ /dev/null @@ -1,182 +0,0 @@ -import base64, json, time, os, re, requests - -from azure.identity import AzureCliCredential -from azure.storage.queue import QueueClient - -from azure.kusto.data import DataFormat -from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, ReportLevel, ReportMethod -from azure.kusto.data import KustoClient, KustoConnectionStringBuilder - -# kusto query client. -def get_kusto_client(): - cluster = "https://sonic.westus2.kusto.windows.net" - kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(cluster) - client = KustoClient(kcsb) - return client - -# kusto ingest client -def get_kusto_ingest_client(): - ingest_cluster = "https://ingest-sonic.westus2.kusto.windows.net" - ingest_kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(ingest_cluster) - ingest_client = QueuedIngestClient(ingest_kcsb) - return ingest_client - -# Azure Storage Queue Client -def get_queue_client(queue_name='builds', storageaccount_name='sonicazurepipelines'): - if os.getenv('AZURE_STORAGE_QUEUE_NAME'): - queue_name = os.getenv('AZURE_STORAGE_QUEUE_NAME') - url=f"https://{storageaccount_name}.queue.core.windows.net" - print(url, queue_name) - default_credential = AzureCliCredential() - queue_client = QueueClient(url, queue_name=queue_name ,credential=default_credential) - return queue_client - -# Download the web content from the url -def get_response(url): - headers = soheaders - if 'mssonic' not in url: - headers = msheaders - for i in range(0, 3): - try: - res = requests.get(url, timeout=300, headers=headers) - if not res.ok: - raise Exception(f'http code: {res.status_code},reason: {res.reason}') - return res.text - except Exception as e: - print(e) - time.sleep(10) - raise Exception(f'failed to get response from {url}, retry 3 times.') - -def get_coverage(build_info): - base_url = re.sub('/_apis/.*', '/_apis', build_info['url']) - url = '{0}/test/codecoverage?buildId={1}&api-version=6.0-preview.1'.format(base_url, build_info['id']) - coverage_content = get_response(url) - info = json.loads(json.dumps(build_info)) - coverage = json.loads(coverage_content) - results = [] - if 'coverageData' in coverage and len(coverage['coverageData']) > 0: - info['coverage'] = coverage_content - results.append(json.dumps(info)) - return results - -# Get the build logs -def get_build_logs(timeline_url, build_info): - timeline_content = get_response(timeline_url) - if not timeline_content: - return [] - records = json.loads(timeline_content)['records'] - results = [] - for record in records: - record['content'] = "" - record['buildId'] = build_info['id'] - record['definitionId'] = build_info['definitionId'] - record['definitionName'] = build_info['definitionName'] - record['sourceBranch'] = build_info['sourceBranch'] - record['sourceVersion'] = build_info['sourceVersion'] - record['triggerInfo'] = build_info['triggerInfo'] - record['reason'] = build_info['reason'] - if record['log']: - log_url = record['log']['url'] - log = get_response(log_url) - lines = [] - for line in log.split('\n'): - if '&sp=' in line and '&sig=' in line: - continue - lines.append(line) - record['content'] = '\n'.join(lines) - if 'parameters' in build_info: - record['parameters'] = build_info['parameters'] - if 'status' in build_info: - record['status'] = build_info['status'] - if 'uri' in build_info: - record['uri'] = build_info['uri'] - print(record['id']) - results.append(json.dumps(record)) - return results - -def kusto_ingest(database='build', table='', mapping='', buildid='', lines=[]): - if lines: - tmpfile = f"{database}_{table}_{buildid}.json" - with open(tmpfile, "w") as file: - file.write('\n'.join(lines)) - properties = IngestionProperties(database=database, table=table, data_format=DataFormat.JSON, ingestion_mapping_reference=mapping) - # properties.report_level = ReportLevel.FailuresAndSuccesses - response = ingest_client.ingest_from_file(tmpfile, properties) - print(response) - else: - print('No lines', database, table, buildid) - -headers,msheaders = {},{} -if os.getenv('SYSTEM_ACCESSTOKEN'): - token = os.getenv('SYSTEM_ACCESSTOKEN') - soheaders = {"Authorization": "Bearer " + token} -if os.getenv('TOKEN'): - token = os.getenv('TOKEN') - msheaders = {"Authorization": "Bearer " + token} - -queue_client = get_queue_client() -ingest_client = get_kusto_ingest_client() - -def main(): - max_messages = 30 - - count = queue_client.get_queue_properties().approximate_message_count - for page in range(0,int(count/max_messages)+1): - messages = queue_client.receive_messages(messages_per_page=1, visibility_timeout=3600, max_messages=max_messages) - build_messages = [] - build_infos = [] - build_logs = [] - build_coverages = [] - msgs = [] - for msg in messages: - msgs.append(msg) - msg_content = base64.b64decode(msg.content) - build = json.loads(msg_content) - content = json.dumps(build, separators=(',', ':')) - build_messages.append(content) - build_url = build['resource']['url'] - if 'dev.azure.com' not in build_url and 'msazure.visualstudio.com' not in build_url: - print(f"Skipped the the url {build_url}") - continue - build_content = get_response(build_url) - if not build_content: - print("Skipped the message for no build content, the build_url: {}".format(build_url)) - continue - build_info = json.loads(build_content) - build_info['definitionId'] = build_info['definition']['id'] - build_info['definitionName'] = build_info['definition']['name'] - build_infos.append(json.dumps(build_info)) - - timeline_url = build_info['_links']['timeline']['href'] - logs = get_build_logs(timeline_url, build_info) - build_logs += logs - build_coverages += get_coverage(build_info) - - database = 'build' - if os.getenv('AZURE_STORAGE_DATABASE'): - database = os.getenv('AZURE_STORAGE_DATABASE') - kusto_ingest(database=database, table='AzurePipelineBuildCoverages', mapping="AzurePipelineBuildCoverages-json", buildid=build['resource']['id'], lines=build_coverages) - kusto_ingest(database=database, table='AzurePipelineBuildLogs', mapping="AzurePipelineBuildLogs-json", buildid=build['resource']['id'], lines=build_logs) - kusto_ingest(database=database, table='AzurePipelineBuildMessages', mapping="AzurePipelineBuildMessages-json", buildid=build['resource']['id'], lines=build_messages) - kusto_ingest(database=database, table='AzurePipelineBuilds', mapping="AzurePipelineBuilds-json", buildid=build['resource']['id'], lines=build_infos) - for msg in msgs: - print(f'deleting message: {msg.id}') - queue_client.delete_message(msg) - -if __name__ == '__main__': - main() - -# Upload a list of lines to blob -def upload_to_blob(lines, blob_prefix, file_prefix=""): - now = datetime.datetime.now() - if len(lines) == 0: - return - local_file_name = file_prefix + now.strftime("_%Y%m%d-%H%M%S-%f") + '.json' - with open(local_file_name, "w") as file: - count = file.write('\n'.join(lines)) - blob_file_name = blob_prefix + now.strftime("/%Y/%m/%d/") + local_file_name - blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=blob_file_name) - with open(local_file_name, "rb") as data: - blob_client.upload_blob(data) - os.remove(local_file_name) - diff --git a/patch b/patch new file mode 100644 index 0000000..11a7d8c --- /dev/null +++ b/patch @@ -0,0 +1,758 @@ +diff --git a/azure-pipelines/azure-pipelines-build-telemetry.yml b/azure-pipelines/azure-pipelines-build-telemetry.yml +index d79e359..b4b33f3 100644 +--- a/azure-pipelines/azure-pipelines-build-telemetry.yml ++++ b/azure-pipelines/azure-pipelines-build-telemetry.yml +@@ -19,26 +19,177 @@ name: $(TeamProject)_$(Build.DefinitionName)_$(SourceBranchName)_$(Date:yyyyMMdd + stages: + - stage: Build + pool: sonicbld-1es +- variables: +- - group: sonicbld + jobs: + - job: Build +- timeoutInMinutes: 240 ++ timeoutInMinutes: 120 + steps: + - script: | + sudo apt-get update + sudo apt-get install -y python3-pip +- sudo pip3 install azure.core azure.kusto.data azure.kusto.ingest azure.storage.blob azure.storage.queue ++ sudo pip3 install azure-storage-queue azure-storage-blob + sudo update-alternatives --install /usr/bin/python python /usr/bin/python3 10 +- # Install Azure cli +- curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash +- az login --service-principal --use-cert-sn-issuer -u 08fd13c1-63ab-4b08-9007-f4ff86b61248 -p $CERTPATH --tenant 72f988bf-86f1-41af-91ab-2d7cd011db47 +- env: +- CERTPATH: $(CERTPATH) + displayName: Install build tools +- - bash: | +- python3 azure-pipelines/scripts/publish-mssonic-logs.py ++ - task: PythonScript@0 ++ displayName: Publish SONiC telemetry ++ inputs: ++ scriptSource: 'inline' ++ script: | ++ import datetime, base64, json, time, os, re ++ from urllib import request ++ from azure.storage.queue import QueueClient ++ from azure.storage.blob import BlobServiceClient ++ ++ QUEUE_NAME="builds" ++ CONTAINER="build" ++ if os.getenv('AZURE_STORAGE_QUEUE_NAME'): ++ QUEUE_NAME = os.getenv('AZURE_STORAGE_QUEUE_NAME') ++ if os.getenv('AZURE_STORAGE_CONTAINER'): ++ CONTAINER = os.getenv('AZURE_STORAGE_CONTAINER') ++ print("QUEUE_NAME={} AZURE_STORAGE_CONTAINER={}".format(QUEUE_NAME, CONTAINER)) ++ AZURE_STORAGE_CONNECTION_STRING='$(AZURE_STORAGE_CONNECTION_STRING)' ++ BUILD_MESSAGES = 'buildmessages' ++ BUILD_INFOS = 'builds' ++ BUILD_LOGS = 'buildlogs' ++ BUILD_COVERAGES = 'buildcoverages' ++ MESSAGE_PER_PAGE = 10 ++ MAX_PAGE_COUNT = 30 ++ HEADERS = {"Authorization": "Bearer " + "$(System.AccessToken)"} ++ blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING) ++ ++ # Upload a list of lines to blob ++ def upload_to_blob(lines, blob_prefix, file_prefix=""): ++ now = datetime.datetime.now() ++ if len(lines) == 0: ++ return ++ local_file_name = file_prefix + now.strftime("_%Y%m%d-%H%M%S-%f") + '.json' ++ with open(local_file_name, "w") as file: ++ count = file.write('\n'.join(lines)) ++ blob_file_name = blob_prefix + now.strftime("/%Y/%m/%d/") + local_file_name ++ blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=blob_file_name) ++ with open(local_file_name, "rb") as data: ++ blob_client.upload_blob(data) ++ os.remove(local_file_name) ++ ++ # Download the web content from the url ++ def get_response(url): ++ for i in range(0, 3): ++ try: ++ print(url) ++ req = request.Request(url, headers=HEADERS) ++ response = request.urlopen(req, timeout=300) ++ data=response.read() ++ encoding = response.info().get_content_charset('utf-8') ++ return data.decode(encoding) ++ except Exception as e: ++ print(e) ++ time.sleep(10) ++ raise Exception('failed to get response from {0}'.format(url)) ++ ++ def get_coverage(build_info): ++ base_url = re.sub('/_apis/.*', '/_apis', build_info['url']) ++ url = '{0}/test/codecoverage?buildId={1}&api-version=6.0-preview.1'.format(base_url, build_info['id']) ++ coverage_content = get_response(url) ++ info = json.loads(json.dumps(build_info)) ++ coverage = json.loads(coverage_content) ++ results = [] ++ if 'coverageData' in coverage and len(coverage['coverageData']) > 0: ++ info['coverage'] = coverage_content ++ results.append(json.dumps(info)) ++ return results ++ ++ # Get the build logs ++ def get_build_logs(timeline_url, build_info): ++ timeline_content = get_response(timeline_url) ++ if not timeline_content: ++ return [] ++ records = json.loads(timeline_content)['records'] ++ results = [] ++ #max_column_size = 104855000 ++ max_column_size = 40*1024*1024 #40M ++ for record in records: ++ record['content'] = "" ++ record['buildId'] = build_info['id'] ++ record['definitionId'] = build_info['definitionId'] ++ record['definitionName'] = build_info['definitionName'] ++ record['sourceBranch'] = build_info['sourceBranch'] ++ record['sourceVersion'] = build_info['sourceVersion'] ++ record['triggerInfo'] = build_info['triggerInfo'] ++ record['reason'] = build_info['reason'] ++ if record['log']: ++ log_url = record['log']['url'] ++ log = get_response(log_url) ++ content = log[:max_column_size] ++ lines = [] ++ for line in content.split('\n'): ++ if '&sp=' in line and '&sig=' in line: ++ continue ++ lines.append(line) ++ record['content'] = '\n'.join(lines) ++ if 'parameters' in build_info: ++ record['parameters'] = build_info['parameters'] ++ if 'status' in build_info: ++ record['status'] = build_info['status'] ++ if 'uri' in build_info: ++ record['uri'] = build_info['uri'] ++ results.append(json.dumps(record)) ++ return results ++ ++ queue_client = QueueClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING, QUEUE_NAME) ++ messages = queue_client.receive_messages(messages_per_page=MESSAGE_PER_PAGE, visibility_timeout=3600) ++ page = 0 ++ for msg_batch in messages.by_page(): ++ page = page + 1 ++ if page > MAX_PAGE_COUNT: ++ break ++ local_file_name = datetime.datetime.now().strftime("_%Y%m%d-%H%M%S-%f") + '.json' ++ build_messages = [] ++ msgs = [] ++ build_infos = [] ++ build_logs = [] ++ build_coverages = [] ++ msg_count = 0 ++ for msg in msg_batch: ++ msg_count = msg_count + 1 ++ print("process message {} on page {}, current log count {}".format(msg_count, page, len(build_logs))) ++ msgs.append(msg) ++ msg_content = base64.b64decode(msg.content) ++ build = json.loads(msg_content) ++ content = json.dumps(build, separators=(',', ':')) ++ build_messages.append(content) ++ build_url = build['resource']['url'] ++ if 'dev.azure.com' not in build_url: ++ print("Skipped the the url {}".format(build_url)) ++ continue ++ build_content = get_response(build_url) ++ if not build_content: ++ print("Skipped the message for no build content, the message: {}".format(msg_content)) ++ continue ++ build_info = json.loads(build_content) ++ build_info['definitionId'] = build_info['definition']['id'] ++ build_info['definitionName'] = build_info['definition']['name'] ++ build_infos.append(json.dumps(build_info)) ++ timeline_url = build_info['_links']['timeline']['href'] ++ logs = get_build_logs(timeline_url, build_info) ++ build_logs += logs ++ build_coverages += get_coverage(build_info) ++ upload_to_blob(build_messages, BUILD_MESSAGES) ++ upload_to_blob(build_infos, BUILD_INFOS) ++ upload_to_blob(build_coverages, BUILD_COVERAGES) ++ split_build_logs = [] ++ log_size = 0 ++ max_upload_size = 80 * 1024 * 1024 # 80M ++ for build_log in build_logs: ++ if log_size >= max_upload_size: ++ print("Split the logs to upload, log_size {}".format(log_size)) ++ upload_to_blob(split_build_logs, BUILD_LOGS) ++ split_build_logs = [] ++ log_size = 0 ++ split_build_logs.append(build_log) ++ log_size += len(build_log) ++ print("Upload log, log_size {}".format(log_size)) ++ upload_to_blob(split_build_logs, BUILD_LOGS) ++ for msg in msgs: ++ queue_client.delete_message(msg) ++ exit(0) + env: +- SYSTEM_ACCESSTOKEN: $(System.AccessToken) +- TOKEN: $(MSAZURE-TOKEN) +- displayName: Ingest data into kusto ++ AZURE_STORAGE_CONNECTION_STRING: '$(AZURE_STORAGE_CONNECTION_STRING)' +diff --git a/azure-pipelines/azure-pipelines-github-telemetry.yml b/azure-pipelines/azure-pipelines-github-telemetry.yml +index 88e4720..dcb2c94 100644 +--- a/azure-pipelines/azure-pipelines-github-telemetry.yml ++++ b/azure-pipelines/azure-pipelines-github-telemetry.yml +@@ -18,23 +18,185 @@ name: $(TeamProject)_$(Build.DefinitionName)_$(SourceBranchName)_$(Date:yyyyMMdd + + stages: + - stage: Build +- pool: sonicbld-1es ++ pool: ++ vmImage: 'ubuntu-latest' + jobs: + - job: Build + timeoutInMinutes: 120 + steps: ++ - task: UsePythonVersion@0 ++ inputs: ++ versionSpec: '3.x' ++ addToPath: true ++ architecture: 'x64' + - script: | +- set -ex +- # Install Azure cli +- curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash +- az login --service-principal --use-cert-sn-issuer -u 08fd13c1-63ab-4b08-9007-f4ff86b61248 -p $CERTPATH --tenant 72f988bf-86f1-41af-91ab-2d7cd011db47 +- pip3 install azure-storage-queue azure-storage-blob pytz python-dateutil azure.core azure.kusto.data azure.kusto.ingest +- env: +- CERTPATH: $(CERTPATH) ++ pip install azure-storage-queue azure-storage-blob pytz python-dateutil + displayName: Install build tools +- - script: | +- python3 azure-pipelines/scripts/publish-github-prs.py $GITHUB_TOKEN $AZURE_STORAGE_CONNECTION_STRING ++ - task: PythonScript@0 ++ displayName: Publish SONiC telemetry + env: + AZURE_STORAGE_CONNECTION_STRING: '$(AZURE_STORAGE_CONNECTION_STRING)' + GITHUB_TOKEN: '$(GITHUB_TOKEN)' +- displayName: Upload PR info to kusto +\ No newline at end of file ++ inputs: ++ scriptSource: 'inline' ++ script: | ++ import datetime, base64, json, time, os, re, pytz, math ++ from urllib import request ++ from urllib.error import HTTPError ++ from http.client import IncompleteRead ++ from azure.core.exceptions import ResourceNotFoundError ++ from dateutil import parser ++ import http.client ++ from azure.storage.blob import BlobServiceClient ++ ++ CONTAINER = 'build' ++ INFO_PULLREQUESTS_FILE = "info/pullrequests.json" ++ GITHUB_TOKEN = '$(GITHUB_TOKEN)' ++ AZURE_STORAGE_CONNECTION_STRING = '$(AZURE_STORAGE_CONNECTION_STRING)' ++ blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING) ++ ++ url="https://api.github.com/graphql" ++ timestamp = datetime.datetime.utcnow() ++ timeoffset = datetime.timedelta(minutes=5) ++ until = (timestamp - timeoffset).replace(tzinfo=pytz.UTC) ++ if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']: ++ until = parser.isoparse(os.environ['END_TIMESTAMP']).replace(tzinfo=pytz.UTC) ++ delta = datetime.timedelta(minutes=60) ++ if 'TIMEDELTA_IN_MINUTES' in os.environ and os.environ['TIMEDELTA_IN_MINUTES']: ++ timedelta_in_minutes = max(int(os.environ['TIMEDELTA_IN_MINUTES']), 30) ++ delta = datetime.timedelta(minutes=timedelta_in_minutes) ++ max_timedelta_in_days = 35 ++ ++ # Upload a list of lines to blob ++ def upload_to_blob(lines, blob_prefix, file_prefix=""): ++ now = datetime.datetime.now() ++ if not lines: ++ print("no lines to upload, skipped") ++ return ++ local_file_name = file_prefix + now.strftime("_%Y%m%d-%H%M%S-%f") + '.json' ++ with open(local_file_name, "w") as file: ++ count = file.write('\n'.join(lines)) ++ blob_file_name = blob_prefix + now.strftime("/%Y/%m/%d/") + local_file_name ++ blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=blob_file_name) ++ with open(local_file_name, "rb") as data: ++ blob_client.upload_blob(data) ++ os.remove(local_file_name) ++ ++ def get_start_timestamp(force=False): ++ if not force and 'START_TIMESTAMP' in os.environ and os.environ['START_TIMESTAMP']: ++ return parser.isoparse(os.environ['START_TIMESTAMP']).replace(tzinfo=pytz.UTC) ++ blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE) ++ try: ++ download_stream = blob_client.download_blob() ++ info = json.loads(download_stream.readall()) ++ return parser.isoparse(info['timestamp']).replace(tzinfo=pytz.UTC) ++ except ResourceNotFoundError: ++ pass ++ start_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=max_timedelta_in_days) ++ return start_timestamp.replace(tzinfo=pytz.UTC) ++ ++ def update_start_timestamp(): ++ if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']: ++ last = get_start_timestamp(True) ++ if last > until: ++ print('skipped update the start timestamp, until:%s < last:%s'.format(until.isoformat(), last.isoformat())) ++ return ++ blob_file_name="info/pullrequests.json" ++ blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE) ++ info = {} ++ info['timestamp'] = until.isoformat() ++ data = json.dumps(info) ++ blob_client.upload_blob(data, overwrite=True) ++ ++ # The GitHub Graphql supports to query 100 items per page, and 10 page in max. ++ # To workaround it, split the query into several time range "delta", in a time range, need to make sure less than 1000 items. ++ def get_pullrequests(): ++ results = [] ++ start_timestamp = get_start_timestamp() ++ print('start: {0}, until: {1}'.format(start_timestamp.isoformat(), until.isoformat()), flush=True) ++ query_pattern = ''' ++ { ++ search(query: "org:azure org:sonic-net is:pr updated:%s..%s sort:updated", %s type: ISSUE, first: 100) { ++ issueCount ++ pageInfo { ++ hasNextPage ++ endCursor ++ } ++ edges { ++ cursor ++ node { ++ ... on PullRequest { ++ url ++ number ++ assignees (first: 10) { ++ nodes { ++ login ++ } ++ } ++ title ++ createdAt ++ closedAt ++ merged ++ mergedAt ++ updatedAt ++ mergedBy {login} ++ author {login} ++ baseRefName ++ baseRepository {name, url, owner{login}} ++ repository {name, url, owner{login}} ++ mergeCommit {id, oid, committedDate} ++ commits (first: 3) {nodes{commit{oid, message}}} ++ state ++ } ++ } ++ } ++ } ++ } ++ ''' ++ start = start_timestamp ++ count = math.ceil((until - start) / delta) ++ for index in range(count): ++ end = min(start+delta, until) ++ condition = "" ++ while True: # pagination, support 1000 total, support 100 per page ++ print("Query: index:%s, count:%s, start:%s, end:%s, page:%s" % (index, count, start.isoformat(), end.isoformat(), condition), flush=True) ++ query = query_pattern %(start.isoformat(), end.isoformat(), condition) ++ req = request.Request(url, method="POST") ++ req.add_header('Content-Type', 'application/json') ++ req.add_header('Authorization', "Bearer {0}".format(GITHUB_TOKEN)) ++ body = {} ++ body['query'] = query ++ data = bytes(json.dumps(body), encoding="utf-8") ++ content = {} ++ for i in range(10): ++ try: ++ r = request.urlopen(req, data=data) ++ content = json.loads(r.read()) ++ break ++ except HTTPError as e: ++ print('Try count: {0}, error code: {1}, reason: {2}'.format(i, e.code, e.reason)) ++ time.sleep(3) ++ except IncompleteRead as e: ++ print("IncompleteRead", e) ++ time.sleep(3) ++ if 'data' not in content: ++ print(content) ++ break ++ edges = content['data']['search']['edges'] ++ for edge in edges: ++ node = edge['node'] ++ node['dumpedAt'] = timestamp.isoformat() ++ results.append(json.dumps(node)) ++ print("Read edge count: {0}, total count: {1}".format(len(results), content['data']['search']['issueCount']), flush=True) ++ hasNextPage = content['data']['search']['pageInfo']['hasNextPage'] ++ print(content['data']['search']['pageInfo']) ++ if not hasNextPage: ++ break ++ condition = 'after: "{0}",'.format(edges[-1]['cursor']) ++ print(condition) ++ start = end ++ return results ++ ++ results = get_pullrequests() ++ upload_to_blob(results, 'pullrequests') ++ update_start_timestamp() +diff --git a/azure-pipelines/scripts/publish-github-prs.py b/azure-pipelines/scripts/publish-github-prs.py +deleted file mode 100644 +index 7c35373..0000000 +--- a/azure-pipelines/scripts/publish-github-prs.py ++++ /dev/null +@@ -1,166 +0,0 @@ +-import datetime, base64, json, time, os, re, pytz, math, sys +-from urllib import request +-from urllib.error import HTTPError +-from http.client import IncompleteRead +-from azure.core.exceptions import ResourceNotFoundError +-from dateutil import parser +-import http.client +-from azure.storage.blob import BlobServiceClient +-from azure.identity import AzureCliCredential +- +-from azure.kusto.data import DataFormat +-from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, ReportLevel, ReportMethod +-from azure.kusto.data import KustoClient, KustoConnectionStringBuilder +- +-CONTAINER = 'build' +-INFO_PULLREQUESTS_FILE = "info/pullrequests.json" +-GITHUB_TOKEN = sys.argv[1] +-AZURE_STORAGE_CONNECTION_STRING = sys.argv[2] +-blob_service_client = BlobServiceClient.from_connection_string(AZURE_STORAGE_CONNECTION_STRING) +- +-ingest_cluster = "https://ingest-sonic.westus2.kusto.windows.net" +-ingest_kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(ingest_cluster) +-ingest_client = QueuedIngestClient(ingest_kcsb) +- +-url="https://api.github.com/graphql" +-timestamp = datetime.datetime.utcnow() +-timeoffset = datetime.timedelta(minutes=5) +-until = (timestamp - timeoffset).replace(tzinfo=pytz.UTC) +-if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']: +- until = parser.isoparse(os.environ['END_TIMESTAMP']).replace(tzinfo=pytz.UTC) +-delta = datetime.timedelta(minutes=60) +-if 'TIMEDELTA_IN_MINUTES' in os.environ and os.environ['TIMEDELTA_IN_MINUTES']: +- timedelta_in_minutes = max(int(os.environ['TIMEDELTA_IN_MINUTES']), 30) +- delta = datetime.timedelta(minutes=timedelta_in_minutes) +-max_timedelta_in_days = 35 +- +-def kusto_ingest(database='build', table='', mapping='', lines=[]): +- now = datetime.datetime.utcnow().isoformat().replace(':','_') +- if lines: +- tmpfile = f"{database}_{table}_{now}.json" +- with open(tmpfile, "w") as file: +- file.write('\n'.join(lines)) +- properties = IngestionProperties(database=database, table=table, data_format=DataFormat.JSON, ingestion_mapping_reference=mapping) +- response = ingest_client.ingest_from_file(tmpfile, properties) +- print(response) +- else: +- print('No lines', database, table, buildid) +- +-def get_start_timestamp(force=False): +- if not force and 'START_TIMESTAMP' in os.environ and os.environ['START_TIMESTAMP']: +- return parser.isoparse(os.environ['START_TIMESTAMP']).replace(tzinfo=pytz.UTC) +- blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE) +- try: +- download_stream = blob_client.download_blob() +- info = json.loads(download_stream.readall()) +- return parser.isoparse(info['timestamp']).replace(tzinfo=pytz.UTC) +- except ResourceNotFoundError: +- pass +- start_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=max_timedelta_in_days) +- return start_timestamp.replace(tzinfo=pytz.UTC) +- +-def update_start_timestamp(): +- if 'END_TIMESTAMP' in os.environ and os.environ['END_TIMESTAMP']: +- last = get_start_timestamp(True) +- if last > until: +- print('skipped update the start timestamp, until:%s < last:%s'.format(until.isoformat(), last.isoformat())) +- return +- blob_file_name="info/pullrequests.json" +- blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=INFO_PULLREQUESTS_FILE) +- info = {} +- info['timestamp'] = until.isoformat() +- data = json.dumps(info) +- blob_client.upload_blob(data, overwrite=True) +- +-# The GitHub Graphql supports to query 100 items per page, and 10 page in max. +-# To workaround it, split the query into several time range "delta", in a time range, need to make sure less than 1000 items. +-def get_pullrequests(): +- results = [] +- start_timestamp = get_start_timestamp() +- print('start: {0}, until: {1}'.format(start_timestamp.isoformat(), until.isoformat()), flush=True) +- query_pattern = ''' +- { +- search(query: "org:azure org:sonic-net is:pr updated:%s..%s sort:updated", %s type: ISSUE, first: 100) { +- issueCount +- pageInfo { +- hasNextPage +- endCursor +- } +- edges { +- cursor +- node { +- ... on PullRequest { +- url +- number +- assignees (first: 10) { +- nodes { +- login +- } +- } +- title +- createdAt +- closedAt +- merged +- mergedAt +- updatedAt +- mergedBy {login} +- author {login} +- baseRefName +- baseRepository {name, url, owner{login}} +- repository {name, url, owner{login}} +- mergeCommit {id, oid, committedDate} +- commits (first: 3) {nodes{commit{oid, message}}} +- state +- } +- } +- } +- } +- } +- ''' +- start = start_timestamp +- count = math.ceil((until - start) / delta) +- for index in range(count): +- end = min(start+delta, until) +- condition = "" +- while True: # pagination, support 1000 total, support 100 per page +- print("Query: index:%s, count:%s, start:%s, end:%s, page:%s" % (index, count, start.isoformat(), end.isoformat(), condition), flush=True) +- query = query_pattern %(start.isoformat(), end.isoformat(), condition) +- req = request.Request(url, method="POST") +- req.add_header('Content-Type', 'application/json') +- req.add_header('Authorization', "Bearer {0}".format(GITHUB_TOKEN)) +- body = {} +- body['query'] = query +- data = bytes(json.dumps(body), encoding="utf-8") +- content = {} +- for i in range(10): +- try: +- r = request.urlopen(req, data=data) +- content = json.loads(r.read()) +- break +- except HTTPError as e: +- print('Try count: {0}, error code: {1}, reason: {2}'.format(i, e.code, e.reason)) +- time.sleep(3) +- except IncompleteRead as e: +- print("IncompleteRead", e) +- time.sleep(3) +- if 'data' not in content: +- print(content) +- break +- edges = content['data']['search']['edges'] +- for edge in edges: +- node = edge['node'] +- node['dumpedAt'] = timestamp.isoformat() +- results.append(json.dumps(node)) +- print("Read edge count: {0}, total count: {1}".format(len(results), content['data']['search']['issueCount']), flush=True) +- hasNextPage = content['data']['search']['pageInfo']['hasNextPage'] +- print(content['data']['search']['pageInfo']) +- if not hasNextPage: +- break +- condition = 'after: "{0}",'.format(edges[-1]['cursor']) +- print(condition) +- start = end +- return results +- +-results = get_pullrequests() +-kusto_ingest(database='build', table='PullRequests', mapping='PullRequests-json', lines=results) +-update_start_timestamp() +diff --git a/azure-pipelines/scripts/publish-mssonic-logs.py b/azure-pipelines/scripts/publish-mssonic-logs.py +deleted file mode 100644 +index d84dafc..0000000 +--- a/azure-pipelines/scripts/publish-mssonic-logs.py ++++ /dev/null +@@ -1,182 +0,0 @@ +-import base64, json, time, os, re, requests +- +-from azure.identity import AzureCliCredential +-from azure.storage.queue import QueueClient +- +-from azure.kusto.data import DataFormat +-from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, ReportLevel, ReportMethod +-from azure.kusto.data import KustoClient, KustoConnectionStringBuilder +- +-# kusto query client. +-def get_kusto_client(): +- cluster = "https://sonic.westus2.kusto.windows.net" +- kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(cluster) +- client = KustoClient(kcsb) +- return client +- +-# kusto ingest client +-def get_kusto_ingest_client(): +- ingest_cluster = "https://ingest-sonic.westus2.kusto.windows.net" +- ingest_kcsb = KustoConnectionStringBuilder.with_az_cli_authentication(ingest_cluster) +- ingest_client = QueuedIngestClient(ingest_kcsb) +- return ingest_client +- +-# Azure Storage Queue Client +-def get_queue_client(queue_name='builds', storageaccount_name='sonicazurepipelines'): +- if os.getenv('AZURE_STORAGE_QUEUE_NAME'): +- queue_name = os.getenv('AZURE_STORAGE_QUEUE_NAME') +- url=f"https://{storageaccount_name}.queue.core.windows.net" +- print(url, queue_name) +- default_credential = AzureCliCredential() +- queue_client = QueueClient(url, queue_name=queue_name ,credential=default_credential) +- return queue_client +- +-# Download the web content from the url +-def get_response(url): +- headers = soheaders +- if 'mssonic' not in url: +- headers = msheaders +- for i in range(0, 3): +- try: +- res = requests.get(url, timeout=300, headers=headers) +- if not res.ok: +- raise Exception(f'http code: {res.status_code},reason: {res.reason}') +- return res.text +- except Exception as e: +- print(e) +- time.sleep(10) +- raise Exception(f'failed to get response from {url}, retry 3 times.') +- +-def get_coverage(build_info): +- base_url = re.sub('/_apis/.*', '/_apis', build_info['url']) +- url = '{0}/test/codecoverage?buildId={1}&api-version=6.0-preview.1'.format(base_url, build_info['id']) +- coverage_content = get_response(url) +- info = json.loads(json.dumps(build_info)) +- coverage = json.loads(coverage_content) +- results = [] +- if 'coverageData' in coverage and len(coverage['coverageData']) > 0: +- info['coverage'] = coverage_content +- results.append(json.dumps(info)) +- return results +- +-# Get the build logs +-def get_build_logs(timeline_url, build_info): +- timeline_content = get_response(timeline_url) +- if not timeline_content: +- return [] +- records = json.loads(timeline_content)['records'] +- results = [] +- for record in records: +- record['content'] = "" +- record['buildId'] = build_info['id'] +- record['definitionId'] = build_info['definitionId'] +- record['definitionName'] = build_info['definitionName'] +- record['sourceBranch'] = build_info['sourceBranch'] +- record['sourceVersion'] = build_info['sourceVersion'] +- record['triggerInfo'] = build_info['triggerInfo'] +- record['reason'] = build_info['reason'] +- if record['log']: +- log_url = record['log']['url'] +- log = get_response(log_url) +- lines = [] +- for line in log.split('\n'): +- if '&sp=' in line and '&sig=' in line: +- continue +- lines.append(line) +- record['content'] = '\n'.join(lines) +- if 'parameters' in build_info: +- record['parameters'] = build_info['parameters'] +- if 'status' in build_info: +- record['status'] = build_info['status'] +- if 'uri' in build_info: +- record['uri'] = build_info['uri'] +- print(record['id']) +- results.append(json.dumps(record)) +- return results +- +-def kusto_ingest(database='build', table='', mapping='', buildid='', lines=[]): +- if lines: +- tmpfile = f"{database}_{table}_{buildid}.json" +- with open(tmpfile, "w") as file: +- file.write('\n'.join(lines)) +- properties = IngestionProperties(database=database, table=table, data_format=DataFormat.JSON, ingestion_mapping_reference=mapping) +- # properties.report_level = ReportLevel.FailuresAndSuccesses +- response = ingest_client.ingest_from_file(tmpfile, properties) +- print(response) +- else: +- print('No lines', database, table, buildid) +- +-headers,msheaders = {},{} +-if os.getenv('SYSTEM_ACCESSTOKEN'): +- token = os.getenv('SYSTEM_ACCESSTOKEN') +- soheaders = {"Authorization": "Bearer " + token} +-if os.getenv('TOKEN'): +- token = os.getenv('TOKEN') +- msheaders = {"Authorization": "Bearer " + token} +- +-queue_client = get_queue_client() +-ingest_client = get_kusto_ingest_client() +- +-def main(): +- max_messages = 30 +- +- count = queue_client.get_queue_properties().approximate_message_count +- for page in range(0,int(count/max_messages)+1): +- messages = queue_client.receive_messages(messages_per_page=1, visibility_timeout=3600, max_messages=max_messages) +- build_messages = [] +- build_infos = [] +- build_logs = [] +- build_coverages = [] +- msgs = [] +- for msg in messages: +- msgs.append(msg) +- msg_content = base64.b64decode(msg.content) +- build = json.loads(msg_content) +- content = json.dumps(build, separators=(',', ':')) +- build_messages.append(content) +- build_url = build['resource']['url'] +- if 'dev.azure.com' not in build_url and 'msazure.visualstudio.com' not in build_url: +- print(f"Skipped the the url {build_url}") +- continue +- build_content = get_response(build_url) +- if not build_content: +- print("Skipped the message for no build content, the build_url: {}".format(build_url)) +- continue +- build_info = json.loads(build_content) +- build_info['definitionId'] = build_info['definition']['id'] +- build_info['definitionName'] = build_info['definition']['name'] +- build_infos.append(json.dumps(build_info)) +- +- timeline_url = build_info['_links']['timeline']['href'] +- logs = get_build_logs(timeline_url, build_info) +- build_logs += logs +- build_coverages += get_coverage(build_info) +- +- database = 'build' +- if os.getenv('AZURE_STORAGE_DATABASE'): +- database = os.getenv('AZURE_STORAGE_DATABASE') +- kusto_ingest(database=database, table='AzurePipelineBuildCoverages', mapping="AzurePipelineBuildCoverages-json", buildid=build['resource']['id'], lines=build_coverages) +- kusto_ingest(database=database, table='AzurePipelineBuildLogs', mapping="AzurePipelineBuildLogs-json", buildid=build['resource']['id'], lines=build_logs) +- kusto_ingest(database=database, table='AzurePipelineBuildMessages', mapping="AzurePipelineBuildMessages-json", buildid=build['resource']['id'], lines=build_messages) +- kusto_ingest(database=database, table='AzurePipelineBuilds', mapping="AzurePipelineBuilds-json", buildid=build['resource']['id'], lines=build_infos) +- for msg in msgs: +- print(f'deleting message: {msg.id}') +- queue_client.delete_message(msg) +- +-if __name__ == '__main__': +- main() +- +-# Upload a list of lines to blob +-def upload_to_blob(lines, blob_prefix, file_prefix=""): +- now = datetime.datetime.now() +- if len(lines) == 0: +- return +- local_file_name = file_prefix + now.strftime("_%Y%m%d-%H%M%S-%f") + '.json' +- with open(local_file_name, "w") as file: +- count = file.write('\n'.join(lines)) +- blob_file_name = blob_prefix + now.strftime("/%Y/%m/%d/") + local_file_name +- blob_client = blob_service_client.get_blob_client(container=CONTAINER, blob=blob_file_name) +- with open(local_file_name, "rb") as data: +- blob_client.upload_blob(data) +- os.remove(local_file_name) +-