arkDLtools
version 0.24.7.8
Introduction
A toolkit for use with pytorch. Self-contained imperative programming, highly customizable, comes with its own instruction controller oriented to complex processes (partly multithreaded parallel, partly sequential serial, and able to be nested). Has built-in features to instantly visualize the training process (highly customizable), generate logs, auto-save, pause and resume training, automatic device selection, etc., and thanks to the use of multi-threading, it has no impact on the training speed at all.
一个搭配pytorch使用的工具箱。自带指令式编程,高度自定义,自带面向复杂流程(部分多线程并行,部分按顺序串行,且能嵌套)的指令控制器。具有即时性地可视化训练过程(高度自定义)、生成日志、自动保存、暂停和接续训练、自动选择device等内建功能,且得益于多线程的使用,它对训练速度完全无影响。
Docs(中文文档附在下方)
select_device(device='')
select_device(device='')
is used to select a device automatically, if not specified, it will be selected in the order of CUDA -> DirectML -> CPU
by default. After selection return device, device_name
returns the device object and device name.
supported_devices()
supported_devices()
returns a dictionary of all currently available devices, with key as a tuple such as ('DirectML', device_name)
and value as a device object.
class Controller()
class Controller()
is an instruction controller for complex processes (partly multithreaded parallel, partly sequential serial, and capable of nesting). First, you need to have an ops function that returns an instruction, which consists of a series of tuples (func, (args))
, where func is the function (operation) to be executed and args are its arguments. The instruction as a whole is wrapped in nestable {}
(a dictionary, where the key can be taken at random and the value is a tuple) or []
, where {}
means that the operations in the dictionary are processed in parallel, and []
means that the operations in the list are processed serially in order. The latter Marker
object comes with some oops that can be modeled after them as examples:
def stdSTART(self,
xlabel: str='epoch', maxmin_columns: list=[('val_loss', -1), ('val_accuracy', 1)],
y_axes: list=[('loss', (0, 1.01, 0.05)), ('accuracy', (0, 1.01, 0.05))],
plots: dict={'loss': [('train_loss', 'red'), ('val_loss', 'blue')], 'accuracy': (('val_accuracy', 'orange'), ('train_accuracy', 'green'))}):
return [
(self.add_log, ("Training has begun!\n",)),
(self.render, (xlabel, y_axes, plots)),
(self.print_log, ()),
(self.print_svg, ()),
]
It has the following methods:
register(self, name:str, ops:types.FunctionType, **fixed_args)
Registers the ops function described earlier as a directive, with name being the name you give it, and **fixed_args allowing you to fix a portion of the arguments when registering the directive, so that later calls to the directive This allows you to register the directive with a fixed number of arguments that will be automatically filled in when the directive is called later, saving you from having to type redundant arguments over and over again when calling the directive.oprs(self, name, **kwargs)
Calls the directive and combines the temporary arguments of your current call with the arguments fixed in the directive, returning the full oprs object.run(self, ops)
Executes the oprs object by rule, {}
for operations in the dictionary processed in parallel, and []
for operations in the list processed serially in order.
class Marker
class Marker()
is a logger and exporter with highly customizable visualization of the training process, generation of training history in json format, automatic saving of checkpoints, loading of checkpoints and successive training, and other tools that greatly increase the efficiency of deep learning work. It has the following basic methods built in, as well as some built-in ops command functions:
These features come with default parameters and can be used conveniently and directly if no customization is required.
__init__(self, makedir: bool=True, workdir: str=os.getcwd()+'/', epochs: int=3, columns: list=None, **hyperara)
Initialize Marker
, makedir
indicates whether to create a new directory to store the training logs; if makedir, the root directory is specified using the workdir
parameter, which defaults to the current script directory; epochs
is the maximum number of epochs to be logged for this session; columns
are the names of the columns of self.df
to be created, often also represents what data will be logged; **hyperara
is the hyperparameters and any other introductions to be appended to the logs.json
file that will be generated.
clear(self)
clears the output areaadd_log(self, s="")
Defaults to adding the results of this epoch to the end of the log string, if customized, use the argument sprint_log(self)
Print out the log string.print_svg(self)
Show visual graphs in svg format.mark(self, mark_value: dict)
logs the data in marker_value
into self.df
maxmin(self, columns: list=[('val_loss', -1), ('val_accuracy', 1)])
Calculates the maximum and minimum values of a selected number of fields in self.df
, with 1 being max and -1 being min, and the return value being a dictionary with the key as a prefixed string, such as min_val_loss
.render
render(self, xlabel: str='epoch',
y_axes: list=[('loss', (0, 1.01, 0.05)), ('accuracy', (0, 1.01, 0.05))],
plots: dict={'loss': [('train_loss', 'red'), ('val_loss', 'blue')], 'accuracy': (('val_accuracy', 'orange'), ('train_accuracy', 'green'))})
Renders the visualization on-the-fly as you describe it, xlabel
is the label of the x-axis; y_axes
is a list describing single or multiple y-axes, each tuple represents a y-axis, the first position of the tuple is the label of the axis, the second position is the (start point, end point, interval)
of the y-axis scale; plots
is a dictionary describing single or multiple curves, each key indicates which y-axis to plot to, and each value is a list of tuples with (curve name, curve color)
.
cret_json(self, device_name, env_info, dicts={})
Generate logs.json
, dicts
is the json key you want to attach.cret_check(self, state_dict)
Generate checkpoints, state_dict
is a dictionary of the current training state, which can contain model parameters, optimizer state, etc. as appropriate.seek_check(self, epoch, from_dir="")
Search for the first epoch from from_dir
, if the parameter is null, the default is to look in the log directory from the current training, if more than one is found that meets the requirements, the terminal prints the options, using input to select. The return value is a tuple, the first position is the path to the corresponding logs.json
, the second position is the path to the checkpoint found and selected.continu
continu(self, from_epoch=None, from_dir="", newdir=True, workdir="", epochs=0, columns: list=None, **hyperpara)
Load checkpoints and pick up training. First search for from_epoch
(int
) from from_dir
, then something like __init__()
; newdir
indicates whether to create a new directory to store the training logs; if newdir, the root directory is specified using the workdir
parameter, which defaults to the current script directory, otherwise it's from_dir
where the Continue to save the logs; epochs
is the maximum number of epochs for this logging; columns
are the names of the columns of self.df
to be created, often also representing what data is to be logged; and **hyperara
is the hyperparameters and any other introductions to be appended to the logs.json
file that will be generated. The return values are tuples, with the first position being the corresponding logs.json
loaded into a dictionary, and the second position being the torch.load
over checkpoint.
The following are the built-in oops command functions:
def stdIO(self, mark_value: dict={}, state_dict: dict={},
device_name: str='', env_info: str='',
xlabel: str='epoch', maxmin_columns: list=[('val_loss', -1), ('val_accuracy', 1)],
y_axes: list=[('loss', (0, 1.01, 0.05)), ('accuracy', (0, 1.01, 0.05))],
plots: dict={'loss': [('train_loss', 'red'), ('val_loss', 'blue')], 'accuracy': (('val_accuracy', 'orange'), ('train_accuracy', 'green'))}):
return {1:[
(self.mark, (mark_value,)),
(self.maxmin, (maxmin_columns,)),
(self.add_log, ()),
{
1:(self.cret_check, (state_dict,)),
2:(self.cret_json, (device_name, env_info, dict(xlabel=xlabel, y_axes=y_axes, plots=plots))),
3:(self.render, (xlabel, y_axes, plots)),
4:(self.clear, ())
},
(self.print_log, ()),
(self.print_svg, ()),
]}
Each iteration completion call in training, the outermost peripheral {}
, indicating that this instruction stdIO
does not block the training process of the main thread, the instruction, firstly, executes self.mark
to record the mark_value
, then executes self.maxmin
to calculate the required maximum value, then executes self.add_log
to add the mark down to add the data to the end of the log string in a format; next, process self.cret_check
, self.cret_json
, self.render
, and self.clear
in parallel, logging the checkpoints at the same time, generating logs.json
, rendering the visualization chart, and clearing the last time's cell output; and finally, serial execution of self.print_log
, self.print_svg
, displaying the log string and the visualization chart.
def stdSTART(self,
xlabel: str='epoch', maxmin_columns: list=[('val_loss', -1), ('val_accuracy', 1)],
y_axes: list=[('loss', (0, 1.01, 0.05)), ('accuracy', (0, 1.01, 0.05))],
plots: dict={'loss': [('train_loss', 'red'), ('val_loss', 'blue')], 'accuracy': (('val_accuracy', 'orange'), ('train_accuracy', 'green'))}):
return [
(self.add_log, ("Training has begun!\n",)),
(self.render, (xlabel, y_axes, plots)),
(self.print_log, ()),
(self.print_svg, ()),
]
Fully serial execution, self.add_log
adds "Training has begun!\n"
to the end of the log string, self.render
renders an empty picture, self.print_log
, self.print_svg
display the log string and the visualization chart respectively.
def stdEND(self, device_name: str='', env_info: str='',
xlabel: str='epoch', maxmin_columns: list=[('val_loss', -1), ('val_accuracy', 1)],
y_axes: list=[('loss', (0, 1.01, 0.05)), ('accuracy', (0, 1.01, 0.05))],
plots: dict={'loss': [('train_loss', 'red'), ('val_loss', 'blue')], 'accuracy': (('val_accuracy', 'orange'), ('train_accuracy', 'green'))}):
return [
(self.add_log, (f"Training has done using {self.df['pre_time'][self.from_epoch:].sum():.2f}s-{self.df['train_time'][self.from_epoch:].sum():.2f}s-{self.df['aft_time'][self.from_epoch:].sum():.2f}s.\nmin of validate_loss is [Epoch {self.maxmin_value['min_val_loss'][0]}] {self.maxmin_value['min_val_loss'][1]}; max of validate_accuracy is [Epoch {self.maxmin_value['max_val_accuracy'][0]}] {self.maxmin_value['max_val_accuracy'][1]}.\n", )),
(self.clear, ()),
(self.print_log, ()),
(self.cret_json, (device_name, env_info, dict(xlabel=xlabel, y_axes=y_axes, plots=plots))),
(self.print_svg, ()),
(os.rename, (os.path.join(self.workdir, self.begin_timestr, 'logs.json'), os.path.join(self.workdir, self.begin_timestr, f"{self.df['time_str'].iloc[-1]}-logs.json"))),
(os.rename, (os.path.join(self.workdir, self.begin_timestr, 'epochs.svg'), os.path.join(self.workdir, self.begin_timestr, f"{self.df['time_str'].iloc[-1]}-epochs.svg"))),
]
Sweep at the end of training, serial processing, self.add_log
adds Training has done using ..."
and other information to the end of the log string, then sequentially clear the cell output, output the log string, generate logs.json
(if there is an existing history of logs.json
in successive training, the old one will be renamed), and output the visualization chart (similarly, if there is an existing history of epochs.svg
in successive training, the old one will be renamed) .
Docs in Chinese
select_device(device='')
select_device(device='')
用于自动选择一个device,若无指定,则默认按CUDA -> DirectML -> CPU
的顺序选择。选择完毕后return device, device_name
返回device对象和设备名称。
supported_devices()
supported_devices()
返回当前所有可用设备的字典,key为如('DirectML', device_name)
的元组,value为device对象。
class Controller()
class Controller()
是一个面向复杂流程(部分多线程并行,部分按顺序串行,且能嵌套)的指令控制器。首先,你需要有一个ops函数,它返回一个指令,指令由一系列元组(func, (args))
构成,其中func是要被执行的函数(操作),args是它的参数。指令整体由可嵌套的{}
(字典,key可以随便取,value是元组)或[]
包裹,其中{}
代表字典中的操作并行处理,[]
代表列表中的操作按顺序串行处理。后面的Marker
对象中自带了一些ops,可以仿照它们作为示例:
def stdSTART(self,
xlabel: str='epoch', maxmin_columns: list=[('val_loss', -1), ('val_accuracy', 1)],
y_axes: list=[('loss', (0, 1.01, 0.05)), ('accuracy', (0, 1.01, 0.05))],
plots: dict={'loss': [('train_loss', 'red'), ('val_loss', 'blue')], 'accuracy': (('val_accuracy', 'orange'), ('train_accuracy', 'green'))}):
return [
(self.add_log, ("Training has begun!\n",)),
(self.render, (xlabel, y_axes, plots)),
(self.print_log, ()),
(self.print_svg, ()),
]
它有以下方法:
以下功能皆自带默认参数,若无自定义需求,可开箱即用。
register(self, name:str, ops:types.FunctionType, **fixed_args)
将前面说的ops函数注册成为指令,name是您给它取的名字,**fixed_args则允许您在注册该指令时固定一部分参数,以后调用指令时会自动填入,免得调用指令时反复键入冗余的参数。oprs(self, name, **kwargs)
调用指令,并将您本次调用的临时参数与固定在指令中的参数合并,返回完整的oprs对象。run(self, ops)
按规则执行oprs对象,{}
代表字典中的操作并行处理,[]
代表列表中的操作按顺序串行处理。
class Marker()
class Marker()
是一个日志记录和输出器,具有高度自定义的可视化训练过程、生成json格式的训练历史、自动保存检查点、加载检查点并接续训练等极大增加深度学习工作效率的工具。它内置了以下基础方法,还有一些内置的ops指令函数:
__init__(self, makedir: bool=True, workdir: str=os.getcwd()+'/', epochs: int=3, columns: list=None, **hyperara)
初始化Marker
,makedir
表示是否创建新目录来存储训练日志;若makedir,则使用workdir
参数指定根目录,默认为当前脚本目录;epochs
是本次记录的最大epoch数;columns
是要创建的self.df
的列名称,往往也代表了要记录下来哪些数据;**hyperara
是要附加到即将生成的logs.json
文件中的超参数和其它任何介绍。
clear(self)
清除输出区add_log(self, s="")
默认将本epoch的结果添加到log字符串的末尾,若自定义,可使用参数sprint_log(self)
将log字符串打印出来print_svg(self)
将svg格式的可视化图表展示出来mark(self, mark_value: dict)
将marker_value
中的数据记录到self.df
中maxmin(self, columns: list=[('val_loss', -1), ('val_accuracy', 1)])
计算self.df
中选定的若干字段的最大最小值,1表示max,-1表示min,返回值是字典,key为加了前缀的字符串,例如min_val_loss
。render
render(self, xlabel: str='epoch',
y_axes: list=[('loss', (0, 1.01, 0.05)), ('accuracy', (0, 1.01, 0.05))],
plots: dict={'loss': [('train_loss', 'red'), ('val_loss', 'blue')], 'accuracy': (('val_accuracy', 'orange'), ('train_accuracy', 'green'))})
按您的描述即时渲染可视化的图表,xlabel
是x轴的标签;y_axes
是描述单个或多个y轴的列表,每个元组表示一条y轴,元组第一个位置为该轴的标签,第二个位置是y轴刻度的(起点,终点,间隔)
;plots
是描述单条或多条曲线的字典,每个key表示绘制到哪条y轴,每个value是一个列表,列表中的元组为(曲线名称,曲线颜色)
。
cret_json(self, device_name, env_info, dicts={})
生成logs.json
,dicts
是想要附加的json key。cret_check(self, state_dict)
生成检查点,state_dict
是当前训练状态的字典,可以根据实际情况包含模型参数、优化器状态等。seek_check(self, epoch, from_dir="")
从from_dir
中搜索第几个epoch,若参数为空,默认从当前训练中的日志目录里找,若找到多个符合要求的,则会终端打印选项,使用input选择。返回值是元组,第一个位置是对应的logs.json
的路径,第二个位置是找到并选择的checkpoint的路径。continu
continu(self, from_epoch=None, from_dir="", newdir=True, workdir="", epochs=0, columns: list=None, **hyperpara)
加载检查点,并接续训练。先从from_dir
中搜索from_epoch
(int
),然后类似__init__()
,newdir
表示是否创建新目录来存储训练日志;若newdir,则使用workdir
参数指定根目录,默认为当前脚本目录,否则就在from_dir
中继续保存日志;epochs
是本次记录的最大epoch数;columns
是要创建的self.df
的列名称,往往也代表了要记录下来哪些数据;**hyperara
是要附加到即将生成的logs.json
文件中的超参数和其它任何介绍。返回值是元组,第一个位置是对应的logs.json
加载成的字典,第二个位置是torch.load
过的checkpoint。
以下是内置的ops指令函数:
def stdIO(self, mark_value: dict={}, state_dict: dict={},
device_name: str='', env_info: str='',
xlabel: str='epoch', maxmin_columns: list=[('val_loss', -1), ('val_accuracy', 1)],
y_axes: list=[('loss', (0, 1.01, 0.05)), ('accuracy', (0, 1.01, 0.05))],
plots: dict={'loss': [('train_loss', 'red'), ('val_loss', 'blue')], 'accuracy': (('val_accuracy', 'orange'), ('train_accuracy', 'green'))}):
return {1:[
(self.mark, (mark_value,)),
(self.maxmin, (maxmin_columns,)),
(self.add_log, ()),
{
1:(self.cret_check, (state_dict,)),
2:(self.cret_json, (device_name, env_info, dict(xlabel=xlabel, y_axes=y_axes, plots=plots))),
3:(self.render, (xlabel, y_axes, plots)),
4:(self.clear, ())
},
(self.print_log, ()),
(self.print_svg, ()),
]}
训练中每次迭代完成调用,最外围套一个{}
,表示这个指令stdIO
不阻塞主线程的训练过程,指令中,首先执行self.mark
记录mark_value
,然后执行self.maxmin
计算要求的最值,然后执行self.add_log
将mark下来的数据按格式添加到log字符串的末尾;接着,并行处理self.cret_check
,self.cret_json
,self.render
,self.clear
,在同一时间内记录检查点,生成logs.json
,渲染可视化图表,以及清空上一次的单元格输出;最后,串行执行self.print_log
,self.print_svg
,展示log字符串和可视化图表。
def stdSTART(self,
xlabel: str='epoch', maxmin_columns: list=[('val_loss', -1), ('val_accuracy', 1)],
y_axes: list=[('loss', (0, 1.01, 0.05)), ('accuracy', (0, 1.01, 0.05))],
plots: dict={'loss': [('train_loss', 'red'), ('val_loss', 'blue')], 'accuracy': (('val_accuracy', 'orange'), ('train_accuracy', 'green'))}):
return [
(self.add_log, ("Training has begun!\n",)),
(self.render, (xlabel, y_axes, plots)),
(self.print_log, ()),
(self.print_svg, ()),
]
完全串行执行,self.add_log
将"Training has begun!\n"
添加到log字符串的末尾,self.render
渲染一幅空画,self.print_log
,self.print_svg
分别展示log字符串和可视化图表。
def stdEND(self, device_name: str='', env_info: str='',
xlabel: str='epoch', maxmin_columns: list=[('val_loss', -1), ('val_accuracy', 1)],
y_axes: list=[('loss', (0, 1.01, 0.05)), ('accuracy', (0, 1.01, 0.05))],
plots: dict={'loss': [('train_loss', 'red'), ('val_loss', 'blue')], 'accuracy': (('val_accuracy', 'orange'), ('train_accuracy', 'green'))}):
return [
(self.add_log, (f"Training has done using {self.df['pre_time'][self.from_epoch:].sum():.2f}s-{self.df['train_time'][self.from_epoch:].sum():.2f}s-{self.df['aft_time'][self.from_epoch:].sum():.2f}s.\nmin of validate_loss is [Epoch {self.maxmin_value['min_val_loss'][0]}] {self.maxmin_value['min_val_loss'][1]}; max of validate_accuracy is [Epoch {self.maxmin_value['max_val_accuracy'][0]}] {self.maxmin_value['max_val_accuracy'][1]}.\n", )),
(self.clear, ()),
(self.print_log, ()),
(self.cret_json, (device_name, env_info, dict(xlabel=xlabel, y_axes=y_axes, plots=plots))),
(self.print_svg, ()),
(os.rename, (os.path.join(self.workdir, self.begin_timestr, 'logs.json'), os.path.join(self.workdir, self.begin_timestr, f"{self.df['time_str'].iloc[-1]}-logs.json"))),
(os.rename, (os.path.join(self.workdir, self.begin_timestr, 'epochs.svg'), os.path.join(self.workdir, self.begin_timestr, f"{self.df['time_str'].iloc[-1]}-epochs.svg"))),
]
训练结束时扫尾,串行处理,self.add_log
将Training has done using ..."
等信息添加到log字符串的末尾,然后依次清空单元格输出,输出log字符串,生成logs.json
(若是在接续训练中,已有历史的logs.json
,会将旧的重命名),输出可视化图表(同理,若是在接续训练中,已有历史的epochs.svg
,会将旧的重命名)。
Usage Example (使用示例)
in main()
if __name__ == "__main__":
epochs = 20
marker = Marker(workdir=os.getcwd()+"/trainlog/", epochs=epochs,
columns=['train_loss', 'val_loss', 'accuracy', 'precision', 'recall', 'f1_score', 'pre_time', 'train_time', 'aft_time'],
model='LiHuNet', optim='Lion', dataset='test', num_workers=num_workers, batch_size=batch_size,
embedding_model = embedding_model, embedding_size=embedding_size, learning_rate=lr, weight_decay=wd)
controller = Controller()
controller.register('stdSTART', marker.stdSTART, **figure_args)
controller.register('stdIO', MyIO,
marker=marker, device_name=device_name, env_info=env_info, **figure_args)
train(device=device, marker=marker, controller=controller, model=model, optimizer=optimizer, Loss=F.binary_cross_entropy,
train_loader=train_loader, val_loader=val_loader, epochs=epochs)
controller.register('stdEND', marker.stdEND,
device_name=device_name, env_info=env_info, **figure_args)
controller.run(MyEND(marker=marker, **figure_args))
in train()
import time
def train(device, marker, controller, model, optimizer, Loss=F.cross_entropy, Metrics=metrics,
train_loader=None, val_loader=None, epochs=10):
end = marker.begin_time
model.train()
controller.run(controller.oprs('stdSTART'))
for epoch in range(epochs):
train_loss = 0; train_metric = 0
main_begin = time.time()
pre_time = main_begin - end
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = Loss(outputs, labels)
train_loss += loss.item()
loss.backward()
optimizer.step()
main_end = time.time()
val_loss, val_accuracy, val_precision, val_recall, val_f1 = validate(model, Loss, Metrics, val_loader)
end = time.time()
mark_value=dict(pre_time=pre_time, train_time=main_end - main_begin, aft_time=end - main_end,
train_loss=train_loss / len(train_loader), val_loss=val_loss, accuracy=val_accuracy,
precision=val_precision, recall=val_recall, f1_score=val_f1)
state_dict=dict(model_state_dict=model.state_dict(), optim_state_dict=optimizer.state_dict())
oprs = controller.oprs('stdIO', mark_value=mark_value, state_dict=state_dict)
if epoch < epochs-2:
controller.run(oprs)
else:
controller.run([oprs])
Update Logs (更新日志)
- 0.24.7.8
- fixed bug: Marker.continu
- self.df = self.df.loc[1:self. from_epoch+1, :] ->
self.df = self.df.loc[1:self.from_epoch, :]
- fixed bug: Marker.continu
- self.df = self.df.loc[1:self.from_epoch, :] ->
- self.df = self.df.sort_index().loc[1:self.from_epoch, :]
- 0.24.5.2 update torch, ipython -> try import
- 0.24.5.0 update plots loc 2 -> 4
- 0.24.4.3 initial