Skip to content
This repository was archived by the owner on Nov 30, 2022. It is now read-only.

Reduce # of clients connected to the application db [#810] #944

Merged
merged 10 commits into from
Jul 27, 2022

Conversation

pattisdr
Copy link
Contributor

@pattisdr pattisdr commented Jul 25, 2022

Purpose

Since around the time of the move to celery, in local light scale testing, it's very easy to open up too many connections against the fidesops application database (current limit is 100). Investigate improvements and set some reasonable defaults.

worker_1            | [2022-07-06 13:03:43,882: WARNING/ForkPoolWorker-1]   File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 584, in connect
worker_1            |     return self.dbapi.connect(*cargs, **cparams)
worker_1            | [2022-07-06 13:03:43,883: WARNING/ForkPoolWorker-1]   File "/usr/local/lib/python3.9/site-packages/psycopg2/__init__.py", line 122, in connect
worker_1            |     conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
worker_1            | [2022-07-06 13:03:43,885: WARNING/ForkPoolWorker-1] sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL:  sorry, too many clients already
worker_1            | 

Changes

Big picture, we now do three things:

  1. Share a Session across each Celery process
  2. Use that same Session to write execution logs for a privacy request instead of creating a new one
  3. Limit task concurrency to two per worker

More details below:

  • Make run_privacy_request a bound celery task so we have access to the task itself within the function.
  • Create a custom celery Task class: DatabaseTask which extends Task but allows caching a Session to be shared across a celery process. Each celery process will have an Engine created that maintains its own connection pool, separate from the other processes. We then create the Session that is shared. This Engine and Session should be created then, once per process.
    • Close the Session after each privacy request has finished executing. This just resets the session and returns connections back to the pool. It can be reused (we were already doing this).
    • Remove unnecessary places where the Session is closed manually because the Session is being used as a context manager and is already closed through that.
  • Pass the same Session that run_privacy_request is using through to graph_task.run_access_request and graph_task.run_erasure and then onto TaskResources to be re-used to create ExecutionLogs instead of creating a new Session each, because they were opening up their own additional connections.
    • Don't close the Session after creating ExecutionLogs, wait until the entire privacy request is complete/exited. This is because we're now using the Session passed into the privacy request, instead of creating a new one, and we need to wait until the end to close the session.
  • Limit task concurrency to two per worker (this can later be a configuration option, this was just our previous limit. This reduces the number of privacy requests that are being executed simultaneously, which also can spread out the queries to their owned databases and connected APIs)

Findings

Local experimentation was completed by monitoring the number of backends connected to the application database while submitting increasing numbers/rates of privacy requests against fidesops. The test setup is using a simple graph of just postgres collections from postgres_example_test_dataset and I'm running access requests using the policy and other details laid out in the Postman collection. I had two celery workers running. A single POST to api/v1/privacy-request can create anywhere from 1-50 privacy requests at a time, so testing would start with just creating one a time, then 6 at a time, then usually 6 at a time 6 times in rapid fashion, then 10 at a time, then 50 at a time.

Before

We were using the default SqlAlchemy Connection Pool (QueuePool) and not sharing connection pools between processes which is good. Each pool size was 5 by default, with the default max_overflow of 10. But we were calling get_db_session for each privacy request without specifying an engine (which manage the connection pools), so it seemed like multiple connection pools were being creating per process so connections weren't being re-used. Additionally, the process of writing execution logs would open up their own sessions which would open up additional connections, compounding the problem.

The setup below seemed to quickly max out the 100 connection default and not reuse connections.

Num Privacy Requests Total connections Idle Connections
1x1 15 12
1x1 23 21
1x1 23 22
1x6 59 56
1x6 67 65
1x6 77 74
1x6 92 89
1x6 Maxed out 99

After

Still using a QueuePool of size 5 with max_overflow of 10, but each celery process gets one engine and one session. We use that session for the entirety of the request, even to create the execution logs, then close at the end, which just resets the session and returns the connections to the connection pool. I also limit the number of privacy requests that can be executed simultaneously per worker to 2, which also reduces the number of connections we need to open at once.

Num Privacy Requests Total connections Idle Connections
1x1 5 3
1x1 7 5
1x1 7 5
1x1 9 7
1x6 13 11
1x6 10 8
6x6 15 12
6x6 15 13
1x10 17 15
1x50 18 16

Side Effects

Datastores that are disabled while a PrivacyRequest is in progress will have their current state picked up and remaining collections will be skipped.

ExecutionLog creation runs a Session.commit() which expires the state of all instances in the Session so the next time we access them, they have the most recent attributes. Our previous behavior queries everything up-front, and then uses that initial state for the remainder of the privacy request. This change means that attributes can alter state mid-privacy request.

Future things to experiment with

  • Increasing or decreasing size of connection pools: with pool_size and max_overflow - making the pool slightly smaller or slightly larger only had small effects on total connections, but I didn't experiment with larger changes.
  • Increasing total number of allowed connections against fidesops database
  • Allow number of workers to be configured, sometimes multiple workers managing a queue can perform better than a single queue. Two workers seemed to perform better than 1, with concurrency of 2, and default pool size/overflow.
  • Allow the number of concurrent privacy requests to be configured: I am currently restricting to 2 per worker to match where we had settled before celery went in.
  • Look into using celery thread based pools (eventlet/gevent) instead of process based pools because a lot of our privacy request execution is IO bound.

Checklist

  • Update CHANGELOG.md file
    • Merge in main so the most recent CHANGELOG.md file is being appended to
    • Add description within the Unreleased section in an appropriate category. Add a new category from the list at the top of the file if the needed one isn't already there.
    • Add a link to this PR at the end of the description with the PR number as the text. example: #1
  • Applicable documentation updated (guides, quickstart, postman collections, tutorial, fidesdemo, database diagram.
  • If docs updated (select one):
    • documentation complete, or draft/outline provided (tag docs-team to complete/review on this branch)
    • documentation issue created (tag docs-team to complete issue separately)
  • Good unit test/integration test coverage
  • This PR contains a DB migration. If checked, the reviewer should confirm with the author that the down_revision correctly references the previous migration before merging
  • The Run Unsafe PR Checks label has been applied, and checks have passed, if this PR touches any external services

Ticket

Fixes #810

- Limit task concurrency to two per worker.
- Create one Engine per celery process which opens up a connection pool.  Create one Session per celery process and use that session across privacy requests.
- Close the session after the privacy request has finished executing.  This just resets the session and returns connections back to the pool. It can be reused.
- Remove unnecessary places where session is closed manually because the session is being used as a context manager and is already closed through that.
- Pass the same Session that the privacy request is using through to TaskResources to be re-used to create ExecutionLogs instead of opening up a new Session.
- Don't close the session when passing it into the Execution Log, wait until the entire privacy request is complete/exited.
@pattisdr pattisdr changed the title [DRAFT] Reduce Number of Open Connections [#810] [DRAFT] Reduce # of clients connected to the application db [#810] Jul 25, 2022
@pattisdr
Copy link
Contributor Author

One legitimate test failure here, representing a behavior change, need to clear with product.

@seanpreston
Copy link
Contributor

Thanks @pattisdr — this is a huge improvement from the current, and back to where we were before!

@seanpreston
Copy link
Contributor

One legitimate test failure here, representing a behavior change, need to clear with product.

For the failed test test_run_disabled_collections_in_progress, is it that case that because we have a live DB connection in the traversal we're now able to update the execution plan to not run disabled collections for already in progress requests?

pattisdr added 2 commits July 27, 2022 07:50
… a request is in progress can cause related collections to be skipped once the current session is expired and the connection config has the most recent state.

Because the same Session that is being used to run the PrivacyRequest is now being used for ExecutionLogs, the process of saving an ExecutionLog runs a session.commit() which expires the Session and causes the ConnectionConfig to have the most recent state the next time it is accessed.
@pattisdr
Copy link
Contributor Author

pattisdr commented Jul 27, 2022

@seanpreston yes, in part. The change is caused because the same Session that we use to run the PrivacyRequest is also now used to create Execution Logs, instead of creating a new Session for the Execution Logs. As part of saving the Execution Logs, we run a Session.commit(), which will cause the ConnectionConfig to have the most recent state the next time it is accessed:

https://docs.sqlalchemy.org/en/14/orm/session_basics.html#session-committing

Finally, all objects within the Session are expired as the transaction is closed out. This is so that when the instances are next accessed, either through attribute access or by them being present in the result of a SELECT, they receive the most recent state.

I cleared the behavior with @adriaaaa yesterday. She said we can go with it, and if we get feedback that it's causing issues we can revisit.

Note that this doesn't spread the Session everywhere through the Traversal yet, it just passes it through far enough for the ExecutionLog creation to be able to access it.

@pattisdr pattisdr changed the title [DRAFT] Reduce # of clients connected to the application db [#810] Reduce # of clients connected to the application db [#810] Jul 27, 2022
Comment on lines +154 to +164
class DatabaseTask(Task): # pylint: disable=W0223
_session = None

@property
def session(self) -> ContextManager[Session]:
"""Creates Session once per process"""
if self._session is None:
SessionLocal = get_db_session(config)
self._session = SessionLocal()

return self._session
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows us to share the Session across a Celery process.

This also limits the number of times get_db_session is being called per process which was creating new Engines which were opening up new Connection Pools whose connections weren't being reused.

@@ -190,7 +204,6 @@ def run_privacy_request(
after_webhook_id=from_webhook_id,
)
if not proceed:
session.close()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These session.close calls were removed because they're not doing anything, the Session is a context manager above (see with self.session as session), so it is being closed automatically.

Comment on lines +576 to +578
with TaskResources(
privacy_request, policy, connection_configs, session
) as resources:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're passing in the Session here to TaskResources so we can use it to create the ExecutionLogs

Comment on lines -160 to +161
SessionLocal = get_db_session(config)
db = SessionLocal()
db = self.session
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using the same Session that is being used for the PrivacyRequest, not creating a new one, which reduces the number of connections we're opening. It also has a side effect of causing other resources bound to the Session for the PrivacyRequest to get the most recent state when the ExecutionLog is saved, because we commit the Session.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main implication here is with request cancellation? ie. if .cancelled_at / paused_at / .status were to change, we may be able to abort the traversal entirely?

Copy link
Contributor Author

@pattisdr pattisdr Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this is a good example of where we can better make decisions mid-traversal based on the resources's current state in the future.

@@ -48,7 +48,7 @@ def _create_celery(config_path: str = config.execution.celery_config_path) -> Ce

def start_worker() -> None:
logger.info("Running Celery worker...")
celery_app.worker_main(argv=["worker", "--loglevel=info"])
celery_app.worker_main(argv=["worker", "--loglevel=info", "--concurrency=2"])
Copy link
Contributor Author

@pattisdr pattisdr Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be configurable in the future, but this matches our state before the move to Celery. The default is the number of cores on your machine, but limiting the number of PrivacyRequests that can be run simultaneously per worker has a lot of positive effects. This includes reducing the number of simultaneous connections on the application database, as well as the customers' owned databases, and reduces simultaneous requests against their connected API's.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've added a ticket to be addressed as a follow-up.

@pattisdr pattisdr marked this pull request as ready for review July 27, 2022 12:07
@pattisdr pattisdr added the run unsafe ci checks Triggers running of unsafe CI checks label Jul 27, 2022
@seanpreston seanpreston self-assigned this Jul 27, 2022
@seanpreston seanpreston merged commit c8ba158 into main Jul 27, 2022
@seanpreston seanpreston deleted the fidesops_810_reduce_connections branch July 27, 2022 16:20
sanders41 pushed a commit that referenced this pull request Sep 22, 2022
* Reduce number of open connections:

- Limit task concurrency to two per worker.
- Create one Engine per celery process which opens up a connection pool.  Create one Session per celery process and use that session across privacy requests.
- Close the session after the privacy request has finished executing.  This just resets the session and returns connections back to the pool. It can be reused.
- Remove unnecessary places where session is closed manually because the session is being used as a context manager and is already closed through that.
- Pass the same Session that the privacy request is using through to TaskResources to be re-used to create ExecutionLogs instead of opening up a new Session.
- Don't close the session when passing it into the Execution Log, wait until the entire privacy request is complete/exited.

* Define "self" for run_privacy_task - it's the task itself.

For mypy's benefits, define that the session is a context manager.

* Make a session non-optional for graph_task.run_access_request, graph_task.run_erasure, and for instantiating taskResources

* Use missing db fixture.

* Add missing db resource.

* Update test to reflect new behavior that disabling a datasource while a request is in progress can cause related collections to be skipped once the current session is expired and the connection config has the most recent state.

Because the same Session that is being used to run the PrivacyRequest is now being used for ExecutionLogs, the process of saving an ExecutionLog runs a session.commit() which expires the Session and causes the ConnectionConfig to have the most recent state the next time it is accessed.

* Update CHANGELOG.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
run unsafe ci checks Triggers running of unsafe CI checks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Spike] Privacy Request Limit Concurrency / Too Many Open Connections (Timebox: 3 days)
2 participants