AssertionError: Using nupic with Spark

Spark Application:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

import json
import datetime

from pkg_resources import resource_filename

from nupic.frameworks.opf.model_factory import ModelFactory

import model_params

_ANOMALY_THRESHOLD = 0.9

model = ModelFactory.create(model_params.MODEL_PARAMS)
model.enableInference({'predictedField': 'consumption'})

def anomaly_detector(x):
    y = x
    y["timestamp"] = datetime.datetime.strptime(x["timestamp"], "%m/%d/%y %H:%M")
    ######
    headers = ["timestamp","consumption"]
    record = ["7/2/10 0:00","21.2"]
    modelInput = dict(zip(headers, record))
    modelInput["consumption"] = float(modelInput["consumption"])
    modelInput["timestamp"] = datetime.datetime.strptime(modelInput["timestamp"], "%m/%d/%y %H:%M")
    result = model.run(modelInput)
    #######
    # result = model.run(y)
    anomalyScore = result.inferences['anomalyScore']
    return anomalyScore

sc = SparkContext(appName="PythonSparkStreamingKafka")
sc.setLogLevel("WARN")

ssc = StreamingContext(sc,60)

kafkaStream = KafkaUtils.createStream(ssc, '<ZKQuorum>', 'pyspark-testing-group-1', {'pyspark-testing':6})

parsed = kafkaStream.map( lambda x: json.loads(x[1]) )

anomalies = parsed.map( anomaly_detector )

anomalies.pprint()

ssc.start()

ssc.awaitTermination()

It fails at model.run() with following error:

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:141)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/lib/python2.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/lib/python2.7/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/docker/nupic/examples/opf/clients/hotgym/anomaly/hotgym_anomaly_with_spark.py", line 28, in anomaly_detector
    result = model.run(modelInput)
  File "/usr/local/lib/python2.7/dist-packages/nupic-1.0.6.dev0-py2.7.egg/nupic/frameworks/opf/htm_prediction_model.py", line 411, in run
    assert not self.__restoringFromState
AssertionError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:204)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)


	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
	at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Can someone please help me to solve this issue?

1 Like

This seems to be related to the model being pickled. Are you pickling the model instance at any point? That is the only way I can see this error happening.

1 Like

No I am not pickling the model anywhere.

MODEL_PARAMS = {
    # Type of model that the rest of these parameters apply to.
    'model': "HTMPrediction",

    # Version that specifies the format of the config.
    'version': 1,

    # Intermediate variables used to compute fields in modelParams and also
    # referenced from the control section.
    'aggregationInfo': {  'days': 0,
        'fields': [(u'c1', 'sum'), (u'c0', 'first')],
        'hours': 1,
        'microseconds': 0,
        'milliseconds': 0,
        'minutes': 0,
        'months': 0,
        'seconds': 0,
        'weeks': 0,
        'years': 0},

    'predictAheadTime': None,

    # Model parameter dictionary.
    'modelParams': {
        # The type of inference that this model will perform
        'inferenceType': 'TemporalAnomaly',

        'sensorParams': {
            # Sensor diagnostic output verbosity control;
            # if > 0: sensor region will print out on screen what it's sensing
            # at each step 0: silent; >=1: some info; >=2: more info;
            # >=3: even more info (see compute() in py/regions/RecordSensor.py)
            'verbosity' : 0,

            # Example:
            #     dsEncoderSchema = [
            #       DeferredDictLookup('__field_name_encoder'),
            #     ],
            #
            # (value generated from DS_ENCODER_SCHEMA)
            'encoders': {
                u'timestamp_timeOfDay': {
                        'fieldname': u'timestamp',
                        'name': u'timestamp_timeOfDay',
                        'timeOfDay': (21, 9.5),
                        'type': 'DateEncoder'
                },
                u'timestamp_dayOfWeek': None,
                u'timestamp_weekend': None,
                u'consumption':    {
                    'clipInput': True,
                    'fieldname': u'consumption',
                    'maxval': 100.0,
                    'minval': 0.0,
                    'n': 50,
                    'name': u'consumption',
                    'type': 'ScalarEncoder',
                    'w': 21
                },
            },

            # A dictionary specifying the period for automatically-generated
            # resets from a RecordSensor;
            #
            # None = disable automatically-generated resets (also disabled if
            # all of the specified values evaluate to 0).
            # Valid keys is the desired combination of the following:
            #  days, hours, minutes, seconds, milliseconds, microseconds, weeks
            #
            # Example for 1.5 days: sensorAutoReset = dict(days=1,hours=12),
            'sensorAutoReset' : None,
        },

        'spEnable': True,

        'spParams': {
            # SP diagnostic output verbosity control;
            # 0: silent; >=1: some info; >=2: more info;
            'spVerbosity' : 0,

            # Spatial Pooler implementation selector.
            # Options: 'py', 'cpp' (speed optimized, new)
            'spatialImp' : 'cpp',

            'globalInhibition': 1,

            # Number of columns in the SP (must be same as in TM)
            'columnCount': 2048,

            'inputWidth': 0,

            # SP inhibition control (absolute value);
            # Maximum number of active columns in the SP region's output (when
            # there are more, the weaker ones are suppressed)
            'numActiveColumnsPerInhArea': 40,

            'seed': 1956,

            # potentialPct
            # What percent of the columns's receptive field is available
            # for potential synapses.
            'potentialPct': 0.8,

            # The default connected threshold. Any synapse whose
            # permanence value is above the connected threshold is
            # a "connected synapse", meaning it can contribute to the
            # cell's firing. Typical value is 0.10.
            'synPermConnected': 0.1,

            'synPermActiveInc': 0.0001,

            'synPermInactiveDec': 0.0005,

            # boostStrength controls the strength of boosting. It should be a
            # a number greater or equal than 0.0. No boosting is applied if
            # boostStrength=0.0. Boosting encourages efficient usage of columns.
            'boostStrength': 0.0,
        },

        # Controls whether TM is enabled or disabled;
        # TM is necessary for making temporal predictions, such as predicting
        # the next inputs.  Without TM, the model is only capable of
        # reconstructing missing sensor inputs (via SP).
        'tmEnable' : True,

        'tmParams': {
            # TM diagnostic output verbosity control;
            # 0: silent; [1..6]: increasing levels of verbosity
            # (see verbosity in nupic/trunk/py/nupic/research/backtracking_tm.py and backtracking_tm_cpp.py)
            'verbosity': 0,

            # Number of cell columns in the cortical region (same number for
            # SP and TM)
            # (see also tpNCellsPerCol)
            'columnCount': 2048,

            # The number of cells (i.e., states), allocated per column.
            'cellsPerColumn': 32,

            'inputWidth': 2048,

            'seed': 1960,

            # Temporal Pooler implementation selector (see _getTPClass in
            # CLARegion.py).
            'temporalImp': 'cpp',

            # New Synapse formation count
            # NOTE: If None, use spNumActivePerInhArea
            #
            # TODO: need better explanation
            'newSynapseCount': 20,

            # Maximum number of synapses per segment
            #  > 0 for fixed-size CLA
            # -1 for non-fixed-size CLA
            #
            # TODO: for Ron: once the appropriate value is placed in TM
            # constructor, see if we should eliminate this parameter from
            # description.py.
            'maxSynapsesPerSegment': 32,

            # Maximum number of segments per cell
            #  > 0 for fixed-size CLA
            # -1 for non-fixed-size CLA
            #
            # TODO: for Ron: once the appropriate value is placed in TM
            # constructor, see if we should eliminate this parameter from
            # description.py.
            'maxSegmentsPerCell': 128,

            # Initial Permanence
            # TODO: need better explanation
            'initialPerm': 0.21,

            # Permanence Increment
            'permanenceInc': 0.1,

            # Permanence Decrement
            # If set to None, will automatically default to tpPermanenceInc
            # value.
            'permanenceDec' : 0.1,

            'globalDecay': 0.0,

            'maxAge': 0,

            # Minimum number of active synapses for a segment to be considered
            # during search for the best-matching segments.
            # None=use default
            # Replaces: tpMinThreshold
            'minThreshold': 9,

            # Segment activation threshold.
            # A segment is active if it has >= tpSegmentActivationThreshold
            # connected synapses that are active due to infActiveState
            # None=use default
            # Replaces: tpActivationThreshold
            'activationThreshold': 12,

            'outputType': 'normal',

            # "Pay Attention Mode" length. This tells the TM how many new
            # elements to append to the end of a learned sequence at a time.
            # Smaller values are better for datasets with short sequences,
            # higher values are better for datasets with long sequences.
            'pamLength': 3,
        },

        # Don't create the classifier since we don't need predictions.
        'clEnable': False,
        'clParams': None,

        'anomalyParams': {  u'anomalyCacheRecords': None,
    u'autoDetectThreshold': None,
    u'autoDetectWaitRecords': 2184},

        'trainSPNetOnlyIfRequested': False,
    },
}

From what I can understand from the code, the only way this error can occur is because of pickling, or some other serialization method that calls into __setState__ of the model.

Do you think that Spark may be pickling the model as part of its process?

This issue is solved. Thanks for the hint @rhyolight.

Since spark tries to serialize the objects to pass it over the network from driver to executor. I was getting an error when I was trying to use the model object directly in the lambda function (which is executed in the executor). Now I am saving the object at the driver level and then loading it at the executor level. This process takes care of serializing and deserializing the model.

Updated code:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

import json
import datetime

from pkg_resources import resource_filename

from nupic.frameworks.opf.model_factory import ModelFactory
from nupic.frameworks.opf.model import Model
import model_params

_ANOMALY_THRESHOLD = 0.9

model = ModelFactory.create( model_params.MODEL_PARAMS )
model.enableInference( {'predictedField': 'consumption'} )
# save the model to local directory
model.save('/home/docker/saved_models/hotgym/anomaly/spark')

def anomaly_detector(x):
    y = x
    y["timestamp"] = datetime.datetime.strptime(x["timestamp"], "%m/%d/%y %H:%M")

    # load the saved model from local directoty
    saved_model = Model.load('/home/docker/saved_models/hotgym/anomaly/spark')
    # run the model
    result = saved_model.run(y)
    # get the anomaly score from the result
    anomalyScore = result.inferences['anomalyScore']
    # save updated model back to local directory
    saved_model.save('/home/docker/saved_models/hotgym/anomaly/spark')
    return anomalyScore

sc = SparkContext( appName = "PythonSparkStreamingKafka" )
sc.setLogLevel( "WARN" )

ssc = StreamingContext( sc, 60 )

kafkaStream = KafkaUtils.createStream( ssc, '<ZKQuorum>', 'pyspark-testing-group-1', { 'pyspark-testing': 6 } )

parsed = kafkaStream.map( lambda x: json.loads(x[1]) )

anomalies = parsed.map( anomaly_detector )

anomalies.pprint()

ssc.start()

ssc.awaitTermination()
2 Likes

I am curious if the load and save methods defined in nupic.frameworks.opf.model. Model will be able to read and write to a remote HDFS directory. Any comments @rhyolight?

1 Like

It should work it it obeys file system rules, I guess.

1 Like

Will definitely try it and post the findings here. Thanks for the help. Appreciate it.

1 Like

It is not able to save / load model to HDFS directory. It is because save and load methods in the Model class requires an absolute path. If the path is not absolute it throws an AssertionError.

See the following output:

Traceback (most recent call last):
  File "/home/docker/nupic/examples/opf/clients/hotgym/anomaly/hotgym_anomaly_with_spark.py", line 20, in <module>
    model.save('hdfs://dfw-ch-098-cappblx-128-005.prod.walmart.com:8020/tmp/testing')
  File "/usr/local/lib/python2.7/dist-packages/nupic-1.0.6.dev0-py2.7.egg/nupic/frameworks/opf/model.py", line 349, in save
    self.__makeDirectoryFromAbsolutePath(saveModelDir)
  File "/usr/local/lib/python2.7/dist-packages/nupic-1.0.6.dev0-py2.7.egg/nupic/frameworks/opf/model.py", line 451, in __makeDirectoryFromAbsolutePath
    assert os.path.isabs(absDirPath)
AssertionError
1 Like

That is unfortunate. We did not design NuPIC models to be serialized to streams. And we are no longer working on new features. I think I may need to defer you to htm.core, the community fork. Their models might have more flexibility for serialization. You can ask here: #engineering:community-fork

1 Like