Indeed, but I’m trying to test within a deterministic environment to reduce the complexity I must account for. Once deterministic environments can be mastered, I love on to non deterministic ones
Do you have a specific question about it?
Are you using neural networks in this model, if so what is the type of NN?
The encoder is an autoencoder, the predictor is the same except it’s decoder error metric is the next state, The last model is the hardest, but I’m also envisioning a few forward encoder with some specialized decoder.
So very simple autoencoders, basically.
Oh ok ok. I got it
I managed to remake it, its not quite identical but is equally flawed, I cant get this to work properly.
still, if you squint, you might be able to see some semblance of “behavior”
here’s the code, please, excuse the bad code quality, this only needs ketchup to be a complete spaghetti. it requires numpy and pygame to run.
import pygame
import numpy as np
from random import random, choice, seed
from math import sin, cos, pi
from numbers import Number
np.random.seed(0)
seed(0)
# some syntatic sugar for projecting and training synapses
# layer1 >> layer2 ----> copy layer1 into layer2
# layer1 >> synapses >> layer2 ----> synapses.project(layer1, layer2)
# layer1 << synapses << layer2 ----> synapses.project(layer1, layer2, backwards=True)
# layer1 >> synapses * factor << layer2 ----> synapses.hebbian_update(layer1, layer2, factor)
# layer1 << synapses * factor >> layer2 ----> synapses.hebbian_update(layer1, layer2, -factor)
def lerp(a, b, t):
return a + (b - a) * t
class Synapses:
def __init__(self, inputs, outputs, initial_sparsity=0.1):
self.weights = (np.random.sample((inputs, outputs)) < initial_sparsity).astype(np.int8)
self.delegated_val = 1
def __mul__(self, other):
if not isinstance(other, Number):
return NotImplemented
self.delegated_val = other
return self
def _check_sizes(self, inputs, outputs):
if not inputs.size <= self.weights.shape[0] or not outputs.size >= self.weights.shape[1]:
raise ValueError(f'imcompatible layers size {inputs.size, outputs.size}')
def project(self, inputs, outputs, backwards=False):
self._check_sizes(inputs, outputs)
if backwards:
inputs.values += self.weights[:, outputs.winners].sum(axis=1)
else:
outputs.values += self.weights[inputs.winners, :].sum(axis=0)
def hebbian_update(self, inputs, outputs, factor=1):
self._check_sizes(inputs, outputs)
self.weights[inputs.winners[:, np.newaxis], outputs.winners] += factor
class SynapsesSyntaticSugar:
def __init__(self, l_activation, synapses, previous_shift):
self.l_activation = l_activation
self.synapses = synapses
self.previous_shift = previous_shift
self.factor = 1
def __rshift__(self, other):
if not isinstance(other, Layer):
return NotImplemented
if self.previous_shift == 'rshift':
self.synapses.project(self.l_activation, other)
return other
if self.previous_shift == 'lshift':
self.synapses.hebbian_update(self.l_activation, other, -self.synapses.delegated_val)
return self
return NotImplemented
def __lshift__(self, other):
if not isinstance(other, Layer):
return NotImplemented
if self.previous_shift == 'rshift':
self.synapses.hebbian_update(self.l_activation, other, self.synapses.delegated_val)
return self
if self.previous_shift == 'lshift':
self.synapses.project(self.l_activation, other, backwards=True)
return self.l_activation
return NotImplemented
class Layer:
def __init__(self, size):
self.size = size
self.values = np.zeros(size, dtype=np.float32)
self.boosts = np.zeros(size, dtype=np.float32)
self.winners = np.zeros(0, dtype=np.int64)
self.wta = None
self.set_wta('relative', 0.5)
def set_wta(self, type, value):
if type == 'relative':
self.wta = lambda: self.relative_threshold(value)
elif type == 'kwta':
self.wta = lambda: self.kwta(value)
else:
raise ValueError('type must be either "relative" or "kwta"')
return self
def one_hot(self, x, width=1):
self.winners = np.arange(width, dtype=np.int32)
self.winners += x
self.winners %= self.size
return self
def one_hot_decode(self):
return self.winners.sum() / len(self.winners)
def kwta(self, k):
self.winners = np.argsort(self.values + self.boosts)[-k:]
return self
def threshold(self, v):
self.winners = np.where(self.values >= v)[0]
return self
def relative_threshold(self, f=0.5):
return self.threshold(self.values.max() * f)
def noise(self, f):
self.values += np.random.sample(self.values.shape) * f
return self
def boost_update(self, decrease=1, recover=0.01):
self.boosts *= 1 - recover
self.boosts[self.winners] -= decrease
return self
def clear(self):
self.values[:] = 0
self.winners = np.zeros(0, dtype=np.int64)
return self
def __rshift__(self, other):
if isinstance(other, Layer):
if other.size == self.size:
other.values[:] = self.values
other.winners = self.winners
return other
else:
raise ValueError('other activation not the same size')
elif isinstance(other, Synapses):
return SynapsesSyntaticSugar(self, other, 'rshift')
return NotImplemented
def __lshift__(self, other):
if isinstance(other, Synapses):
return SynapsesSyntaticSugar(self, other, 'lshift')
return NotImplemented
def __str__(self):
# 10240 = char code for empty braile dot pattern
characters = np.repeat(np.int64(10240), self.size // 8 + (self.size % 8 > 0))
bits = np.zeros(self.size, dtype=np.bool_)
bits[self.winners] = True
characters |= np.packbits(bits) # fill in braile dots with active units
final = [f'Layer< {self.size} |']
n = len(final[0])
x = n
for c in characters:
ch = chr(c)
final.append(ch)
x += 1
if x == 50:
final.append('|\n')
final.extend([' ' * (n - 1), '|'])
x = n
return ''.join(final)
# simplest enviroment I could think of
class GridBlocksEnviroment:
def __init__(self, w, h, density=0.2):
self.mask = np.random.sample((w, h)) < density
self.blocks_locations = list(zip(*np.where(self.mask)))
self.w, self.h = w, h
def point_collision(self, x, y):
return self.mask[int(x) % self.w, int(y) % self.h]
def draw(self, surface, scale, color=(0, 0, 0), offset=(0, 0)):
x_offset, y_offset = offset
for x, y in self.blocks_locations:
pygame.draw.rect(surface, color, pygame.Rect((x + x_offset) * scale,
(y + y_offset) * scale,
scale, scale))
class BindingEncoder:
def __init__(self, *input_layers, size=1000, k=50, init_synapses=50):
"""
Kinda like a Restricted Boltzmann Machine but sparse and with boosting.
"""
self.input_layers = input_layers
self.reconst_layers = tuple(Layer(x.size) for x in input_layers)
self.synapses = tuple(Synapses(x.size, size) for x in input_layers)
for syn in self.synapses:
syn.weights = np.random.randint(
size=syn.weights.shape,
low=-init_synapses,
high=init_synapses + 1,
dtype=np.int8)
self.hidden = Layer(size)
self.k = k
def _replace_inputs(self, original, replace):
if len(replace) < len(original):
replace = [*replace, *((None,) * len(original))]
return list((r or o) for o, r in zip(replace, original))
def bind(self, replace=(), mask=(), clear=True):
mask = (*mask,) + (True,) * len(self.input_layers)
if clear:
self.hidden.clear()
for layer, syns, enable in zip(self._replace_inputs(self.input_layers, replace), self.synapses, mask):
if not enable:
continue
layer >> syns * 1 >> self.hidden
self.hidden.kwta(self.k)
return self
def reconstruct(self):
for layer, syns in zip(self.reconst_layers, self.synapses):
layer.clear()
layer << syns * 1 << self.hidden
layer.wta()
return self
def train(self, targets=(), second_pass_mask=(), simple_mode=False, learning_rate=1, boost_decrease=0.5, boost_recover=0.001):
"""
targets: layers to replace inputs while trainig, use None if you dont want to replace
"""
self.bind(replace=targets, mask=second_pass_mask)
self.reconstruct()
for reconst_layer, input_layer, syns in zip(self.reconst_layers, self.input_layers, self.synapses):
if simple_mode:
input_layer >> syns * learning_rate << self.hidden
elif not second_pass_mask:
input_layer >> syns * learning_rate << self.hidden
reconst_layer >> syns * -learning_rate << self.hidden
else:
reconst_layer >> syns * -learning_rate << self.hidden
if simple_mode:
return
self.hidden.boost_update(boost_decrease, boost_recover)
if second_pass_mask:
self.bind(replace=targets, mask=(not x for x in second_pass_mask), clear=False)
for reconst_layer, input_layer, syns in zip(self.reconst_layers, self.input_layers, self.synapses):
input_layer >> syns * learning_rate << self.hidden
return self
class Brain():
def __init__(self, input):
self.input = input
self.efference_copy = Layer(3)
self.input_encoder = BindingEncoder(self.input, size=1000, k=50)
# previous buffer state
self.delayed_feedback = Layer(3000)
self.temporal_compressor = BindingEncoder(self.delayed_feedback, self.input_encoder.hidden, size=3000, k=100)
# self.temporal_compressor.reconst_layers[0].set_wta('kwta', 50)
# self.temporal_compressor.reconst_layers[1].set_wta('kwta', 50)
self.reward_memory = Synapses(3000, 110)
self.reward_input = Layer(110)
self.reward_reconst = Layer(110)
self.reward_reconst.set_wta('kwta', 10)
self.action_decoder = Synapses(3000, 3)
self.decoded_action = Layer(3).set_wta('kwta', 1)
self.prev_reward = 0
self.prev_true_reward = 0
def action(self, reward):
self.prev_true_reward = reward
reward = reward * 50 + 50
self.input_encoder.train()
self.temporal_compressor.train(boost_decrease=5, boost_recover=0.001)
(self.temporal_compressor.hidden >> self.action_decoder * 1 >> self.decoded_action.clear()).wta()
self.decoded_action >> self.efference_copy
action = self.decoded_action.winners[0]
(self.temporal_compressor.hidden >> self.reward_memory >> self.reward_reconst.clear()).wta()
current_reward = self.reward_reconst.one_hot_decode()
train_reward = lerp(current_reward * 0.8, reward, 0.2)
self.reward_input.clear()
self.reward_input.one_hot(round(train_reward), width=10)
(self.delayed_feedback >> self.reward_memory >> self.reward_reconst.clear()).wta()
self.delayed_feedback >> self.reward_memory * -1 << self.reward_reconst
self.delayed_feedback >> self.reward_memory * 1 << self.reward_input
if self.prev_reward < current_reward:
self.delayed_feedback >> self.action_decoder * -1 << self.efference_copy
elif self.prev_reward > current_reward:
self.delayed_feedback >> self.action_decoder * 1 << self.efference_copy
self.decoded_action.boost_update(decrease=10, recover=0.001)
# elif self.prev_reward > current_reward:
# self.delayed_feedback >> self.desired_future_synapses * -1 << self.input_encoder.hidden
self.temporal_compressor.hidden >> self.delayed_feedback
self.prev_reward=current_reward
return action
def print_debug_viz(self):
print('temporal compressed')
print(self.temporal_compressor.hidden)
print('input encoding')
print(self.input_encoder.hidden)
print('reward')
print(int(self.prev_reward * 0.5) * '*-'[self.prev_true_reward < 0])
def circle_points(center, radius, resolution):
for i in range(resolution):
f = i / resolution * 2 * pi
yield pygame.Vector2(sin(f), cos(f)) * radius + center
# the agent turned out more complex than the enviroment.
class Agent:
def __init__(self, x, y, enviroment, radius=2, damping=0.8, sensor_resolution=(30, 5)):
self.location=pygame.math.Vector2(x, y)
self.momentum=pygame.math.Vector2(0.01, 0)
self.input=Layer(np.product(sensor_resolution))
self.sensor_resolution=sensor_resolution
self.damping=damping
self.enviroment=enviroment
self.radius=radius
self.angle=0
self.max_speed=0.1
self.brain=Brain(self.input)
def forward_vector(self):
return pygame.Vector2(sin(self.angle), cos(self.angle))
def timestep(self):
new_location = self.location + self.momentum
collided = self.enviroment.point_collision(*new_location)
if not collided:
self.location = new_location
else:
cell_center = pygame.Vector2(int(self.location.x) + 0.5, int(self.location.y) + 0.5)
found_location = (float('inf'), None)
for point in circle_points(self.location, self.momentum.length(), 32):
if not self.enviroment.point_collision(*point):
dist = new_location.distance_squared_to(point)
found_location = min(found_location, (dist, point))
if found_location[1]:
self.location = found_location[1]
# self.location += self.momentum
self.momentum *= self.damping
if self.momentum.length() > self.max_speed:
self.momentum.normalize_ip()
self.momentum *= self.max_speed
self.location.x %= self.enviroment.w
self.location.y %= self.enviroment.h
self.sensory_input()
# incentivate agent to not stay still by rewarding momentum speed
# punish stayinig on dark blocks
reward=self.momentum.length() * 5 if not collided else -1
action=self.brain.action(reward)
# brain should return a single integer number that contains all actions, its easier that way
rotation_direction=[-1, 0, 1][action]
self.angle += rotation_direction * 0.1
self.momentum += self.forward_vector() * 0.007
def sensor_points(self, x, y, angle, radius, wraparound=True):
# sample sensory input in a circular array shape
for i in range(self.sensor_resolution[0]):
x_offset=sin(i / self.sensor_resolution[0] * 2 * pi + angle)
y_offset=cos(i / self.sensor_resolution[0] * 2 * pi + angle)
for j in range(self.sensor_resolution[1]):
distance=(j + 1) / (self.sensor_resolution[1] + 1) * radius
v=pygame.math.Vector2(x_offset * distance + x, y_offset * distance + y)
if wraparound:
v.x %= self.enviroment.w
v.y %= self.enviroment.h
yield v
def sensory_input(self):
for index, location in enumerate(self.sensor_points(*self.location, self.angle, self.radius)):
hit=self.enviroment.point_collision(*location)
self.input.values[index]=hit
self.input.threshold(0.5)
def draw(self, surface, scale):
pygame.draw.circle(surface, (0, 0, 255), self.location * scale, self.radius * 0.05 * scale)
location1=self.location * scale
location2=location1 + self.forward_vector() * scale * self.radius
pygame.draw.line(surface, (255, 0, 0), location1, location2, 2) # forward facing line
for index, sensor in enumerate(self.sensor_points(*self.location, self.angle, self.radius)):
hits=set(self.brain.input_encoder.reconst_layers[0].winners)
hit=index in hits
if hit > 0.5:
color=(0, 255, 60)
else:
color=(255, 0, 0)
pygame.draw.circle(surface, color, (sensor) * scale, 3)
pygame.init()
clock=pygame.time.Clock()
screen=pygame.display.set_mode([13*60, 13*60])
blocks=GridBlocksEnviroment(13, 13)
agent=Agent(3, 3, blocks)
agent.sensory_input()
running=True
SCALE = 60
click_pos = None
while running:
clock.tick(60)
for event in pygame.event.get():
if event.type == pygame.QUIT:
running=False
if pygame.mouse.get_pressed(num_buttons=3)[0]:
location = pygame.Vector2(pygame.mouse.get_pos()) / SCALE
agent.location = location
screen.fill((128, 128, 128))
blocks.draw(screen, SCALE)
agent.draw(screen, SCALE)
agent.timestep()
agent.brain.print_debug_viz()
pygame.display.flip()
pygame.quit()
I like the simplicity of the idea and using states to describe it. Although to be honest, I was lost while looking at the circuit diagram.
Could you explain which ones below correspond to which component in your EPA circuit?
- Spatial Pooler
- Temporal Pooler
He is not using a spatial pooler but rather autoencoders, the temporal pooler is also implicit in the state predictor.
I like the idea of slapping autoencoders all the way down.
We should rename this thread as “Autoencoders is all you need”.
I know. I was trying to understand the correspondence of the nupic architecture because as he mentioned the idea was inspired by numenta/nupic/htm and books by Jeff.
Through Jeff’s books and my research into what Numenta knows I have noticed several operating principles of intelligence, or even just attributes of it’s make up and have, for a long time, wanted to embody those principles in something I call a Sensorimotor Inference Engine.
A large part of this is demonstrated in HTM already. Therefore it’s important to understand the difference here from a high-level point of view in EPA. What component role is new, which one already existed in HTM, and why use model X (e.g. autoencoder) rather than Y etc.
I only see HTM demonstrating one thing so far which is how to overfit efficiently.
This circuit seems to be a good proposal for how to wire up overfitted predictors to get behavior.
Ok I think you are focusing on the micro part of the proposal. I was more in the macro part such as;
- Encoder
- Encodes inputs while maintaining semantic meanings
- Spatial Pooler
- Preferentially encodes inputs into SDRs
- Most importantly, it dynamically builds a solution graph of agent states (column configuration).
- Temporal Pooler
- Takes agent states or SDRs and predicts the next one
- Basically predicting the next node in the graph generated so far by SP
So HTM is Input → Encode → Predict and thus also a hierarchy.
Again I was interested about how the EPA proposal relates to the high-level roles above I mentioned. If there is in anyway an overlap as implied in the first post, I’d like to know these and understand further why an autoencoder is used instead of an SP (just an example).
Well, autoencoders are just better than the spatial pooler in almost every way. They can be made sparse too and are able to disentangle latent spaces.
I kinda think of a spatial pooler as a incomplete autoencoder where someone forgot to implement the second half.
this circuit is not much in parallel with HTM because it handles the problem of branching future states, it doesnt just mindlessly preddicts the most likely future state, it preddicts the most desirable future state and then the action that would lead into that future.
The thing is that although it doesnt have much parallel, its conceivable to use HTMs to implement it, just throw some minicolumns on the preddictor part of the circuit. The autoencoders thing could be done in biology with the plus and minus stages of the forward-forward algorithm. Although now i’m like, 30% convinced that its unnecessary and the brain can actualy backpropagate errors.
Thanks for explaining, now I somehow get why the autoencoder here. I’ve always thought and shared in the forum that the HTM architecture doesn’t have to be linear, rather it should be tried with a feature that spawns children SP/TMs as parallel models so that their overfitted configuration can then be potentially balanced out by multiples of them (spawns) by consensus.
This code you posted is an implemention of my idea? Wow, that’s a lot of work.
I’ve forgotten how those poolers work. So I can’t draw on a direct analogy.
I can’t explain how the spatial dimension is managed and how the temporal dimension is managed.
First the spatial dimension is managed explicitly in a model. That’s the predictor model in the middle of the layer. It predicts the next state.
The temporal dimension is a bit more hidden. In the bottom layer they’re basically is no temporal dimension. Because the entire temporal dimension is taken up with just one action between two states.
In higher layers output motor output behavior corresponds to describing the state between states.
So as you go up to the next layer the input for that layer is two states at a time. It sees two states as one state. State A goes to State B, The bottom layer learns that mapping. The top layer in the diagram learns the mapping state AB goes to BC. That’s how the temporal dimension is modeled. The input is the compressed representation of two adjacent states.
And if you added another layer, the input to that will be the compressed representation of the layer below. So it would see ABCD as one state. And learn to map which other 4-state States you could go to from it.
I hope that helps. I think this diagram is fundamentally flawed and probably can’t achieve any form of scale. But, but, it does show a basic idea of all the components that are required to theoretically unfold behavior as it travels down a hierarchy, and shows how those components might be interconnected.
Yeah I’m not very familiar with HTM architecture, just with the concepts from the books, not how it’s implemented.
But I will say my auto encoder attempts to do a version of 1 and 2 at the same time. The idea is basically you take all the states you’ve ever seen, and find a smaller representation of them. I guess that doesn’t do the SDR stuff. SDRs are powerful and I’d love to include them but I just don’t know how.
I took some code I already had and adapted into something that resembles your idea. Its not a perfect match as the state preddictor is only used to train a reward function and not to drive behavior, it also ony has 3 layers,
instead of autoencoders I use sparse RBMs with boosting which kind of looks like a offspring of a spatial pooler with a boltzmann machine.
I think this is the answer to my first question/comment. Thanks. So this smaller representation is probably a result of the work of the autoencoder (compress). Not that it’s very important for you, but in contrast HTM does not intentionally compress states rather it just prefers it both spatially and temporally through real-time reconfiguration of cortical column synapses. However from a macro level with respect to Time, it can also be seen as it’s compressing the “important states” because as you may see in the TP in action, it prefers the recent patterns and forgets the less relevant ones.
Firstly, for the DL warriors (obviously not the OP) I’m not trying to “fight for this better model” here. I’m as usual trying to respectfully understand people’s perspectives and what they are trying to convey in their ideas - unfortunately an online forum is not the best space for this. Secondly, I prefer not to assume what one knows and do not, same as for myself, that’s why I ask a lot of questions, sorry for this.
Going back to the topic, these paragraphs are the interesting ones. Conceptually, this is like a stacked Spatial Pooler and Temporal Pooler architecture. SP1 encodes the current state (C), TP1 predicts the next state (D). SP2 encodes the next N predicted states (BCD) of TP1 and TP2 predicts the next state (EFG). And so on…
So it would see ABCD as one state. And learn to map which other 4-state States you could go to from it.
If I understand this correctly, then theoretically, how do you ensure the errors from the lower nodes don’t blow up as the prediction goes up the hierarchy? Or am I missing a theoretical concept here?
I’m quite new to all of this, and I’m wondering if someone could explain to a newb what the tradeoffs are here. Are “sparse RBMs with boosting” and “autoencoders” functionally similar? Is one generally better than the other, or are there reasonable tradeoffs and if so what are they?