From d086ce73249991593fdbb6e71d1353f253941350 Mon Sep 17 00:00:00 2001 From: Yuu Ohmura Date: Wed, 10 Jul 2024 14:22:20 +0900 Subject: [PATCH 1/2] add ingesting parent segment configuration function to monitoring workflow --- .../cdp_monitoring/common/settings.yaml | 2 + .../cdp_monitoring/incremental_ingest.dig | 18 +++++++++ .../cdp_monitoring/initial_ingest.dig | 14 ++++++- .../scripts/ingest_ps_configuration.py | 37 +++++++++++++++++++ 4 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 scenarios/monitoring/cdp_monitoring/scripts/ingest_ps_configuration.py diff --git a/scenarios/monitoring/cdp_monitoring/common/settings.yaml b/scenarios/monitoring/cdp_monitoring/common/settings.yaml index 299c8153..5a9c32df 100644 --- a/scenarios/monitoring/cdp_monitoring/common/settings.yaml +++ b/scenarios/monitoring/cdp_monitoring/common/settings.yaml @@ -13,6 +13,8 @@ td: journey_activation_history: journey_activation_history activations: activations activations_history: activations_history + parent_segments_configuration: parent_segments_configuration + parent_segments_configuration_history: parent_segments_configuration_history api_endpoint: api.treasuredata.com cdp_api_endpoint: api-cdp.treasuredata.com diff --git a/scenarios/monitoring/cdp_monitoring/incremental_ingest.dig b/scenarios/monitoring/cdp_monitoring/incremental_ingest.dig index bc8a314d..b38d64e0 100644 --- a/scenarios/monitoring/cdp_monitoring/incremental_ingest.dig +++ b/scenarios/monitoring/cdp_monitoring/incremental_ingest.dig @@ -23,6 +23,24 @@ schedule: _env: TD_API_KEY: ${secret:td.apikey} ++incremental_ingest_ps_configuration: + +append_ps_configuration_history: + td>: + query: select * from ${td.tables.parent_segments_configuration} + insert_into: ${td.tables.parent_segments_configuration_history} + database: ${td.database} + +ingest_ps_configuration: + py>: scripts.ingest_ps_configuration.run + session_unixtime: ${session_unixtime} + dest_db: ${td.database} + dest_table: ${td.tables.parent_segments_configuration} + api_endpoint: ${td.api_endpoint} + cdp_api_endpoint: ${td.cdp_api_endpoint} + docker: + image: "digdag/digdag-python:3.9" + _env: + TD_API_KEY: ${secret:td.apikey} + +incremental_ingest_entities: +append_entities_history: td>: diff --git a/scenarios/monitoring/cdp_monitoring/initial_ingest.dig b/scenarios/monitoring/cdp_monitoring/initial_ingest.dig index d4b644db..60e46c32 100644 --- a/scenarios/monitoring/cdp_monitoring/initial_ingest.dig +++ b/scenarios/monitoring/cdp_monitoring/initial_ingest.dig @@ -17,7 +17,19 @@ _export: api_endpoint: ${td.api_endpoint} cdp_api_endpoint: ${td.cdp_api_endpoint} docker: - image: "digdag/digdag-python:3.9" + image: "digdag/digdag-python:3.10.1" + _env: + TD_API_KEY: ${secret:td.apikey} + ++initial_ingest_ps_configuration: + py>: scripts.ingest_ps_configuration.run + session_unixtime: ${session_unixtime} + dest_db: ${td.database} + dest_table: ${td.tables.parent_segments_configuration} + api_endpoint: ${td.api_endpoint} + cdp_api_endpoint: ${td.cdp_api_endpoint} + docker: + image: "digdag/digdag-python:3.10.1" _env: TD_API_KEY: ${secret:td.apikey} diff --git a/scenarios/monitoring/cdp_monitoring/scripts/ingest_ps_configuration.py b/scenarios/monitoring/cdp_monitoring/scripts/ingest_ps_configuration.py new file mode 100644 index 00000000..04f434d6 --- /dev/null +++ b/scenarios/monitoring/cdp_monitoring/scripts/ingest_ps_configuration.py @@ -0,0 +1,37 @@ +import requests +import pandas as pd +import pytd +import os +import json + +def convert_to_json(s): + return json.dumps(s) + +def get_all_parent_segment_configuration(url, headers): + print(url) + res = requests.get(url=url, headers=headers) + if res.status_code != requests.codes.ok: + res.raise_for_status() + + data = res.json() + for d in data: + for k in d.keys(): + if type(d[k]) is dict: + d[k] = json.dumps(d[k]) + + return data + +def run(session_unixtime, dest_db, dest_table, api_endpoint='api.treasuredata.com', cdp_api_endpoint='api-cdp.treasuredata.com'): + url = 'https://%s/audiences' % cdp_api_endpoint + headers = {'Authorization': 'TD1 %s' % os.environ['TD_API_KEY']} + l = get_all_parent_segment_configuration(url, headers) + if len(l) == 0: + print('no import record') + return + df = pd.DataFrame(l) + df['time'] = int(session_unixtime) + df['attributes'] = df['attributes'].apply(convert_to_json) + df['behaviors'] = df['behaviors'].apply(convert_to_json) + + client = pytd.Client(apikey=os.environ['TD_API_KEY'], endpoint='https://%s' % api_endpoint, database=dest_db) + client.load_table_from_dataframe(df, dest_table, if_exists='overwrite', fmt='msgpack') From 1a1ace33ebe8c85a4994388998e93eb2b085fde5 Mon Sep 17 00:00:00 2001 From: Yuu Ohmura Date: Wed, 10 Jul 2024 23:13:47 +0900 Subject: [PATCH 2/2] fix bus --- .../cdp_monitoring/incremental_ingest.dig | 2 +- .../cdp_monitoring/initial_ingest.dig | 20 +++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/scenarios/monitoring/cdp_monitoring/incremental_ingest.dig b/scenarios/monitoring/cdp_monitoring/incremental_ingest.dig index b38d64e0..0e94b751 100644 --- a/scenarios/monitoring/cdp_monitoring/incremental_ingest.dig +++ b/scenarios/monitoring/cdp_monitoring/incremental_ingest.dig @@ -37,7 +37,7 @@ schedule: api_endpoint: ${td.api_endpoint} cdp_api_endpoint: ${td.cdp_api_endpoint} docker: - image: "digdag/digdag-python:3.9" + image: "digdag/digdag-python:3.10.1" _env: TD_API_KEY: ${secret:td.apikey} diff --git a/scenarios/monitoring/cdp_monitoring/initial_ingest.dig b/scenarios/monitoring/cdp_monitoring/initial_ingest.dig index 60e46c32..fe62eabc 100644 --- a/scenarios/monitoring/cdp_monitoring/initial_ingest.dig +++ b/scenarios/monitoring/cdp_monitoring/initial_ingest.dig @@ -22,10 +22,10 @@ _export: TD_API_KEY: ${secret:td.apikey} +initial_ingest_ps_configuration: - py>: scripts.ingest_ps_configuration.run - session_unixtime: ${session_unixtime} - dest_db: ${td.database} - dest_table: ${td.tables.parent_segments_configuration} + py>: scripts.ingest_ps_configuration.run + session_unixtime: ${session_unixtime} + dest_db: ${td.database} + dest_table: ${td.tables.parent_segments_configuration} api_endpoint: ${td.api_endpoint} cdp_api_endpoint: ${td.cdp_api_endpoint} docker: @@ -48,7 +48,7 @@ _export: api_endpoint: ${td.api_endpoint} cdp_api_endpoint: ${td.cdp_api_endpoint} docker: - image: "digdag/digdag-python:3.9" + image: "digdag/digdag-python:3.10.1" _env: TD_API_KEY: ${secret:td.apikey} @@ -67,7 +67,7 @@ _export: api_endpoint: ${td.api_endpoint} cdp_api_endpoint: ${td.cdp_api_endpoint} docker: - image: "digdag/digdag-python:3.9" + image: "digdag/digdag-python:3.10.1" _env: TD_API_KEY: ${secret:td.apikey} @@ -88,7 +88,7 @@ _export: api_endpoint: ${td.api_endpoint} cdp_api_endpoint: ${td.cdp_api_endpoint} docker: - image: "digdag/digdag-python:3.9" + image: "digdag/digdag-python:3.10.1" _env: TD_API_KEY: ${secret:td.apikey} @@ -108,7 +108,7 @@ _export: api_endpoint: ${td.api_endpoint} cdp_api_endpoint: ${td.cdp_api_endpoint} docker: - image: "digdag/digdag-python:3.9" + image: "digdag/digdag-python:3.10.1" _env: TD_API_KEY: ${secret:td.apikey} @@ -128,6 +128,6 @@ _export: api_endpoint: ${td.api_endpoint} cdp_api_endpoint: ${td.cdp_api_endpoint} docker: - image: "digdag/digdag-python:3.9" + image: "digdag/digdag-python:3.10.1" _env: - TD_API_KEY: ${secret:td.apikey} \ No newline at end of file + TD_API_KEY: ${secret:td.apikey}