IPython(jupyter)中的常用工具

ipython是一个python的交互式shell,比默认的python shell好用得多,支持变量自动补全,自动缩进,支持bash shell命令,内置了许多很有用的功能和函数。学习ipython将会让我们以一种更高的效率来使用python。同时它也是利用Python进行科学计算和交互可视化的一个最佳的平台。

IPython提供了两个主要的组件:

1.一个强大的python交互式shell
2.供Jupyter notebooks使用的一个Jupyter内核(IPython notebook)

IPython的主要功能如下:

1.运行ipython控制台
2.使用ipython作为系统shell
3.使用历史输入(history)
4.Tab补全
5.使用%run命令运行脚本
6.使用%timeit命令快速测量时间
7.使用%pdb命令快速debug
8.使用pylab进行交互计算
9.使用IPython Notebook

Tab键自动补全

在shell中输入表达式时,只要按下Tab键,当前命名空间中任何与输入的字符串相匹配的变量(对象或者函数等)就会被找出来

 内省

在变量的前面或者后面加上一个问号?,就可以将有关该对象的一些通用信息显示出来,这就叫做对象的内省

如果对象是一个函数或者实例方法,则它的docstring也会被显示出来

使用历史命令history

在IPython shell中,使用历史命令可以简单地使用上下翻页键即可,另外我们也可以使用hist命令(或者history命令)查看所有的历史输入。(正确的做法是使用%hist,在这里,%hist也是一个魔法命令)

使用%run命令运行脚本

在ipython会话环境中,所有文件都可以通过%run命令当做Python程序来运行,输入%run 路径+python文件名称即可

使用%timeit命令快速测量代码运行时间

在一个交互式会话中,我们可以使用%timeit魔法命令快速测量代码运行时间。相同的命令会在一个循环中多次执行,多次运行时长的平均值作为该命令的最终评估时长。-n 选项可以控制命令在单词循环中执行的次数,-r选项控制执行循环的次数。

使用%debug命令进行快速debug

ipython带有一个强大的调试器。无论何时控制台抛出了一个异常,我们都可以使用%debug魔法命令在异常点启动调试器。接着你就能调试模式下访问所有的本地变量和整个栈回溯。使用ud向上和向下访问栈,使用q退出调试器。在调试器中输入?可以查看所有的可用命令列表。

 在IPython中使用系统shell

我们可以在IPython中直接使用系统shell,并获取读取结果作为一个Python字符串列表。为了实现这种功能,我们需要使用感叹号!作为shell命令的前缀。比如现在在我的windows系统中,直接在IPython中ping百度

点:display 模块

官方教程 https://ipython.readthedocs.io/en/stable/api/generated/IPython.display.html

之前想要在jupyter中显示图像 、视频 or voice、html,可能不知道该怎么办,有了IP display模块,可以解决该问题。

1、audio

from IPython.display import Audio,display
sound_file = '../taobao427.mp3'
display(Audio(sound_file))

2、ipython.display.image



from IPython.display import display, Image

path = "1.jpg"

display( Image( filename = path) )

3、播放视频

from IPython.display import clear_output,  display, HTML
from PIL import Image
import matplotlib.pyplot as plt
import time
import cv2
import os

def show_video(video_path:str,small:int=2):
    if not os.path.exists(video_path):
        print("视频文件不存在")
    video = cv2.VideoCapture(video_path)
    current_time = 0
    while(True):
        try:
            clear_output(wait=True)
            ret, frame = video.read()
            if not ret:
                break
            lines, columns, _ = frame.shape
            #########do img preprocess##########
            
            # 画出一个框
            #     cv2.rectangle(img, (500, 300), (800, 400), (0, 0, 255), 5, 1, 0)
             # 上下翻转
             # img= cv2.flip(img, 0)
            
            ###################################
            
            if current_time == 0:
                current_time = time.time()
            else:
                last_time = current_time
                current_time = time.time()
                fps = 1. / (current_time - last_time)
                text = "FPS: %d" % int(fps)
                cv2.putText(frame, text , (0,100), cv2.FONT_HERSHEY_TRIPLEX, 3.65, (255, 0, 0), 2)
                
          #     img = cv2.resize(img,(1080,1080))
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = cv2.resize(frame, (int(columns / small), int(lines / small)))

            img = Image.fromarray(frame)

            display(img)
            # 控制帧率
            time.sleep(0.02)
        except KeyboardInterrupt:
            video.release()

4、htlm(视频)

# ########## display
from IPython.display import display, HTML

html_str = '''
<video controls width=\"500\" height=\"500\" src=\"{}\">animation</video>
'''.format("./dataset/vid****8726.mp4")
print(html_str)
display(HTML(html_str))

5、插入参考的网页或者论文 iframe

from IPython.display import IFrame IFrame(src='https://www.baidu.com/',width=800,height=500)

from IPython.display import HTML
HTML("""

Example Domain

This domain is for use in illustrative examples in documents. You may use this domain in literature without prior coordination or asking for permission.

More information...

""")

6、插入iframe标签

from IPython.display import HTML
HTML('')

Pytorch Image Models –timm快速使用

原文:Getting Started with PyTorch Image Models (timm): A Practitioner’s Guide – 2022.02.02

中文教程: https://www.aiuai.cn/aifarm1967.html

Github: rwightman/pytorch-image-models

PyTorch Image Models(timm) 是一个优秀的图像分类 Python 库,其包含了大量的图像模型(Image Models)、Optimizers、Schedulers、Augmentations 等等.里面提供了许多计算机视觉的SOTA模型,可以当作是torchvision的扩充版本,并且里面的模型在准确度上也较高。

timm 提供了参考的 training 和 validation 脚本,用于复现在 ImageNet 上的训练结果;以及更多的 官方文档 和 timmdocs project.

timm的安装

关于timm的安装,我们可以选择以下两种方式进行:

  1. 通过pip安装
pip install timm
  1. 通过git与pip进行安装
git clone https://github.com/rwightman/pytorch-image-models
cd pytorch-image-models && pip install -e .

如何查看预训练模型种类

  1. 查看timm提供的预训练模型 截止到2022.3.27日为止,timm提供的预训练模型已经达到了592个,我们可以通过timm.list_models()方法查看timm提供的预训练模型(注:本章测试代码均是在jupyter notebook上进行)
import timm
avail_pretrained_models = timm.list_models(pretrained=True)
len(avail_pretrained_models)
  1. 查看特定模型的所有种类 每一种系列可能对应着不同方案的模型,比如Resnet系列就包括了ResNet18,50,101等模型,我们可以在timm.list_models()传入想查询的模型名称(模糊查询),比如我们想查询densenet系列的所有模型。
all_densnet_models = timm.list_models("*densenet*")
all_densnet_models

我们发现以列表的形式返回了所有densenet系列的所有模型。

['densenet121',
 'densenet121d',
 'densenet161',
 'densenet169',
 'densenet201',
 'densenet264',
 'densenet264d_iabn',
 'densenetblur121d',
 'tv_densenet121']
  1. 查看模型的具体参数 当我们想查看下模型的具体参数的时候,我们可以通过访问模型的default_cfg属性来进行查看,具体操作如下
model = timm.create_model('resnet34',num_classes=10,pretrained=True)
model.default_cfg
{'url': 'https://github.com/rwightman/pytorch-image-models/releases/download/v0.1-weights/resnet34-43635321.pth',
 'num_classes': 1000,
 'input_size': (3, 224, 224),
 'pool_size': (7, 7),
 'crop_pct': 0.875,
 'interpolation': 'bilinear',
 'mean': (0.485, 0.456, 0.406),
 'std': (0.229, 0.224, 0.225),
 'first_conv': 'conv1',
 'classifier': 'fc',
 'architecture': 'resnet34'}

除此之外,我们可以通过访问这个链接 查看提供的预训练模型的准确度等信息。

使用和修改预训练模型

在得到我们想要使用的预训练模型后,我们可以通过timm.create_model()的方法来进行模型的创建,我们可以通过传入参数pretrained=True,来使用预训练模型。同样的,我们也可以使用跟torchvision里面的模型一样的方法查看模型的参数,类型/

import timm
import torch

model = timm.create_model('resnet34',pretrained=True)
x = torch.randn(1,3,224,224)
output = model(x)
output.shape
torch.Size([1, 1000])
  • 查看某一层模型参数(以第一层卷积为例)
model = timm.create_model('resnet34',pretrained=True)
list(dict(model.named_children())['conv1'].parameters())
[Parameter containing:
 tensor([[[[-2.9398e-02, -3.6421e-02, -2.8832e-02,  ..., -1.8349e-02,
            -6.9210e-03,  1.2127e-02],
           [-3.6199e-02, -6.0810e-02, -5.3891e-02,  ..., -4.2744e-02,
            -7.3169e-03, -1.1834e-02],
            ...
           [ 8.4563e-03, -1.7099e-02, -1.2176e-03,  ...,  7.0081e-02,
             2.9756e-02, -4.1400e-03]]]], requires_grad=True)]
            
  • 修改模型(将1000类改为10类输出)
model = timm.create_model('resnet34',num_classes=10,pretrained=True)
x = torch.randn(1,3,224,224)
output = model(x)
output.shape
torch.Size([1, 10])
  • 改变输入通道数(比如我们传入的图片是单通道的,但是模型需要的是三通道图片) 我们可以通过添加in_chans=1来改变
model = timm.create_model('resnet34',num_classes=10,pretrained=True,in_chans=1)
x = torch.randn(1,1,224,224)
output = model(x)

模型的保存

timm库所创建的模型是torch.model的子类,我们可以直接使用torch库中内置的模型参数保存和加载的方法,具体操作如下方代码所示

torch.save(model.state_dict(),'./checkpoint/timm_model.pth')
model.load_state_dict(torch.load('./checkpoint/timm_model.pth'))

使用示例

# replace
# optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# with
optimizer = timm.optim.AdamP(model.parameters(), lr=0.01)

for epoch in num_epochs:
    for batch in training_dataloader:
        inputs, targets = batch
        outputs = model(inputs)
        loss = loss_function(outputs, targets)

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
        
#
optimizer = timm.optim.Adahessian(model.parameters(), lr=0.01)

is_second_order = (
    hasattr(optimizer, "is_second_order") and optimizer.is_second_order
)  # True

for epoch in num_epochs:
    for batch in training_dataloader:
        inputs, targets = batch
        outputs = model(inputs)
        loss = loss_function(outputs, targets)

        loss.backward(create_graph=second_order)
        optimizer.step()
        optimizer.zero_grad()

神经网络和相关算法的简单 PyTorch 实现

github地址:

https://github.com/labmlai/annotated_deep_learning_paper_implementations

这是神经网络和相关算法的简单 PyTorch 实现的集合。这些实现与解释一起记录,

该网站 将这些呈现为并排格式化的注释。我们相信这些将帮助您更好地理解这些算法。

截屏

我们几乎每周都在积极维护这个 repo 并添加新的实现。 更新。

模块:

✨ Transformers

✨ Recurrent Highway Networks

✨ LSTM

✨ HyperNetworks – HyperLSTM

✨ ResNet

✨ ConvMixer

✨ Capsule Networks

✨ Generative Adversarial Networks

✨ Diffusion models

✨ Sketch RNN

✨ Graph Neural Networks

✨ Counterfactual Regret Minimization (CFR)

Solving games with incomplete information such as poker with CFR.

✨ Reinforcement Learning

✨ Optimizers

✨ Normalization Layers

✨ Distillation

✨ Adaptive Computation

✨ Uncertainty

Installation

pip install labml-nn

强化学习:

代码学习网站:

教程:https://stable-baselines.readthedocs.io/en/master/guide/examples.html

gym使用

在做rl时候 ,如何利用gym将动画动起来,让每一步训练过程可视化:

例程:

import gym

from stable_baselines import DQN
from stable_baselines.common.evaluation import evaluate_policy


# Create environment
env = gym.make('LunarLander-v2')

# Instantiate the agent
model = DQN('MlpPolicy', env, learning_rate=1e-3, prioritized_replay=True, verbose=1)
# Train the agent
model.learn(total_timesteps=int(2e5))
# Save the agent
model.save("dqn_lunar")
del model  # delete trained model to demonstrate loading

# Load the trained agent
model = DQN.load("dqn_lunar")

# Evaluate the agent
mean_reward, std_reward = evaluate_policy(model, model.get_env(), n_eval_episodes=10)

# Enjoy trained agent
obs = env.reset()
for i in range(1000):
    action, _states = model.predict(obs)
    obs, rewards, dones, info = env.step(action)
    env.render()
https://cdn-images-1.medium.com/max/960/1*h4WTQNVIsvMXJTCpXm_TAw.gif
from stable_baselines.common.cmd_util import make_atari_env
from stable_baselines.common.vec_env import VecFrameStack
from stable_baselines import ACER

# There already exists an environment generator
# that will make and wrap atari environments correctly.
# Here we are also multiprocessing training (num_env=4 => 4 processes)
env = make_atari_env('PongNoFrameskip-v4', num_env=4, seed=0)
# Frame-stacking with 4 frames
env = VecFrameStack(env, n_stack=4)

model = ACER('CnnPolicy', env, verbose=1)
model.learn(total_timesteps=25000)

obs = env.reset()
while True:
    action, _states = model.predict(obs)
    obs, rewards, dones, info = env.step(action)
    env.render()
https://cdn-images-1.medium.com/max/960/1*UHYJE7lF8IDZS_U5SsAFUQ.gif

bug解决:

在执行时:

import gym
env = gym.make('ALE/Pong-v5')
env.reset()

for i in range(1000):
    env.step(env.action_space.sample())
    env.render()
env.close()

输出,无法对fream进行渲染。

ImportError: cannot import name ‘rendering’ from ‘gym.envs.classic_control’.

解决办法:

打开 包gym.envs.classic_control,发现没有 rendering .py文件,去github,发现,main分支确实已经没有这个文件了,应该是版本的问题,最新版本已经去掉了该文件,然而其他分支是有的,所以将该文件下载并放在包对应位置。

此外,还需要 在代码中加入

from gym.envs.classic_control import rendering

导入rendering.py

PyTorch 断点训练,模型的保存和加载

pytorch中与保存和加载模型有关函数有三个:
1.torch.save:将序列化的对象保存到磁盘。此函数使用Python的pickle实用程序进行序列化。使用此功能可以保存各种对象的模型,张量和字典。
2. torch.load:使用pickle的unpickle工具将pickle的对象文件反序列化到内存中。即加载save保存的东西。
3. torch.nn.Module.load_state_dict:使用反序列化的state_dict加载模型的参数字典。注意,这意味着它的传入的参数应该是一个state_dict类型,也就torch.load加载出来的。

模型搭建:

# Define model  
class TheModelClass(nn.Module):  
    def __init__(self):  
        super(TheModelClass, self).__init__()  
        self.conv1 = nn.Conv2d(3, 6, 5)  
        self.pool = nn.MaxPool2d(2, 2)  
        self.conv2 = nn.Conv2d(6, 16, 5)  
        self.fc1 = nn.Linear(16 * 5 * 5, 120)  
        self.fc2 = nn.Linear(120, 84)  
        self.fc3 = nn.Linear(84, 10)  
  
    def forward(self, x):  
        x = self.pool(F.relu(self.conv1(x)))  
        x = self.pool(F.relu(self.conv2(x)))  
        x = x.view(-1, 16 * 5 * 5)  
        x = F.relu(self.fc1(x))  
        x = F.relu(self.fc2(x))  
        x = self.fc3(x)  
        return x  
  
# Initialize model  
model = TheModelClass()  
  
# Initialize optimizer  
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)  
  
# Print model's state_dict  
print("Model's state_dict:")  
for param_tensor in model.state_dict():  
    print(param_tensor, "\t", model.state_dict()[param_tensor].size())  
  
# Print optimizer's state_dict  
print("Optimizer's state_dict:")  
for var_name in optimizer.state_dict():  
    print(var_name, "\t", optimizer.state_dict()[var_name])  

output:

Model's state_dict:
conv1.weight     torch.Size([6, 3, 5, 5])
conv1.bias   torch.Size([6])
conv2.weight     torch.Size([16, 6, 5, 5])
conv2.bias   torch.Size([16])
fc1.weight   torch.Size([120, 400])
fc1.bias     torch.Size([120])
fc2.weight   torch.Size([84, 120])
fc2.bias     torch.Size([84])
fc3.weight   torch.Size([10, 84])
fc3.bias     torch.Size([10])

Optimizer's state_dict:
state    {}
param_groups     [{'lr': 0.001, 'momentum': 0.9, 'dampening': 0, 'weight_decay': 0, 'nesterov': False, 'pa

恢复训练实例

保存模型和加载模型的函数如下:

def  save_checkpoint_state(dir,epoch,model,optimizer):
	#保存模型
    checkpoint = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
                }   
    if not os.path.isdir(dir):
        os.mkdir(dir)

    torch.save(checkpoint, os.path.join(dir,'checkpoint-epoch%d.tar'%(epoch)))
    
def get_checkpoint_state(dir,ckp_name,device,model,optimizer):
     # 恢复上次的训练状态
    print("Resume from checkpoint...")
    checkpoint = torch.load(os.path.join(dir,ckp_name),map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    epoch=checkpoint['epoch']

    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    #scheduler.load_state_dict(checkpoint['scheduler_state_dict'])

    print('sucessfully recover from the last state')
    return model,epoch,optimizer

如果加入了lr_scheduler,那么lr_scheduler的state_dict也要加进来。

使用时:

# 引用包省略
#保持模型函数
def save_checkpoint_state(epoch, model, optimizer, scheduler, running_loss):
    checkpoint = {
        "epoch": epoch,
        "model_state_dict": model.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "scheduler_state_dict": scheduler.state_dict()
    }
    
    torch.save(checkpoint, "checkpoint-epoch%d-loss%d.tar" % (epoch, running_loss))
# 加载模型函数   
def load_checkpoint_state(path, device, model, optimizer, scheduler):
    checkpoint = torch.load(path, map_location=device)
    
    model.load_state_dict(checkpoint["model_state_dict"])
    
    epoch = checkpoint["epoch"]
    
    optimizer.load_state_dict(checkpoint["optimizer_state_dict"])
    
    scheduler.load_state_dict(checkpoint["scheduler_state_dict"])
    
    return model, epoch, optimizer, scheduler  


# 是否恢复训练(如果是恢复训练,那么需要设置为true)
resume = False # True

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def train(): 
    trans = transforms.Compose([
        transforms.ToPILImage(),
        transforms.RandomResizedCrop(512),
        transforms.RandomHorizontalFlip(),
        transforms.RandomVerticalFlip(),
        transforms.RandomRotation(90),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    # get training dataset
    leafDiseaseCLS = CustomDataSet(images_path, is_to_ls, trans)
    
    data_loader = DataLoader(leafDiseaseCLS,
                             batch_size=16,
                             num_workers=0,
                             shuffle=True,
                             pin_memory=False)
    
    # get model
    model = EfficientNet.from_pretrained("efficientnet-b3")
    
    # extract the parameter of fully connected layer
    fc_features = model._fc.in_features
    # modify the number of classes
    model._fc = nn.Linear(fc_features, 5)
    
    model.to(device)
        
    # optimizer
    optimizer = optim.SGD(model.parameters(), 
                          lr=0.001, 
                          momentum=0.9,
                          weight_decay=5e-4)
    
    scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[6, 10], gamma=1/3.)
    
    # loss
    #loss_func = nn.CrossEntropyLoss()
    loss_func = FocalCosineLoss()
    
    start_epoch = -1
    
    if resume:
        model, start_epoch, optimizer,scheduler = load_checkpoint_state("../path/to/checkpoint.tar",
                                                                        device, 
                                                                        model,
                                                                        optimizer,
                                                                        scheduler)
    
    model.train()
    
    epochs = 3
    
    for epoch in range(start_epoch + 1, epochs):
        
        running_loss = 0.0
        
        print("Epoch {}/{}".format(epoch, epochs))
        
        for step, train_data in tqdm(enumerate(data_loader)):
            x_train, y_train = train_data
            
            x_train = Variable(x_train.to(device))
            y_train = Variable(y_train.to(device))
            
            # forward
            prediction = model(x_train)
            
            optimizer.zero_grad()
            
            loss = loss_func(prediction, y_train)
            
            running_loss += loss.item()
            
            # backward
            loss.backward()
            
            optimizer.step()            
            
            
        scheduler.step()
        
        # saving model
        torch.save(model.state_dict(), str(int(running_loss)) + "_" + str(epoch) + ".pth")
        
        save_checkpoint_state(epoch, model, optimizer, scheduler, running_loss)
        
        print("Loss:{}".format(running_loss))

if __name__ == "__main__":
    train()

加载部分预训练模型

大多数时候我们需要根据我们的任务调节我们的模型,所以很难保证模型和公开的模型完全一样,但是预训练模型的参数确实有助于提高训练的准确率,为了结合二者的优点,就需要我们加载部分预训练模型。

pretrained_dict = torch.load("model_data/yolo_weights.pth", map_location=device)

model_dict = model.state_dict()
# 将 pretrained_dict 里不属于 model_dict 的键剔除掉
pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict}
#pretrained_dict = {k: v for k, v in pretrained_dict.items() if np.shape(model_dict[k]) ==  np.shape(v)}
# 更新现有的 model_dict
model_dict.update(pretrained_dict)
# 加载我们真正需要的 state_dict
model.load_state_dict(model_dict)

跨设备保存/加载模型(CPU与GPU)

模型保存在GPU上,加载到CPU

  • 保存
torch.save(model.state_dict(), PATH)
  • 加载:
device = torch.device('cpu')
model = TheModelClass(*args, **kwargs)
model.load_state_dict(torch.load(PATH, map_location=device))

模型保存在GPU上,加载到GPU

保存:

torch.save(model.state_dict(), PATH)
  • 加载:
device = torch.device("cuda")
model = TheModelClass(*args, **kwargs)
model.load_state_dict(torch.load(PATH))
model.to(device)
# Make sure to call input = input.to(device) on any input tensors that you feed to the model

重点:在于epoch的恢复

保存的时候需要将 epoch也保存

代码:实现每隔N个epoch,save模型:

optimizer = torch.optim.SGD(model.parameters(),lr=0.1)
lr_schedule = torch.optim.lr_scheduler.MultiStepLR(optimizer,milestones=[10,20,30,40,50],gamma=0.1)
start_epoch = 9
# print(schedule)


if RESUME:
    path_checkpoint = "./model_parameter/test/ckpt_best_50.pth"  # 断点路径
    checkpoint = torch.load(path_checkpoint)  # 加载断点

    model.load_state_dict(checkpoint['net'])  # 加载模型可学习参数

    optimizer.load_state_dict(checkpoint['optimizer'])  # 加载优化器参数
    start_epoch = checkpoint['epoch']  # 设置开始的epoch
    lr_schedule.load_state_dict(checkpoint['lr_schedule'])

for epoch in range(start_epoch+1,80):

    optimizer.zero_grad()

    optimizer.step()
    lr_schedule.step()


    if epoch %10 ==0:
        print('epoch:',epoch)
        print('learning rate:',optimizer.state_dict()['param_groups'][0]['lr'])
        checkpoint = {
            "net": model.state_dict(),
            'optimizer': optimizer.state_dict(),
            "epoch": epoch,
            'lr_schedule': lr_schedule.state_dict()
        }
        if not os.path.isdir("./model_parameter/test"):
            os.mkdir("./model_parameter/test")
        torch.save(checkpoint, './model_parameter/test/ckpt_best_%s.pth' % (str(epoch)))

设置随机数种子 ,使得训练过程结果可复现

PyTorch时,如果希望通过设置随机数种子,在gpu或cpu上固定每一次的训练结果,则需要在程序执行的开始处添加以下代码:

def setup_seed(seed):
     torch.manual_seed(seed)
     torch.cuda.manual_seed_all(seed)
     np.random.seed(seed)
     random.seed(seed)
     torch.backends.cudnn.deterministic = True
# 设置随机数种子
setup_seed(20)
# 预处理数据以及训练模型
# ...
# ...

随机数种子seed确定时,不改变程序参数情况下,两次模型的训练结果将始终保持一致。

torch.optim.lr_scheduler 学习率的动态调整

当我们在训练神经网络时候,学习率作为一个超参数,需要我们指定,如果lr过大,会导致模型不收敛,震荡,而如果lr很小,会导致收敛慢,训练时间长。

一个最普遍的想法,初始时lr设置大一些,随着epoch的增加,lr逐渐下降。

torch.optim.lr_scheduler模块提供了一些根据epoch训练次数来调整学习率(learning rate)的方法

源码:https://pytorch.org/docs/stable/_modules/torch/optim/lr_scheduler.html

torch.optim.lr_scheduler中大部分调整学习率的方法都是根据epoch训练次数,这里介绍常见的几种方法,其他方法以后用到再补充。
要了解每个类的更新策略,可直接查看官网doc中的源码,每类都有个get_lr方法,定义了更新策略

1、torch.optim.lr_scheduler.LambdaLR

  >>> # Assuming optimizer has two groups.
        >>> lambda1 = lambda epoch: epoch // 30
        >>> lambda2 = lambda epoch: 0.95 ** epoch
        >>> scheduler = LambdaLR(optimizer, lr_lambda=[lambda1, lambda2])
        >>> for epoch in range(100):
        >>>     train(...)
        >>>     validate(...)
        >>>     scheduler.step()

class torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda, last_epoch=-1)

参数:

  1. optimizer (Optimizer):要更改学习率的优化器;
  2. lr_lambda(function or list):根据epoch计算λ的函数;或者是一个list的这样的function,分别计算各个parameter groups的学习率更新用到的λ
  3. last_epoch (int):最后一个epoch的index,如果是训练了很多个epoch后中断了,继续训练,这个值就等于加载的模型的epoch。默认为-1表示从头开始训练,即从epoch=1开始。

更新策略:
new _ l r = λ×initial_lr
其中new_lr是得到的新的学习率,initial_lr是初始的学习率,λ是通过参数lr_lambda和epoch得到的。

import torch
import torch.nn as nn
from torch.optim.lr_scheduler import LambdaLR

initial_lr = 0.1

class model(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=3, kernel_size=3)

    def forward(self, x):
        pass

net_1 = model()

optimizer_1 = torch.optim.Adam(net_1.parameters(), lr = initial_lr)
scheduler_1 = LambdaLR(optimizer_1, lr_lambda=lambda epoch: 1/(epoch+1))

print("初始化的学习率:", optimizer_1.defaults['lr'])

for epoch in range(1, 11):
    # train

    optimizer_1.zero_grad()
    optimizer_1.step()
    print("第%d个epoch的学习率:%f" % (epoch, optimizer_1.param_groups[0]['lr']))
    scheduler_1.step()
初始化的学习率: 0.1
第1个epoch的学习率:0.100000
第2个epoch的学习率:0.050000
第3个epoch的学习率:0.033333
第4个epoch的学习率:0.025000
第5个epoch的学习率:0.020000
第6个epoch的学习率:0.016667
第7个epoch的学习率:0.014286
第8个epoch的学习率:0.012500
第9个epoch的学习率:0.011111
第10个epoch的学习率:0.010000

class torch.optim.lr_scheduler.StepLR(optimizer, step_size, gamma=0.1, last_epoch=-1)

参数:

  1. optimizer (Optimizer):要更改学习率的优化器;
  2. step_size(int):每训练step_size个epoch,更新一次参数;
  3. gamma(float):更新lr的乘法因子;
  4. last_epoch (int):最后一个epoch的index,如果是训练了很多个epoch后中断了,继续训练,这个值就等于加载的模型的epoch。默认为-1表示从头开始训练,即从epoch=1开始。
import torch
import torch.nn as nn
from torch.optim.lr_scheduler import StepLR
import itertools


initial_lr = 0.1

class model(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=3, kernel_size=3)

    def forward(self, x):
        pass

net_1 = model()

optimizer_1 = torch.optim.Adam(net_1.parameters(), lr = initial_lr)
scheduler_1 = StepLR(optimizer_1, step_size=3, gamma=0.1)

print("初始化的学习率:", optimizer_1.defaults['lr'])

for epoch in range(1, 11):
    # train

    optimizer_1.zero_grad()
    optimizer_1.step()
    print("第%d个epoch的学习率:%f" % (epoch, optimizer_1.param_groups[0]['lr']))
    scheduler_1.step()
初始化的学习率: 0.1
第1个epoch的学习率:0.100000
第2个epoch的学习率:0.100000
第3个epoch的学习率:0.100000
第4个epoch的学习率:0.010000
第5个epoch的学习率:0.010000
第6个epoch的学习率:0.010000
第7个epoch的学习率:0.001000
第8个epoch的学习率:0.001000
第9个epoch的学习率:0.001000
第10个epoch的学习率:0.000100

class torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones, gamma=0.1, last_epoch=-1)

每次遇到milestones中的epoch,做一次更新:

参数:

  1. optimizer (Optimizer):要更改学习率的优化器;
  2. milestones(list):递增的list,存放要更新lr的epoch;
  3. gamma(float):更新lr的乘法因子;
  4. last_epoch (int):最后一个epoch的index,如果是训练了很多个epoch后中断了,继续训练,这个值就等于加载的模型的epoch。默认为-1表示从头开始训练,即从epoch=1开始。
import torch
import torch.nn as nn
from torch.optim.lr_scheduler import MultiStepLR
import itertools


initial_lr = 0.1

class model(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=3, kernel_size=3)

    def forward(self, x):
        pass

net_1 = model()

optimizer_1 = torch.optim.Adam(net_1.parameters(), lr = initial_lr)
scheduler_1 = MultiStepLR(optimizer_1, milestones=[3, 7], gamma=0.1)

print("初始化的学习率:", optimizer_1.defaults['lr'])

for epoch in range(1, 11):
    # train

    optimizer_1.zero_grad()
    optimizer_1.step()
    print("第%d个epoch的学习率:%f" % (epoch, optimizer_1.param_groups[0]['lr']))
    scheduler_1.step()
初始化的学习率: 0.1
第1个epoch的学习率:0.100000
第2个epoch的学习率:0.100000
第3个epoch的学习率:0.100000
第4个epoch的学习率:0.010000
第5个epoch的学习率:0.010000
第6个epoch的学习率:0.010000
第7个epoch的学习率:0.010000
第8个epoch的学习率:0.001000
第9个epoch的学习率:0.001000
第10个epoch的学习率:0.001000

上面的只是一部分,还有很多更新方法,可以在官方源码中查看

PyTorch代码调试利器–TorchSnooper & Captum-pytorch模型可解释性库

TorchSnooper

https://github.com/zasdfgbnm/TorchSnooper

大家可能遇到这样子的困扰:比如说运行自己编写的 PyTorch 代码的时候,PyTorch 提示你说数据类型不匹配,需要一个 double 的 tensor 但是你给的却是 float;再或者就是需要一个 CUDA tensor, 你给的却是个 CPU tensor。比如下面这种:

RuntimeError: Expected object of scalar type Double but got scalar type Float

这种问题调试起来很麻烦,因为你不知道从哪里开始出问题的。比如你可能在代码的第三行用 torch.zeros 新建了一个 CPU tensor, 然后这个 tensor 进行了若干运算,全是在 CPU 上进行的,一直没有报错,直到第十行需要跟你作为输入传进来的 CUDA tensor 进行运算的时候,才报错。要调试这种错误,有时候就不得不一行行地手写 print 语句,非常麻烦。

再或者,你可能脑子里想象着将一个 tensor 进行什么样子的操作,就会得到什么样子的结果,但是 PyTorch 中途报错说 tensor 的形状不匹配,或者压根没报错但是最终出来的形状不是我们想要的。这个时候,我们往往也不知道是什么地方开始跟我们「预期的发生偏离的」。我们有时候也得需要插入一大堆 print 语句才能找到原因。

TorchSnooper 就是一个设计了用来解决这个问题的工具。TorchSnooper 的安装非常简单,只需要执行标准的 Python 包安装指令就好:

pip install snoop
pip install torchsnooper

1、监测函数中的变量:

import torch
import torchsnooper

@torchsnooper.snoop()
def myfunc(mask, x):
    y = torch.zeros(6)
    y.masked_scatter_(mask, x)
    return y

mask = torch.tensor([0, 1, 0, 1, 1, 0], device='cuda')
source = torch.tensor([1.0, 2.0, 3.0], device='cuda')
y = myfunc(mask, source)

Run our script, and we will see:

Starting var:.. mask = tensor<(6,), int64, cuda:0>
Starting var:.. x = tensor<(3,), float32, cuda:0>
21:41:42.941668 call         5 def myfunc(mask, x):
21:41:42.941834 line         6     y = torch.zeros(6)
New var:....... y = tensor<(6,), float32, cpu>
21:41:42.943443 line         7     y.masked_scatter_(mask, x)
21:41:42.944404 exception    7     y.masked_scatter_(mask, x)

2、监测for循环中的变量:

with torchsnooper.snoop():
    for _ in range(100):
        optimizer.zero_grad()
        pred = model(x)
        squared_diff = (y - pred) ** 2
        loss = squared_diff.mean()
        print(loss.item())
        loss.backward()
        optimizer.step()

Part of the trace looks like:

New var:....... x = tensor<(4, 2), float32, cpu>
New var:....... y = tensor<(4,), float32, cpu>
New var:....... model = Model(  (layer): Linear(in_features=2, out_features=1, bias=True))
New var:....... optimizer = SGD (Parameter Group 0    dampening: 0    lr: 0....omentum: 0    nesterov: False    weight_decay: 0)
22:27:01.024233 line        21     for _ in range(100):
New var:....... _ = 0
22:27:01.024439 line        22         optimizer.zero_grad()
22:27:01.024574 line        23         pred = model(x)
New var:....... pred = tensor<(4, 1), float32, cpu, grad>
22:27:01.026442 line        24         squared_diff = (y - pred) ** 2
New var:....... squared_diff = tensor<(4, 4), float32, cpu, grad>
22:27:01.027369 line        25         loss = squared_diff.mean()
New var:....... loss = tensor<(), float32, cpu, grad>
22:27:01.027616 line        26         print(loss.item())
22:27:01.027793 line        27         loss.backward()
22:27:01.050189 line        28         optimizer.step()

Captum:PyTorch 的统一通用模型可解释性库

https://captum.ai/tutorials/CIFAR_TorchVision_Captum_Insights

可解释性,即理解人工智能模型为什么做出决定的能力,对于开发人员解释模型为什么做出某个决定是很重要的。它可以使人工智能符合监管法律,以应用于需要解释性的业务。

Captum旨在实现AI模型的最新版本,如集成梯度、深度弯曲和传导等等,它可以帮助研究人员和开发人员解释人工智能在多模态环境中做出的决策,并能帮助研究人员把结果与数据库中现有的模型进行比较。

Captum——新的人工智能可解释性工具

torch.nn.utils.rnn中的pack_padded_sequence和 pad_packed_sequence and pad_sequence

这两个是互逆的操作。

先来看为什么需要pad和pack操作:

先看一个例子,这个batch中有5个sample

如果不用pack和pad操作会有一个问题,什么问题呢?比如上图,句子“Yes”只有一个单词,但是padding了多余的pad符号,这样会导致LSTM对它的表示通过了非常多无用的字符,这样得到的句子表示就会有误差,更直观的如下图:

那么我们正确的做法应该是怎么样呢?

在上面这个例子,我们想要得到的表示仅仅是LSTM过完单词”Yes”之后的表示,而不是通过了多个无用的“Pad”得到的表示:如下图:

torch.nn.utils.rnn.pack_padded_sequence()

这里的pack,理解成压紧比较好。 将一个 填充过的变长序列 压紧。(填充时候,会有冗余,所以压紧一下)

其中pack的过程为:(注意pack的形式,不是按行压,而是按列压)

pack之后,原来填充的 PAD(一般初始化为0)占位符被删掉了。

输入的形状可以是(T×B×* )。T是最长序列长度,Bbatch size*代表任意维度(可以是0)。如果batch_first=True的话,那么相应的 input size 就是 (B×T×*)

Variable中保存的序列,应该按序列长度的长短排序,长的在前,短的在后。即input[:,0]代表的是最长的序列,input[:, B-1]保存的是最短的序列。

NOTE: 只要是维度大于等于2的input都可以作为这个函数的参数。你可以用它来打包labels,然后用RNN的输出和打包后的labels来计算loss。通过PackedSequence对象的.data属性可以获取 Variable

参数说明:

  • input (Variable) – 变长序列 被填充后的 batch
  • lengths (list[int]) – Variable 中 每个序列的长度。
  • batch_first (bool, optional) – 如果是True,input的形状应该是B*T*size
  • 参数 enforce_sorted ,如果是 True ,则输入应该是按长度降序排序的序列。如果是 False ,会在函数内部进行排序。默认值为 True 。也就是说在输入 pack_padded_sequence 前,我们也可以不对数据进行排序。

返回值:

一个PackedSequence 对象。

torch.nn.utils.rnn.pad_packed_sequence()

填充packed_sequence

上面提到的函数的功能是将一个填充后的变长序列压紧。 这个操作和pack_padded_sequence()是相反的。把压紧的序列再填充回来。填充时会初始化为0。

返回的Varaible的值的size是 T×B×*T 是最长序列的长度,B 是 batch_size,如果 batch_first=True,那么返回值是B×T×*

Batch中的元素将会以它们长度的逆序排列。

参数说明:

  • sequence (PackedSequence) – 将要被填充的 batch
  • batch_first (bool, optional) – 如果为True,返回的数据的格式为 B×T×*

返回值: 一个tuple,包含被填充后的序列,和batch中序列的长度列表

实例 代码:



>>> from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
>>> seq = torch.tensor([[1,2,0], [3,0,0], [4,5,6]])
#seq的维度是3*3,第一个 维度表示T,is the length of the longest sequence 
#第二维表示B,批次大小,也就是说有3个长度为3的向量,其中每列表示一个序列,序列都是等长的,因为短的序列已经用0补齐了
>>> lens = [2, 1, 3]
>>> packed = pack_padded_sequence(seq, lens, batch_first=True, enforce_sorted=False)
#将补齐的序列压紧成一个序列,将0去掉
>>> packed
PackedSequence(data=tensor([4, 1, 3, 5, 2, 6]), batch_sizes=tensor([3, 2, 1]),
               sorted_indices=tensor([2, 0, 1]), unsorted_indices=tensor([1, 2, 0]))
>>> seq_unpacked, lens_unpacked = pad_packed_sequence(packed, batch_first=True)
>>> seq_unpacked
tensor([[1, 2, 0],
        [3, 0, 0],
        [4, 5, 6]])
>>> lens_unpacked
tensor([2, 1, 3])

pad_sequence

参数

sequences:表示输入样本序列,为 list 类型,list 中的元素为 tensor 类型。 tensor 的 size 为 L * F 。其中,L 为单个序列的长度,F 为序列中每个时间步(time step)特征的个数,根据任务的不同 F 的维度会有所不同。

batch_first:为 True 对应 [batch_size, seq_len, feature];False 对应[seq_len, batch_size, feature],从习惯上来讲一般设置为 True 比较符合我们的认知。

padding_value:填充值,默认值为 0 。

说明

主要用来对样本进行填充,填充值一般为 0 。我们在训练网络时,一般会采用一个一个 mini-batch 的方式,将训练样本数据喂给网络。在 PyTorch 里面数据都是以 tensor 的形式存在,一个 mini-batch 实际上就是一个高维的 tensor ,每个序列数据的长度必须相同才能组成一个 tensor 。为了使网络可以处理 mini-batch 形式的数据,就必须对序列样本进行填充,保证一个 mini-batch 里面的数据长度是相同的。

在 PyTorch 里面一般是使用 DataLoader 进行数据加载,返回 mini-batch 形式的数据,再将此数据喂给网络进行训练。我们一般会自定义一个 collate_fn 函数,完成对数据的填充。

import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence,pack_padded_sequence,pack_sequence,pad_packed_sequence

class MyData(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

def collate_fn(data):
    data.sort(key=lambda x: len(x), reverse=True)
    data = pad_sequence(data, batch_first=True, padding_value=0)
    return data

a = torch.tensor([1,2,3,4])
b = torch.tensor([5,6,7])
c = torch.tensor([7,8])
d = torch.tensor([9])
train_x = [a, b, c, d]

data = MyData(train_x)
data_loader = DataLoader(data, batch_size=2, shuffle=True, collate_fn=collate_fn)
# 采用默认的 collate_fn 会报错
#data_loader = DataLoader(data, batch_size=2, shuffle=True) 
batch_x = iter(data_loader).next()

运行程序,得到 batch_x 的值:

# batch_x
tensor([[1, 2, 3, 4],
        [9, 0, 0, 0]])

从 batch_x 的值可以看出,第二行填充了三个 0 ,使其长度和第一行保持一致。

需要说明的是,对于长度不同的序列,使用默认的 collate_fn 函数,不自定义 collate_fn 函数完成对序列的填充,上面的程序就会报错。

Python3 迭代器与生成器

迭代器

迭代是Python最强大的功能之一,是访问集合元素的一种方式。

迭代器是一个可以记住遍历的位置的对象。

迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。迭代器只能往前不会后退。

迭代器有两个基本的方法:iter() 用于生成迭代器 和 next() 求迭代器的值

字符串,列表或元组对象都可用于创建迭代器:

>>> list=[1,2,3,4]
>>> it = iter(list)    # 创建迭代器对象
>>> print (next(it))   # 输出迭代器的下一个元素
1
>>> print (next(it))
2
>>>

迭代器对象可以使用常规for语句进行遍历:

list=[1,2,3,4]
it = iter(list)    # 创建迭代器对象
for x in it:
    print (x, end=" ")

执行以上程序,输出结果如下:

1 2 3 4

也可以使用 next() 函数:

import sys         # 引入 sys 模块
 
list=[1,2,3,4]
it = iter(list)    # 创建迭代器对象
 
while True:
    try:
        print (next(it))
    except StopIteration:
        sys.exit()

执行以上程序,输出结果如下:

1
2
3
4

创建一个迭代器

把一个类作为一个迭代器使用需要在类中实现两个方法 __iter__() 与 __next__() 。

如果你已经了解的面向对象编程,就知道类都有一个构造函数,Python 的构造函数为 __init__(), 它会在对象初始化的时候执行。

更多内容查阅:Python3 面向对象

__iter__() 方法返回一个特殊的迭代器对象, 这个迭代器对象实现了 __next__() 方法并通过 StopIteration 异常标识迭代的完成。

__next__() 方法(Python 2 里是 next())会返回下一个迭代器对象。

创建一个返回数字的迭代器,初始值为 1,逐步递增 1:

class MyNumbers:
  def __iter__(self):
    self.a = 1
    return self
 
  def __next__(self):
    x = self.a
    self.a += 1
    return x
 
myclass = MyNumbers()
myiter = iter(myclass)
 
print(next(myiter))
print(next(myiter))
print(next(myiter))
print(next(myiter))
print(next(myiter))

执行输出结果为:

1
2
3
4
5

StopIteration

StopIteration 异常用于标识迭代的完成,防止出现无限循环的情况,在 __next__() 方法中我们可以设置在完成指定循环次数后触发 StopIteration 异常来结束迭代。

在 20 次迭代后停止执行:

class MyNumbers:
  def __iter__(self):
    self.a = 1
    return self
 
  def __next__(self):
    if self.a <= 20:
      x = self.a
      self.a += 1
      return x
    else:
      raise StopIteration
 
myclass = MyNumbers()
myiter = iter(myclass)
 
for x in myiter:
  print(x)

执行输出结果为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

生成器

yield是python的一个关键字,本质上是一个生成器generator。

生成器是一种特殊的函数,它会返回一个迭代器。定义一个生成器函数同定义一个普通函数没有什么区别,特殊之处在于生成器函数内部会包含yield表达式,专门用于生成一个序列。当一个生成器函数被调用时,它会返回一个迭代器。之后就由这个迭代器来控制生成器函数的执行。当生成器函数被调用后,首先会执行到第一个yield表达式处,然后会将生成器函数挂起,将yield生成的表达式的值返回给生成器函数的调用者。当生成器函数被挂起时,它的所有局部状态都会被保存起来,包括当前绑定的局部变量、指令指针、内部栈和异常处理的状态。当调用迭代器的方法时,一般都是调用next()方法,将会恢复生成器函数的执行,并且是从上次被挂起的地方继续执行,直到遇到另外一次yield调用,生成器函数将再次被挂起。

在一个生成器函数中,如果没有 return,则默认执行至函数完毕,如果在执行过程中 return,则直接抛出 StopIteration 终止迭代。

可以中断一个函数的执行,跳转到调用的地方

在 Python 中,使用了 yield 的函数被称为生成器(generator)。

跟普通函数不同的是,生成器是一个返回迭代器的函数,只能用于迭代操作,更简单点理解生成器就是一个迭代器。

在调用生成器运行的过程中,每次遇到 yield 时函数会暂停并保存当前所有的运行信息,返回 yield 的值, 并在下一次执行 next() 方法时从当前位置继续运行。

调用一个生成器函数,返回的是一个迭代器对象。

以下实例使用 yield 实现斐波那契数列:

import sys
 
def fibonacci(n): # 生成器函数 - 斐波那契
    a, b, counter = 0, 1, 0
    while True:
        if (counter > n): 
            return
        yield a
        a, b = b, a + b
        counter += 1
f = fibonacci(10) # f 是一个迭代器,由生成器返回生成
 
while True:
    try:
        print (next(f), end=" ")
    except StopIteration:
        sys.exit()

执行以上程序,输出结果如下:

0 1 1 2 3 5 8 13 21 34 55

python3中的heapq 堆

heapq实现了一个适合与Python的列表一起使用的最小堆排序算法。

满二叉树

树中除了叶子节点,每个节点都有两个子节点

完全二叉树

在满足满二叉树的性质后,最后一层的叶子节点均需在最左边

堆是一种数据结构,它是一颗完全二叉树。最小堆则是在堆的基础增加了新的规则,它的根结点的值是最小的,而且它的任意结点的父结点的值都小于或者等于其左右结点的值。因为二进制堆可以使用有组织的列表或数组来表示,所以元素N的子元素位于位置2 * N + 1和2 * N + 2。这种布局使重新安排堆成为可能,因此在添加或删除项时不需要重新分配那么多内存
区分堆(heap)与栈(stack):堆与二叉树有关,像一堆金字塔型泥沙;而栈像一个直立垃圾桶,一列下来。

最大堆

最大堆确保父堆大于或等于它的两个子堆。

最小堆

建堆:

 heapify()

最小堆要求父堆小于或等于其子堆。Python的heapq模块实现了一个最小堆。

要创建一个堆,可以使用list来初始化为 [] ,或者你可以通过一个函数 heapify() ,来把一个list转换成堆。

定义了以下函数:heapq.heappush(heapitem)

将 item 的值加入 heap 中,保持堆的不变性。

heapq.heappop(heap)

弹出并返回 heap 的最小的元素,保持堆的不变性。如果堆为空,抛出 IndexError 。使用 heap[0] ,可以只访问最小的元素而不弹出它。

heapq.heappushpop(heapitem)

将 item 放入堆中,然后弹出并返回 heap 的最小元素。该组合操作比先调用 heappush() 再调用 heappop() 运行起来更有效率。

heapq.heapify(x)

将list x 转换成堆,原地,线性时间内。heapq.heapreplace(heapitem)

弹出并返回 heap 中最小的一项,同时推入新的 item。 堆的大小不变。 如果堆为空则引发 IndexError

这个单步骤操作比 heappop() 加 heappush() 更高效,并且在使用固定大小的堆时更为适宜。 pop/push 组合总是会从堆中返回一个元素并将其替换为 item

返回的值可能会比添加的 item 更大。 如果不希望如此,可考虑改用 heappushpop()。 它的 push/pop 组合会返回两个值中较小的一个,将较大的值留在堆中。

heapq.nlargest(niterablekey=None)

从 iterable 所定义的数据集中返回前 n 个最大元素组成的列表。 如果提供了 key 则其应指定一个单参数的函数,用于从 iterable 的每个元素中提取比较键 (例如 key=str.lower)。 等价于: sorted(iterable, key=key, reverse=True)[:n]

heapq.nsmallest(niterablekey=None)

从 iterable 所定义的数据集中返回前 n 个最小元素组成的列表。 如果提供了 key 则其应指定一个单参数的函数,用于从 iterable 的每个元素中提取比较键 (例如 key=str.lower)。 等价于: sorted(iterable, key=key)[:n]