Discussion:
[Numpy-discussion] numpy in python callback from threaded c++
Burlen Loring
2016-03-28 20:44:29 UTC
Permalink
Hi All,

in my c++ code I've added Python binding via swig. one scenario is to
pass a python function to do some computational work. the Python program
runs in serial in the main thread but work is handled by a thread pool,
the callback is invoked from another thread on unique data. Before a
thread invokes the Python callback it acquires Python's GIL. Also I
PyEval_InitThreads during module initialization, and have swig'd with
-threads flag. However, I'm experiencing frequent crashes when thread
pool size is greater than 1, and valgrind is reporting errors from numpy
even in case where thread pool size is 1.

Here's the essence of the error reported by valgrind:

==10316== Invalid read of size 4
==10316== at 0x4ED7D73: PyObject_Free (obmalloc.c:1013)
==10316== by 0x10D540B0: NpyIter_Deallocate (nditer_constr.c:699)
....
==10316== Address 0x20034020 is 3,856 bytes inside a block of size
4,097 free'd
==10316== at 0x4C29E00: free (vg_replace_malloc.c:530)
==10316== by 0x4F57B22: import_module_level (import.c:2278)
==10316== by 0x4F57B22: PyImport_ImportModuleLevel (import.c:2292)
==10316== by 0x4F36597: builtin___import__ (bltinmodule.c:49)
==10316== by 0x4E89AC2: PyObject_Call (abstract.c:2546)
==10316== by 0x4E89C1A: call_function_tail (abstract.c:2578)
==10316== by 0x4E89C1A: PyObject_CallFunction (abstract.c:2602)
==10316== by 0x4F58735: PyImport_Import (import.c:2890)
==10316== by 0x4F588B9: PyImport_ImportModule (import.c:2133)
==10316== by 0x10D334C2: get_forwarding_ndarray_method (methods.c:57)
==10316== by 0x10D372C0: array_mean (methods.c:1932)
==10316== by 0x4F40AC7: call_function (ceval.c:4350)

There are a few of these reported. I'll attach the full output. This is
from the simplest scenario, where the thread pool has a size of 1.
Although there are 2 threads, the program is serial as the main thread
passes work tasks to the thread pool and waits for work to finish.

Here is the work function where above occurs:

def execute(port, data_in, req):
sys.stderr.write('descriptive_stats::execute MPI %d\n'%(rank))

mesh = as_teca_cartesian_mesh(data_in[0])

table = teca_table.New()
table.declare_columns(['step','time'], ['ul','d'])
table << mesh.get_time_step() << mesh.get_time()

for var_name in var_names:

table.declare_columns(['min '+var_name, 'avg '+var_name, \
'max '+var_name, 'std '+var_name, 'low_q '+var_name, \
'med '+var_name, 'up_q '+var_name], ['d']*7)

var = mesh.get_point_arrays().get(var_name).as_array()

table << float(np.min(var)) << float(np.average(var)) \
<< float(np.max(var)) << float(np.std(var)) \
<< map(float, np.percentile(var, [25.,50.,75.]))

return table

Again, I'm acquiring the GIL so this should be executed in serial. What
am I doing wrong? Have I missed some key aspect of using numpy in this
scenario? Any documentation on using numpy in a scenario like this? Any
help is greatly appreciated!

Thanks
Burlen

Loading...