Htm anomaly values differs in two project

I’m trying to integrate HTM in my FLINK instance.
Initially I implemented HTM in a simple Java project, just to learn and to deal with my dataset and I obtained great results.

Later I used Flink-HTM library from Eron Wright, but I obtain no output (although I have no errors, I don’t know how solve this “invisible” error). I know that the only one expert in Flink-HTM is Eron but he doesn’t answering to my emails so today I tried to implement HTM in Flink in java-fashioned style using program already implemented as a template.
Note that I used same network configurations and exactly same dataset, but with simple java project I obtain good results (initially a 1.0 series, then values goes down to about 0 gradually) while in my “Flink porting” I obtain ALWAYS 1.0 values. I’m absolutely sure that in Flink different subsequent values are considered.

Here the code used in my flink porting:

DataStream<Tuple2<Double,Double>> mappaStream = MapFunction<Tuple6<String, String, Date, String, String, Double>, Tuple2<Double,Double>>(){ 
			public Tuple2<Double,Double> map(Tuple6<String, String, Date, String, String, Double> value) throws Exception { 
				 Map<String, Object> multiInput = new HashMap<>();
				 multiInput.put("timestamp", new DateTime(value.f2));
			         multiInput.put("dayOfWeek", value.f5);
				 Network network = Harness.AnomalyNetwork.createNetwork();

				 Inference inference = network.computeImmediate(multiInput);
				 return new Tuple2<Double,Double>(value.f5, inference.getAnomalyScore());

The network is derived from the HotGym example described within HTM examples.

Hi @Andrea_Giordano,

There was a bug that existed prior to Flink-HTM being developed that was solved regarding Anomaly detection. You probably only need to update any build files which specify the version of HTM.Java, within Flink-HTM, and make sure it points to the latest version?

Hi cogmission, are you referring to the problem I encountered with Flink-HTM or with the one with my Htm porting in Flink?

Hi @Andrea_Giordano,

I’m just talking about Flink-HTM I believe.

I found a jar version 0.6.8 in Maven dependencies, so I downloaded an updated version jar 0.6.13 and loaded. Anyway no good news about that:
I had to change some methods’ name (I suppose they are updated now) but now (using Flink-HTM) I obtain:

java.lang.IllegalStateException: KEY.AUTO_CLASSIFY has been set to "true", but KEY.INFERRED_FIELDS is null or
empty. Must specify desired Classifier for at least one input field in
KEY.INFERRED_FIELDS or set KEY.AUTO_CLASSIFY to "false" (which is its default
value in Parameters).

but KEY.AUTO_CLASSIFY is set TRUE in the network provided in example while I don’t know how to infer a key…

I tried to come back to jar 0.6.8 and effectively I have no more the previous error.

1 Like

Hi @Andrea_Giordano,

This isn’t a problem. There was an additional parameter added to accommodate the ability to infer more than one field at a time (a new feature not included in Flink-HTM’s configuration).

Take a look at this test to see how to specify the field(s) you’d like to infer… Including that entry in the Parameters should do the trick.

1 Like

thank you,
do you have an example where is a template for the method “getInferredFieldsMap”? Especially about the CLAClassifier


Question: If a reference is not located within the class file itself, then what mechanism is used to refer to “external resources” ?

Do you mean a library?

Anyway I found an implementation of getInferredFieldsMap in in examples.

public static Map<String, Class<? extends Classifier>> getInferredFieldsMap(String field, Class<? extends Classifier> classifier) {
		Map<String, Class<? extends Classifier>> inferredFieldsMap = new HashMap<>();
        inferredFieldsMap.put(field, classifier);
        return inferredFieldsMap;

Unfortunately still no output is generated.

P.S.: The fact I have no output could depends from a wrong configuration of the network? Honestly I’m exploring every class of the library but I don’t know where could be the “error”…

Hi @Andrea_Giordano,

Yes, I wanted you to find the implementation via the “import” statements.

Without more to go on, I can’t really even attempt to help you? Can you printout or peek inside of Flink to see if there anomalies being emitted by HTM.Java inside of the Flink implementation? Where do things inside of Flink fail? Try and locate where things don’t act like they do outside of Flink?

Ok, I think I detected the problem. I write here hoping someone expert with flink can help me to solve it.
Essentially, the overrided select() function designed for print the anomaly score is essentially never called in flink-htm library.

DataStream<Double> result = HTM.learn(kafkaStream, new Harness.AnomalyNetwork())
			.select(new InferenceSelectFunction<Harness.KafkaRecord, Double>() {

                public Double select(Tuple2<Harness.KafkaRecord, NetworkInference> inference) throws Exception {
					return inference.f1.getAnomalyScore();

The above text is the template of the code used to perform anomaly detection.
I verified that internally the anomaly score is correctly computed. Following the call hierarchy I found that in HTMStream class the method .map() of InferenceSelectMapper class, which call the highlighted select() method, is never called so I suppose the subsequent dataStream in main class is empty and no one output is printed.
The InferenceSelectMapper class and the considered map function:

private static class InferenceSelectMapper<T, R> implements MapFunction<Tuple2<T, NetworkInference>, R> {

	private static final long serialVersionUID = 7519531173989055436L;
	private final InferenceSelectFunction<T, R> inferenceSelectFunction;

    public InferenceSelectMapper(InferenceSelectFunction<T, R> inferenceSelectFunction) {
    		this.inferenceSelectFunction = inferenceSelectFunction;

    public R map(Tuple2<T, NetworkInference> value) throws Exception {

I tried for days to fix the issue but I have no solutions for that. This is the only one thing I miss to close my work :frowning: