Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncConnectionPool in a spawned process gets stuck when acquiring a connection from the pool two or more times #457

Open
klymenkosergiy opened this issue Feb 25, 2025 · 4 comments
Labels
question Further information is requested

Comments

@klymenkosergiy
Copy link

I have tested my web application with oracledb-3.0.0b1. The application uses ProcessPoolExecutor with mp_context=multiprocessing.get_context('spawn') to start background tasks. Each child process creates its own AsyncConnectionPool, which is used to process submitted tasks.

I observed that the child process freezes when acquiring a connection from the pool, usually on the second received task.

Additionally, when I restarted the application, I encountered a CancelledError in the child process:

    con = await pool.acquire()
  File "XXXX/.py-venv/lib64/python3.9/site-packages/oracledb/connection.py", line 1580, in _connect
    impl = await pool_impl.acquire(params_impl)
  File "src/oracledb/impl/thin/pool.pyx", line 835, in acquire
  File "/usr/lib64/python3.9/asyncio/tasks.py", line 442, in wait_for
    return await fut
  File "src/oracledb/impl/thin/pool.pyx", line 688, in _acquire_helper
  File "src/oracledb/impl/thin/pool.pyx", line 694, in oracledb.thin_impl.AsyncThinPoolImpl._acquire_helper
  File "src/oracledb/impl/thin/pool.pyx", line 690, in oracledb.thin_impl.AsyncThinPoolImpl._acquire_helper
  File "/usr/lib64/python3.9/asyncio/locks.py", line 317, in wait_for
    await self.wait()
  File "/usr/lib64/python3.9/asyncio/locks.py", line 290, in wait
    await fut

It's a very strange place for CancelledError because I create a pool as
oracledb.create_pool_async(getmode=oracledb.SPOOL_ATTRVAL_NOWAIT)

For release connection, I use the bellow way:
await pool.release(con)

But this explains why the request processing in child processes gets stuck every time.

I suspect that something goes wrong with oracledb initialization when a child process is spawned.
Could it be that additional initialization is required in the child process for oracledb (thin mode) before creating the AsyncConnectionPool?

I have observed this behavior only on Linux. On Windows, ProcessPoolExecutor spawns a new child process for each task and terminates it immediately after the task is completed.

@klymenkosergiy klymenkosergiy added the question Further information is requested label Feb 25, 2025
@anthony-tuininga
Copy link
Member

Can you share code that demonstrates the problem? Creation of a pool and use of the connections in a pool should only occur in the same process. No mixing and matching allowed!

@klymenkosergiy
Copy link
Author

klymenkosergiy commented Feb 25, 2025

yes, each process creates own pool into init function. Here is a very simplified example to describe the concept of how it is initialized and used in the child process:

global w = None
class Worker:
	def __init__(self, ctx, config):
		self.loop = asyncio.new_event_loop()
		self.ctx = ctx
		self.config = config
		asyncio.set_event_loop(self.loop)
		self.loop.run_until_complete(self.asyncIInit())

	async def asyncInit(self):
		self.pool = oracledb.create_pool_async(
					dsn = self.config.dsn,
					user = self.config.user,
					password = self.config.password,
					min = self.config.pool_min,
					max = self.config.pool_max,
					increment = 1,
					timeout = self.config.exec_timeout,
					wait_timeout = self.config.wait_timeout,
					max_lifetime_session = self.config.maxLifetimeSession,
					host = self.config.host,
					port = self.config.port,
					service_name = self.config.service_name,
					sid = self.config.sid,
					getmode=oracledb.SPOOL_ATTRVAL_NOWAIT
					)

	def run(self, coro):
		self.loop.run_until_complete(coro))

	async def runTask(self, func, args)
		await func(*args)

	async def acquire(self):
		try:
			return await pool.acquire()
		except oracledb.DatabaseError as ex:
			#the logic when we have no ready connection
			pass
	async def release(self, con):
		await self.pool.release(con)


def initSubprocess(ctx, config):
	w = Worker(ctx, config)

def runTask(func, args)
	w.run(w.runTask(func, args))

async def taskWorker(params):
	con = None:
	try:
		con = await w.acquire()
		#real payload
	finally:
		if con != None: await w.release(con)

p = ProcessPoolExecutor(
		max_workers=4,
		mp_context=ctx,
		initializer=initSubprocess,
		initargs=(ctx, configs))
p.submit(runTask, taskWorker, params)

This logic works on cx_Oracle + cx_Oracle_async. If you need the real working example, I'll create it, the real logic is very huge to try to copy it as an example :)

@anthony-tuininga
Copy link
Member

anthony-tuininga commented Feb 25, 2025

I had to tweak this a fair bit to make it runnable. Does this hang for you? It works for me.

import asyncio
from concurrent.futures import ProcessPoolExecutor

import oracledb


class Worker:
    def __init__(self, ctx, config):
        self.loop = asyncio.new_event_loop()
        self.ctx = ctx
        self.config = config
        asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(self.asyncInit())

    async def asyncInit(self):
        self.pool = oracledb.create_pool_async(
            dsn="host/service_name",
            user="user",
            password="password",
            min=2,
            max=10,
            increment=1,
            getmode=oracledb.SPOOL_ATTRVAL_NOWAIT,
        )

    def run(self, coro):
        self.loop.run_until_complete(coro)

    async def runTask(self, func, *args):
        await func(*args)

    async def acquire(self):
        try:
            return await self.pool.acquire()
        except oracledb.DatabaseError as ex:
            # the logic when we have no ready connection
            pass

    async def release(self, con):
        await self.pool.release(con)


def initSubprocess(ctx, config):
    global w
    w = Worker(ctx, config)


def runTask(func, args):
    w.run(w.runTask(func, args))


async def taskWorker(params):
    con = await w.acquire()
    async with con:
        print("version:", con.version)
        # real payload

ctx = {}
config = {}
params = {}

p = ProcessPoolExecutor(
    max_workers=4,
    initializer=initSubprocess,
    initargs=(ctx, config),
)

result = p.submit(runTask, taskWorker, params)
print("result 1:", result.result())
result = p.submit(runTask, taskWorker, params)
print("result 2:", result.result())
result = p.submit(runTask, taskWorker, params)
print("result 3:", result.result())
print("Done!")

Note that I simplified the taskWorker() by using an asynchronous context manager -- that ensures that the connection is returned to the pool, regardless of success or failure.

@klymenkosergiy
Copy link
Author

I expanded the test script but couldn't reproduce the real issue.
My observation on real application (the report server):
when I restart the application all requests from the queue have been processed without issues in line, and the child worker could process 3-4 tasks in a row.
I checked the logic into AsyncConnection.aexit and switched connection release as: await con.close() in my application.
In a place where a connection is released, I added the logging and saw no busy connections in the pool after the connection release ( {pool.busy}/{pool.max})

Also, I added the logging before and after the connection acquiring call and I observed 3 times the same behavior after restart:
after the application launch child workers process all task from the queue successfully, but after that, if an idle period happens the next call await pool.acquire() is never returned. Hence if I break the application I receive CanceledError with stack trace into oracledb...
Do you have any idea how to investigate this issue deeply?

my current version of the script that unfortunately does not reproduce issue:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import oracledb
import time
import os


class Worker:
	def __init__(self, ctx, config):
		self.loop = asyncio.new_event_loop()
		self.ctx = ctx
		self.config = config
		asyncio.set_event_loop(self.loop)
		self.loop.run_until_complete(self.asyncInit())

	async def asyncInit(self):
		self.pool = oracledb.create_pool_async(
			dsn="host/service_name",
			user="user",
			password="password",
			min=1,
			max=3,
			increment=1,
			getmode=oracledb.SPOOL_ATTRVAL_NOWAIT,
		)

	def run(self, coro):
		self.loop.run_until_complete(coro)

	async def runTask(self, func, *args):
		await func(*args)

	async def acquire(self):
		try:
			return await self.pool.acquire()
		except oracledb.DatabaseError as ex:
			# the logic when we have no ready connection
			pass

	async def release(self, con):
		await self.pool.release(con)


def initSubprocess(ctx, config):
	global w
	w = Worker(ctx, config)


def runTask(func, args):
	w.run(w.runTask(func, args))


async def taskWorker(params):
	print(f'[{os.getpid()}] task#{params}: acquiring connection...')
	con = await w.acquire()
	async with con:
		print(f'[{os.getpid()}] Task#{params}: connection version= {con.version}')
		# real payload
		await asyncio.sleep(3)
	print(f'[{os.getpid()}] Task#{params} is finished')
	return 0

async def taskDummy():
	await asyncio.sleep(0.1)

if __name__ == '__main__':
	ctx = multiprocessing.get_context('spawn')
	config = {}
	params = {}

	p = ProcessPoolExecutor(
		max_workers=2,
		mp_context=ctx,
		initializer=initSubprocess,
		initargs=(ctx, config),
	)
	#run dummy task to init all child processes
	d1 = p.submit(runTask, taskDummy)
	d2 = p.submit(runTask, taskDummy)
	#read results and finish future
	d1.result()
	d2.result()

	#push 2 task in parallel
	result1 = p.submit(runTask, taskWorker, 1)
	result2 = p.submit(runTask, taskWorker, 2)
	#print task result
	print(f'result 1: {result1.result()}')
	print(f'result 2: {result2.result()}')

	#idle before next call
	time.sleep(30)

	#push 2 task in parallel
	result1 = p.submit(runTask, taskWorker, 3)
	result2 = p.submit(runTask, taskWorker, 4)
	#print task result
	print(f'result 3: {result1.result()}')
	print(f'result 4: {result2.result()}')

	print("Done!")
	p.shutdown(wait=False, cancel_futures=True)


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants