Input source for other than CSV file



Hello. Just started working on HTM and exploring the library @cogmission

Is there a way to load data into the Network API other than a CSV file (or any other file for that matter). Can I pass entire records as a string value, and iteratively feed into the Network object?

Use case: I’m subscribing to a pub/sub message broker, and data comes as a single record at a time. I would like to keep pushing these values to the model for anomaly detection as they come (without pushing these records to a file first of course).

Any pointers/suggestions?
Thank you.


Hi @styagi,

Welcome to the HTM forums!

I’d suggest you take a look at flink-htm, where is used in conjunction with Apache Flink to process streaming data. There’s a link to a demo on youtube in its wiki page, and here’s the canonical “Hotgym” example as a flink job.

Depending on what you’re using as a queue, there might already be a flink sink available which would let you reuse a lot of this project.

This example is listed alongside others at Other HTM Projects if you’re curious about other similar community projects.


Thank you @jimmyw, I’ll take a look at it.

But just to keep things simple, I’ve been looking at the network.compute(T input) method. I’m testing the network API with the single inputs from data in rec-center-hourly.csv.

Can someone show me an example of how to use this method? I tried looking through the docs but that didn’t help.

If this compute method works, that will be sufficient given my use case compared to distributed stream processing done in flink-htm.


No worries, in I think you’ll need to use a Publisher to do this.

Try looking at this test as an example.


Thank you for the help! That worked just right for me.

I’ve been busy looking further into the API ever since.

Using the, I used the publish.onNext(String s) to manually input CSV strings into the network API. Over the course of my program execution, I called this onNext(String s) method roughly 23000 times (i.e, for unique 23000 CSV strings). As shown in the plot, I’m calculating the execution time to generate an inference for each of the record index (like time taken to generate anomaly score for the first record is index 1, the second record is index 2 and so on). I expected a line more or less parallel to the x-axis, but as seen, the execution time to calculate an inference increases with increasing record indexes. used reactiveX to implement Observable, so I tried increasing the buffer size to accommodate 32000 items (which I assumed would work given I have only 23000 data points) in rx.subjects.ReplaySubject’s createWithSize(int capacity) method, which is called in Publisher’s build() method (by default it starts with a capacity of 3). This did not work for me.

Does anyone have a clue as to why the execution time to generate inferences (specifically the anomaly score) increasing as more records are passed through the Publisher object into the HTM network @jimmyw @cogmission ?executiontime


Hi @styagi, looks like you’re covering some good ground so far.

If it’s occurring in the anomaly score, the only thing I can think of off the top of my head would be to check that you’re using a moving window (check useMovingAverage flag).

The quickest way to hone in on where the CPU cycles are being spent would be to attach a profiler.


@jimmyw, all right I’ll try to do that. I tried to look for this useMovingAverage flag you mentioned, but not able to find it. Could you please point me out in the direction where I should look for this? Thanks again!



Could you post a snippet from your code where you instantiate the network and the anomaly detector? The flag I referred to is here in the base Anomaly class so it would depend which implementation you picked. Also that was just wild speculation, but if you post your code could help.

Having once been a java developer I can definitely recommend being comfortable and familiar with using a profiler, so definitely continue down that path regardless!


Code with HTM n/w api and anomaly score logging

@jimmyw , I’ll definitely use a profiler. In the meantime, here’s the GitHub link to my repository that I’m running. Here’s the quick summary as well:
The Network API is started in the runHTMNetwork() method. The Publisher object manualpublish enters CSV strings with onNext(String s) in explicitFileRead() method. (Note: I’ve deliberately controlled the input message rate to 10 message/second in the explicitFileRead with sleep for 100 millseconds).

I keep a timestamp of entry for the publisher object + corresponding index number, and write this data to ‘executiontime.txt’. I also write the anomaly score to file ‘htmsample.txt’. Using the index number and input value in both these files, I calculate the time taken to execute for each record in the dataset.

Looking at the code, can you see what could be causing the increasing latency wrt the increasing indexes?


Thanks @styagi,

Where you’d try my suggestion would be line 80, where you’d pass in a params object like so), with KEY_USE_MOVING_AVG set to true.

Again, my best educated guess at the cause - profiler results will be much more helpful.


Hey @jimmyw @cogmission !

I made the corresponding changes to the Anomaly to accommodate moving average with a window size of 10. This did mess up my anomaly score but that’s not a concern for now. I’m basically interested in the performance.

As you can see from the plot, using moving average had no effect on the execution time as records keep streaming into the him network. The performance using Publish for manual inputs to calculate anomaly score did not change from previous case.

Like you suggested, I used a profiler to see what was going on. Since only time is my concern, I tried to see individual method statistics like the one attached. I see the data, but not sure what to do about it or where else to go from there.

It looks to me that I’ve hit a dead-end. Any suggestions where to go from here?

What I’m trying to achieve is to evaluate the latency to calculate anomaly score for inputs, which I initially assumed would ideally be a horizontal line (parallel to the x-axis as processing time to process each individual record should be a constant…again, this is the assumption). I’m trying to see if this can be fixed to obtain that near-horizontal line in HTM, or be able to explain as to why this execution behavior is observed.

I read that average time to compute a record in nupic is roughly 5ms. I start with that, but the processing time gradually keeps increasing as more records are being processed.movingAverage


If you can supply the file htmsample.txt, I’ll take a look. Might need to put it on dropbox, or zipped in your git repo if it’s not too large.


@jimmyw, I just uploaded the files on Github

The data corresponds to using the default Anomaly, and not the moving average (which is the same in either case).
The data in htmsample.txt is index #, input value, anomaly score, timestamp.
The data in executionTime.txt: index #, input value, timestamp.

Using the index # and input value in both, I calculate the time by subtracting timestamp (htmsample ts - executionTime ts).


Actually after a closer look, it seems to be erp.log that is being read in.


Yes. The input CSV strings that I’m feeding to the him come from this file. It contains a collection of multiple metrics but I’m only collecting two: date time and the input value I need to feed. It’s essential that I do not preprocess this log data into csv file and put that as argument in Network API.

I read one record at a time and push it to the publish object. Fyi, I’ve uploaded a sample of rep.log containing 1000 records if you need to see what it looks like. Each metric is separated by broken bar (¦). You’ll find it here

The log file is just a means to store, collect and feed data to HTM. This could very well be a database or a pub/sub broker mechanism.

I think it will be very useful if we could figure out the performance latency was attributed to either of the two: the regions, layers and anomaly algorithm in HTM, OR
the mechanism we are using to feed data into HTM, i.e., the Publish object.

What do you think @jimmyw @cogmission ?


Hi @styagi, Hi @jimmyw!

One very brief comment. There is an optimization I’ve been dying to do within the Sensor code, which is where an index is being added to each line and a String[] composed, which I think removing would boost the performance by a lot! This was done so that parallelization would work for processing sensor input. It really isn’t being run that way and that feature doesn’t really have a high priority at the time so I think it would be great to remove it and get the main input flow sped up.

My “day job” has been taking a lot of my time and energy, and I haven’t been giving HTM.Java much recent love (which I hope to eventually get back around to doing), but if anyone else wants to take on that project I think the performance would be greatly improved?

Anyway, thank you @jimmyw for attending to questions about HTM.Java in the forum, and thank you @styagi for your interest! Though my time at present is severely limited, let me know if I can help and I’ll try my best, though it won’t be prompt at the moment.


Hi @styagi,

I was hoping to check this out on the weekend but didn’t get around to it, hopefully tonight I’ll be able to take a look.

Because of the use of a lambda into the rx libraries, it’s not immediately obvious where exactly the end penalty is, so my plan is just to drill down a little further into the profile.



I seem to be getting very different results to you, my executionTime output looks like this:


The erp.log file only has 1000 lines, after your filter there are only about 800 so I don’t think I’m getting your full dataset.

Anyway, how are you measuring your latency from that data? If I use the third column, the difference between each value and the last is very close to the thread sleep time of 100ms, generally under 105ms and never more than 113ms:


(y axis is latency, x axis is record index)

My java profiler showed 97% wall clock time spent in Thread.sleep, and certainly my CPU was almost idle the whole time, even with the profiler running.

I’m on a Windows 10 laptop with an i7-6600U CPU, 16GB RAM and run-of-the-mill SSD.


Hey @jimmyw! Thank you for looking into this.

Sorry about the erp.log dataset, I included only a subset. I just uploaded the 23000 dataset that I’ve been using to generate my results on Github in the same directory. You’ll find the entire dataset there now.

The latency calculation in my code is as follows:
-The input string value is manually pushed to the HTM network with publish.onNext(String s) method call. At this point, I log down the index #, the input value and the current timestamp (ts1). This part is present in explicitFileRead() method in I write this data to executionTime.txt.(

-In the writeToFileAnomalyOnly(args…), called from getSubscriber(args…), I fetch the record index #, the input value, anomaly score and append the current timestamp (ts2). I write this data to htmsample.txt.

-Using the data points from above two files, corresponding to the index # and input value (which is present in both files), I calculate the latency by (ts2 - ts1).

Upto 10,000 data points, the latency lies in the ballpark of 100ms if not lower (you’ll find this processed data that I generated from the two files at The latency keeps increasing as we keep traversing to more data points.

But I’ve read from benchmarks on NAB that it usually stays at around 5 ms. Compared to this number, we’re way too high I think. And this is basically what has me so bothered.

Can you please do your tests with the updated erp.log and share you findings? Thanks again James.


OK, with the full dataset I’m seeing a bit more latency appear after the 10000 mark, not quite the curve you’re seeing though.


I’ll take a look at what the profiler is saying.