- Classic Transformer learning
- Article transferred from wechat official account [Machine Learning Alchemy]
- Author: Yixin Chen (authorized)
- Contact: Wechat CYX645016617
- Welcome exchanges and common progress
[TOC]
Code fine speak
transformer
class transformer(nn.Sequential) :
def __init__(self, encoding, **config) :
super(transformer, self).__init__()
if encoding == 'drug':
self.emb = Embeddings(config['input_dim_drug'], config['transformer_emb_size_drug'].50, config['transformer_dropout_rate'])
self.encoder = Encoder_MultipleLayers(config['transformer_n_layer_drug'],
config['transformer_emb_size_drug'],
config['transformer_intermediate_size_drug'],
config['transformer_num_attention_heads_drug'],
config['transformer_attention_probs_dropout'],
config['transformer_hidden_dropout_rate'])
elif encoding == 'protein':
self.emb = Embeddings(config['input_dim_protein'], config['transformer_emb_size_target'].545, config['transformer_dropout_rate'])
self.encoder = Encoder_MultipleLayers(config['transformer_n_layer_target'],
config['transformer_emb_size_target'],
config['transformer_intermediate_size_target'],
config['transformer_num_attention_heads_target'],
config['transformer_attention_probs_dropout'],
config['transformer_hidden_dropout_rate'])
### parameter v (tuple of length 2) is from utils.drug2emb_encoder
def forward(self, v) :
e = v[0].long().to(device)
e_mask = v[1].long().to(device)
print(e.shape,e_mask.shape)
ex_e_mask = e_mask.unsqueeze(1).unsqueeze(2)
ex_e_mask = (1.0 - ex_e_mask) * -10000.0
emb = self.emb(e)
encoded_layers = self.encoder(emb.float(), ex_e_mask.float())
return encoded_layers[:,0]
Copy the code
- As long as there are two components: Embedding layer and Encoder_MultipleLayers module.
- The input v for forward is a tuple containing two elements: the first is data and the second is mask. Corresponding to the location of valid data.
Embedding
class Embeddings(nn.Module) :
"""Construct the embeddings from protein/target, position embeddings. """
def __init__(self, vocab_size, hidden_size, max_position_size, dropout_rate) :
super(Embeddings, self).__init__()
self.word_embeddings = nn.Embedding(vocab_size, hidden_size)
self.position_embeddings = nn.Embedding(max_position_size, hidden_size)
self.LayerNorm = nn.LayerNorm(hidden_size)
self.dropout = nn.Dropout(dropout_rate)
def forward(self, input_ids) :
seq_length = input_ids.size(1)
position_ids = torch.arange(seq_length, dtype=torch.long, device=input_ids.device)
position_ids = position_ids.unsqueeze(0).expand_as(input_ids)
words_embeddings = self.word_embeddings(input_ids)
position_embeddings = self.position_embeddings(position_ids)
embeddings = words_embeddings + position_embeddings
embeddings = self.LayerNorm(embeddings)
embeddings = self.dropout(embeddings)
return embeddings
Copy the code
- There are three components: Embedding, LayerNorm and Dropout layers.
torch.nn.Embedding(num_embeddings, embedding_dim, padding_idx=None,
max_norm=None, norm_type=2.0, scale_grad_by_freq=False,
sparse=False, _weight=None)
Copy the code
It is a simple lookup table that stores the embedding vectors of fixed size dictionaries. That is, given a number, the embedding layer can return the corresponding embedding vector of this number, which reflects the semantic relationship between symbols represented by each number.
The input is a numbered list and the output is a list of corresponding symbol embedding vectors.
- Num_embeddings (python:int) — The dictionary size, such as 5000 words, enter 5000. Index is 0-4999.
- Embedding_dim (python:int) — The dimension of the embedded vector, that is, how many dimensions to represent a symbol.
- Padding_idx (python:int, optional) — Fill the id. For example, if the length of the input is 100, but the sentence length is not the same, you need to fill the id with the same number. I don’t calculate its correlation with other symbols. (initialize to 0)
- Max_norm (python:float, optional) – Maximum norm. If the norm of the embedded vector exceeds this threshold, it is normalized again.
- Norm_type (python:float, optional) – Specifies what norm is used for calculation and is used to compare max_norm. The default is 2 norm.
- Scale_grad_by_freq (Boolean, optional) – Scale gradients based on the frequency of words in mini-Batch. The default is False.
- Sparse (bool, optional) — If True, the gradient associated with the weight matrix transforms into a sparse tensor.
Here’s an example:
If your integer Max exceeds the size of the dictionary set, an error will occur:
- Embedding has learnable parameters! Is a matrix of num_embedding * embedding_DIM.
Encoder_MultipleLayers
class Encoder_MultipleLayers(nn.Module) :
def __init__(self, n_layer, hidden_size, intermediate_size, num_attention_heads, attention_probs_dropout_prob, hidden_dropout_prob) :
super(Encoder_MultipleLayers, self).__init__()
layer = Encoder(hidden_size, intermediate_size, num_attention_heads, attention_probs_dropout_prob, hidden_dropout_prob)
self.layer = nn.ModuleList([copy.deepcopy(layer) for _ in range(n_layer)])
def forward(self, hidden_states, attention_mask, output_all_encoded_layers=True) :
all_encoder_layers = []
for layer_module in self.layer:
hidden_states = layer_module(hidden_states, attention_mask)
return hidden_states
Copy the code
- The purpose of embedding in Transformer is to transform data into corresponding vectors. The Encoder-multilayer is the key to feature extraction.
- Structure is very simple, is composed of ==n_layer== Encoder stack.
Encoder
class Encoder(nn.Module) :
def __init__(self, hidden_size, intermediate_size, num_attention_heads, attention_probs_dropout_prob, hidden_dropout_prob) :
super(Encoder, self).__init__()
self.attention = Attention(hidden_size, num_attention_heads, attention_probs_dropout_prob, hidden_dropout_prob)
self.intermediate = Intermediate(hidden_size, intermediate_size)
self.output = Output(intermediate_size, hidden_size, hidden_dropout_prob)
def forward(self, hidden_states, attention_mask) :
attention_output = self.attention(hidden_states, attention_mask)
intermediate_output = self.intermediate(attention_output)
layer_output = self.output(intermediate_output, attention_output)
return layer_output
Copy the code
- This includes the Attention section, Intermediate, and Output.
class Attention(nn.Module) :
def __init__(self, hidden_size, num_attention_heads, attention_probs_dropout_prob, hidden_dropout_prob) :
super(Attention, self).__init__()
self.self = SelfAttention(hidden_size, num_attention_heads, attention_probs_dropout_prob)
self.output = SelfOutput(hidden_size, hidden_dropout_prob)
def forward(self, input_tensor, attention_mask) :
self_output = self.self(input_tensor, attention_mask)
attention_output = self.output(self_output, input_tensor)
return attention_output
Copy the code
class SelfAttention(nn.Module) :
def __init__(self, hidden_size, num_attention_heads, attention_probs_dropout_prob) :
super(SelfAttention, self).__init__()
ifhidden_size % num_attention_heads ! =0:
raise ValueError(
"The hidden size (%d) is not a multiple of the number of attention "
"heads (%d)" % (hidden_size, num_attention_heads))
self.num_attention_heads = num_attention_heads
self.attention_head_size = int(hidden_size / num_attention_heads)
self.all_head_size = self.num_attention_heads * self.attention_head_size
self.query = nn.Linear(hidden_size, self.all_head_size)
self.key = nn.Linear(hidden_size, self.all_head_size)
self.value = nn.Linear(hidden_size, self.all_head_size)
self.dropout = nn.Dropout(attention_probs_dropout_prob)
def transpose_for_scores(self, x) :
# num_attention_heads = 8, attention_head_size = 128 / 8 = 16
new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size)
x = x.view(*new_x_shape)
return x.permute(0.2.1.3)
def forward(self, hidden_states, attention_mask) :
# hidden_states. Shape = [batch, 50128]
mixed_query_layer = self.query(hidden_states)
mixed_key_layer = self.key(hidden_states)
mixed_value_layer = self.value(hidden_states)
query_layer = self.transpose_for_scores(mixed_query_layer)
key_layer = self.transpose_for_scores(mixed_key_layer)
value_layer = self.transpose_for_scores(mixed_value_layer)
# query_layer. Shape = [batch, 8,50,16]
# Take the dot product between "query" and "key" to get the raw attention scores.
attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
# attention_score. Shape = [batch, 8,50,50]
attention_scores = attention_scores / math.sqrt(self.attention_head_size)
attention_scores = attention_scores + attention_mask
# Normalize the attention scores to probabilities.
attention_probs = nn.Softmax(dim=-1)(attention_scores)
# This is actually dropping out entire tokens to attend to, which might
# seem a bit unusual, but is taken from the original Transformer paper.
attention_probs = self.dropout(attention_probs)
context_layer = torch.matmul(attention_probs, value_layer)
context_layer = context_layer.permute(0.2.1.3).contiguous()
new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,)
context_layer = context_layer.view(*new_context_layer_shape)
return context_layer
Copy the code
This section is similar to the normal VIT process. Although Transformer is from NLP to CV, it is also fun to look back at NLP transformer from CV’s VIT. One thing to note inside is the concept of multihead. Originally hidden-size was 128, but if you set the number of multiheads to 8, it is actually like the number of channels in the convolution. Will treat 128 tokens as 8 and 16 tokens, and then do self-attention separately. But the notion of multihead as convolution feels right, as does the notion of grouping convolution:
- Convolution. If the size number of each head is fixed as 16, then head is like the number of channels. Increasing the number of heads actually increases the feeling of the number of channels in the convolution kernel.
- It’s like grouping convolution. If the number of hiding-size is fixed as 128, then the number of heads is the number of groups. Increasing the number of heads is like increasing the number of convolutional groups, which reduces the amount of calculation.
– The rest of the code is FC + LayerNorm +Dropout.
The complete code
import torch.nn as nn
import torch.nn.functional as F
import copy,math
class Embeddings(nn.Module) :
"""Construct the embeddings from protein/target, position embeddings. """
def __init__(self, vocab_size, hidden_size, max_position_size, dropout_rate) :
super(Embeddings, self).__init__()
self.word_embeddings = nn.Embedding(vocab_size, hidden_size)
self.position_embeddings = nn.Embedding(max_position_size, hidden_size)
self.LayerNorm = nn.LayerNorm(hidden_size)
self.dropout = nn.Dropout(dropout_rate)
def forward(self, input_ids) :
seq_length = input_ids.size(1)
position_ids = torch.arange(seq_length, dtype=torch.long, device=input_ids.device)
position_ids = position_ids.unsqueeze(0).expand_as(input_ids)
words_embeddings = self.word_embeddings(input_ids)
position_embeddings = self.position_embeddings(position_ids)
embeddings = words_embeddings + position_embeddings
embeddings = self.LayerNorm(embeddings)
embeddings = self.dropout(embeddings)
return embeddings
class Encoder_MultipleLayers(nn.Module) :
def __init__(self, n_layer, hidden_size, intermediate_size, num_attention_heads, attention_probs_dropout_prob, hidden_dropout_prob) :
super(Encoder_MultipleLayers, self).__init__()
layer = Encoder(hidden_size, intermediate_size, num_attention_heads, attention_probs_dropout_prob, hidden_dropout_prob)
self.layer = nn.ModuleList([copy.deepcopy(layer) for _ in range(n_layer)])
def forward(self, hidden_states, attention_mask, output_all_encoded_layers=True) :
all_encoder_layers = []
for layer_module in self.layer:
hidden_states = layer_module(hidden_states, attention_mask)
#if output_all_encoded_layers:
# all_encoder_layers.append(hidden_states)
#if not output_all_encoded_layers:
# all_encoder_layers.append(hidden_states)
return hidden_states
class Encoder(nn.Module) :
def __init__(self, hidden_size, intermediate_size, num_attention_heads, attention_probs_dropout_prob, hidden_dropout_prob) :
super(Encoder, self).__init__()
self.attention = Attention(hidden_size, num_attention_heads, attention_probs_dropout_prob, hidden_dropout_prob)
self.intermediate = Intermediate(hidden_size, intermediate_size)
self.output = Output(intermediate_size, hidden_size, hidden_dropout_prob)
def forward(self, hidden_states, attention_mask) :
attention_output = self.attention(hidden_states, attention_mask)
intermediate_output = self.intermediate(attention_output)
layer_output = self.output(intermediate_output, attention_output)
return layer_output
class Attention(nn.Module) :
def __init__(self, hidden_size, num_attention_heads, attention_probs_dropout_prob, hidden_dropout_prob) :
super(Attention, self).__init__()
self.self = SelfAttention(hidden_size, num_attention_heads, attention_probs_dropout_prob)
self.output = SelfOutput(hidden_size, hidden_dropout_prob)
def forward(self, input_tensor, attention_mask) :
self_output = self.self(input_tensor, attention_mask)
attention_output = self.output(self_output, input_tensor)
return attention_output
class SelfAttention(nn.Module) :
def __init__(self, hidden_size, num_attention_heads, attention_probs_dropout_prob) :
super(SelfAttention, self).__init__()
ifhidden_size % num_attention_heads ! =0:
raise ValueError(
"The hidden size (%d) is not a multiple of the number of attention "
"heads (%d)" % (hidden_size, num_attention_heads))
self.num_attention_heads = num_attention_heads
self.attention_head_size = int(hidden_size / num_attention_heads)
self.all_head_size = self.num_attention_heads * self.attention_head_size
self.query = nn.Linear(hidden_size, self.all_head_size)
self.key = nn.Linear(hidden_size, self.all_head_size)
self.value = nn.Linear(hidden_size, self.all_head_size)
self.dropout = nn.Dropout(attention_probs_dropout_prob)
def transpose_for_scores(self, x) :
new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size)
x = x.view(*new_x_shape)
return x.permute(0.2.1.3)
def forward(self, hidden_states, attention_mask) :
mixed_query_layer = self.query(hidden_states)
mixed_key_layer = self.key(hidden_states)
mixed_value_layer = self.value(hidden_states)
query_layer = self.transpose_for_scores(mixed_query_layer)
key_layer = self.transpose_for_scores(mixed_key_layer)
value_layer = self.transpose_for_scores(mixed_value_layer)
# Take the dot product between "query" and "key" to get the raw attention scores.
attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
attention_scores = attention_scores / math.sqrt(self.attention_head_size)
attention_scores = attention_scores + attention_mask
# Normalize the attention scores to probabilities.
attention_probs = nn.Softmax(dim=-1)(attention_scores)
# This is actually dropping out entire tokens to attend to, which might
# seem a bit unusual, but is taken from the original Transformer paper.
attention_probs = self.dropout(attention_probs)
context_layer = torch.matmul(attention_probs, value_layer)
context_layer = context_layer.permute(0.2.1.3).contiguous()
new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,)
context_layer = context_layer.view(*new_context_layer_shape)
return context_layer
class SelfOutput(nn.Module) :
def __init__(self, hidden_size, hidden_dropout_prob) :
super(SelfOutput, self).__init__()
self.dense = nn.Linear(hidden_size, hidden_size)
self.LayerNorm = nn.LayerNorm(hidden_size)
self.dropout = nn.Dropout(hidden_dropout_prob)
def forward(self, hidden_states, input_tensor) :
hidden_states = self.dense(hidden_states)
hidden_states = self.dropout(hidden_states)
hidden_states = self.LayerNorm(hidden_states + input_tensor)
return hidden_states
class Intermediate(nn.Module) :
def __init__(self, hidden_size, intermediate_size) :
super(Intermediate, self).__init__()
self.dense = nn.Linear(hidden_size, intermediate_size)
def forward(self, hidden_states) :
hidden_states = self.dense(hidden_states)
hidden_states = F.relu(hidden_states)
return hidden_states
class Output(nn.Module) :
def __init__(self, intermediate_size, hidden_size, hidden_dropout_prob) :
super(Output, self).__init__()
self.dense = nn.Linear(intermediate_size, hidden_size)
self.LayerNorm = nn.LayerNorm(hidden_size)
self.dropout = nn.Dropout(hidden_dropout_prob)
def forward(self, hidden_states, input_tensor) :
hidden_states = self.dense(hidden_states)
hidden_states = self.dropout(hidden_states)
hidden_states = self.LayerNorm(hidden_states + input_tensor)
return hidden_states
class transformer(nn.Sequential) :
def __init__(self, encoding, **config) :
super(transformer, self).__init__()
if encoding == 'drug':
self.emb = Embeddings(config['input_dim_drug'], config['transformer_emb_size_drug'].50, config['transformer_dropout_rate'])
self.encoder = Encoder_MultipleLayers(config['transformer_n_layer_drug'],
config['transformer_emb_size_drug'],
config['transformer_intermediate_size_drug'],
config['transformer_num_attention_heads_drug'],
config['transformer_attention_probs_dropout'],
config['transformer_hidden_dropout_rate'])
elif encoding == 'protein':
self.emb = Embeddings(config['input_dim_protein'], config['transformer_emb_size_target'].545, config['transformer_dropout_rate'])
self.encoder = Encoder_MultipleLayers(config['transformer_n_layer_target'],
config['transformer_emb_size_target'],
config['transformer_intermediate_size_target'],
config['transformer_num_attention_heads_target'],
config['transformer_attention_probs_dropout'],
config['transformer_hidden_dropout_rate'])
### parameter v (tuple of length 2) is from utils.drug2emb_encoder
def forward(self, v) :
e = v[0].long().to(device)
e_mask = v[1].long().to(device)
print(e.shape,e_mask.shape)
ex_e_mask = e_mask.unsqueeze(1).unsqueeze(2)
ex_e_mask = (1.0 - ex_e_mask) * -10000.0
emb = self.emb(e)
encoded_layers = self.encoder(emb.float(), ex_e_mask.float())
return encoded_layers[:,0]
Copy the code