Skip to content

Commit

Permalink
Enhance: Add warehouse_id to pg_stat_activity_extended
Browse files Browse the repository at this point in the history
We add new catalog view pg_stat_activity_extended to map
query/session to warehouse in cloud. We report warehouse_id
in pgstat_report_activity for each query.
  • Loading branch information
lss602726449 authored and my-ship-it committed May 29, 2024
1 parent 4649bc4 commit d73da65
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 5 deletions.
33 changes: 33 additions & 0 deletions src/backend/catalog/system_views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,39 @@ CREATE VIEW pg_stat_activity AS
LEFT JOIN pg_database AS D ON (S.datid = D.oid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);

CREATE VIEW pg_stat_activity_extended AS
SELECT
S.warehouse_id,
S.datid AS datid,
D.datname AS datname,
S.pid,
S.sess_id,
S.leader_pid,
S.usesysid,
U.rolname AS usename,
S.application_name,
S.client_addr,
S.client_hostname,
S.client_port,
S.backend_start,
S.xact_start,
S.query_start,
S.state_change,
S.wait_event_type,
S.wait_event,
S.state,
S.backend_xid,
s.backend_xmin,
S.query_id,
S.query,
S.backend_type,

S.rsgid,
S.rsgname
FROM pg_stat_get_activity(NULL) AS S
LEFT JOIN pg_database AS D ON (S.datid = D.oid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);

CREATE VIEW pg_stat_replication AS
SELECT
S.pid,
Expand Down
4 changes: 4 additions & 0 deletions src/backend/utils/activity/backend_status.c
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ pgstat_report_activity(BackendState state, const char *cmd_str)
TimestampTz start_timestamp;
TimestampTz current_timestamp;
int len = 0;
Oid warehouse_id = InvalidOid;

TRACE_POSTGRESQL_STATEMENT_STATUS(cmd_str);

Expand Down Expand Up @@ -605,13 +606,16 @@ pgstat_report_activity(BackendState state, const char *cmd_str)
pgstat_count_conn_txn_idle_time((PgStat_Counter) secs * 1000000 + usecs);
}

warehouse_id = GetCurrentWarehouseId();

/*
* Now update the status entry
*/
PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);

beentry->st_state = state;
beentry->st_state_start_timestamp = current_timestamp;
beentry->st_warehouse_id = warehouse_id;

/*
* If a new query is started, we reset the query identifier as it'll only
Expand Down
8 changes: 7 additions & 1 deletion src/backend/utils/adt/pgstatfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
Datum
pg_stat_get_activity(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_ACTIVITY_COLS 33
#define PG_STAT_GET_ACTIVITY_COLS 34
int num_backends = pgstat_fetch_stat_numbackends();
int curr_backend;
int pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
Expand Down Expand Up @@ -954,6 +954,11 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
else
nulls[32] = true;
}

if (beentry->st_warehouse_id != InvalidOid)
values[33] = ObjectIdGetDatum(beentry->st_warehouse_id);
else
nulls[33] = true;
}
else
{
Expand Down Expand Up @@ -987,6 +992,7 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
values[30] = Int32GetDatum(beentry->st_session_id);
nulls[31] = true;
nulls[32] = true;
nulls[33] = true;
}

tuplestore_putvalues(tupstore, tupdesc, values, nulls);
Expand Down
6 changes: 3 additions & 3 deletions src/include/catalog/pg_proc.dat
Original file line number Diff line number Diff line change
Expand Up @@ -5335,9 +5335,9 @@
proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => 'int4',
proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,text,numeric,text,bool,text,bool,int4,int8,int4,int4,text}',
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,query_id,sess_id,rsgid,rsgname}',
proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,text,numeric,text,bool,text,bool,int4,int8,int4,int4,text,oid}',
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,query_id,sess_id,rsgid,rsgname,warehouse_id}',
prosrc => 'pg_stat_get_activity' },
{ oid => '3318',
descr => 'statistics: information about progress of backends running maintenance command',
Expand Down
1 change: 1 addition & 0 deletions src/include/utils/backend_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ typedef struct PgBackendStatus

/* query identifier, optionally computed using post_parse_analyze_hook */
uint64 st_query_id;
Oid st_warehouse_id;
} PgBackendStatus;


Expand Down
25 changes: 25 additions & 0 deletions src/test/isolation2/expected/stat_activity_extended.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- pg_stat_activity_extended is only used in cloud with warehouse
-- In CBDB, pg_stat_activity_extended warehouse_id should always be NULL

create table test_activity(a int);
CREATE

0: insert into test_activity select * from generate_series(1,100);
INSERT 100

-- in CBDB QD/QE warehouse_id must be NULL

SELECT warehouse_id, query from pg_stat_activity_extended WHERE query LIKE 'insert into test_activity select * from generate_series(1,100);';
warehouse_id | query
--------------+-----------------------------------------------------------------
| insert into test_activity select * from generate_series(1,100);
(1 row)

SELECT warehouse_id, query from gp_dist_random('pg_stat_activity_extended') WHERE query LIKE 'insert into test_activity select * from generate_series(1,100);' limit 1;
warehouse_id | query
--------------+-----------------------------------------------------------------
| insert into test_activity select * from generate_series(1,100);
(1 row)

drop table test_activity;
DROP
3 changes: 2 additions & 1 deletion src/test/isolation2/isolation2_schedule
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,5 @@ test: aocs_unique_index
test: uao/ao_unique_index_vacuum_row
test: uao/ao_unique_index_vacuum_column

test: local_directory_table_mixed
test: local_directory_table_mixed
test: stat_activity_extended
14 changes: 14 additions & 0 deletions src/test/isolation2/sql/stat_activity_extended.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- pg_stat_activity_extended is only used in cloud with warehouse
-- In CBDB, pg_stat_activity_extended warehouse_id should always be NULL

create table test_activity(a int);

0: insert into test_activity select * from generate_series(1,100);

-- in CBDB QD/QE warehouse_id must be NULL

SELECT warehouse_id, query from pg_stat_activity_extended WHERE query LIKE 'insert into test_activity select * from generate_series(1,100);';

SELECT warehouse_id, query from gp_dist_random('pg_stat_activity_extended') WHERE query LIKE 'insert into test_activity select * from generate_series(1,100);' limit 1;

drop table test_activity;

0 comments on commit d73da65

Please sign in to comment.