Implementing HTM using Kafka Streams

Looking through the samples I see all batch processes. Are there samples of using NuPIC/HTM with a continuous streaming datasource like Kafka? I have keyed / timestamped scalar data for which Im hoping to deploy an anomaly detection process. The output / results of the “hot gym” sample seem like a good starting point but it appear to be a pull instead of a push process. Even the Flink samples are essentially batch jobs.

Looking through the docs I see examples like this which make anomaly detection seem almost trivial. Unf this sample appears to use a version of the API somewhat behind whats out there now.

In any event - looking for pointers, references, etc.

Lastly - perhaps Numenta has a professional services group that might be willing to contract to answer some questions?

Hi @phil_d_cat,

I’m not familiar with Kafka, but HTM.Java’s base NetworkAPI (NAPI) is implemented using RxJava for precisely that reason - I wanted a framework that data could be “pushed” into, and not the typical pull-based IO. Sensors are primarily “Observables” (Publishers) and the NAPI emits through an Observable whose return type is an Inference Object encapsulating the state and data at the time of that emission.



Also, HTMs don’t do “batch processing” - they only process streaming data, using the more common meaning of “batch” which I believe means that the system “reacts” the same way for each input. Instead of feeding in a chunk of the dataset and then after an “epoch” of some length, calculations are performed (batch); HTMs do their calculations upon each datapoint they receive, and make no distinction between online performance and learning performance because they don’t required training in a separate phase (that treats the internal data manipulations differently) - and so therefore are online learners. Just an FYI… :wink:

1 Like

That’s great to hear! Would you recommend following the “template” of the hot gym model or is there a better design pattern to use that would suit this project?

The reason I asked about batch processing is that it seems the hot gym reads a csv file and exits when the file is finished. The triggers seem to be the Subscriber#onNext fn. How would I turn this around so my datasource trigger would call an equivalent function?

I guess Im having issues following an A-Z of how to implement. Ive also been trying to convert the Flink demo(s) - perhaps this would make more sense.

1 Like

Hi @phil_d_cat

I worked up this Gist for some example code of using the NAPI with a Publisher for pushing data into the Network. Please have a look at the tests for variations and other goodies like persisting the network (saving) etc.

The Network has two ends. The output and the input. The input is either connected to a Sensor that handles input automatically (such as a FileSensor or URLSensor), or a Publisher (ObservableSensor) which can be used to “push” data into the network.

When using a sensor, one “subscribes” to it in order to receive output. The handler’s onNext(Inference i) method is called after every compute cycle. It must be remembered that this is an asynchronous operation which means that the cycle the handler is currently working on may not be the cycle that was most recently entered.

Notice that “subscribing” to the network in the Gist example, involves two calls:

1.) Observable observable = Network.observe(); // This observable can be used in many wonderful ways such as combined with other Observables to do functional operations! see this Marble example of Observable usage for and example!

…second step

2.) Subscription sub = observable.subscribe(Subscriber or Observer or Action)

In the tests, these steps are collapsed, but you can actually do a lot of different things with the Observable, defining whatever functions you want, before subscribing to it!

The NAPI also has a “synchronous” method that can be called where the Network returns an Inference as a response to the call to Network.computeImmediate() (not recommended for streaming inputs). Don’t use this. :slight_smile: …at least not for your purposes.


Hey Phil, I’ve worked with the HTM engine ( which sets up several services that listen to data streams, HTM models are swapped into memory from disk when new data comes in (and out when the data stops). While there’s not a service explicitly written for Kafka processing that I know of, it wouldn’t been to hard to write one. For example, I’ve taken Austin’s skeleton HTM engine app, and fleshed out this simple web service ( for a production app before. So each time you consume from Kafka, there would be some calls to the “API” that HTM engine provides. (Here’s a short video where I talk about some work I did with HTM engine Would be happy to discuss further.


I’ve written up a test using the Akka/Akka Streams framework in Scala (along with 0.6.13). Akka stream nodes are push and pull based. I can give you snippets if you’re interested - you would have to be familiar with Scala, though. There is a Java API as well, though I haven’t used it.

The basic approach to the actor is to wrap computeImmediate(..) in the receive method of the actor, under the case when an envelope from our Kafka topic is sent. The result from this is then “asked” back to the sender. An actor is single threaded and guarantees order so each actor processes receive synchronously.

The actor constructor can then be wrapped up to create a Future object, the Future can then be wrapped into a Flow object, and then the flow can be wired into a Graph by setting up the Source and Sink. These can be built without Kafka (for test purposes) or wired up to Kafka for production.

1 Like

If you use the Network API you have the ObservableSensor that’s suited to that task.

I’ve used that sensor in this experiment:

1 Like