The last few weeks I have been busy with the HTM.core package to make a nice HTM model.
I didn’t quite get the results I wanted on the dataset and for that reason I am now trying to make a good model on an almost perfect sine function (with a flat line). However, the model does not predict the anomaly, the anomaly score does not go up and likelihood does not change in the pattern. In the meantime, I’ve tried a few things, but can’t get it to work properly.
Does anyone have an idea what I should change about the approach to get it working properly?
This is the code I used, most of it is from the hotgym example.
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from htm.bindings.sdr import SDR, Metrics
from htm.encoders.scalar_encoder import ScalarEncoder, ScalarEncoderParameters
from htm.bindings.algorithms import SpatialPooler
from htm.bindings.algorithms import TemporalMemory
from htm.algorithms.anomaly_likelihood import \
AnomalyLikelihood # FIXME use TM.anomaly instead, but it gives worse results than the py.AnomalyLikelihood now
from htm.bindings.algorithms import Predictor
x = np.linspace(0, np.pi*80, 8000)
y = np.sin(x) + 1
df = pd.DataFrame(data=zip(x, y), index=range(len(x)), columns=['x','y'])
# making the anomaly
df.iloc[6000:6100,:] = 0
print(df.shape)
parameters = {
'predictor': {'sdrc_alpha': 0.05},
'sp': {
'columnDimensions': (2048, )
},
'tm': {
'columnDimensions':(2048, )
},
'anomaly': {
'likelihood':
{
'learningPeriod':288, # if None it will be calculated later, else value (indication is 500 for 5-min interval)
'historicWindowSize':4000, # default of 8640 is a month's worth of history at 5-minute intervals
'probationaryPct': 0.1,
'reestimationPeriod': 200,
'estimationSamples':100} # how often we re-estimate the Gaussian distribution
}
}
#when changing the encoders
par = ScalarEncoderParameters()
par.activeBits = 41
par.minimum = -1
par.maximum = 3
par.size = 2048
scalarEncoder = ScalarEncoder(par)
encodingWidth = (scalarEncoder.size)
enc_info = Metrics([encodingWidth], 999999999)
spParams = parameters["sp"]
sp = SpatialPooler(
inputDimensions=(encodingWidth,),
columnDimensions=spParams["columnDimensions"])
sp_info = Metrics(sp.getColumnDimensions(), 999999999)
tmParams = parameters["tm"]
tm = TemporalMemory(
columnDimensions=tmParams["columnDimensions"])
tm_info = Metrics([tm.numberOfCells()], 999999999)
step = 5
# setup likelihood, these settings are used in NAB
anParams = parameters["anomaly"]["likelihood"]
if anParams.get("learningPeriod") is None:
probationaryPeriod = int(math.floor(float(anParams["probationaryPct"]) * df.shape[0]))
learningPeriod = int(math.floor(probationaryPeriod / 2.0))
else:
learningPeriod = anParams["learningPeriod"]
anomaly_history = AnomalyLikelihood(learningPeriod=learningPeriod,
estimationSamples=anParams["estimationSamples"],
reestimationPeriod=anParams["reestimationPeriod"],
historicWindowSize=anParams["historicWindowSize"])
predictor = Predictor(steps=[1, step], alpha=parameters["predictor"]['sdrc_alpha'])
predictor_resolution = 0.1
inputs = []
anomaly = []
anomalyLikelihood = []
log_anomalyLikelihood = []
predictions = {1: [], step: []}
for count, record in enumerate(y):
# Call the encoders to create bit representations for each value. These are SDR objects.
consumptionBits = scalarEncoder.encode(record)
# Concatenate all these encodings into one large encoding for Spatial Pooling.
encoding = consumptionBits
enc_info.addData(encoding)
# Create an SDR to represent active columns, This will be populated by the
# compute method below. It must have the same dimensions as the Spatial Pooler.
activeColumns = SDR(sp.getColumnDimensions())
# Execute Spatial Pooling algorithm over input space.
overlaps = sp.compute(encoding, True, activeColumns)
sp_info.addData(activeColumns)
# Execute Temporal Memory algorithm over active mini-columns.
tm.compute(activeColumns, learn=True)
tm_info.addData(tm.getActiveCells().flatten())
# Predict what will happen, and then train the predictor based on what just happened.
pdf = predictor.infer(tm.getActiveCells())
for n in (1, step):
if pdf[n]:
predictions[n].append(np.argmax(pdf[n]) * predictor_resolution)
else:
predictions[n].append(float('nan'))
anomaly_Likelihood = anomaly_history.anomalyProbability(record, tm.anomaly)
anomaly.append(tm.anomaly)
logAnomalyLikelihood = np.log(1.0000000001 - anomaly_Likelihood) / -23.02585084720009
anomalyLikelihood.append(anomaly_Likelihood)
log_anomalyLikelihood.append(logAnomalyLikelihood)
predictor.learn(count, tm.getActiveCells(), int(record / predictor_resolution))
# Print information & statistics about the state of the HTM.
print("Encoded Input", enc_info)
print("")
print("Spatial Pooler Mini-Columns", sp_info)
print(str(sp))
print("")
print("Temporal Memory Cells", tm_info)
print(str(tm))
print("")
THRESHOLD = 0.2
df['anomaly'] = log_anomalyLikelihood
# Shift the predictions so that they are aligned with the input they predict.
for n_steps, pred_list in predictions.items():
for x in range(n_steps):
pred_list.insert(0, float('nan'))
pred_list.pop()
# Calculate the predictive accuracy, Root-Mean-Squared
accuracy = {1: 0, step: 0}
accuracy_samples = {1: 0, step: 0}
for idx, inp in enumerate(df.y):
for n in predictions: # For each [N]umber of time steps ahead which was predicted.
val = predictions[n][idx]
if not np.isnan(val):
accuracy[n] += (inp - val) ** 2
accuracy_samples[n] += 1
for n in sorted(predictions):
accuracy[n] = (accuracy[n] / accuracy_samples[n]) ** .5
print("Predictive Error (RMS)", n, "steps ahead:", accuracy[n])
print("Random guess, mean temperature:")
print(mean_squared_error(df.y, [np.mean(df.y)]*df.y.shape[0])**0.5)
df_t = df[df.anomaly >THRESHOLD]
fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(go.Scatter(x=df.index, y=df.y, name='Sinus'), secondary_y=False)
fig.add_trace(go.Scatter(x=df_t.index, y=df_t.y, mode='markers', name='Anomaly'), secondary_y=False)
fig.add_trace(go.Scatter(x=df.index, y=predictions[1], name='prediction one step ahead'), secondary_y=False)
fig.add_trace(go.Scatter(x=df.index, y=predictions[step], name=f'prediction {step} step ahead'), secondary_y=False)
fig.add_trace(go.Scatter(x=df.index, y=anomaly, name='Anomaly score TM'), secondary_y=True)
fig.add_trace(go.Scatter(x=df.index, y=anomalyLikelihood, name='Anomaly Likelihood'), secondary_y=True)
fig.add_trace(go.Scatter(x=df.index, y=log_anomalyLikelihood, name='Log Likelihood', line_color='#ffe476'), secondary_y=True)
fig.add_trace(go.Scatter(x=df.index, y=np.array([THRESHOLD]*df.shape[0]), name='Threshold'), secondary_y=True)
fig.update_layout(autosize=False, width=1000, height=500)
fig.update_yaxes(
title_text = "Sinus",
title_standoff = 25,
secondary_y=False)
fig.update_yaxes(
title_text = "Anomaly score",
title_standoff = 25,
secondary_y=True)
fig.show()