Below is a (somewhat) minimal working example.
They key is on line 115, where if you change True to False it’ll run the multiprocess version.
Essentially I have a list of lists of models (in the example case all the same model, but not in my real code). The list of models are independent of each other and can be run in parallel.
Note that I am on Windows 10, and have no option to move to linux, and using nupic v1.0.0
# Code to investigate NuPIC and multiprocessing
import multiprocessing as mp
import random
from nupic.frameworks.opf.metrics import MetricSpec
from nupic.frameworks.opf.model_factory import ModelFactory
from nupic.frameworks.opf.prediction_metrics_manager import MetricsManager
MODEL_PARAMS_DICT = \
{ 'aggregationInfo': { 'days': 0,
'fields': [],
'hours': 0,
'microseconds': 0,
'milliseconds': 0,
'minutes': 0,
'months': 0,
'seconds': 0,
'weeks': 0,
'years': 0},
'model': 'HTMPrediction',
'modelParams': { 'anomalyParams': { u'anomalyCacheRecords': None,
u'autoDetectThreshold': None,
u'autoDetectWaitRecords': None},
'clParams': { 'alpha': 0.0001,
'verbosity': 0,
'regionName': 'SDRClassifierRegion',
'steps': '1'},
'inferenceType': 'TemporalMultiStep',
'sensorParams': { 'encoders': { u'input1': {'fieldname': 'input1',
'clipInput': True,
'forced': True,
'maxval': 1.0,
'minval': 0.0,
'n': 100,
'w': 23,
'name': 'input1',
'type': 'ScalarEncoder'},
u'_classifierInput': {'classifierOnly': True,
'clipInput': True,
'fieldname': 'input1',
'maxval': 1.0,
'minval': 0.0,
'n': 100,
'w': 23,
'forced': True,
'name': '_classifierInput',
'type': 'ScalarEncoder'}},
'sensorAutoReset': None,
'verbosity': 0},
'spEnable': True,
'spParams': { 'columnCount': 2048,
'globalInhibition': 1,
'inputWidth': 0,
'boostStrength': 1.0,
'numActiveColumnsPerInhArea': 40,
'potentialPct': 0.8,
'seed': 1956,
'spVerbosity': 0,
'spatialImp': 'cpp',
'synPermActiveInc': 0.05,
'synPermConnected': 0.1,
'synPermInactiveDec': 0.1},
'tmEnable': True,
'tmParams': { 'activationThreshold': 12,
'cellsPerColumn': 32,
'columnCount': 2048,
'globalDecay': 0.0,
'initialPerm': 0.21,
'inputWidth': 2048,
'maxAge': 0,
'maxSegmentsPerCell': 16,
'maxSynapsesPerSegment': 32,
'minThreshold': 9,
'newSynapseCount': 20,
'outputType': 'normal',
'pamLength': 1,
'permanenceDec': 0.1,
'permanenceInc': 0.1,
'seed': 1960,
'temporalImp': 'cpp',
'verbosity': 0},
'trainSPNetOnlyIfRequested': False},
'predictAheadTime': None,
'version': 1}
N_MODELS = 2
N_STEPS = 20
PREDICTION_HORIZON = 1
#---------------------------------------------------------------------
# load Models - list of list of models
#---------------------------------------------------------------------
def loadModels():
modelList = [None]*N_MODELS
# create models
for idx in range(0,N_MODELS):
lowerModelList = [None]*N_MODELS
for jdx in range(0,N_MODELS):
model = ModelFactory.create(MODEL_PARAMS_DICT)
model.enableInference({"predictedField": 'input1'})
lowerModelList[jdx] = model
modelList[idx] = lowerModelList
return modelList
#---------------------------------------------------------------------
# runModels
#---------------------------------------------------------------------
def runModels(modelList, modelInputsList):
if True: # True => NOT multiprocessing, False => multiprocessing
# Define default values
predictionsList = [[-1]]*len(modelList)
# Run each list of models
for idx in range(0,len(modelList)):
try:
prediction = runModels_mpfcn(modelList[idx],modelInputsList)
predictionsList[idx] = prediction
except:
continue # do next idx
else:
# Create parallel pool
pool = mp.Pool(processes=len(modelList))
# Run parallel pool
predictionsList = [pool.apply(runModels_mpfcn, args=(modelList[idx], modelInputsList,)) for idx in range(0,len(modelList))]
return predictionsList
#---------------------------------------------------------------------
# runModels_mpfcn multiprocessing function
#---------------------------------------------------------------------
def runModels_mpfcn(modelList, modelInputsDict):
predictionsList = [-1]*len(modelList)
for jdx in range(0,len(modelList)):
thisModel = modelList[jdx]
try:
prediction = runIndividualModel(thisModel,modelInputsDict)
predictionsList[jdx] = prediction
except Exception as e:
predictionsList[jdx] = type(thisModel) #e #str(e)
continue # do next jdx
return predictionsList
#---------------------------------------------------------------------
# runIndividualModel
#---------------------------------------------------------------------
def runIndividualModel(model, modelInputDict):
result = model.run(modelInputDict)
prediction = result.inferences["multiStepBestPredictions"][PREDICTION_HORIZON]
return prediction
#---------------------------------------------------------------------
# Command line entry point
#---------------------------------------------------------------------
if __name__ == "__main__":
# load models
modelList = loadModels()
# process each time step
for idx in range(0,N_STEPS):
# Create a random model input
inputDict = {'input1': random.random()}
# Create a prediction for each model
predictions = runModels(modelList,inputDict)
print idx, predictions