This is my 13th day of the November Challenge

Convolution network

The neural networks used before are full-connection layer networks, that is, neurons in the network are connected to each neuron in the adjacent layer. This network architecture does not consider the spatial structure of the image and treats input pixels that are far apart and close to each other in the same way. However, convolutional neural network is different. It tries to learn by using the spatial structure of images and adopts three basic concepts: local receptive fields, shared weights and pooling.

Local receiving field

To make small, local connections to the input image, more specifically, each neuron in the first hidden layer connects to a small region of an input neuron as shown below:

This input area is called the local receiving field of the hidden neuron.

Parameters of the Shared

Each hidden neuron has a bias and the weight of the local receptive field connected to it, and the same weight and bias are used for each hidden neuron. Assuming that the size of the local receiving field is 5×55\times 55×5 pixels, the output of the hidden neuron j, KJ, KJ, K is:


sigma ( b + l = 0 4 m = 0 4 w l . m a j + l . k + m ) \sigma(b+\sum^4_{l=0}\sum^4_{m=0}w_{l,m}a_{j+l,k+m})

Sigma sigma is the neuron activation function, BBB is the bias, wlmw_{lm} WLM is an array of size 5×55\times 55×5, ax,ya_{x,y}ax,y is the activation value of the input at position x,yx,yx,y.

In this way, the role of the first hidden layer is actually to detect exactly the same features at different locations of the input image (think of weights and biases as things that the hidden neuron can pick up, for example, to detect the presence of vertical edges in a particular local receiving field). So convolutional networks are well adapted to image translational invariance: move an image of a cat slightly and it is still an image of a cat.

The mapping from the input layer to the hidden layer is called the feature map, the weight defining the feature map is called the shared weight, the bias defining the feature map in this way is called the shared bias, and the shared weight and bias are often referred to as a convolution kernel or filter. These are all terms that might be used in different papers.

In order to complete image recognition, more than one feature map is required. A complete convolution layer is composed of several different feature maps:

The following 20 images correspond to 20 different feature maps (also known as filters or convolution kernels), each represented by a 5×55\times 55×5 image corresponding to a 5×55\times 55×5 weight in the local receiving field. A white block implies a small (typically, smaller negative) weight, and a darker block implies a larger weight. Many of these features have clear light and dark subregions, indicating that the network is actually learning something about spatial structure.

The concept of convolutional is: This name derives from the operator in the equation, a1=σ(b+ W ∗ A0) A ^1=\sigma(b+ W * a^0) A1 =σ(b+ W ∗ A0) where A1A ^ 1A1 represents the set of output activation values from a feature map, a0A ^ 0A0 represents the set of input activation values, ∗*∗ is called a convolution operation.

pooling

In addition to the convolutional layer just described, the convolutional neural network also includes pooling layers, which is then used after the convolutional layer to simplify the information output from the convolutional layer, that is, a pooling layer obtains each feature map output from the convolutional layer and makes a condensed feature map.

Maximum pooling

Each cell in the pooling layer may summarize a (for example) 2×22 ×22 ×2 area in the previous layer, commonly used is max-pooling, where a pool cell simply outputs the maximum active value of its 2×22 ×22 ×2 input area:

In this way, the output of 24×2424 \times 2424×24 of the convolution layer can be compressed into 12×1212\times 1212×12 neuron output:

L2 pooling

This method takes the square root (rather than the maximum) of the region activation value of 2×22 ×22 ×2.

Code implementation

import pickle
import gzip

import numpy as np
import theano
import theano.tensor as T
from theano.tensor.nnet import conv
from theano.tensor.nnet import softmax
from theano.tensor import shared_randomstreams
from theano.tensor.signal.pool import pool_2d

# 神经元激活函数
def linear(z): return z
def ReLU(z): return T.maximum(0.0, z)

from theano.tensor.nnet import sigmoid
from theano.tensor import tanh

# 常量
GPU = True
if GPU:
    print ("Trying to run under a GPU. If this is not desired, then modify "+\
"network3.py\nto set the GPU flag to False.")
    try: theano.config.device = 'gpu'
    except: pass # it's already set
    theano.config.floatX = 'float32'
else:
    print ("Running with a CPU. If this is not desired, then the modify "+\
"network3.py to set\nthe GPU flag to True.")


# 加载MINST数据集
def load_data_shared(filename="mnist.pkl.gz"):
    f = gzip.open(filename, 'rb')
    training_data, validation_data, test_data = pickle.load(f)
    f.close()
    def shared(data):
        #将数据放入共享变量中,允许Theano复制数据到GPU,当GPU可用时
        shared_x = theano.shared(
            np.asarray(data[0], dtype=theano.config.floatX), borrow=True)
        shared_y = theano.shared(
            np.asarray(data[1], dtype=theano.config.floatX), borrow=True)
        return shared_x, T.cast(shared_y, "int32")
    return [shared(training_data), shared(validation_data), shared(test_data)]

# 用于构造和训练网络的类

class Network:

    # 获取一个“层”列表,描述网络架构,以及在随机梯度下降训练期间使用的“mini_batch_size”值
    def __init__(self, layers, mini_batch_size):
        self.layers = layers
        self.mini_batch_size = mini_batch_size
        # 此⾏代码将每层的参数放到⼀个列表中
        # Network.SGD ⽅法使⽤ self.params 来确定 Network 中哪些变量需要学习
        self.params = [param for layer in self.layers for param in layer.params]
        # theano.tensor 高维数组
        # matrix 矩阵
        # ivector 向量
        # 定义了 Theano 符号变量 x 和 y,⽤来表⽰输⼊和⽹络得到的输出
        self.x = T.matrix("x")
        self.y = T.ivector("y")
        init_layer = self.layers[0]
        # 设置初始层的输⼊
        # 输⼊ self.x 传了两次:这是因为我们可能会以两种⽅式(有dropout 和⽆ dropout)使⽤⽹络
        init_layer.set_inpt(self.x, self.x, self.mini_batch_size)
        for j in range(1, len(self.layers)):
            prev_layer, layer = self.layers[j - 1], self.layers[j]
            layer.set_inpt(prev_layer.output, prev_layer.output_dropout, self.mini_batch_size)
        self.output = self.layers[-1].output
        self.output_dropout = self.layers[-1].output_dropout

    def SGD(self, training_data, epochs, mini_batch_size, eta,
            validation_data, test_data, lmbda=0.0):

        # 将数据集分解成 x 和 y 两部分,并计算在每个数据集中⼩批量数据的数量
        # 使用小批量随机梯度下降训练网络
        training_x, training_y = training_data
        validation_x, validation_y = validation_data
        test_x, test_y = test_data

        # 计算用于训练、验证和测试的小批数量
        num_training_batches = size(training_data) / mini_batch_size
        num_validation_batches = size(validation_data) / mini_batch_size
        num_test_batches = size(test_data) / mini_batch_size

        # 定义(正则化的)代价函数、符号渐变和更新
        # 符号化地给出了规范化的对数似然代价函数,在梯度函数中计算了对应的导数,以及对应参数的更新⽅式
        l2_norm_squared = sum([(layer.w ** 2).sum() for layer in self.layers])
        cost = self.layers[-1].cost(self) + \
               0.5 * lmbda * l2_norm_squared / num_training_batches
        grads = T.grad(cost, self.params)
        updates = [(param, param - eta * grad)
                   for param, grad in zip(self.params, grads)]

        # 定义函数来训练一个小批处理,并计算验证和测试小批的准确性
        i = T.lscalar()  # mini-batch index
        # Theano 符号函数在给定 minibatch 索引的情况下使⽤ updates 来更新 Network 的参数
        train_mb = theano.function(
            [i], cost, updates=updates,
            givens={
                self.x:
                    training_x[i * self.mini_batch_size: (i + 1) * self.mini_batch_size],
                self.y:
                    training_y[i * self.mini_batch_size: (i + 1) * self.mini_batch_size]
            })
        validate_mb_accuracy = theano.function(
            [i], self.layers[-1].accuracy(self.y),
            givens={
                self.x:
                    validation_x[i * self.mini_batch_size: (i + 1) * self.mini_batch_size],
                self.y:
                    validation_y[i * self.mini_batch_size: (i + 1) * self.mini_batch_size]
            })
        test_mb_accuracy = theano.function(
            [i], self.layers[-1].accuracy(self.y),
            givens={
                self.x:
                    test_x[i * self.mini_batch_size: (i + 1) * self.mini_batch_size],
                self.y:
                    test_y[i * self.mini_batch_size: (i + 1) * self.mini_batch_size]
            })
        self.test_mb_predictions = theano.function(
            [i], self.layers[-1].y_out,
            givens={
                self.x:
                    test_x[i * self.mini_batch_size: (i + 1) * self.mini_batch_size]
            })

        # 做训练
        best_validation_accuracy = 0.0
        for epoch in range(epochs):
            for minibatch_index in range(num_training_batches):
                iteration = num_training_batches * epoch + minibatch_index
                if iteration % 1000 == 0:
                    print("Training mini-batch number {0}".format(iteration))
                cost_ij = train_mb(minibatch_index)
                if (iteration + 1) % num_training_batches == 0:
                    validation_accuracy = np.mean(
                        [validate_mb_accuracy(j) for j in range(num_validation_batches)])
                    print("Epoch {0}: validation accuracy {1:.2%}".format(
                        epoch, validation_accuracy))
                    if validation_accuracy >= best_validation_accuracy:
                        print("This is the best validation accuracy to date.")
                        best_validation_accuracy = validation_accuracy
                        best_iteration = iteration
                        if test_data:
                            test_accuracy = np.mean(
                                [test_mb_accuracy(j) for j in range(num_test_batches)])
                            print('The corresponding test accuracy is {0:.2%}'.format(
                                test_accuracy))
        print("Finished training network.")
        print("Best validation accuracy of {0:.2%} obtained at iteration {1}".format(best_validation_accuracy, best_iteration))
        print("Corresponding test accuracy of {0:.2%}".format(test_accuracy))

# 定义层的类型

# 用于创建卷积和最大池化层的组合,更复杂的实现会将二者分开,但是为简化问题这里将二者合并

class ConvPoolLayer:
    def __init__(self, filter_shape, image_shape, poolsize=(2, 2),activation_fn=sigmoid):

        # filter_shape为一个长度为4的元组,其中的实体分别为:
        # 过滤器的数量、输入特征映射的数量、过滤器的高度和过滤器的宽度
        self.filter_shape = filter_shape

        # image_shape为一个长度为4的元组,其中的实体分别为:
        # 小批量大小、输入特征映射的数量、图像高度和图像宽度
        self.image_shape = image_shape

        # poolsize为一个长度为2的元组,其中的实体分别为:
        # y和x 池化大小
        self.poolsize = poolsize
        self.activation_fn = activation_fn

        # 初始化权重和偏差
        # theano.shared 载⼊权重和偏差到 Theano 中的共享变量中,这样可以确保这些变量可在 GPU 中进⾏处理
        n_out = (filter_shape[0] * np.prod(filter_shape[2:]) / np.prod(poolsize))
        self.w = theano.shared(
            np.asarray(
                np.random.normal(loc=0, scale=np.sqrt(1.0 / n_out), size=filter_shape),
                dtype=theano.config.floatX),
            borrow=True)
        self.b = theano.shared(
            np.asarray(
                np.random.normal(loc=0, scale=1.0, size=(filter_shape[0],)),
                dtype=theano.config.floatX),
            borrow=True)

        #  self.params存储参数
        self.params = [self.w, self.b]


    # set_inpt ⽅法⽤来设置该层的输⼊,并计算相应的输出
    def set_inpt(self, inpt, inpt_dropout, mini_batch_size):
        self.inpt = inpt.reshape(self.image_shape)
        conv_out = conv.conv2d(
            input=self.inpt, filters=self.w, filter_shape=self.filter_shape,
            image_shape=self.image_shape)
        pooled_out = pool_2d(
            input=conv_out, ds=self.poolsize, ignore_border=True)
        self.output = self.activation_fn(
            pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
        self.output_dropout = self.output # 在卷积层中不做dropout操作

# 全连接层
class FullyConnectedLayer:
    def __init__(self, n_in, n_out, activation_fn=sigmoid, p_dropout=0.0):
        self.n_in = n_in
        self.n_out = n_out
        self.activation_fn = activation_fn
        self.p_dropout = p_dropout

        # 初始化权重和偏差
        self.w = theano.shared(
            np.asarray(
                np.random.normal(
                    loc=0.0, scale=np.sqrt(1.0 / n_out), size=(n_in, n_out)),
                dtype=theano.config.floatX),
            name='w', borrow=True)
        self.b = theano.shared(
            np.asarray(np.random.normal(loc=0.0, scale=1.0, size=(n_out,)),
                       dtype=theano.config.floatX),
            name='b', borrow=True)
        self.params = [self.w, self.b]

    def set_inpt(self, inpt, inpt_dropout, mini_batch_size):
        self.inpt = inpt.reshape((mini_batch_size, self.n_in))
        self.output = self.activation_fn(
            (1 - self.p_dropout) * T.dot(self.inpt, self.w) + self.b)
        self.y_out = T.argmax(self.output, axis=1)
        # 训练时我们可能要使⽤ dropout。如果使⽤ dropout,就需要设置对应丢弃的概率
        self.inpt_dropout = dropout_layer(
            inpt_dropout.reshape((mini_batch_size, self.n_in)), self.p_dropout)
        self.output_dropout = self.activation_fn(
            T.dot(self.inpt_dropout, self.w) + self.b)

    # 返回小批量的准确率
    def accuracy(self, y):
        return T.mean(T.eq(y, self.y_out))

# softmax层
class SoftmaxLayer:
    def __init__(self, n_in, n_out, p_dropout=0.0):
        self.n_in = n_in
        self.n_out = n_out
        self.p_dropout = p_dropout

        # 初始化参数
        self.w = theano.shared(
            np.zeros((n_in, n_out), dtype=theano.config.floatX),
            name='w', borrow=True)
        self.b = theano.shared(
            np.zeros((n_out,), dtype=theano.config.floatX),
            name='b', borrow=True)
        self.params = [self.w, self.b]

    def set_inpt(self, inpt, inpt_dropout, mini_batch_size):
        self.inpt = inpt.reshape((mini_batch_size, self.n_in))
        self.output = softmax((1 - self.p_dropout) * T.dot(self.inpt, self.w) + self.b)
        self.y_out = T.argmax(self.output, axis=1)
        self.inpt_dropout = dropout_layer(
            inpt_dropout.reshape((mini_batch_size, self.n_in)), self.p_dropout)
        self.output_dropout = softmax(T.dot(self.inpt_dropout, self.w) + self.b)

    def cost(self, net):
        # 返回log-likelihood 损失
        return -T.mean(T.log(self.output_dropout)[T.arange(net.y.shape[0]), net.y])

    def accuracy(self, y):
        # 返回准确率
        return T.mean(T.eq(y, self.y_out))

#### Miscellanea
def size(data):
    # 返回dataset的size
    return data[0].get_value(borrow=True).shape[0]

def dropout_layer(layer, p_dropout):
    srng = shared_randomstreams.RandomStreams(
        np.random.RandomState(0).randint(999999))
    mask = srng.binomial(n=1, p=1 - p_dropout, size=layer.shape)
    return layer * T.cast(mask, theano.config.floatX)
Copy the code