Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a global variable to set number of parallel threads #336

Merged
merged 10 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ For the development version, you can do almost the same:
export GSTOOLS_BUILD_PARALLEL=1
pip install git+git://github.com/GeoStat-Framework/GSTools.git@main

The number of parallel threads can be set with the global variable `config.NUM_THREADS`.

**Using experimental GSTools-Core for even more speed**

You can install the optional dependency `GSTools-Core <https://github.com/GeoStat-Framework/GSTools-Core>`_,
Expand All @@ -122,8 +124,8 @@ or by manually installing the package

GSTools-Core will automatically use all your cores in parallel, without having
to use OpenMP or a local C compiler.
In case you want to restrict the number of threads used, you can set the
environment variable ``RAYON_NUM_THREADS`` to the desired amount.
In case you want to restrict the number of threads used, you can use the
global variable `config.NUM_THREADS` to the desired number.


Citation
Expand Down
10 changes: 8 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
for ext in ["field.summator", "variogram.estimator", "krige.krigesum"]
]
# you can set GSTOOLS_BUILD_PARALLEL=0 or GSTOOLS_BUILD_PARALLEL=1
open_mp = False
if int(os.getenv("GSTOOLS_BUILD_PARALLEL", "0")):
added = [add_openmp_flags_if_available(mod) for mod in CY_MODULES]
print(f"## GSTools setup: OpenMP used: {any(added)}")
if any(added):
open_mp = True
print(f"## GSTools setup: OpenMP used: {open_mp}")
else:
print("## GSTools setup: OpenMP not wanted by the user.")

# setup - do not include package data to ignore .pyx files in wheels
setup(ext_modules=cythonize(CY_MODULES), include_package_data=False)
setup(
ext_modules=cythonize(CY_MODULES, compile_time_env={"OPENMP": open_mp}),
include_package_data=False,
)
2 changes: 2 additions & 0 deletions src/gstools/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
.. currentmodule:: gstools.config

"""
NUM_THREADS = None

# pylint: disable=W0611
try: # pragma: no cover
import gstools_core
Expand Down
10 changes: 8 additions & 2 deletions src/gstools/field/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ def __call__(self, pos, add_nugget=True):
the random modes
"""
pos = np.asarray(pos, dtype=np.double)
summed_modes = summate(self._cov_sample, self._z_1, self._z_2, pos)
summed_modes = summate(
self._cov_sample, self._z_1, self._z_2, pos, config.NUM_THREADS
)
nugget = self.get_nugget(summed_modes.shape) if add_nugget else 0.0
return np.sqrt(self.model.var / self._mode_no) * summed_modes + nugget

Expand Down Expand Up @@ -489,7 +491,11 @@ def __call__(self, pos, add_nugget=True):
"""
pos = np.asarray(pos, dtype=np.double)
summed_modes = summate_incompr(
self._cov_sample, self._z_1, self._z_2, pos
self._cov_sample,
self._z_1,
self._z_2,
pos,
config.NUM_THREADS,
)
nugget = self.get_nugget(summed_modes.shape) if add_nugget else 0.0
e1 = self._create_unit_vector(summed_modes.shape)
Expand Down
26 changes: 23 additions & 3 deletions src/gstools/field/summator.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,32 @@ This is the randomization method summator, implemented in cython.
import numpy as np
from cython.parallel import prange

IF OPENMP:
cimport openmp

cimport numpy as np
from libc.math cimport cos, sin


def set_num_threads(num_threads):
cdef int num_threads_c = 1
if num_threads is None:
# OPENMP set during setup
IF OPENMP:
num_threads_c = openmp.omp_get_num_procs()
ELSE:
...
else:
num_threads_c = num_threads
return num_threads_c


def summate(
const double[:, :] cov_samples,
const double[:] z_1,
const double[:] z_2,
const double[:, :] pos
const double[:, :] pos,
num_threads=None,
):
cdef int i, j, d
cdef double phase
Expand All @@ -25,7 +42,9 @@ def summate(

cdef double[:] summed_modes = np.zeros(X_len, dtype=float)

for i in prange(X_len, nogil=True):
cdef int num_threads_c = set_num_threads(num_threads)

for i in prange(X_len, nogil=True, num_threads=num_threads_c):
for j in range(N):
phase = 0.
for d in range(dim):
Expand All @@ -49,7 +68,8 @@ def summate_incompr(
const double[:, :] cov_samples,
const double[:] z_1,
const double[:] z_2,
const double[:, :] pos
const double[:, :] pos,
num_threads=None,
):
cdef int i, j, d
cdef double phase
Expand Down
30 changes: 26 additions & 4 deletions src/gstools/krige/krigesum.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,30 @@ This is a summator for the kriging routines
import numpy as np
from cython.parallel import prange

IF OPENMP:
cimport openmp

cimport numpy as np


def set_num_threads(num_threads):
cdef int num_threads_c = 1
if num_threads is None:
# OPENMP set during setup
IF OPENMP:
num_threads_c = openmp.omp_get_num_procs()
ELSE:
...
else:
num_threads_c = num_threads
return num_threads_c


def calc_field_krige_and_variance(
const double[:, :] krig_mat,
const double[:, :] krig_vecs,
const double[:] cond
const double[:] cond,
num_threads=None,
):

cdef int mat_i = krig_mat.shape[0]
Expand All @@ -24,9 +41,11 @@ def calc_field_krige_and_variance(

cdef int i, j, k

cdef int num_threads_c = set_num_threads(num_threads)

# error = krig_vecs * krig_mat * krig_vecs
# field = cond * krig_mat * krig_vecs
for k in prange(res_i, nogil=True):
for k in prange(res_i, nogil=True, num_threads=num_threads_c):
for i in range(mat_i):
krig_fac = 0.0
for j in range(mat_i):
Expand All @@ -40,7 +59,8 @@ def calc_field_krige_and_variance(
def calc_field_krige(
const double[:, :] krig_mat,
const double[:, :] krig_vecs,
const double[:] cond
const double[:] cond,
const int num_threads=1,
):

cdef int mat_i = krig_mat.shape[0]
Expand All @@ -51,8 +71,10 @@ def calc_field_krige(

cdef int i, j, k

cdef int num_threads_c = set_num_threads(num_threads)

# field = cond * krig_mat * krig_vecs
for k in prange(res_i, nogil=True):
for k in prange(res_i, nogil=True, num_threads=num_threads_c):
for i in range(mat_i):
krig_fac = 0.0
for j in range(mat_i):
Expand Down
41 changes: 36 additions & 5 deletions src/gstools/variogram/estimator.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,26 @@ This is the variogram estimater, implemented in cython.
import numpy as np
from cython.parallel import parallel, prange

IF OPENMP:
cimport openmp

cimport numpy as np
from libc.math cimport M_PI, acos, atan2, cos, fabs, isnan, pow, sin, sqrt


def set_num_threads(num_threads):
cdef int num_threads_c = 1
if num_threads is None:
# OPENMP set during setup
IF OPENMP:
num_threads_c = openmp.omp_get_num_procs()
ELSE:
...
else:
num_threads_c = num_threads
return num_threads_c


cdef inline double dist_euclid(
const int dim,
const double[:, :] pos,
Expand Down Expand Up @@ -181,6 +197,7 @@ def directional(
const double bandwidth=-1.0, # negative values to turn of bandwidth search
const bint separate_dirs=False, # whether the direction bands don't overlap
str estimator_type='m',
num_threads=None,
):
if pos.shape[1] != f.shape[1]:
raise ValueError(f'len(pos) = {pos.shape[1]} != len(f) = {f.shape[1])}')
Expand Down Expand Up @@ -208,7 +225,9 @@ def directional(
cdef int i, j, k, m, d
cdef double dist

for i in prange(i_max, nogil=True):
cdef int num_threads_c = set_num_threads(num_threads)

for i in prange(i_max, nogil=True, num_threads=num_threads_c):
for j in range(j_max):
for k in range(j+1, k_max):
dist = dist_euclid(dim, pos, j, k)
Expand Down Expand Up @@ -239,6 +258,7 @@ def unstructured(
const double[:, :] pos,
str estimator_type='m',
str distance_type='e',
num_threads=None,
):
cdef int dim = pos.shape[0]
cdef _dist_func distance
Expand Down Expand Up @@ -271,7 +291,9 @@ def unstructured(
cdef int i, j, k, m
cdef double dist

for i in prange(i_max, nogil=True):
cdef int num_threads_c = set_num_threads(num_threads)

for i in prange(i_max, nogil=True, num_threads=num_threads_c):
for j in range(j_max):
for k in range(j+1, k_max):
dist = distance(dim, pos, j, k)
Expand All @@ -287,7 +309,11 @@ def unstructured(
return np.asarray(variogram), np.asarray(counts)


def structured(const double[:, :] f, str estimator_type='m'):
def structured(
const double[:, :] f,
str estimator_type='m',
num_threads=None,
):
cdef _estimator_func estimator_func = choose_estimator_func(estimator_type)
cdef _normalization_func normalization_func = (
choose_estimator_normalization(estimator_type)
Expand All @@ -301,7 +327,9 @@ def structured(const double[:, :] f, str estimator_type='m'):
cdef long[:] counts = np.zeros(k_max, dtype=long)
cdef int i, j, k

with nogil, parallel():
cdef int num_threads_c = set_num_threads(num_threads)

with nogil, parallel(num_threads=num_threads_c):
for i in range(i_max):
for j in range(j_max):
for k in prange(1, k_max-i):
Expand All @@ -316,6 +344,7 @@ def ma_structured(
const double[:, :] f,
const bint[:, :] mask,
str estimator_type='m',
num_threads=None,
):
cdef _estimator_func estimator_func = choose_estimator_func(estimator_type)
cdef _normalization_func normalization_func = (
Expand All @@ -330,7 +359,9 @@ def ma_structured(
cdef long[:] counts = np.zeros(k_max, dtype=long)
cdef int i, j, k

with nogil, parallel():
cdef int num_threads_c = set_num_threads(num_threads)

with nogil, parallel(num_threads=num_threads_c):
for i in range(i_max):
for j in range(j_max):
for k in prange(1, k_max-i):
Expand Down
8 changes: 6 additions & 2 deletions src/gstools/variogram/variogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ def vario_estimate(
pos,
estimator_type=cython_estimator,
distance_type=distance_type,
num_threads=config.NUM_THREADS,
)
else:
estimates, counts = directional(
Expand All @@ -385,6 +386,7 @@ def vario_estimate(
bandwidth,
separate_dirs=_separate_dirs_test(direction, angles_tol),
estimator_type=cython_estimator,
num_threads=config.NUM_THREADS,
)
if dir_no == 1:
estimates, counts = estimates[0], counts[0]
Expand Down Expand Up @@ -485,8 +487,10 @@ def vario_estimate_axis(
cython_estimator = _set_estimator(estimator)

if masked:
return ma_structured(field, mask, cython_estimator)
return structured(field, cython_estimator)
return ma_structured(
field, mask, cython_estimator, num_threads=config.NUM_THREADS
)
return structured(field, cython_estimator, num_threads=config.NUM_THREADS)


# for backward compatibility
Expand Down