Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SISGraph for_all_nodes more efficient single-threaded #87

Merged
merged 1 commit into from
Jun 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 59 additions & 51 deletions sisyphus/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,69 +489,77 @@ def for_all_nodes(self, f, nodes=None, bottom_up=False):
if path.creator:
nodes.append(path.creator)

visited = {}
finished = 0

if gs.GRAPH_WORKER == 1:
# Run in main thread if only one graph worker is given anyway
def runner(job):
"""
:param Job job:
"""
# make sure all inputs are updated
visited_set = set()
visited_list = []
queue = list(reversed(nodes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you reverse the nodes first?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I pop the latest entry (queue.pop(-1)) and I wanted to keep a similar order as before (although it might not matter; the order of job._sis_inputs is anyway random).

while queue:
job = queue.pop(-1)
if id(job) in visited_set:
continue
visited_set.add(id(job))
job._sis_runnable()

if bottom_up:
for path in job._sis_inputs:
if path.creator:
runner(path.creator)
f(job)
# execute in reverse order at the end
visited_list.append(job)
else:
res = f(job)
# Stop if function has a not None but false return value
if res is None or res:
for path in job._sis_inputs:
if path.creator:
runner(path.creator)
if res is not None and not res:
continue

else:
pool_lock = threading.Lock()
finished_lock = threading.Lock()
pool = self.pool

# recursive function to run through tree
def runner(job):
"""
:param Job job:
"""
sis_id = job._sis_id()
with pool_lock:
if sis_id not in visited:
visited[sis_id] = pool.apply_async(
tools.default_handle_exception_interrupt_main_thread(runner_helper), (job,))

def runner_helper(job):
"""
:param Job job:
"""
# make sure all inputs are updated
job._sis_runnable()
nonlocal finished
for path in job._sis_inputs:
if path.creator:
if id(path.creator) not in visited_set:
queue.append(path.creator)

if bottom_up:
if bottom_up:
for job in reversed(visited_list):
f(job)

return visited_set

visited = {}
finished = 0

pool_lock = threading.Lock()
finished_lock = threading.Lock()
pool = self.pool

# recursive function to run through tree
def runner(job):
"""
:param Job job:
"""
sis_id = job._sis_id()
with pool_lock:
if sis_id not in visited:
visited[sis_id] = pool.apply_async(
tools.default_handle_exception_interrupt_main_thread(runner_helper), (job,))

def runner_helper(job):
"""
:param Job job:
"""
# make sure all inputs are updated
job._sis_runnable()
nonlocal finished

if bottom_up:
for path in job._sis_inputs:
if path.creator:
runner(path.creator)
f(job)
else:
res = f(job)
# Stop if function has a not None but false return value
if res is None or res:
for path in job._sis_inputs:
if path.creator:
runner(path.creator)
f(job)
else:
res = f(job)
# Stop if function has a not None but false return value
if res is None or res:
for path in job._sis_inputs:
if path.creator:
runner(path.creator)
with finished_lock:
finished += 1
with finished_lock:
finished += 1

for node in nodes:
runner(node)
Expand Down