Is this a good idea?

Indeed, but I’m trying to test within a deterministic environment to reduce the complexity I must account for. Once deterministic environments can be mastered, I love on to non deterministic ones

1 Like

Do you have a specific question about it?

2 Likes

Are you using neural networks in this model, if so what is the type of NN?

2 Likes

The encoder is an autoencoder, the predictor is the same except it’s decoder error metric is the next state, The last model is the hardest, but I’m also envisioning a few forward encoder with some specialized decoder.

So very simple autoencoders, basically.

3 Likes

Oh ok ok. I got it

1 Like

I managed to remake it, its not quite identical but is equally flawed, I cant get this to work properly.

still, if you squint, you might be able to see some semblance of “behavior”

here’s the code, please, excuse the bad code quality, this only needs ketchup to be a complete spaghetti. it requires numpy and pygame to run.

import pygame
import numpy as np
from random import random, choice, seed
from math import sin, cos, pi
from numbers import Number
np.random.seed(0)
seed(0)


# some syntatic sugar for projecting and training synapses
# layer1 >> layer2                        ---->   copy layer1 into layer2
# layer1 >> synapses >> layer2            ---->   synapses.project(layer1, layer2)
# layer1 << synapses << layer2            ---->   synapses.project(layer1, layer2, backwards=True)
# layer1 >> synapses * factor << layer2   ---->   synapses.hebbian_update(layer1, layer2, factor)
# layer1 << synapses * factor >> layer2   ---->   synapses.hebbian_update(layer1, layer2, -factor)

def lerp(a, b, t):
    return a + (b - a) * t


class Synapses:
    def __init__(self, inputs, outputs, initial_sparsity=0.1):
        self.weights = (np.random.sample((inputs, outputs)) < initial_sparsity).astype(np.int8)
        self.delegated_val = 1

    def __mul__(self, other):
        if not isinstance(other, Number):
            return NotImplemented
        self.delegated_val = other
        return self

    def _check_sizes(self, inputs, outputs):
        if not inputs.size <= self.weights.shape[0] or not outputs.size >= self.weights.shape[1]:
            raise ValueError(f'imcompatible layers size {inputs.size, outputs.size}')

    def project(self, inputs, outputs, backwards=False):
        self._check_sizes(inputs, outputs)
        if backwards:
            inputs.values += self.weights[:, outputs.winners].sum(axis=1)
        else:
            outputs.values += self.weights[inputs.winners, :].sum(axis=0)

    def hebbian_update(self, inputs, outputs, factor=1):
        self._check_sizes(inputs, outputs)
        self.weights[inputs.winners[:, np.newaxis], outputs.winners] += factor


class SynapsesSyntaticSugar:
    def __init__(self, l_activation, synapses, previous_shift):
        self.l_activation = l_activation
        self.synapses = synapses
        self.previous_shift = previous_shift
        self.factor = 1

    def __rshift__(self, other):
        if not isinstance(other, Layer):
            return NotImplemented

        if self.previous_shift == 'rshift':
            self.synapses.project(self.l_activation, other)
            return other

        if self.previous_shift == 'lshift':
            self.synapses.hebbian_update(self.l_activation, other, -self.synapses.delegated_val)
            return self

        return NotImplemented

    def __lshift__(self, other):
        if not isinstance(other, Layer):
            return NotImplemented

        if self.previous_shift == 'rshift':
            self.synapses.hebbian_update(self.l_activation, other, self.synapses.delegated_val)
            return self

        if self.previous_shift == 'lshift':
            self.synapses.project(self.l_activation, other, backwards=True)
            return self.l_activation

        return NotImplemented


class Layer:
    def __init__(self, size):
        self.size = size
        self.values = np.zeros(size, dtype=np.float32)
        self.boosts = np.zeros(size, dtype=np.float32)
        self.winners = np.zeros(0, dtype=np.int64)
        self.wta = None
        self.set_wta('relative', 0.5)

    def set_wta(self, type, value):
        if type == 'relative':
            self.wta = lambda: self.relative_threshold(value)
        elif type == 'kwta':
            self.wta = lambda: self.kwta(value)
        else:
            raise ValueError('type must be either "relative" or "kwta"')
        return self

    def one_hot(self, x, width=1):
        self.winners = np.arange(width, dtype=np.int32)
        self.winners += x
        self.winners %= self.size
        return self

    def one_hot_decode(self):
        return self.winners.sum() / len(self.winners)

    def kwta(self, k):
        self.winners = np.argsort(self.values + self.boosts)[-k:]
        return self

    def threshold(self, v):
        self.winners = np.where(self.values >= v)[0]
        return self

    def relative_threshold(self, f=0.5):
        return self.threshold(self.values.max() * f)

    def noise(self, f):
        self.values += np.random.sample(self.values.shape) * f
        return self

    def boost_update(self, decrease=1, recover=0.01):
        self.boosts *= 1 - recover
        self.boosts[self.winners] -= decrease
        return self

    def clear(self):
        self.values[:] = 0
        self.winners = np.zeros(0, dtype=np.int64)
        return self

    def __rshift__(self, other):
        if isinstance(other, Layer):
            if other.size == self.size:
                other.values[:] = self.values
                other.winners = self.winners
                return other
            else:
                raise ValueError('other activation not the same size')
        elif isinstance(other, Synapses):
            return SynapsesSyntaticSugar(self, other, 'rshift')

        return NotImplemented

    def __lshift__(self, other):
        if isinstance(other, Synapses):
            return SynapsesSyntaticSugar(self, other, 'lshift')
        return NotImplemented

    def __str__(self):
        # 10240 = char code for empty braile dot pattern
        characters = np.repeat(np.int64(10240), self.size // 8 + (self.size % 8 > 0))
        bits = np.zeros(self.size, dtype=np.bool_)
        bits[self.winners] = True
        characters |= np.packbits(bits)  # fill in braile dots with active units
        final = [f'Layer< {self.size} |']
        n = len(final[0])
        x = n
        for c in characters:
            ch = chr(c)
            final.append(ch)
            x += 1
            if x == 50:
                final.append('|\n')
                final.extend([' ' * (n - 1), '|'])
                x = n

        return ''.join(final)


# simplest enviroment I could think of
class GridBlocksEnviroment:
    def __init__(self, w, h, density=0.2):
        self.mask = np.random.sample((w, h)) < density
        self.blocks_locations = list(zip(*np.where(self.mask)))
        self.w, self.h = w, h

    def point_collision(self, x, y):
        return self.mask[int(x) % self.w, int(y) % self.h]

    def draw(self, surface, scale, color=(0, 0, 0), offset=(0, 0)):
        x_offset, y_offset = offset
        for x, y in self.blocks_locations:
            pygame.draw.rect(surface, color, pygame.Rect((x + x_offset) * scale,
                                                         (y + y_offset) * scale,
                                                         scale, scale))


class BindingEncoder:
    def __init__(self, *input_layers, size=1000, k=50, init_synapses=50):
        """
        Kinda like a Restricted Boltzmann Machine but sparse and with boosting.
        """
        self.input_layers = input_layers
        self.reconst_layers = tuple(Layer(x.size) for x in input_layers)
        self.synapses = tuple(Synapses(x.size, size) for x in input_layers)

        for syn in self.synapses:
            syn.weights = np.random.randint(
                size=syn.weights.shape,
                low=-init_synapses,
                high=init_synapses + 1,
                dtype=np.int8)
        self.hidden = Layer(size)
        self.k = k

    def _replace_inputs(self, original, replace):
        if len(replace) < len(original):
            replace = [*replace, *((None,) * len(original))]

        return list((r or o) for o, r in zip(replace, original))

    def bind(self, replace=(), mask=(), clear=True):
        mask = (*mask,) + (True,) * len(self.input_layers)

        if clear:
            self.hidden.clear()

        for layer, syns, enable in zip(self._replace_inputs(self.input_layers, replace), self.synapses, mask):
            if not enable:
                continue
            layer >> syns * 1 >> self.hidden

        self.hidden.kwta(self.k)
        return self

    def reconstruct(self):
        for layer, syns in zip(self.reconst_layers, self.synapses):
            layer.clear()
            layer << syns * 1 << self.hidden
            layer.wta()
        return self

    def train(self, targets=(), second_pass_mask=(), simple_mode=False, learning_rate=1, boost_decrease=0.5, boost_recover=0.001):
        """
        targets: layers to replace inputs while trainig, use None if you dont want to replace
        """
        self.bind(replace=targets, mask=second_pass_mask)
        self.reconstruct()
        for reconst_layer, input_layer, syns in zip(self.reconst_layers, self.input_layers, self.synapses):
            if simple_mode:
                input_layer >> syns * learning_rate << self.hidden
            elif not second_pass_mask:
                input_layer >> syns * learning_rate << self.hidden
                reconst_layer >> syns * -learning_rate << self.hidden
            else:
                reconst_layer >> syns * -learning_rate << self.hidden

        if simple_mode:
            return

        self.hidden.boost_update(boost_decrease, boost_recover)

        if second_pass_mask:
            self.bind(replace=targets, mask=(not x for x in second_pass_mask), clear=False)

            for reconst_layer, input_layer, syns in zip(self.reconst_layers, self.input_layers, self.synapses):
                input_layer >> syns * learning_rate << self.hidden

        return self


class Brain():
    def __init__(self, input):
        self.input = input
        self.efference_copy = Layer(3)
        
        self.input_encoder = BindingEncoder(self.input, size=1000, k=50)

        # previous buffer state
        self.delayed_feedback = Layer(3000)
        self.temporal_compressor = BindingEncoder(self.delayed_feedback, self.input_encoder.hidden, size=3000, k=100)
        # self.temporal_compressor.reconst_layers[0].set_wta('kwta', 50)
        # self.temporal_compressor.reconst_layers[1].set_wta('kwta', 50)

        self.reward_memory = Synapses(3000, 110)
        self.reward_input = Layer(110)
        self.reward_reconst = Layer(110)
        self.reward_reconst.set_wta('kwta', 10)

        self.action_decoder = Synapses(3000, 3)
        self.decoded_action = Layer(3).set_wta('kwta', 1)

        self.prev_reward = 0
        self.prev_true_reward = 0

    def action(self, reward):
        self.prev_true_reward = reward
        reward = reward * 50 + 50

        self.input_encoder.train()
        self.temporal_compressor.train(boost_decrease=5, boost_recover=0.001)

        (self.temporal_compressor.hidden >> self.action_decoder * 1 >> self.decoded_action.clear()).wta()
        self.decoded_action >> self.efference_copy
        action = self.decoded_action.winners[0]

        (self.temporal_compressor.hidden >> self.reward_memory >> self.reward_reconst.clear()).wta()
        current_reward = self.reward_reconst.one_hot_decode()

        train_reward = lerp(current_reward * 0.8, reward, 0.2)

        self.reward_input.clear()
        self.reward_input.one_hot(round(train_reward), width=10)
        (self.delayed_feedback >> self.reward_memory >> self.reward_reconst.clear()).wta()
        self.delayed_feedback >> self.reward_memory * -1 << self.reward_reconst
        self.delayed_feedback >> self.reward_memory * 1 << self.reward_input


        if self.prev_reward < current_reward:
            self.delayed_feedback >> self.action_decoder * -1 << self.efference_copy
        elif self.prev_reward > current_reward:
            self.delayed_feedback >> self.action_decoder * 1 << self.efference_copy
        
        self.decoded_action.boost_update(decrease=10, recover=0.001)
        
        # elif self.prev_reward > current_reward:
        #     self.delayed_feedback >> self.desired_future_synapses * -1 << self.input_encoder.hidden



        self.temporal_compressor.hidden >> self.delayed_feedback
        self.prev_reward=current_reward

        return action

    def print_debug_viz(self):
        print('temporal compressed')
        print(self.temporal_compressor.hidden)
        print('input encoding')
        print(self.input_encoder.hidden)
        print('reward')
        print(int(self.prev_reward * 0.5) * '*-'[self.prev_true_reward < 0])


def circle_points(center, radius, resolution):
    for i in range(resolution):
        f = i / resolution * 2 * pi
        yield pygame.Vector2(sin(f), cos(f)) * radius + center


# the agent turned out more complex than the enviroment.
class Agent:
    def __init__(self, x, y,  enviroment, radius=2, damping=0.8, sensor_resolution=(30, 5)):
        self.location=pygame.math.Vector2(x, y)
        self.momentum=pygame.math.Vector2(0.01, 0)
        self.input=Layer(np.product(sensor_resolution))
        self.sensor_resolution=sensor_resolution
        self.damping=damping
        self.enviroment=enviroment
        self.radius=radius
        self.angle=0
        self.max_speed=0.1

        self.brain=Brain(self.input)

    def forward_vector(self):
        return pygame.Vector2(sin(self.angle), cos(self.angle))

    def timestep(self):
        new_location = self.location + self.momentum
        collided = self.enviroment.point_collision(*new_location)
        if not collided:
            self.location = new_location
        else:
            cell_center = pygame.Vector2(int(self.location.x) + 0.5, int(self.location.y) + 0.5)

            found_location = (float('inf'), None)

            for point in circle_points(self.location, self.momentum.length(), 32):
                if not self.enviroment.point_collision(*point):
                    dist = new_location.distance_squared_to(point)
                    found_location = min(found_location, (dist, point))
            
            if found_location[1]:
                self.location = found_location[1]

        # self.location += self.momentum
        self.momentum *= self.damping
        if self.momentum.length() > self.max_speed:
            self.momentum.normalize_ip()
            self.momentum *= self.max_speed
        self.location.x %= self.enviroment.w
        self.location.y %= self.enviroment.h
        self.sensory_input()

        # incentivate agent to not stay still by rewarding momentum speed
        # punish stayinig on dark blocks
        
        reward=self.momentum.length() * 5 if not collided else -1
        action=self.brain.action(reward)
        # brain should return a single integer number that contains all actions, its easier that way
        rotation_direction=[-1, 0, 1][action]

        self.angle += rotation_direction * 0.1

        self.momentum += self.forward_vector() * 0.007

    def sensor_points(self, x, y, angle, radius, wraparound=True):
        # sample sensory input in a circular array shape
        for i in range(self.sensor_resolution[0]):
            x_offset=sin(i / self.sensor_resolution[0] * 2 * pi + angle)
            y_offset=cos(i / self.sensor_resolution[0] * 2 * pi + angle)

            for j in range(self.sensor_resolution[1]):
                distance=(j + 1) / (self.sensor_resolution[1] + 1) * radius
                v=pygame.math.Vector2(x_offset * distance + x, y_offset * distance + y)
                if wraparound:
                    v.x %= self.enviroment.w
                    v.y %= self.enviroment.h
                yield v

    def sensory_input(self):
        for index, location in enumerate(self.sensor_points(*self.location, self.angle, self.radius)):
            hit=self.enviroment.point_collision(*location)
            self.input.values[index]=hit

        self.input.threshold(0.5)

    def draw(self, surface, scale):
        pygame.draw.circle(surface, (0, 0, 255), self.location * scale, self.radius * 0.05 * scale)
        location1=self.location * scale
        location2=location1 + self.forward_vector() * scale * self.radius
        pygame.draw.line(surface, (255, 0, 0), location1, location2, 2)  # forward facing line

        for index, sensor in enumerate(self.sensor_points(*self.location, self.angle, self.radius)):
            hits=set(self.brain.input_encoder.reconst_layers[0].winners)
            hit=index in hits
            if hit > 0.5:
                color=(0, 255, 60)
            else:
                color=(255, 0, 0)
            pygame.draw.circle(surface, color, (sensor) * scale, 3)


pygame.init()
clock=pygame.time.Clock()
screen=pygame.display.set_mode([13*60, 13*60])
blocks=GridBlocksEnviroment(13, 13)
agent=Agent(3, 3, blocks)
agent.sensory_input()
running=True

SCALE = 60

click_pos = None

while running:
    clock.tick(60)
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running=False
        
        if pygame.mouse.get_pressed(num_buttons=3)[0]:
            location = pygame.Vector2(pygame.mouse.get_pos())  / SCALE
            agent.location = location


    screen.fill((128, 128, 128))

    blocks.draw(screen, SCALE)
    agent.draw(screen, SCALE)
    agent.timestep()
    agent.brain.print_debug_viz()
    pygame.display.flip()

pygame.quit()

3 Likes

I like the simplicity of the idea and using states to describe it. Although to be honest, I was lost while looking at the circuit diagram.

Could you explain which ones below correspond to which component in your EPA circuit?

  1. Spatial Pooler
  2. Temporal Pooler
2 Likes

He is not using a spatial pooler but rather autoencoders, the temporal pooler is also implicit in the state predictor.

I like the idea of slapping autoencoders all the way down.

We should rename this thread as “Autoencoders is all you need”.

2 Likes

I know. I was trying to understand the correspondence of the nupic architecture because as he mentioned the idea was inspired by numenta/nupic/htm and books by Jeff.

Through Jeff’s books and my research into what Numenta knows I have noticed several operating principles of intelligence, or even just attributes of it’s make up and have, for a long time, wanted to embody those principles in something I call a Sensorimotor Inference Engine.

A large part of this is demonstrated in HTM already. Therefore it’s important to understand the difference here from a high-level point of view in EPA. What component role is new, which one already existed in HTM, and why use model X (e.g. autoencoder) rather than Y etc.

2 Likes

I only see HTM demonstrating one thing so far which is how to overfit efficiently.

This circuit seems to be a good proposal for how to wire up overfitted predictors to get behavior.

2 Likes

Ok I think you are focusing on the micro part of the proposal. I was more in the macro part such as;

  1. Encoder
  • Encodes inputs while maintaining semantic meanings
  1. Spatial Pooler
  • Preferentially encodes inputs into SDRs
  • Most importantly, it dynamically builds a solution graph of agent states (column configuration).
  1. Temporal Pooler
  • Takes agent states or SDRs and predicts the next one
  • Basically predicting the next node in the graph generated so far by SP

So HTM is Input → Encode → Predict and thus also a hierarchy.

Again I was interested about how the EPA proposal relates to the high-level roles above I mentioned. If there is in anyway an overlap as implied in the first post, I’d like to know these and understand further why an autoencoder is used instead of an SP (just an example).

3 Likes

Well, autoencoders are just better than the spatial pooler in almost every way. They can be made sparse too and are able to disentangle latent spaces.

I kinda think of a spatial pooler as a incomplete autoencoder where someone forgot to implement the second half.

this circuit is not much in parallel with HTM because it handles the problem of branching future states, it doesnt just mindlessly preddicts the most likely future state, it preddicts the most desirable future state and then the action that would lead into that future.

The thing is that although it doesnt have much parallel, its conceivable to use HTMs to implement it, just throw some minicolumns on the preddictor part of the circuit. The autoencoders thing could be done in biology with the plus and minus stages of the forward-forward algorithm. Although now i’m like, 30% convinced that its unnecessary and the brain can actualy backpropagate errors.

4 Likes

Thanks for explaining, now I somehow get why the autoencoder here. I’ve always thought and shared in the forum that the HTM architecture doesn’t have to be linear, rather it should be tried with a feature that spawns children SP/TMs as parallel models so that their overfitted configuration can then be potentially balanced out by multiples of them (spawns) by consensus.

3 Likes

This code you posted is an implemention of my idea? Wow, that’s a lot of work.

1 Like

I’ve forgotten how those poolers work. So I can’t draw on a direct analogy.

I can’t explain how the spatial dimension is managed and how the temporal dimension is managed.

First the spatial dimension is managed explicitly in a model. That’s the predictor model in the middle of the layer. It predicts the next state.

The temporal dimension is a bit more hidden. In the bottom layer they’re basically is no temporal dimension. Because the entire temporal dimension is taken up with just one action between two states.

In higher layers output motor output behavior corresponds to describing the state between states.

So as you go up to the next layer the input for that layer is two states at a time. It sees two states as one state. State A goes to State B, The bottom layer learns that mapping. The top layer in the diagram learns the mapping state AB goes to BC. That’s how the temporal dimension is modeled. The input is the compressed representation of two adjacent states.

And if you added another layer, the input to that will be the compressed representation of the layer below. So it would see ABCD as one state. And learn to map which other 4-state States you could go to from it.

I hope that helps. I think this diagram is fundamentally flawed and probably can’t achieve any form of scale. But, but, it does show a basic idea of all the components that are required to theoretically unfold behavior as it travels down a hierarchy, and shows how those components might be interconnected.

1 Like

Yeah I’m not very familiar with HTM architecture, just with the concepts from the books, not how it’s implemented.

But I will say my auto encoder attempts to do a version of 1 and 2 at the same time. The idea is basically you take all the states you’ve ever seen, and find a smaller representation of them. I guess that doesn’t do the SDR stuff. SDRs are powerful and I’d love to include them but I just don’t know how.

2 Likes

I took some code I already had and adapted into something that resembles your idea. Its not a perfect match as the state preddictor is only used to train a reward function and not to drive behavior, it also ony has 3 layers,

instead of autoencoders I use sparse RBMs with boosting which kind of looks like a offspring of a spatial pooler with a boltzmann machine.

2 Likes

I think this is the answer to my first question/comment. Thanks. So this smaller representation is probably a result of the work of the autoencoder (compress). Not that it’s very important for you, but in contrast HTM does not intentionally compress states rather it just prefers it both spatially and temporally through real-time reconfiguration of cortical column synapses. However from a macro level with respect to Time, it can also be seen as it’s compressing the “important states” because as you may see in the TP in action, it prefers the recent patterns and forgets the less relevant ones.

2 Likes

Firstly, for the DL warriors (obviously not the OP) I’m not trying to “fight for this better model” here. I’m as usual trying to respectfully understand people’s perspectives and what they are trying to convey in their ideas - unfortunately an online forum is not the best space for this. Secondly, I prefer not to assume what one knows and do not, same as for myself, that’s why I ask a lot of questions, sorry for this.

Going back to the topic, these paragraphs are the interesting ones. Conceptually, this is like a stacked Spatial Pooler and Temporal Pooler architecture. SP1 encodes the current state (C), TP1 predicts the next state (D). SP2 encodes the next N predicted states (BCD) of TP1 and TP2 predicts the next state (EFG). And so on…

So it would see ABCD as one state. And learn to map which other 4-state States you could go to from it.

If I understand this correctly, then theoretically, how do you ensure the errors from the lower nodes don’t blow up as the prediction goes up the hierarchy? Or am I missing a theoretical concept here?

2 Likes

I’m quite new to all of this, and I’m wondering if someone could explain to a newb what the tradeoffs are here. Are “sparse RBMs with boosting” and “autoencoders” functionally similar? Is one generally better than the other, or are there reasonable tradeoffs and if so what are they?

2 Likes