Is multiprocessing model execution possible?

I’m wondering if anyone has used OPF models in conjunction with the python multiprocessing package?

I have an application that boils down to the following pseudo-code:

# Create a list of models
modelList = [createModels() for i in range(0,5)]

# Create some model input data
in1 = ...
in2 = ...

# Execute the models
# This approach works
predictions = [executeModels(modelList[idx],in1,in2) for idx in range(0,5)]

# This approach does not work
pool = mp.pool(processes=5)
predictions = [pool.apply(executeModels,args=(modelList[idx],in1,in2,)) for idx in range(0,5)]

With the parallel approach the run method of the models throws a (very unhelpful) AssertionError.
(As far as I can tell the correct data is getting to the run method.)

I’m wondering if there’s a serialization issue with using the models in this way, or whether there’s something obvious that I’m doing wrong.

1 Like

What exactly is the unhelpful error?

It’s empty - i.e. there is no message saying what the problem is.

More specifically, within the executeModels function, the except part of a try-except construct looks like :

except Exception as e:
return e

which returns AssertionError()

or I’ve also tried,
except Exception as e:
return str(e) # or e.message

which returns ‘’ (i.e. an empty string).

The only thing in the try part is a single line calling the models run method.

Can you debug this and set a breakpoint to stop on that error and trace it back into NuPIC to see where it is generated?

Unfortunately my debugger doesn’t recognize breakpoints inside the functions that are being multiprocessed. A stacktrace is no good either as it just tells me that the error is occurring at the run method.

I’m not sure what to tell you. Asking @lscheinkman @scott @mrcslws have you ever used NuPIC with multiprocessing? I have only used multiprocessing to serialize models, not actually run them.

I’m wondering if anyone has used OPF models in conjunction with the python multiprocessing package?

I vaguely remember someone running into an issue running models in multiple threads or processes (don’t remember which) so very possible there is a problem. The Python stack trace or a way to reproduce would be helpful.

:+1:

This isn’t a data problem, so you can just fake the input. Try to write as little code as possible to exhibit the error. Once you have a python script, post it here. Then I will race @scott to see who can find the problem first.

Below is a (somewhat) minimal working example.

They key is on line 115, where if you change True to False it’ll run the multiprocess version.
Essentially I have a list of lists of models (in the example case all the same model, but not in my real code). The list of models are independent of each other and can be run in parallel.

Note that I am on Windows 10, and have no option to move to linux, and using nupic v1.0.0


# Code to investigate NuPIC and multiprocessing

import multiprocessing as mp
import random

from nupic.frameworks.opf.metrics import MetricSpec
from nupic.frameworks.opf.model_factory import ModelFactory
from nupic.frameworks.opf.prediction_metrics_manager import MetricsManager

MODEL_PARAMS_DICT = \
{ 'aggregationInfo': { 'days': 0,
                       'fields': [],
                       'hours': 0,
                       'microseconds': 0,
                       'milliseconds': 0,
                       'minutes': 0,
                       'months': 0,
                       'seconds': 0,
                       'weeks': 0,
                       'years': 0},
  'model': 'HTMPrediction',
  'modelParams': { 'anomalyParams': { u'anomalyCacheRecords': None,
                                      u'autoDetectThreshold': None,
                                      u'autoDetectWaitRecords': None},
                   'clParams': { 'alpha': 0.0001,
                                 'verbosity': 0,
                                 'regionName': 'SDRClassifierRegion',
                                 'steps': '1'},
                   'inferenceType': 'TemporalMultiStep',
                   'sensorParams': { 'encoders': { u'input1': {'fieldname': 'input1',
                                                            'clipInput': True,
                                                            'forced': True,
                                                            'maxval': 1.0,
                                                            'minval': 0.0,
                                                            'n': 100,
                                                            'w': 23,
                                                            'name': 'input1',
                                                            'type': 'ScalarEncoder'},
                                                   u'_classifierInput': {'classifierOnly': True,
                                                                          'clipInput': True,
                                                                          'fieldname': 'input1',
                                                                          'maxval': 1.0,
                                                                          'minval': 0.0,
                                                                          'n': 100,
                                                                          'w': 23,
                                                                          'forced': True,
                                                                          'name': '_classifierInput',
                                                                          'type': 'ScalarEncoder'}},
                                     'sensorAutoReset': None,
                                     'verbosity': 0},
                   'spEnable': True,
                   'spParams': { 'columnCount': 2048,
                                 'globalInhibition': 1,
                                 'inputWidth': 0,
                                 'boostStrength': 1.0,
                                 'numActiveColumnsPerInhArea': 40,
                                 'potentialPct': 0.8,
                                 'seed': 1956,
                                 'spVerbosity': 0,
                                 'spatialImp': 'cpp',
                                 'synPermActiveInc': 0.05,
                                 'synPermConnected': 0.1,
                                 'synPermInactiveDec': 0.1},
                   'tmEnable': True,
                   'tmParams': { 'activationThreshold': 12,
                                 'cellsPerColumn': 32,
                                 'columnCount': 2048,
                                 'globalDecay': 0.0,
                                 'initialPerm': 0.21,
                                 'inputWidth': 2048,
                                 'maxAge': 0,
                                 'maxSegmentsPerCell': 16,
                                 'maxSynapsesPerSegment': 32,
                                 'minThreshold': 9,
                                 'newSynapseCount': 20,
                                 'outputType': 'normal',
                                 'pamLength': 1,
                                 'permanenceDec': 0.1,
                                 'permanenceInc': 0.1,
                                 'seed': 1960,
                                 'temporalImp': 'cpp',
                                 'verbosity': 0},
                   'trainSPNetOnlyIfRequested': False},
  'predictAheadTime': None,
  'version': 1}


N_MODELS = 2
N_STEPS = 20
PREDICTION_HORIZON = 1

#---------------------------------------------------------------------
# load Models - list of list of models
#---------------------------------------------------------------------
def loadModels():

    modelList = [None]*N_MODELS
    
    # create models
    for idx in range(0,N_MODELS):
        lowerModelList = [None]*N_MODELS
        for jdx in range(0,N_MODELS):
            model = ModelFactory.create(MODEL_PARAMS_DICT)
            model.enableInference({"predictedField": 'input1'})
            lowerModelList[jdx] = model
        modelList[idx] = lowerModelList
            
    return modelList

#---------------------------------------------------------------------
# runModels
#---------------------------------------------------------------------
def runModels(modelList, modelInputsList):

    if True:  # True => NOT multiprocessing, False => multiprocessing
        # Define default values
        predictionsList = [[-1]]*len(modelList)
        
        # Run each list of models
        for idx in range(0,len(modelList)):
            try:
                prediction = runModels_mpfcn(modelList[idx],modelInputsList)
                predictionsList[idx] = prediction
            except:
                continue # do next idx
    else:
        # Create parallel pool
        pool = mp.Pool(processes=len(modelList))
        
        # Run parallel pool
        predictionsList = [pool.apply(runModels_mpfcn, args=(modelList[idx], modelInputsList,)) for idx in range(0,len(modelList))]
    
    return predictionsList

#---------------------------------------------------------------------
# runModels_mpfcn multiprocessing function
#---------------------------------------------------------------------
def runModels_mpfcn(modelList, modelInputsDict):

    predictionsList = [-1]*len(modelList)
    for jdx in range(0,len(modelList)):
        thisModel = modelList[jdx]
        try:    
            prediction = runIndividualModel(thisModel,modelInputsDict)
            predictionsList[jdx] = prediction
        except Exception as e:
            predictionsList[jdx] = type(thisModel) #e #str(e)
            continue # do next jdx
            
    return predictionsList

#---------------------------------------------------------------------
# runIndividualModel
#---------------------------------------------------------------------
def runIndividualModel(model, modelInputDict):

    result = model.run(modelInputDict)
    prediction = result.inferences["multiStepBestPredictions"][PREDICTION_HORIZON]
    
    return prediction


#---------------------------------------------------------------------
# Command line entry point
#---------------------------------------------------------------------
if __name__ == "__main__":

    # load models
    modelList = loadModels()

    # process each time step
    for idx in range(0,N_STEPS):

        # Create a random model input
        inputDict = {'input1': random.random()}

        # Create a prediction for each model
        predictions = runModels(modelList,inputDict)

        print idx, predictions

The code executes without error for me. This is likely a Windows-specific issue but still fixable by making sure all objects are pickle-able. I’m not sure that that is a priority but if it makes multiprocessing fail in all cases on Windows that might suggest some priority.

I also noticed that the actual HTMPredictionModel classes are being returned by the processes. You might try changing it so that only Python standard library types are returned to the original process which might avoid pickle issues. You can see from the output when I switched to use multiprocessing that the return types are NuPIC classes:

$ python tmp.py
0 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
1 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
2 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
3 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
4 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
5 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
6 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
7 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
8 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
9 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
10 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
11 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
12 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
13 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
14 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
15 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
16 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
17 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
18 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]
19 [[<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>], [<class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>, <class 'nupic.frameworks.opf.htm_prediction_model.HTMPredictionModel'>]]

Can you change the pool.apply arguments and return value of runModels_mpfcn be Python primitive types to see if that avoids the error?

@scott - if you are seeing the model class returned (as per your print out) then it isn’t working correctly.
That is, the parallel pool is failing.

The class type is only returned when the parallel pool fails.
If it doesn’t fail then a numeric prediction is returned (as per the result when the parallel pool is not used).

I’m sure that the return type is not the issue - as I’ve tested it with returning many different data types, from integers to strings.
It’s just currently set up to return the model class as I was testing that the pool was passing the expected model class to the function.

If you comment out line 147 then you’ll get the number -1 if the pool fails.