這個暑假兩個月的時間跑了各種實驗真的快瘋了
尤其是當好幾個小時的訓練跑完後發現訓練數據沒儲存到時那真的是一個令人崩潰的瞬間…
所以就來寫這篇文章紀錄各種好用模板~~~

Python

Argparse 命令參數解析

在工讀那篇文章就有提到這個工具~ 這個套件可以幫助解析各種實驗參數

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import argparse

if __name__ == '__main__':
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument('--batch_size', type=int, default=2)
parser.add_argument('--lr', type=float, default=0.001, help="initial learning rate")
parser.add_argument('--device', type=str, choices=["cuda", "cpu"], default="cuda")
parser.add_argument('--optim', type=str, choices=["Adam", "AdamW"], default="Adam")
parser.add_argument('--gpu', type=int, default=1)
parser.add_argument('--test', action='store_true')
parser.add_argument('--DR', type=str, required=True, help="Your Dataset Path")
parser.add_argument('--save_root', type=str, required=True, help="The path to save your data")
parser.add_argument('--num_epoch', type=int, default=70, help="number of total epoch")
parser.add_argument('--per_save', type=int, default=1, help="Save checkpoint every seted epoch")
parser.add_argument('--partial', type=float, default=1.0, help="Part of the training dataset to be trained")
parser.add_argument('--train_vi_len', type=int, default=16, help="Training video length")
parser.add_argument('--val_vi_len', type=int, default=630, help="valdation video length")
parser.add_argument('--frame_H', type=int, default=32, help="Height input image to be resize")
parser.add_argument('--frame_W', type=int, default=64, help="Width input image to be resize")

# Module parameters setting
parser.add_argument('--F_dim', type=int, default=128, help="Dimension of feature human frame")
parser.add_argument('--L_dim', type=int, default=32, help="Dimension of feature label frame")
parser.add_argument('--N_dim', type=int, default=12, help="Dimension of the Noise")
parser.add_argument('--D_out_dim', type=int, default=192, help="Dimension of the output in Decoder_Fusion")

args = parser.parse_args()

main(args)

Increment_path 遞增實驗數據資料夾

指定資料夾並製造遞增路徑
例如指定 runs/exp 路徑,則每一次的實驗會依序儲存在 runs/exp1, runs/exp2, runs/exp3… 等依序遞增的資料夾中,就不用怕數據搞丟了!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import glob, re
import numpy as np
import pandas as pd
from pathlib import Path

class Recorder():
def __init__(self, path='', sep='', recordname='train_details'):
self.save_path = self.increment_path(path, sep)
self.recorder = open(self.save_path+'\\'+recordname+'.txt', 'a')
self.train_detail = []
self.recordname = recordname

# save all the namespace parameters in the file which name "recordname".txt
def save_record(self, key, value):
self.recorder.write(key + ': ' + str(value))

# produce the increment path from the given path
# modify from YoloV7 source code
def increment_path(self, path, sep=''):
path = Path(path) # os-agnostic
dirs = glob.glob(f"{path}{sep}*") # similar paths
matches = [re.search(rf"%s{sep}(\d+)" % path.stem, d) for d in dirs]
i = [int(m.groups()[0]) for m in matches if m] # indices
n = max(i) + 1 if i else 2 # increment number
os.makedirs(f"{path}{sep}{n}")
return f"{path}{sep}{n}" # n, f"{path}{sep}{n}"

# save training detail as a csv file
def save_csv(self):
train_detail = np.array(recorder.train_detail)
train_detail = pd.DataFrame(train_detail, columns = ['Epoch','Train loss','Valid loss', 'LR'])
train_detail.to_csv(self.save_path+'\\'+self.recordname+'.csv',index=False)

和 Argparse 合用就變成了一個無敵模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import os
import argparse
import glob, re
import numpy as np
import pandas as pd
from pathlib import Path

class Recorder():
def __init__(self, path='', sep='', recordname='train_details'):
self.save_path = self.increment_path(path, sep)
self.recorder = open(self.save_path+'\\'+recordname+'.txt', 'a')
self.train_detail = []
self.recordname = recordname

# save all the namespace parameters in the file which name "recordname".txt
def save_record(self, key, value):
self.recorder.write(key + ': ' + str(value))

# produce the increment path from the given path
# modify from YoloV7 source code
def increment_path(self, path, sep=''):
path = Path(path) # os-agnostic
dirs = glob.glob(f"{path}{sep}*") # similar paths
matches = [re.search(rf"%s{sep}(\d+)" % path.stem, d) for d in dirs]
i = [int(m.groups()[0]) for m in matches if m] # indices
n = max(i) + 1 if i else 2 # increment number
os.makedirs(f"{path}{sep}{n}")
return f"{path}{sep}{n}" # n, f"{path}{sep}{n}"

# save training detail as a csv file
def save_csv(self):
train_detail = np.array(recorder.train_detail)
train_detail = pd.DataFrame(train_detail, columns = ['Epoch','Train loss','Valid loss', 'LR'])
train_detail.to_csv(self.save_path+'\\'+self.recordname+'.csv',index=False)

def main(args):

# a global recorder thus can store data from another function
global recorder
recorder = Recorder(path=args.save_root)
args.save_root = recorder.save_path

# save all the args in txt file
recorder.recorder.write(str(args))

'''
things you want to do
'''

# save the all the training details in csv file
recorder.save_csv()

if __name__ == '__main__':
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument('--batch_size', type=int, default=2)
parser.add_argument('--lr', type=float, default=0.001, help="initial learning rate")
parser.add_argument('--device', type=str, choices=["cuda", "cpu"], default="cuda")
parser.add_argument('--optim', type=str, choices=["Adam", "AdamW"], default="Adam")
parser.add_argument('--gpu', type=int, default=1)
parser.add_argument('--test', action='store_true')
parser.add_argument('--DR', type=str, required=True, help="Your Dataset Path")
parser.add_argument('--save_root', type=str, required=True, help="The path to save your data")
parser.add_argument('--num_epoch', type=int, default=70, help="number of total epoch")
parser.add_argument('--per_save', type=int, default=1, help="Save checkpoint every seted epoch")
parser.add_argument('--partial', type=float, default=1.0, help="Part of the training dataset to be trained")
parser.add_argument('--train_vi_len', type=int, default=16, help="Training video length")
parser.add_argument('--val_vi_len', type=int, default=630, help="valdation video length")
parser.add_argument('--frame_H', type=int, default=32, help="Height input image to be resize")
parser.add_argument('--frame_W', type=int, default=64, help="Width input image to be resize")

# Module parameters setting
parser.add_argument('--F_dim', type=int, default=128, help="Dimension of feature human frame")
parser.add_argument('--L_dim', type=int, default=32, help="Dimension of feature label frame")
parser.add_argument('--N_dim', type=int, default=12, help="Dimension of the Noise")
parser.add_argument('--D_out_dim', type=int, default=192, help="Dimension of the output in Decoder_Fusion")

args = parser.parse_args()

main(args)

Visualization 視覺畫各種結果

上一部分將資料都儲存在 csv 裡,因此這部分就是利用那些資料料來視覺化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

path = './runs/exp1' # your experiment path
df = pd.read_csv('train_details.csv').to_numpy() #your csv path

'''
CASE 1 : plot a figure with a line
'''
fig = plt.figure(figsize=(10, 10))
plt.plot(df[:,0], df[:,1], linewidth='1')
plt.legend(['Train Loss'])
plt.savefig(path+'/train_loss.png')
plt.show()


'''
CASE 2 : plot a figure with multiple lines
'''
fig = plt.figure(figsize=(10, 10))
plt.plot(df[:,0], df[:,1], linewidth='1') # blue line
plt.plot(df[:,0], df[:,2], linewidth='1') # yellow line
plt.legend(['Train', 'Valid'])
plt.savefig(path+'/train_valid_loss.png')
plt.show()

'''
CASE 3 : plot multiple subplot in one figure
'''
fig = plt.figure(figsize=(10, 4))

# subplot 1
ax1 = fig.add_subplot(211)
ax1.title.set_text('Train Loss')
ax1.plot(df[:,0], df[:,1], label='Loss', linewidth='1') # blue

# subplot 2
ax2 = fig.add_subplot(212)
ax2.title.set_text('KL Beta')
ax2.plot(df[:,0], df[:,5], label='KL Beta', linewidth='1') # yellow

plt.tight_layout()
plt.savefig(path+'/loss_beta.png')
plt.show()

Pytorch

Check your environment

開始之前先確認環境狀況並設置

1
2
3
print(torch.__version__)
print(torch.cuda.is_available()) # True when using GPU
device = torch.device("cuda") # compute on GPU

Build your own Dataloader

以下是一個Image Dataloader的模板,可以製造自己的dataloader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import sys
import pandas as pd
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

def getData(mode, path):

if mode == 'train':
df = pd.read_csv(path[0])
image = df['Path'].tolist()
label = df['label'].tolist()
return image, label
elif mode == 'valid':
df = pd.read_csv(path[1])
image = df['Path'].tolist()
label = df['label'].tolist()
return image, label
elif mode == 'test:
df = pd.read_csv(path[2])
image = df['Path'].tolist()
return image, [] # perhaps test dataset doesn't have label
else:
print('Error !')
sys.exit(1)

class Image_Dataloader(Dataset):
def __init__(self, path, mode):

'''
get the image and label through the given path
'''
self.image, self.label = getData(mode, path)
self.mode = mode # specific dataset mode, like train, valid, or test...
print("> {} dataset: found {} images and {} labels...".format(mode, (len(self.image)), (len(self.label))))

def __len__(self):
return len(self.image)

def __getitem__(self, index):

'''
Do whatever data augmentation you like
and then return a image and a label with the given index
'''

# data preprocessing
preprocess = transforms.Compose([
transforms.Resize(224),
transforms.ToTensor(),
])
normalize = transforms.Compose([
transforms.Normalize(std=(0.5,0.5,0.5),mean=(0.5,0.5,0.5)) # normal distribution
#transforms.Normalize(std=(0.485, 0.456, 0.406),mean=(0.229, 0.224, 0.225)) # ImageNet image distribution
img = transforms.functional.adjust_contrast(img, 1.5) # adjust contrast to 1.5 times
img = transforms.functional.adjust_brightness(img, 1.5) # adjust brightness to 1.5 times
img = transforms.functional.adjust_sharpness(img, 1.5) # adjust sharpness to 1.5 times
])
img = Image.open(self.image[index])
img = preprocess(img)

if self.mode != 'train' and self.mode != 'valid':
return img # perhaps test dataset doesn't have label
else:
return img, self.label[index]

train_data = Image_Dataloader(path, 'train')
valid_data = Image_Dataloader(path, 'valid')

# re-split the dataset if you want
# if False:
# train_data = ConcatDataset([train_data, valid_data])
# train_data, valid_data = random_split(train_data, [9094, 500])
# print("\n> Reassign: train {}, valid {}".format(len(train_data), len(valid_data)))

train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
valid_loader = DataLoader(valid_data, shuffle=True, batch_size=batch_size)

for epoch in range(num_epoch):

model.train()
for x_train, y_train in train_loader:

'''
do forward and backward and weight updates
'''
model.eval()

'''
evaluate your model
'''

Overview frameWork

以下是整個大架構的範例,各個情況要是視所需調整,只是一個架構參考~

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import os
import argparse
import glob, re
from pathlib import Path
import pandas as pd

import numpy as np
import torch
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import DataLoader

from modules import Generator, Gaussian_Predictor, Decoder_Fusion, Label_Encoder, RGB_Encoder

from dataloader import Dataset_Dance
from torchvision.utils import save_image
import random
import torch.optim as optim
from torch import stack

from tqdm import tqdm
import imageio

import matplotlib.pyplot as plt
from math import log10

class YOUR_MODEL(nn.Module):
def __init__(self, args):
super(YOUR_MODEL, self).__init__()
self.args = args

self.current_epoch = 0
self.mse_criterion = nn.MSELoss()
self.optim = optim.Adam(self.parameters(), lr=self.args.lr)
self.scheduler = optim.lr_scheduler.MultiStepLR(self.optim, milestones=[2, 5], gamma=0.1)

'''
Your model layer here
'''

def forward(self, img, label, adapt_TeacherForcing):
# TODO
raise NotImplementedError
return train_loader

def training_stage(self):

train_loader = self.train_dataloader()

for i in range(self.args.num_epoch):

self.train()
for (img, label) in (pbar := tqdm(train_loader, ncols=120)):
img = img.to(self.args.device)
label = label.to(self.args.device)
loss = self.training_one_step(img, label)

self.tqdm_bar('train'.format(self.tfr, beta), pbar, loss.detach().cpu(), lr=self.scheduler.get_last_lr()[0])

if self.current_epoch % self.args.per_save == 0:
self.save(os.path.join(self.args.save_root, f"epoch={self.current_epoch}.ckpt"))

self.eval()
valid_loss = self.eval_stage()
self.current_epoch += 1
self.scheduler.step()

# record training details
recorder.train_detail.append([i, loss.item(), valid_loss.item(), self.scheduler.get_last_lr()[0]])

def training_one_step(self, img, label):
self.optim.zero_grad()

reconstructed_frames = self.forward(img, label)
loss = self.mse_criterion(reconstructed_frames, img)
loss.backward()
self.optimizer_step()

return loss

@torch.no_grad()
def eval_stage(self):
val_loader = self.val_dataloader()
for (img, label) in (pbar := tqdm(val_loader, ncols=120)):
img = img.to(self.args.device)
label = label.to(self.args.device)
loss = self.val_one_step(img, label)
self.tqdm_bar('val', pbar, loss.detach().cpu(), lr=self.scheduler.get_last_lr()[0])

return loss

def val_one_step(self, img, label):
reconstructed_frame = self.forward(img, label)
loss = self.mse_criterion(reconstructed_frame, img)
return loss

def train_dataloader(self):
# TODO
raise NotImplementedError
return train_loader

def val_dataloader(self):
# TODO
raise NotImplementedError
return val_loader

def tqdm_bar(self, mode, pbar, loss, lr):
pbar.set_description(f"({mode}) Epoch {self.current_epoch}, lr:{lr}" , refresh=False)
pbar.set_postfix(loss=float(loss), refresh=False)
pbar.refresh()

def save(self, path):
torch.save({
"state_dict": self.state_dict(),
"optimizer": self.state_dict(),
"lr" : self.scheduler.get_last_lr()[0],
"last_epoch": self.current_epoch
}, path)
print(f"save ckpt to {path}")

# incomplete function
def load_checkpoint(self):
if self.args.ckpt_path != None:
checkpoint = torch.load(self.args.ckpt_path)
self.load_state_dict(checkpoint['state_dict'], strict=True)
self.args.lr = checkpoint['lr']
self.tfr = checkpoint['tfr']

self.optim = optim.Adam(self.parameters(), lr=self.args.lr)
self.scheduler = optim.lr_scheduler.MultiStepLR(self.optim, milestones=[2, 4], gamma=0.1)
self.current_epoch = checkpoint['last_epoch']

def optimizer_step(self):
nn.utils.clip_grad_norm_(self.parameters(), 1.)
self.optim.step()

class Recorder():
def __init__(self, path='', sep='', recordname='train_details'):
self.save_path = self.increment_path(path, sep)
self.recorder = open(self.save_path+'\\'+recordname+'.txt', 'a')
self.train_detail = []
self.recordname = recordname

# save all the namespace parameters in the file which name "recordname".txt
def save_record(self, key, value):
self.recorder.write(key + ': ' + str(value))

# produce the increment path from the given path
# modify from YoloV7 source code
def increment_path(self, path, sep=''):
path = Path(path) # os-agnostic
dirs = glob.glob(f"{path}{sep}*") # similar paths
matches = [re.search(rf"%s{sep}(\d+)" % path.stem, d) for d in dirs]
i = [int(m.groups()[0]) for m in matches if m] # indices
n = max(i) + 1 if i else 2 # increment number
os.makedirs(f"{path}{sep}{n}")
return f"{path}{sep}{n}" # n, f"{path}{sep}{n}"

# save training detail as a csv file
def save_csv(self):
train_detail = np.array(recorder.train_detail)
train_detail = pd.DataFrame(train_detail, columns = ['Epoch','Train loss','Valid loss', 'LR'])
train_detail.to_csv(self.save_path+'\\'+self.recordname+'.csv',index=False)

def main(args):

global recorder
recorder = Recorder(path=args.save_root)
args.save_root = recorder.save_path

recorder.recorder.write(str(args))

model = VAE_Model(args).to(args.device)
model.load_checkpoint()
if args.test:
model.eval()
else:
model.training_stage()

recorder.save_csv()

def main(args):

# a global recorder thus can store data from another function
global recorder
recorder = Recorder(path=args.save_root)
args.save_root = recorder.save_path

# save all the args in txt file
recorder.recorder.write(str(args))

'''
things you want to do
'''
model = YOUR_MODEL(args).to(args.device)

# save the all the training details in csv file
recorder.save_csv()

if __name__ == '__main__':
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument('--batch_size', type=int, default=2)
parser.add_argument('--lr', type=float, default=0.001, help="initial learning rate")
parser.add_argument('--device', type=str, choices=["cuda", "cpu"], default="cuda")
parser.add_argument('--optim', type=str, choices=["Adam", "AdamW"], default="Adam")
parser.add_argument('--gpu', type=int, default=1)
parser.add_argument('--test', action='store_true')
parser.add_argument('--DR', type=str, required=True, help="Your Dataset Path")
parser.add_argument('--save_root', type=str, required=True, help="The path to save your data")
parser.add_argument('--num_epoch', type=int, default=70, help="number of total epoch")
parser.add_argument('--per_save', type=int, default=1, help="Save checkpoint every seted epoch")
parser.add_argument('--partial', type=float, default=1.0, help="Part of the training dataset to be trained")
parser.add_argument('--train_vi_len', type=int, default=16, help="Training video length")
parser.add_argument('--val_vi_len', type=int, default=630, help="valdation video length")
parser.add_argument('--frame_H', type=int, default=32, help="Height input image to be resize")
parser.add_argument('--frame_W', type=int, default=64, help="Width input image to be resize")

# Module parameters setting
parser.add_argument('--F_dim', type=int, default=128, help="Dimension of feature human frame")
parser.add_argument('--L_dim', type=int, default=32, help="Dimension of feature label frame")
parser.add_argument('--N_dim', type=int, default=12, help="Dimension of the Noise")
parser.add_argument('--D_out_dim', type=int, default=192, help="Dimension of the output in Decoder_Fusion")

args = parser.parse_args()

main(args)

Build network from scratch

拿 ResNet 18, 50, 152為例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import torch
import torch.nn as nn
import torch.nn.functional as F

# Conv2d, BN, ReLU
# final layer's feature map must be 7 x 7

class BasicBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1):
super(BasicBlock, self).__init__()

self.expansion = 1
self.downsample = nn.Sequential()
if in_channels != out_channels*self.expansion:
# identity needs to downsample if the dimension is not same
self.downsample = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True), # , eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
)

self.basicblock = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
nn.ReLU(inplace=True),

nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)

def forward(self, x):

identity = x
identity = self.downsample(x)

x = self.basicblock(x)
x += identity
x = F.relu(x)

return x

class BottleneckBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1):
super(BottleneckBlock, self).__init__()

self.expansion = 4
self.downsample = nn.Sequential()
if in_channels != out_channels*self.expansion:
# identity needs to downsample if the dimension is not same
self.downsample = nn.Sequential(
nn.Conv2d(in_channels, out_channels*self.expansion, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels*self.expansion, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True), #, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
)

self.bottleneckblock = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False),
nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
nn.ReLU(inplace=True),

nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
nn.ReLU(inplace=True),

nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, bias=False),
nn.BatchNorm2d(out_channels*self.expansion, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)

def forward(self, x):

identity = x
identity = self.downsample(x)

x = self.bottleneckblock(x)
x += identity
x = F.relu(x)

return x

class ResNet(nn.Module):

def __init__(self, modeltype=18, in_channels=3, n_class=2):
super(ResNet, self).__init__()

self.in_channels = 64
if modeltype == 18:
self.expansion = 1
else:
self.expansion = 4

self.in_layer = nn.Sequential(
nn.Conv2d(in_channels=in_channels, out_channels=self.in_channels, kernel_size=7, stride=2, padding=3, bias=False),
nn.BatchNorm2d(self.in_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)

self.layers = self._network(modeltype)

self.out_layer = nn.Sequential(
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(),
nn.Linear(512*self.expansion, n_class)
)

def _layer(self, out_channels, n_blocks, BlockType, stride):

layers = []

# first time fist block of each layer needs to
layers.append(BlockType(self.in_channels, out_channels, stride=stride))
self.in_channels = out_channels*self.expansion

for i in range(1, n_blocks):
layers.append(BlockType(self.in_channels, out_channels, stride=1))

return nn.Sequential(*layers)

def _network(self, modeltype):

if modeltype == 18:
layers = [self._layer(64, 2, BasicBlock, stride=1),
self._layer(128, 2, BasicBlock, stride=2),
self._layer(256, 2, BasicBlock, stride=2),
self._layer(512, 2, BasicBlock, stride=2)]
elif modeltype == 50:
layers = [self._layer(64, 3, BottleneckBlock, stride=1),
self._layer(128, 4, BottleneckBlock, stride=2),
self._layer(256, 6, BottleneckBlock, stride=2),
self._layer(512, 3, BottleneckBlock, stride=2)]
elif modeltype == 152:
layers = [self._layer(64, 3, BottleneckBlock, stride=1),
self._layer(128, 8, BottleneckBlock, stride=2),
self._layer(256, 36, BottleneckBlock, stride=2),
self._layer(512, 3, BottleneckBlock, stride=2)]

return nn.Sequential(*layers)

def forward(self, x):

x = self.in_layer(x)
x = self.layers(x)
x = self.out_layer(x)

return x

# test the model whether working correctly
model = ResNet(modeltype=18)
x = torch.randn(1, 3, 224, 224)
print(model)
print(model(x))

Some model template from scratch

ResNet 18, 50, 152

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class BasicBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1):
super(BasicBlock, self).__init__()

self.expansion = 1
self.downsample = nn.Sequential()
if in_channels != out_channels*self.expansion:
# identity needs to downsample if the dimension is not same
self.downsample = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True), # , eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
)

self.basicblock = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
nn.ReLU(inplace=True),

nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)

def forward(self, x):

identity = x
identity = self.downsample(x)

x = self.basicblock(x)
x += identity
x = F.relu(x)

return x

class BottleneckBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1):
super(BottleneckBlock, self).__init__()

self.expansion = 4
self.downsample = nn.Sequential()
if in_channels != out_channels*self.expansion:
# identity needs to downsample if the dimension is not same
self.downsample = nn.Sequential(
nn.Conv2d(in_channels, out_channels*self.expansion, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels*self.expansion, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True), #, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
)

self.bottleneckblock = nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False),
nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
nn.ReLU(inplace=True),

nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
nn.BatchNorm2d(out_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
nn.ReLU(inplace=True),

nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, bias=False),
nn.BatchNorm2d(out_channels*self.expansion, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)

def forward(self, x):

identity = x
identity = self.downsample(x)

x = self.bottleneckblock(x)
x += identity
x = F.relu(x)

return x

class ResNet(nn.Module):

def __init__(self, modeltype=18, in_channels=3, n_class=2):
super(ResNet, self).__init__()

self.in_channels = 64
if modeltype == 18:
self.expansion = 1
else:
self.expansion = 4

self.in_layer = nn.Sequential(
nn.Conv2d(in_channels=in_channels, out_channels=self.in_channels, kernel_size=7, stride=2, padding=3, bias=False),
nn.BatchNorm2d(self.in_channels, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)

self.layers = self._network(modeltype)

self.out_layer = nn.Sequential(
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(),
nn.Linear(512*self.expansion, n_class)
)

def _layer(self, out_channels, n_blocks, BlockType, stride):

layers = []

# first time fist block of each layer needs to
layers.append(BlockType(self.in_channels, out_channels, stride=stride))
self.in_channels = out_channels*self.expansion

for i in range(1, n_blocks):
layers.append(BlockType(self.in_channels, out_channels, stride=1))

return nn.Sequential(*layers)

def _network(self, modeltype):

if modeltype == 18:
layers = [self._layer(64, 2, BasicBlock, stride=1),
self._layer(128, 2, BasicBlock, stride=2),
self._layer(256, 2, BasicBlock, stride=2),
self._layer(512, 2, BasicBlock, stride=2)]
elif modeltype == 50:
layers = [self._layer(64, 3, BottleneckBlock, stride=1),
self._layer(128, 4, BottleneckBlock, stride=2),
self._layer(256, 6, BottleneckBlock, stride=2),
self._layer(512, 3, BottleneckBlock, stride=2)]
elif modeltype == 152:
layers = [self._layer(64, 3, BottleneckBlock, stride=1),
self._layer(128, 8, BottleneckBlock, stride=2),
self._layer(256, 36, BottleneckBlock, stride=2),
self._layer(512, 3, BottleneckBlock, stride=2)]

return nn.Sequential(*layers)

def forward(self, x):

x = self.in_layer(x)
x = self.layers(x)
x = self.out_layer(x)

return x

EEGNet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class EEGNet_ReLU(nn.Module):
def __init__(self):
super(EEGNet_ReLU, self).__init__()

self.firstconv = nn.Sequential(
nn.Conv2d(in_channels=1, out_channels=16, kernel_size=(1, 51), padding=(0, 25), bias=False ),
nn.BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
)

self.depthwiseConv = nn.Sequential(
nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(2, 1), stride=(1, 1), groups=16, bias=False),
nn.BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
nn.ReLU(),
nn.AvgPool2d(kernel_size=(1, 4), stride=(1, 4), padding=0),
nn.Dropout(p=0.2)
)

self.separableConv = nn.Sequential(
nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(1, 15), stride=(1, 1), padding=(0, 7), bias=False),
nn.BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
nn.ReLU(),
nn.AvgPool2d(kernel_size=(1, 8), stride=(1, 8), padding=0),
nn.Dropout(p=0.2)
)

self.classify = nn.Sequential(
#nn.Flatten(),
nn.Linear(in_features=32*1*23, out_features=2, bias=True)
)

def forward(self, x):
x = self.firstconv(x)
x = self.depthwiseConv(x)
x = self.separableConv(x)

x = x.view(x.size(0), -1)
x = self.classify(x)

return F.softmax(x, dim=1), x

DeepConvNet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class DeepConvNet_ReLU(nn.Module):
def __init__(self):
super(DeepConvNet_ReLU, self).__init__()

self.Conv2d_1 = nn.Sequential(
nn.Conv2d(in_channels=1, out_channels=25, kernel_size=(1, 5), bias=False),
nn.Conv2d(in_channels=25, out_channels=25, kernel_size=(2, 1), bias=False),
nn.BatchNorm2d(25, eps=1e-05, momentum=0.1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=(1, 2)),
nn.Dropout(p=0.35),
)

self.Conv2d_2 = nn.Sequential(
nn.Conv2d(in_channels=25, out_channels=50, kernel_size=(1, 5), bias=False),
nn.BatchNorm2d(50, eps=1e-05, momentum=0.1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=(1, 2)),
nn.Dropout(p=0.35),
)

self.Conv2d_3 = nn.Sequential(
nn.Conv2d(in_channels=50, out_channels=100, kernel_size=(1, 5), bias=False),
nn.BatchNorm2d(100, eps=1e-05, momentum=0.1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=(1, 2)),
nn.Dropout(p=0.35),
)

self.Conv2d_4 = nn.Sequential(
nn.Conv2d(in_channels=100, out_channels=200, kernel_size=(1, 5), bias=False),
nn.BatchNorm2d(200, eps=1e-05, momentum=0.1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=(1, 2)),
nn.Dropout(p=0.35),
)

self.classify = nn.Sequential(
nn.Flatten(),
nn.Linear(in_features=200*1*43, out_features=2, bias=True)
)

def forward(self, x):

x = self.Conv2d_1(x)
x = self.Conv2d_2(x)
x = self.Conv2d_3(x)
x = self.Conv2d_4(x)

#x = x.view(x.size(0), -1)
x = self.classify(x)

return F.softmax(x, dim=1), x