Skip to content

Commit df4b082

Browse files
committed
Add support for Citus
We make all queries using pg_stat_activity view templated so that they can use citus_stat_activity view when the --citus option is passed.
1 parent f728f5b commit df4b082

26 files changed

+132
-100
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
configuration. (Requires Python 3.9 or higher.)
1212
* Add a `y` command to copy focused query to the system clipboard, using
1313
OSC 52 escape sequence (#311).
14+
* Support Citus query activity (`citus_stat_activity`) views, through a new
15+
`--citus` command-line option.
1416

1517
### Fixed
1618

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ ex:
9898
--rds Enable support for AWS RDS (implies --no-tempfiles and
9999
filters out the rdsadmin database from space
100100
calculation).
101+
--citus Enable support for Citus.
101102
--output FILEPATH Store running queries as CSV.
102103
--db-size, --no-db-size
103104
Enable/disable total size of DB.

docs/man/pg_activity.1

+5
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,11 @@ required by another session. It shows following information:
366366
.Vb 1
367367
\& Enable support for AWS RDS (implies \-\-no\-tempfiles and filters out the rdsadmin database from space calculation).
368368
.Ve
369+
.IP "\fB\-\-citus\fR" 2
370+
.IX Item "--citus"
371+
.Vb 1
372+
\& Enable support for Citus.
373+
.Ve
369374
.IP "\fB\-\-output=FILEPATH\fR" 2
370375
.IX Item "--output=FILEPATH"
371376
.Vb 1

docs/man/pg_activity.pod

+4
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,10 @@ required by another session. It shows following information:
252252

253253
Enable support for AWS RDS (implies --no-tempfiles and filters out the rdsadmin database from space calculation).
254254

255+
=item B<--citus>
256+
257+
Enable support for Citus.
258+
255259
=item B<--output=FILEPATH>
256260

257261
Store running queries as CSV.

pgactivity/cli.py

+7
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,13 @@ def get_parser() -> argparse.ArgumentParser:
104104
help="Enable support for AWS RDS (implies --no-tempfiles and filters out the rdsadmin database from space calculation).",
105105
default=False,
106106
)
107+
group.add_argument(
108+
"--citus",
109+
dest="citus",
110+
action="store_true",
111+
help="Enable support for Citus.",
112+
default=False,
113+
)
107114
group.add_argument(
108115
"--output",
109116
dest="output",

pgactivity/data.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class Data:
7373
filters: Filters
7474
dsn_parameters: dict[str, str]
7575
failed_queries: FailedQueriesInfo
76+
pg_stat_activity: str
7677

7778
@classmethod
7879
def pg_connect(
@@ -85,6 +86,7 @@ def pg_connect(
8586
password: str | None = None,
8687
database: str = "postgres",
8788
rds_mode: bool = False,
89+
citus: bool = False,
8890
dsn: str = "",
8991
hide_queries_in_logs: bool = False,
9092
filters: Filters = NO_FILTER,
@@ -115,6 +117,7 @@ def pg_connect(
115117
failed_queries=FailedQueriesInfo(),
116118
filters=filters,
117119
dsn_parameters=pg.connection_parameters(pg_conn),
120+
pg_stat_activity="citus_stat_activity" if citus else "pg_stat_activity",
118121
)
119122

120123
def try_reconnect(self) -> Data | None:
@@ -320,7 +323,10 @@ def pg_get_server_information(
320323
else:
321324
query = queries.get("get_server_info_oldest")
322325

323-
qs = sql.SQL(query).format(dbname_filter=self.dbname_filter)
326+
qs = sql.SQL(query).format(
327+
pg_stat_activity=sql.Identifier(self.pg_stat_activity),
328+
dbname_filter=self.dbname_filter,
329+
)
324330
try:
325331
ret = pg.fetchone(
326332
self.pg_conn,
@@ -410,6 +416,7 @@ def pg_get_activities(self, duration_mode: int = 1) -> list[RunningProcess]:
410416

411417
duration_column = self.get_duration_column(duration_mode)
412418
query = sql.SQL(qs).format(
419+
pg_stat_activity=sql.Identifier(self.pg_stat_activity),
413420
dbname_filter=self.dbname_filter,
414421
duration_column=sql.Identifier(duration_column),
415422
min_duration=sql.Literal(self.min_duration),
@@ -437,6 +444,7 @@ def pg_get_waiting(self, duration_mode: int = 1) -> list[WaitingProcess]:
437444

438445
duration_column = self.get_duration_column(duration_mode)
439446
query = sql.SQL(qs).format(
447+
pg_stat_activity=sql.Identifier(self.pg_stat_activity),
440448
dbname_filter=self.dbname_filter,
441449
duration_column=sql.Identifier(duration_column),
442450
min_duration=sql.Literal(self.min_duration),
@@ -466,6 +474,7 @@ def pg_get_blocking(self, duration_mode: int = 1) -> list[BlockingProcess]:
466474

467475
duration_column = self.get_duration_column(duration_mode)
468476
query = sql.SQL(qs).format(
477+
pg_stat_activity=sql.Identifier(self.pg_stat_activity),
469478
dbname_filter=self.dbname_filter,
470479
duration_column=sql.Identifier(duration_column),
471480
min_duration=sql.Literal(self.min_duration),
@@ -527,6 +536,7 @@ def pg_connect(
527536
password=password,
528537
database=options.dbname,
529538
rds_mode=options.rds,
539+
citus=options.citus,
530540
min_duration=min_duration,
531541
filters=filters,
532542
hide_queries_in_logs=options.hide_queries_in_logs,

pgactivity/queries/get_blocking_oldest.sql

+24-24
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,21 @@ SELECT
2727
SELECT
2828
blocking.pid,
2929
'<unknown>' AS application_name,
30-
pg_stat_activity.current_query AS query,
30+
{pg_stat_activity}.current_query AS query,
3131
blocking.mode,
32-
pg_stat_activity.datname,
33-
pg_stat_activity.datid,
34-
pg_stat_activity.usename,
35-
pg_stat_activity.client_addr AS client,
32+
{pg_stat_activity}.datname,
33+
{pg_stat_activity}.datid,
34+
{pg_stat_activity}.usename,
35+
{pg_stat_activity}.client_addr AS client,
3636
blocking.locktype,
37-
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
37+
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
3838
NULL AS state,
3939
blocking.relation::regclass AS relation,
40-
pg_stat_activity.waiting
40+
{pg_stat_activity}.waiting
4141
FROM
4242
pg_locks AS blocking
4343
JOIN pg_locks AS blocked ON (blocking.transactionid = blocked.transactionid AND blocking.locktype = blocked.locktype)
44-
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.procpid)
44+
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.procpid)
4545
WHERE
4646
blocking.granted
4747
AND NOT blocked.granted
@@ -57,21 +57,21 @@ SELECT
5757
SELECT
5858
blocking.pid,
5959
'<unknown>' AS application_name,
60-
pg_stat_activity.current_query AS query,
60+
{pg_stat_activity}.current_query AS query,
6161
blocking.mode,
62-
pg_stat_activity.datname,
63-
pg_stat_activity.datid,
64-
pg_stat_activity.usename,
65-
pg_stat_activity.client_addr AS client,
62+
{pg_stat_activity}.datname,
63+
{pg_stat_activity}.datid,
64+
{pg_stat_activity}.usename,
65+
{pg_stat_activity}.client_addr AS client,
6666
blocking.locktype,
67-
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
67+
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
6868
NULL AS state,
6969
blocking.relation::regclass AS relation,
70-
pg_stat_activity.waiting
70+
{pg_stat_activity}.waiting
7171
FROM
7272
pg_locks AS blocking
7373
JOIN pg_locks AS blocked ON (blocking.virtualxid = blocked.virtualxid AND blocking.locktype = blocked.locktype)
74-
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.procpid)
74+
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.procpid)
7575
WHERE
7676
blocking.granted
7777
AND NOT blocked.granted
@@ -87,21 +87,21 @@ SELECT
8787
SELECT
8888
blocking.pid,
8989
'<unknown>' AS application_name,
90-
pg_stat_activity.current_query AS query,
90+
{pg_stat_activity}.current_query AS query,
9191
blocking.mode,
92-
pg_stat_activity.datname,
93-
pg_stat_activity.datid,
94-
pg_stat_activity.usename,
95-
pg_stat_activity.client_addr AS client,
92+
{pg_stat_activity}.datname,
93+
{pg_stat_activity}.datid,
94+
{pg_stat_activity}.usename,
95+
{pg_stat_activity}.client_addr AS client,
9696
blocking.locktype,
97-
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
97+
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
9898
NULL AS state,
9999
blocking.relation::regclass AS relation,
100-
pg_stat_activity.waiting
100+
{pg_stat_activity}.waiting
101101
FROM
102102
pg_locks AS blocking
103103
JOIN pg_locks AS blocked ON (blocking.database = blocked.database AND blocking.relation = blocked.relation AND blocking.locktype = blocked.locktype)
104-
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.procpid)
104+
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.procpid)
105105
WHERE
106106
blocking.granted
107107
AND NOT blocked.granted

pgactivity/queries/get_blocking_post_090200.sql

+30-30
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,22 @@ SELECT
2121
-- Transaction id lock
2222
SELECT
2323
blocking.pid,
24-
pg_stat_activity.application_name,
25-
pg_stat_activity.query,
24+
{pg_stat_activity}.application_name,
25+
{pg_stat_activity}.query,
2626
blocking.mode,
27-
pg_stat_activity.datname,
28-
pg_stat_activity.datid,
29-
pg_stat_activity.usename,
30-
pg_stat_activity.client_addr AS client,
27+
{pg_stat_activity}.datname,
28+
{pg_stat_activity}.datid,
29+
{pg_stat_activity}.usename,
30+
{pg_stat_activity}.client_addr AS client,
3131
blocking.locktype,
32-
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
33-
pg_stat_activity.state as state,
32+
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
33+
{pg_stat_activity}.state as state,
3434
blocking.relation::regclass AS relation,
35-
pg_stat_activity.waiting
35+
{pg_stat_activity}.waiting
3636
FROM
3737
pg_locks AS blocking
3838
JOIN pg_locks AS blocked ON (blocking.transactionid = blocked.transactionid AND blocking.locktype = blocked.locktype)
39-
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.pid)
39+
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.pid)
4040
WHERE
4141
blocking.granted
4242
AND NOT blocked.granted
@@ -51,22 +51,22 @@ SELECT
5151
-- VirtualXid Lock
5252
SELECT
5353
blocking.pid,
54-
pg_stat_activity.application_name,
55-
pg_stat_activity.query,
54+
{pg_stat_activity}.application_name,
55+
{pg_stat_activity}.query,
5656
blocking.mode,
57-
pg_stat_activity.datname,
58-
pg_stat_activity.datid,
59-
pg_stat_activity.usename,
60-
pg_stat_activity.client_addr AS client,
57+
{pg_stat_activity}.datname,
58+
{pg_stat_activity}.datid,
59+
{pg_stat_activity}.usename,
60+
{pg_stat_activity}.client_addr AS client,
6161
blocking.locktype,
62-
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
63-
pg_stat_activity.state as state,
62+
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
63+
{pg_stat_activity}.state as state,
6464
blocking.relation::regclass AS relation,
65-
pg_stat_activity.waiting
65+
{pg_stat_activity}.waiting
6666
FROM
6767
pg_locks AS blocking
6868
JOIN pg_locks AS blocked ON (blocking.virtualxid = blocked.virtualxid AND blocking.locktype = blocked.locktype)
69-
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.pid)
69+
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.pid)
7070
WHERE
7171
blocking.granted
7272
AND NOT blocked.granted
@@ -81,22 +81,22 @@ SELECT
8181
-- Relation or tuple Lock
8282
SELECT
8383
blocking.pid,
84-
pg_stat_activity.application_name,
85-
pg_stat_activity.query,
84+
{pg_stat_activity}.application_name,
85+
{pg_stat_activity}.query,
8686
blocking.mode,
87-
pg_stat_activity.datname,
88-
pg_stat_activity.datid,
89-
pg_stat_activity.usename,
90-
pg_stat_activity.client_addr AS client,
87+
{pg_stat_activity}.datname,
88+
{pg_stat_activity}.datid,
89+
{pg_stat_activity}.usename,
90+
{pg_stat_activity}.client_addr AS client,
9191
blocking.locktype,
92-
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
93-
pg_stat_activity.state as state,
92+
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
93+
{pg_stat_activity}.state as state,
9494
blocking.relation::regclass AS relation,
95-
pg_stat_activity.waiting
95+
{pg_stat_activity}.waiting
9696
FROM
9797
pg_locks AS blocking
9898
JOIN pg_locks AS blocked ON (blocking.database = blocked.database AND blocking.relation = blocked.relation AND blocking.locktype = blocked.locktype)
99-
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.pid)
99+
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.pid)
100100
WHERE
101101
blocking.granted
102102
AND NOT blocked.granted

0 commit comments

Comments
 (0)