my experience is that its usually much harder to visualize synapses than activations. they are too many and too random.
anyway.
I spent this afternoon working on this clustering algorithm that analizes synapses and sorts neurons into a square grid, clustered by similarity.
that way we can watch an activity bump moving around.
I cant tell if its useful but looks cool anyways.
heres the algorithm:
class ClusterPreview2D:
def __init__(self, matrix, iterations=500, neighbors=10, recursive_neighbors=3):
w = self.w = matrix.weights
self.size = round(w.shape[1] ** 0.5 + 0.5)
self.grid = {}
self.grid_locations = []
self.neighbors = [[] for _ in range(w.shape[1])]
self.remap_indexes = list(range(w.shape[1]))
self.preview_image = np.zeros((self.size, self.size), dtype=np.float32)
self.preview_image_flat = self.preview_image.reshape(self.size ** 2)
self.im = plt.imshow(self.preview_image, vmin=0, vmax=1)
plt.pause(0.001)
i = 0
for i in range(w.shape[1]):
x = i % self.size
y = i // self.size
self.grid_locations.append((x, y))
self.grid[(x, y)] = i
for i in range(w.shape[1]):
col = np.argsort(w[:, i])
if neighbors:
dot = w[col[-neighbors:], :].sum(axis=0)
self.neighbors[i] = [x for x in np.argsort(dot)[-neighbors:] if not i == x]
if recursive_neighbors:
self.neighbors[i].extend(col[-recursive_neighbors:])
for _ in range(iterations):
self.iteration()
self.update_remap_indexes()
def iteration(self):
for i in range(self.w.shape[1]):
neighbors = self.neighbors[i]
n = len(neighbors)
location = self.grid_locations[i]
avg_x = 0
avg_y = 0
for buddy_i in neighbors:
bx, by = self.grid_locations[buddy_i]
avg_x += bx
avg_y += by
target_location = (round((avg_x / n + location[0] * 2) / 3),
round((avg_y / n + location[1] * 2) / 3))
if target_location not in self.grid:
continue
self._swap(location, target_location)
def update_remap_indexes(self):
for i, (x, y) in enumerate(self.grid_locations):
self.remap_indexes[i] = x + y * self.size
def _swap(self, p1, p2):
i1 = self.grid[p1]
i2 = self.grid[p2]
self.grid[p1] = i2
self.grid[p2] = i1
self.grid_locations[i1] = p2
self.grid_locations[i2] = p1
def preview_activity(self, activity):
self.preview_image_flat[:] = 0
self.preview_image_flat[self.remap_indexes] = activity.values
self.preview_image_flat /= self.preview_image_flat.max()
self.im.set_array(self.preview_image)
plt.pause(0.01)
full code:
import numpy as np
import time
import matplotlib.pyplot as plt
np.random.seed(0)
class Synapses:
def __init__(self, inputs, outputs, initial_sparsity=0.1):
self.weights = (np.random.sample((inputs, outputs)) < initial_sparsity).astype(np.int8)
def project(self, inputs, outputs, backwards=False):
if backwards:
inputs.values += self.weights[:, outputs.winners].sum(axis=1)
else:
outputs.values += self.weights[inputs.winners, :].sum(axis=0)
def hebbian_update(self, inputs, outputs, factor=1):
self.weights[inputs.winners[:, np.newaxis], outputs.winners] += factor
class Activation:
def __init__(self, size):
self.values = np.zeros(size, dtype=np.float32)
self.boosts = np.zeros(size, dtype=np.float32)
self.winners = np.zeros(0, dtype=np.int64)
def one_hot(self, x):
self.winners = np.array([x], dtype=np.int32)
def kwta(self, k):
self.winners = np.argsort(self.values + self.boosts)[-k:]
def noise(self, f):
self.values += np.random.sample(self.values.shape) * f
def boost_update(self, decrease=1, recover=0.01):
self.boosts *= recover
self.boosts[self.winners] -= decrease
def clear(self):
self.values[:] = 0
self.winners = np.zeros(0, dtype=np.int64)
class ClusterPreview2D:
def __init__(self, matrix, iterations=500, neighbors=10, recursive_neighbors=3):
w = self.w = matrix.weights
self.size = round(w.shape[1] ** 0.5 + 0.5)
self.grid = {}
self.grid_locations = []
self.neighbors = [[] for _ in range(w.shape[1])]
self.remap_indexes = list(range(w.shape[1]))
self.preview_image = np.zeros((self.size, self.size), dtype=np.float32)
self.preview_image_flat = self.preview_image.reshape(self.size ** 2)
self.im = plt.imshow(self.preview_image, vmin=0, vmax=1)
plt.pause(0.001)
i = 0
for i in range(w.shape[1]):
x = i % self.size
y = i // self.size
self.grid_locations.append((x, y))
self.grid[(x, y)] = i
for i in range(w.shape[1]):
col = np.argsort(w[:, i])
if neighbors:
dot = w[col[-neighbors:], :].sum(axis=0)
self.neighbors[i] = [x for x in np.argsort(dot)[-neighbors:] if not i == x]
if recursive_neighbors:
self.neighbors[i].extend(col[-recursive_neighbors:])
for _ in range(iterations):
self.iteration()
self.update_remap_indexes()
def iteration(self):
for i in range(self.w.shape[1]):
neighbors = self.neighbors[i]
n = len(neighbors)
location = self.grid_locations[i]
avg_x = 0
avg_y = 0
for buddy_i in neighbors:
bx, by = self.grid_locations[buddy_i]
avg_x += bx
avg_y += by
target_location = (round((avg_x / n + location[0] * 2) / 3),
round((avg_y / n + location[1] * 2) / 3))
if target_location not in self.grid:
continue
self._swap(location, target_location)
def update_remap_indexes(self):
for i, (x, y) in enumerate(self.grid_locations):
self.remap_indexes[i] = x + y * self.size
def _swap(self, p1, p2):
i1 = self.grid[p1]
i2 = self.grid[p2]
self.grid[p1] = i2
self.grid[p2] = i1
self.grid_locations[i1] = p2
self.grid_locations[i2] = p1
def preview_activity(self, activity):
self.preview_image_flat[:] = 0
self.preview_image_flat[self.remap_indexes] = activity.values
self.preview_image_flat /= self.preview_image_flat.max()
self.im.set_array(self.preview_image)
plt.pause(0.01)
class SequencePreddictor:
def __init__(self, n_state, n_input, k):
self.n_state = n_state
self.n_input = n_input
self.k = k
self.encoding_matrix = Synapses(n_input, n_state, initial_sparsity=n_state / n_input)
self.state_matrix = Synapses(n_state, n_state, initial_sparsity=0.5)
self.new_state = Activation(n_state)
self.previous_state = Activation(n_state)
self.previous_state_reconst = Activation(n_state)
self.input = Activation(n_input)
self.input_reconst = Activation(n_input)
def step(self, input_index, train=False):
self.previous_state, self.new_state = self.new_state, self.previous_state
self.new_state.clear()
self.state_matrix.project(self.previous_state, self.new_state,)
if input_index is None:
self.input.one_hot(self.decode())
else:
self.input.one_hot(input_index)
self.encoding_matrix.project(self.input, self.new_state)
self.new_state.kwta(self.k)
self.new_state.boost_update(10, 0.0001)
if train:
self.previous_state_reconst.clear()
self.input_reconst.clear()
self.state_matrix.project(self.previous_state_reconst, self.new_state, backwards=True)
self.encoding_matrix.project(self.input_reconst, self.new_state, backwards=True)
self.previous_state_reconst.kwta(self.k)
self.input_reconst.kwta(1)
# plus phase
self.state_matrix.hebbian_update(self.previous_state, self.new_state, 1)
self.encoding_matrix.hebbian_update(self.input, self.new_state, 1)
# minus phase
self.state_matrix.hebbian_update(self.previous_state_reconst, self.new_state, -1)
self.encoding_matrix.hebbian_update(self.input_reconst, self.new_state, -1)
def decode(self):
self.input_reconst.clear()
self.encoding_matrix.project(self.input_reconst, self.new_state, backwards=True)
self.input_reconst.kwta(1)
return self.input_reconst.winners[0]
input_data = '''Lorem ipsum dolor sit amet, consectetur adipiscing elit,
sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim
ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip
ex ea commodo consequat. Duis aute irure dolor in reprehenderit in
voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur
sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt
mollit anim id est laborum. 123456789abcdefghijk'''.replace('\n', '')
EPOCHS = 1
seq_pred = SequencePreddictor(1000, 256, k=10)
for i in range(EPOCHS):
print('epoch', i)
for ch in input_data:
seq_pred.step(ord(ch), train=True)
preview = ClusterPreview2D(seq_pred.state_matrix)
while True:
seq_pred.step(None)
print(chr(seq_pred.decode()), end='', flush=True)
preview.preview_activity(seq_pred.previous_state)
time.sleep(0.3)