Making a Synchronous call to the HTM Engine



What is the best architecture to build a Synchronous Anomaly engine.
I would like to have a solution where the HTM engine resides in a container which receives messages via two pipes.

I do not have control over the container except the callbacks where I retrieve messages from.
I can Initialize the engine in the Initialization steps of the container.

The container now receives messages which I handle via a callback method

void ProcessMessage(String message, context ctx, session session){

//process message here

– Retrieve Timesamp
– Retrieve value

// Invoke engine here

– call ???

// Retrieve Inference

–Inference inf

– retrieve anomaly from inference
– prediction from inference
– prepare out put object
session.write (output)

Help appreciated



Hi Madhan,

Your call to the engine does not have to be synchronous necessarily, it can be done both ways.

Asynchronous (through callback):

//Create the PublisherSupplier - we'll need this to create our Sensor (instead of the Publisher)
PublisherSupplier supplier = PublisherSupplier.builder()
    .addHeader("timestamp, value")
    .addHeader("datetime, float")
    .addHeader("T, B") //see for more info

//Create the ObservableSensor; note the use of the PublisherSupplier instead of the Publisher
// Notice the input type is the type specified on the ObservableSensor == String[]
Sensor<ObservableSensor<String[]>> sensor = Sensor.create(ObservableSensor::create, SensorParams.create(
    Keys::obs, new Object[] {"", supplier}))

// Get the parameters to use... 
Parameters p = ...

// Create your Network
Network network = Network.create("Network API Demo", p)         // Name the Network whatever you wish...
    .add(Network.createRegion("Region 1")                       // Name the Region whatever you wish...
        .add(Network.createLayer("Layer 2/3", p)                // Name the Layer whatever you wish...
            .add(new TemporalMemory())                         
            .add(new SpatialPooler())
            .add(sensor));                                      // created above

// Create the handler to process callback calls
network.observe().subscribe(new Observer<Inference>() {
    @Override public void onCompleted() {}
    @Override public void onError(Throwable e) { e.printStackTrace(); }
    @Override public void onNext(Inference output) {

            // Retrieve Inference 
             --Inference inf
             -- retrieve anomaly from inference
             -- prediction from inference
             -- prepare out put object
             session.write (output)

Then inside of your processMessage() method:

// Use the Publisher to input data into the Network...
//Get the Publisher from the PublisherSupplier - we'll use this to feed in data just as before
Publisher publisher = supplier.get();

void ProcessMessage(String message, context ctx, session session) {

    //process message here
        publisher.onNext("7/2/10 0:00,21.2");        // The date pattern here is set via the Parameters
        publisher.onNext("7/2/10 1:00,34.0");
        publisher.onNext("7/2/10 2:00,40.4");
        publisher.onNext("7/2/10 3:00,123.4");

You need to get the parameters which are considered to be the best default parameters for Anomaly Detection which were discovered to be good for almost all anomaly use cases so far… (almost all).

I will gather the parameters and put them in code form to make them easier to use - in a bit… I’m still waking up…

Try the asynchronous approach first… I will continue the “synchronous” approach in my next message in a few hours… (I’m on vacation right now :-P)



Thanks David.
You really need to take that vacation seriously :grinning:
Remind me of my vacations where is used to work on work stuff that I wanted to work on rather than needed to work on :slight_smile:

This was my first approach,
The downside being “session” object is available only on the “ProcessMessage” callback and not on the initialization.

Please enjoy your time off. I can wait for your Synchronous approach.
Meanwhile … I’ll continue to look :slight_smile:

Thanks so much!!!



Thanks for understanding :slight_smile:

Take a look at the synchronous test while you wait for me to put together an example…

… it uses the computeImmediate() method on Network.




Fixing a build error I found… then I’ll put together an example. How’s it going, were you able to get anything together yourself using the test as an example?


Hi David,

Appreciate your help. ComputeImmediate worked. I had tried using it in the past and it threw some exceptions.
I’m afraid I tried so many things that I don’t have a history of that now :frowning:

I tried it again now with a closer look at the Test
And … I made some good progress. I’m seeing the anomaly scores :smiley:
Simplified code below…
in the Initialize() method I create a network

public  static Network createNetwork(Object key) {
    Parameters p = SensorParameters.getParameters();
    p = p.union(getEncoderParams());

    // This is how easy it is to create a full running Network!
    return Network.create("Network API Demo", p)
            .add(Network.createRegion("Region 1")
                    .add(Network.createLayer("Layer 2/3", p)
                            .alterParameter(Parameters.KEY.AUTO_CLASSIFY, Boolean.TRUE)
                            .add(new TemporalMemory())
                            .add(new SpatialPooler())


In Process Message – This worked.

  ProcessMessage(message, yada, yada){
            String[] array = message.split(",");
            SimpleDateFormat dt = new SimpleDateFormat("mm/dd/YY HH:mm");
            Date arrDate = new Date(Long.parseLong(array[0]));
            String arrDateStr = dt.format(arrDate);

            String messageStr = arrDateStr+","+array[1];
            log.debug("Input into Network --> " + messageStr);

            Map<String, Object> multiInput = new HashMap<>();
            multiInput.put("timestamp",new DateTime(Long.parseLong(array[0])));
            Inference inference = network.computeImmediate(multiInput);

            System.out.println("AnomalyScore: "+ inference.getAnomalyScore());
            System.out.println("ProbableValue: "+ inference.getClassification("consumption").getMostProbableValue(1));
            System.out.println("timestamp: "+ arrDateStr);

Thanks so much.




You’re very welcome! Congratulations on your progress!

I found a small problem with the last release (build file had old version in one place). So I’m going to have to do another release. You might want to update your repo, once this is done…

Also I will get back to you with the “optimum” parameters for anomaly detection… Just give me a few…



No worries … take your time. At this rate you’ll need a vacation from your vacation :wink:



Actually, that was very funny! :smile: I really needed some time though to put all my nagging issues behind me, so this has been a good vacation. This way I’ll be able to start work without thinking about this.



Here’s a DropBox link to a parameters file that should get you most of the way there. Parameters are a try-as-you-go kind of deal, so I can’t tell you exactly what parameters to use, but these are retrieved from NuPIC’s best anomaly parameters. Numenta’s engineers settled on these because they started to see that they worked very well across a modest set of input variations. Of course, I don’t work for Numenta so I can’t tell you exactly what the story is/was - but that’s what I remember from observed conversations. They have a Python method which is used to retrieve them, and you can take a look for yourself, here.

Anyway, usage of the linked Parameters source file is done via the entry method, NetworkParameterHarness.getNetworkParams(). If you know the range of your data values, you could opt to use a ScalarEncoder, otherwise use the RandomDistributedScalarEncoder - the choices are illustrated in the file.

I can’t really help you beyond this as I’ve spent 99% of my time building the Java version of NuPIC and not using it… which could seem kind of odd…

Good luck!

P.S. Don’t forget to fork the latest HTM.Java version (v0.6.11-alpha)

Kirk out…


Thank you … Thank you.