Geotransform代码解读
网络整体流程网络流程较为简单,主要分为四部分
(1)对原始点云和目标点云进行基于Kpconv-FPN的骨干网络进行特征提取,然后我们将下采样进行到最底层的这层的点称为超点
(2)对超点部分进行Geotransformer变换,然后利用双归一化挑选出前top-k个作为处理好的精细点对
(3)对刚刚处理过的点和上采样后的点进行精细点的处理,实际上是进行置信度计算,挑选出较为可靠的点对
(4)使用局部到全局的方法,具体是加权SVD算法来对每个局部的点云进行处理,得到不同的R和t,最终选内点最多的作为最终的R和t
代码解读
Feature Extraction
首先它是使用的基于Kpconv的FPN骨干网络,所以我们这里首先看特征提取部分
这里主要有三部分,
第一部分是基础块,它主要是对函数进行Kpconv提取特征,然后使用归一化和激活函数处理,接下来提取完特征进行残差块,准备开始下采样操作。
class ConvBlock(nn.Module):
def __init__(
self,
in_channels,
out_channels,
kernel_size,#卷积核的大小
radius,#Kpconv中设定的半径
sigma,#Kpconv中设定的权重
group_norm,#分组归一化的组数
negative_slope=0.1,#leaky relu的负斜率
bias=True,
layer_norm=False,
):
super(ConvBlock, self).__init__()
self.in_channels = in_channels#输入特征维度
self.out_channels = out_channels#输出特征维度
self.KPConv = KPConv(in_channels, out_channels, kernel_size, radius, sigma, bias=bias)#KPConv卷积层
if layer_norm:#根据layer_norm参数选择归一化方式
self.norm = nn.LayerNorm(out_channels)
else:#使用分组归一化
self.norm = GroupNorm(group_norm, out_channels)
self.leaky_relu = nn.LeakyReLU(negative_slope=negative_slope)#定义LeakyReLU激活函数
def forward(self, s_feats, q_points, s_points, neighbor_indices):#前向传播函数
x = self.KPConv(s_feats, q_points, s_points, neighbor_indices)#KPConv卷积操作
x = self.norm(x)#归一化
x = self.leaky_relu(x)#激活函数
return x第二部分是残差块模块,它主要是对代码进行下采样,具体操作是使用线性变换维度,然后使用Kpconv卷积提取特征
class ResidualBlock(nn.Module):
def __init__(
self,
in_channels,
out_channels,
kernel_size,
radius,#Kpconv中设定的半径
sigma,#Kpconv中设定的权重
group_norm,#分组归一化的组数
strided=False,#是否进行下采样
bias=True,
layer_norm=False,#是否使用层归一化替代分组归一化
):
super(ResidualBlock, self).__init__()
self.in_channels = in_channels#输入特征维度
self.out_channels = out_channel#s#输出特征维度
self.strided = strided#是否进行下采样
mid_channels = out_channels // 4 #中间特征维度,通常是输出维度的1/4
if in_channels != mid_channels: #输入维度和中间维度不相等时,使用一层线性变换将输入特征映射到中间维度
self.unary1 = UnaryBlock(in_channels, mid_channels, group_norm, bias=bias, layer_norm=layer_norm)
else:#相等时,使用恒等映射
self.unary1 = nn.Identity()
self.KPConv = KPConv(mid_channels, mid_channels, kernel_size, radius, sigma, bias=bias)#KPConv卷积层,将中间维度的特征进行卷积操作
if layer_norm:#根据layer_norm参数选择归一化方式
self.norm_conv = nn.LayerNorm(mid_channels)
else:
self.norm_conv = GroupNorm(group_norm, mid_channels)
self.unary2 = UnaryBlock(
mid_channels, out_channels, group_norm, has_relu=False, bias=bias, layer_norm=layer_norm
)#第二个一层线性变换,将卷积后的特征映射到输出维度,且不使用ReLU激活函数
if in_channels != out_channels:#输入维度和输出维度不相等时,使用一层线性变换将输入特征映射到输出维度,以便进行残差连接
self.unary_shortcut = UnaryBlock(
in_channels, out_channels, group_norm, has_relu=False, bias=bias, layer_norm=layer_norm
)
else:#相等时,使用恒等映射
self.unary_shortcut = nn.Identity()
self.leaky_relu = nn.LeakyReLU(0.1)#定义LeakyReLU激活函数
def forward(self, s_feats, q_points, s_points, neighbor_indices):
x = self.unary1(s_feats)#第一层线性变换
x = self.KPConv(x, q_points, s_points, neighbor_indices)#KPConv卷积操作
x = self.norm_conv(x)#归一化
x = self.leaky_relu(x)#激活函数
x = self.unary2(x)#第二层线性变换
if self.strided:#如果进行下采样,使用最大池化操作对输入特征进行下采样
shortcut = maxpool(s_feats, neighbor_indices)
else:#否则直接使用输入特征
shortcut = s_feats
shortcut = self.unary_shortcut(shortcut)#将shortcut映射到输出维度
x = x + shortcut#残差连接
x = self.leaky_relu(x)#激活函数
return x第三部分则是解码器的上采样部分,这里主要是将原点云数恢复到目标部分:
首先我们使用torch.zeros_like(x[:1, :])取出第一行,以0填充数值,然后cat函数中的最后的0,表示以行的形式融合到x中,也就是给x多添加一行全为0的。
接下来是使用upsample_indices[:, 0]函数,表示取出所有行的第0列,也就是找到上采样的列进行排序,然后我们根据它对应的列的数值作为索引,找到每一个对应的行,比如upsample_indices[:, 0]的值是,那么它就会取出x中第一行和倒数第一行,作为最终的x
def nearest_upsample(x, upsample_indices):
# Add a last row with minimum features for shadow pools
x = torch.cat((x, torch.zeros_like(x[:1, :])), 0)
# Get features for each pooling location
x = index_select(x, upsample_indices[:, 0], dim=0)
return x这个时候再来看Kpconv的整体部分,就好理解多了,首先进行编码器的编码,不断变换通道数进行深化特征,然后进行下采样,再进行下采样之后,通过我们的上采样进行恢复尺度,然后进行使用UnaryBlock和LastUnaryBlock进行尺度的块融合,这两个函数实际上都是简单的MLP函数,然后前者多加了归一化和激活函数处理。
class KPConvFPN(nn.Module):
def __init__(self, input_dim, output_dim, init_dim, kernel_size, init_radius, init_sigma, group_norm):
super(KPConvFPN, self).__init__()
self.encoder1_1 = ConvBlock(input_dim, init_dim, kernel_size, init_radius, init_sigma, group_norm)#第一层卷积块,将输入特征映射到初始维度
self.encoder1_2 = ResidualBlock(init_dim, init_dim * 2, kernel_size, init_radius, init_sigma, group_norm)#第二层残差块,将初始维度的特征映射到2倍初始维度
self.encoder2_1 = ResidualBlock(
init_dim * 2, init_dim * 2, kernel_size, init_radius, init_sigma, group_norm, strided=True
)#第三层残差块,进行下采样操作,同时保持特征维度不变
self.encoder2_2 = ResidualBlock(
init_dim * 2, init_dim * 4, kernel_size, init_radius * 2, init_sigma * 2, group_norm
)#第四层残差块,将特征维度映射到4倍初始维度
self.encoder2_3 = ResidualBlock(
init_dim * 4, init_dim * 4, kernel_size, init_radius * 2, init_sigma * 2, group_norm
)#第五层残差块,保持特征维度不变
self.encoder3_1 = ResidualBlock(
init_dim * 4, init_dim * 4, kernel_size, init_radius * 2, init_sigma * 2, group_norm, strided=True
)#
self.encoder3_2 = ResidualBlock(
init_dim * 4, init_dim * 8, kernel_size, init_radius * 4, init_sigma * 4, group_norm
)
self.encoder3_3 = ResidualBlock(
init_dim * 8, init_dim * 8, kernel_size, init_radius * 4, init_sigma * 4, group_norm
)
self.decoder2 = UnaryBlock(init_dim * 12, init_dim * 4, group_norm)#尺度融合块,将上采样后的特征和对应编码器特征拼接后映射到4倍初始维度
self.decoder1 = LastUnaryBlock(init_dim * 6, output_dim)#将上采样后的特征和对应编码器特征拼接后映射到输出维度
def forward(self, feats, data_dict):
feats_list = []
points_list = data_dict['points']
neighbors_list = data_dict['neighbors']
subsampling_list = data_dict['subsampling']
upsampling_list = data_dict['upsampling']
feats_s1 = feats
feats_s1 = self.encoder1_1(feats_s1, points_list, points_list, neighbors_list)
feats_s1 = self.encoder1_2(feats_s1, points_list, points_list, neighbors_list)
feats_s2 = self.encoder2_1(feats_s1, points_list, points_list, subsampling_list)
feats_s2 = self.encoder2_2(feats_s2, points_list, points_list, neighbors_list)
feats_s2 = self.encoder2_3(feats_s2, points_list, points_list, neighbors_list)
feats_s3 = self.encoder3_1(feats_s2, points_list, points_list, subsampling_list)
feats_s3 = self.encoder3_2(feats_s3, points_list, points_list, neighbors_list)
feats_s3 = self.encoder3_3(feats_s3, points_list, points_list, neighbors_list)
latent_s3 = feats_s3
feats_list.append(feats_s3)
latent_s2 = nearest_upsample(latent_s3, upsampling_list)
latent_s2 = torch.cat(, dim=1)
latent_s2 = self.decoder2(latent_s2)
feats_list.append(latent_s2)
latent_s1 = nearest_upsample(latent_s2, upsampling_list)
latent_s1 = torch.cat(, dim=1)
latent_s1 = self.decoder1(latent_s1)
feats_list.append(latent_s1)
feats_list.reverse()
return feats_list接下来完成了初步特征提取,就到了超点的GeoTransfrom处理阶段
Superpoint Matching
GeoTrasnformer
这里我们来看它是如何实现的
在实现具体的GeoTransfromer前,他首先定义了编码函数,我们知道这里的注意力多加的R是有两部分的,距离和角度编码
他们的具体计算方式都是需要进行编码的,如下所示
所以这里我们首先定义GeometricStructureEmbedding函数,完成这些编码的处理
class GeometricStructureEmbedding(nn.Module):
def __init__(self, hidden_dim, sigma_d, sigma_a, angle_k, reduction_a='max'):
super(GeometricStructureEmbedding, self).__init__()
self.sigma_d = sigma_d#给定距离的温度参数
self.sigma_a = sigma_a#给定角度的温度参数
self.factor_a = 180.0 / (self.sigma_a * np.pi)#定义角度缩放因子
self.angle_k = angle_k #用于角度嵌入的最近邻数量
self.embedding = SinusoidalPositionalEmbedding(hidden_dim)#位置编码嵌入,这里其实也就是PE编码嵌入的部分
self.proj_d = nn.Linear(hidden_dim, hidden_dim)
self.proj_a = nn.Linear(hidden_dim, hidden_dim)
self.reduction_a = reduction_a
if self.reduction_a not in ['max', 'mean']:#检查角度嵌入的归约方式是否有效
raise ValueError(f'Unsupported reduction mode: {self.reduction_a}.')
@torch.no_grad()#禁用梯度计算,因为这里是纯粹的索引计算
def get_embedding_indices(self, points):#计算距离和角度嵌入的索引
batch_size, num_point, _ = points.shape#获取批量大小和点的数量
dist_map = torch.sqrt(pairwise_distance(points, points))# (B, N, N),计算点云之间的成对距离
d_indices = dist_map / self.sigma_d # (B, N, N),计算距离嵌入的索引,对应公式中的正弦位置编码部分,现在再用PE编码就已经实现了
k = self.angle_k #获取用于角度嵌入的最近邻数量
knn_indices = dist_map.topk(k=k + 1, dim=2, largest=False)[:, :, 1:]# (B, N, k) ,找到每个点的k个最近邻点的索引,排除自身
knn_indices = knn_indices.unsqueeze(3).expand(batch_size, num_point, k, 3)# (B, N, k, 3),在第三个维度后加一个新维度,并扩展以匹配点的坐标维度
expanded_points = points.unsqueeze(1).expand(batch_size, num_point, num_point, 3)# (B, N, N, 3),扩展点云以便与每个点进行比较
knn_points = torch.gather(expanded_points, dim=2, index=knn_indices)# (B, N, k, 3),获取每个点的k个最近邻点的坐标
ref_vectors = knn_points - points.unsqueeze(2)# (B, N, k, 3),计算参考向量,即从当前点指向其k个最近邻点的向量
anc_vectors = points.unsqueeze(1) - points.unsqueeze(2)# (B, N, N, 3),计算锚点向量,即从每个点指向所有其他点的向量
ref_vectors = ref_vectors.unsqueeze(2).expand(batch_size, num_point, num_point, k, 3)# (B, N, N, k, 3),扩展参考向量以匹配锚点向量的维度
anc_vectors = anc_vectors.unsqueeze(3).expand(batch_size, num_point, num_point, k, 3)# (B, N, N, k, 3),扩展锚点向量以匹配参考向量的维度
sin_values = torch.linalg.norm(torch.cross(ref_vectors, anc_vectors, dim=-1), dim=-1)# (B, N, N, k),计算参考向量和锚点向量的叉积的模长,得到正弦值
cos_values = torch.sum(ref_vectors * anc_vectors, dim=-1)# (B, N, N, k),计算参考向量和锚点向量的点积,得到余弦值
angles = torch.atan2(sin_values, cos_values)# (B, N, N, k),计算角度值,使用atan2函数结合正弦值和余弦值
a_indices = angles * self.factor_a# (B, N, N, k),计算角度嵌入的索引,对应公式中的正弦位置编码部分
return d_indices, a_indices#返回距离和角度嵌入的索引
def forward(self, points):
d_indices, a_indices = self.get_embedding_indices(points)#获取距离和角度嵌入的索引
d_embeddings = self.embedding(d_indices)#距离嵌入,通过正弦位置编码获取嵌入特征
d_embeddings = self.proj_d(d_embeddings)#线性变换映射到隐藏维度
a_embeddings = self.embedding(a_indices)#角度嵌入,通过正弦位置编码获取嵌入特征
a_embeddings = self.proj_a(a_embeddings)#线性变换映射到隐藏维度
if self.reduction_a == 'max':#对角度嵌入进行归约操作
a_embeddings = a_embeddings.max(dim=3)#取最大值
else:
a_embeddings = a_embeddings.mean(dim=3)#取均值
embeddings = d_embeddings + a_embeddings#将距离嵌入和角度嵌入相加,得到最终的几何结构嵌入特征
return embeddings看着很复杂,实则就是计算每个点与其他所有点之间的相对距离和相对角度,来编码点云内部的几何结构信息
接下来就是GeoTransfrom的实现了,这里实际上就是利用刚刚所定义的函数,对源点云和目标点云分别进行编码,然后进行Transformer变换,具体代码如下
class GeometricTransformer(nn.Module):
def __init__(
self,
input_dim,
output_dim,
hidden_dim,
num_heads,
blocks,
sigma_d,
sigma_a,
angle_k,
dropout=None,
activation_fn='ReLU',
reduction_a='max',
):
super(GeometricTransformer, self).__init__()
self.embedding = GeometricStructureEmbedding(hidden_dim, sigma_d, sigma_a, angle_k, reduction_a=reduction_a)#几何结构嵌入模块
self.in_proj = nn.Linear(input_dim, hidden_dim)#输入特征映射到隐藏维度
self.transformer = RPEConditionalTransformer(
blocks, hidden_dim, num_heads, dropout=dropout, activation_fn=activation_fn
)#堆叠的自注意力和交叉注意力模块组成的变换器
self.out_proj = nn.Linear(hidden_dim, output_dim)
def forward(
self,
ref_points,
src_points,
ref_feats,
src_feats,
ref_masks=None,
src_masks=None,
):
ref_embeddings = self.embedding(ref_points)#对源点云位置进行几何结构嵌入
src_embeddings = self.embedding(src_points)#对目标点云位置进行几何结构嵌入
ref_feats = self.in_proj(ref_feats)#对源点云特征进行线性映射
src_feats = self.in_proj(src_feats)#对目标点云特征进行线性映射
ref_feats, src_feats = self.transformer(
ref_feats,
src_feats,
ref_embeddings,
src_embeddings,
masks0=ref_masks,
masks1=src_masks,
)#通过变换器模块进行特征变换
ref_feats = self.out_proj(ref_feats)#对源点云特征进行线性映射
src_feats = self.out_proj(src_feats)#对目标点云特征进行线性映射
return ref_feats, src_feats这里我们发现了这个RPEConditionalTransformer函数,这里其实就可以发现它是跟我们之前所学理论知识完全对应,当是自注意力机制时,QKV均来自自身,而如果是交叉注意力,那么它的就是Q来自本身,而KV来自于另一点云,代码如下:
class RPEConditionalTransformer(nn.Module):
def __init__(
self,
blocks,
d_model,
num_heads,
dropout=None,
activation_fn='ReLU',
return_attention_scores=False,
parallel=False,
):
super(RPEConditionalTransformer, self).__init__()
self.blocks = blocks
layers = []
for block in self.blocks:#遍历每个块的类型
_check_block_type(block)
if block == 'self':#如果是自注意力块
layers.append(RPETransformerLayer(d_model, num_heads, dropout=dropout, activation_fn=activation_fn))
else:#如果是交叉注意力块
layers.append(TransformerLayer(d_model, num_heads, dropout=dropout, activation_fn=activation_fn))
self.layers = nn.ModuleList(layers)
self.return_attention_scores = return_attention_scores
self.parallel = parallel
def forward(self, feats0, feats1, embeddings0, embeddings1, masks0=None, masks1=None):
attention_scores = []
for i, block in enumerate(self.blocks):
if block == 'self':#如果是自注意力块
feats0, scores0 = self.layers(feats0, feats0, embeddings0, memory_masks=masks0)
feats1, scores1 = self.layers(feats1, feats1, embeddings1, memory_masks=masks1)
else:#如果是交叉注意力块
if self.parallel:#如果是并行计算
new_feats0, scores0 = self.layers(feats0, feats1, memory_masks=masks1)#计算源点云特征的新表示和注意力分数
new_feats1, scores1 = self.layers(feats1, feats0, memory_masks=masks0)#计算目标点云特征的新表示和注意力分数
feats0 = new_feats0#更新源点云特征
feats1 = new_feats1#更新目标点云特征
else:
feats0, scores0 = self.layers(feats0, feats1, memory_masks=masks1)
feats1, scores1 = self.layers(feats1, feats0, memory_masks=masks0)
if self.return_attention_scores:
attention_scores.append()
if self.return_attention_scores:
return feats0, feats1, attention_scores
else:
return feats0, feats1这里可以发现的是他们实现自注意力机制和交叉注意力使用的函数分别是RPETransformerLayer和TransformerLayer,接下来我们来分别看一下对应代码
class RPETransformerLayer(nn.Module):
def __init__(self, d_model, num_heads, dropout=None, activation_fn='ReLU'):
super(RPETransformerLayer, self).__init__()
self.attention = RPEAttentionLayer(d_model, num_heads, dropout=dropout)#注意力层
self.output = AttentionOutput(d_model, dropout=dropout, activation_fn=activation_fn)#输出层
def forward(
self,
input_states,
memory_states,
position_states,
memory_weights=None,
memory_masks=None,
attention_factors=None,
):
hidden_states, attention_scores = self.attention(
input_states,
memory_states,
position_states,
memory_weights=memory_weights,
memory_masks=memory_masks,
attention_factors=attention_factors,
)#通过注意力层计算隐藏状态和注意力分数
output_states = self.output(hidden_states)#通过输出层计算最终的输出状态
return output_states, attention_scores#返回输出状态和注意力分数这里首先使用RPEAttentionLayer进行注意力层的计算,然后一般注意力处理后的还需要进行线性层和dropout挑选后才能输出,所以还有一个output函数进行处理,这里我们继续跟进看注意力层的实现
class RPEAttentionLayer(nn.Module):
def __init__(self, d_model, num_heads, dropout=None):
super(RPEAttentionLayer, self).__init__()
self.attention = RPEMultiHeadAttention(d_model, num_heads, dropout=dropout)
self.linear = nn.Linear(d_model, d_model)
self.dropout = build_dropout_layer(dropout)
self.norm = nn.LayerNorm(d_model)
def forward(
self,
input_states,
memory_states,
position_states,
memory_weights=None,
memory_masks=None,
attention_factors=None,
):
hidden_states, attention_scores = self.attention(
input_states,
memory_states,
memory_states,
position_states,
key_weights=memory_weights,
key_masks=memory_masks,
attention_factors=attention_factors,
)
hidden_states = self.linear(hidden_states)
hidden_states = self.dropout(hidden_states)
output_states = self.norm(hidden_states + input_states)
return output_states, attention_scores这里可以发现它是使用RPEMultiHeadAttention多头注意力函数实现的,然后加上了线性层,dropout和标准化这些,最终得到了输出状态和注意力分数,跟进这个多头注意力函数观察其实现
class RPEMultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads, dropout=None):
super(RPEMultiHeadAttention, self).__init__()
if d_model % num_heads != 0:
raise ValueError('`d_model` ({}) must be a multiple of `num_heads` ({}).'.format(d_model, num_heads))
self.d_model = d_model#总的特征维度
self.num_heads = num_heads#头的数量
self.d_model_per_head = d_model // num_heads#每个头的特征维度
self.proj_q = nn.Linear(self.d_model, self.d_model)#即q=Wq*x+b_q的实现,查询的线性映射
self.proj_k = nn.Linear(self.d_model, self.d_model)#即k=Wk*x+b_k的实现,键的线性映射
self.proj_v = nn.Linear(self.d_model, self.d_model)#值的线性映射
self.proj_p = nn.Linear(self.d_model, self.d_model)#相对位置嵌入的线性映射
self.dropout = build_dropout_layer(dropout)#dropout层
def forward(self, input_q, input_k, input_v, embed_qk, key_weights=None, key_masks=None, attention_factors=None):
q = rearrange(self.proj_q(input_q), 'b n (h c) -> b h n c', h=self.num_heads)#查询的多头表示,从b n (h c)变换到b h n c
k = rearrange(self.proj_k(input_k), 'b m (h c) -> b h m c', h=self.num_heads)
v = rearrange(self.proj_v(input_v), 'b m (h c) -> b h m c', h=self.num_heads)
p = rearrange(self.proj_p(embed_qk), 'b n m (h c) -> b h n m c', h=self.num_heads)
attention_scores_p = torch.einsum('bhnc,bhnmc->bhnm', q, p)#计算查询和相对位置嵌入之间的注意力分数
attention_scores_e = torch.einsum('bhnc,bhmc->bhnm', q, k)#计算查询和键之间的注意力分数
attention_scores = (attention_scores_e + attention_scores_p) / self.d_model_per_head ** 0.5#缩放注意力分数
if attention_factors is not None:#如果提供了注意力因子
attention_scores = attention_factors.unsqueeze(1) * attention_scores#调整注意力分数
if key_weights is not None:#如果提供了键的权重
attention_scores = attention_scores * key_weights.unsqueeze(1).unsqueeze(1)
if key_masks is not None:#如果提供了键的掩码
attention_scores = attention_scores.masked_fill(key_masks.unsqueeze(1).unsqueeze(1), float('-inf'))
attention_scores = F.softmax(attention_scores, dim=-1)#计算注意力分数的softmax
attention_scores = self.dropout(attention_scores)#应用dropout
hidden_states = torch.matmul(attention_scores, v)#计算加权值
hidden_states = rearrange(hidden_states, 'b h n c -> b n (h c)')#重新排列隐藏状态的形状
return hidden_states, attention_scores#返回隐藏状态和注意力分数这就是它的实现了,然后另一个交叉注意力机制代码与此类似,不过这里的不同的是自注意力是RPE,即带有位置编码的transformer,而交叉注意力则是普通的,只有QKV。在进行过transform变换后,接下来就是超点的筛选了
SuperPointTargetGenerator&&Matching
这里有两个函数,一个是SuperPointTargetGenerator从真值数据中筛选高质量的点对应关系,为训练提供监督目标,另一个是SuperPointMatching,它是基于学习到的特征相似度,预测两个点云之间的对应匹配关系。
具体实现如下,这里主要是筛选出重叠度高于阈值对应关系,如果数量过多就会随机挑选数量为目标数量的点对。
class SuperPointTargetGenerator(nn.Module):
def __init__(self, num_targets, overlap_threshold):
super(SuperPointTargetGenerator, self).__init__()
self.num_targets = num_targets#目标数量
self.overlap_threshold = overlap_threshold#重叠阈值
@torch.no_grad()#禁用梯度计算
def forward(self, gt_corr_indices, gt_corr_overlaps):
gt_corr_masks = torch.gt(gt_corr_overlaps, self.overlap_threshold)#筛选出重叠度高于阈值的对应关系
gt_corr_overlaps = gt_corr_overlaps#筛选对应的重叠度
gt_corr_indices = gt_corr_indices#筛选对应的索引
if gt_corr_indices.shape > self.num_targets:#如果筛选后的对应关系数量超过目标数量
indices = np.arange(gt_corr_indices.shape)#生成对应关系的索引数组
sel_indices = np.random.choice(indices, self.num_targets, replace=False)#随机选择目标数量的索引
sel_indices = torch.from_numpy(sel_indices).cuda()#转换为GPU张量
gt_corr_indices = gt_corr_indices#选择对应的索引
gt_corr_overlaps = gt_corr_overlaps#选择对应的重叠度
gt_ref_corr_indices = gt_corr_indices[:, 0]#选择参考点云中的超点索引
gt_src_corr_indices = gt_corr_indices[:, 1]#选择源点云中的超点索引
return gt_ref_corr_indices, gt_src_corr_indices, gt_corr_overlaps#返回选择的超点索引和对应的重叠度这个则是主要对点进行双归一化,然后得到匹配得分矩阵,选出前top-k个作为匹配好的点对。
class SuperPointMatching(nn.Module):
def __init__(self, num_correspondences, dual_normalization=True):
super(SuperPointMatching, self).__init__()
self.num_correspondences = num_correspondences#最大对应点对数量
self.dual_normalization = dual_normalization#是否使用双重归一化
def forward(self, ref_feats, src_feats, ref_masks=None, src_masks=None):
if ref_masks is None:#如果参考点云的掩码为空
ref_masks = torch.ones(size=(ref_feats.shape,), dtype=torch.bool).cuda()
if src_masks is None:#如果源点云的掩码为空
src_masks = torch.ones(size=(src_feats.shape,), dtype=torch.bool).cuda()
# remove empty patch
ref_indices = torch.nonzero(ref_masks, as_tuple=True)#获取参考点云中非空超点的索引
src_indices = torch.nonzero(src_masks, as_tuple=True)#获取源点云中非空超点的索引
ref_feats = ref_feats#选择参考点云非空超点的特征
src_feats = src_feats#选择源点云非空超点的特征
# select top-k proposals
matching_scores = torch.exp(-pairwise_distance(ref_feats, src_feats, normalized=True))#计算参考点云和源点云超点特征之间的匹配分数
if self.dual_normalization:#如果使用双重归一化
ref_matching_scores = matching_scores / matching_scores.sum(dim=1, keepdim=True)#归一化参考点云的匹配分数
src_matching_scores = matching_scores / matching_scores.sum(dim=0, keepdim=True)#归一化源点云的匹配分数
matching_scores = ref_matching_scores * src_matching_scores#结合两种归一化的匹配分数
num_correspondences = min(self.num_correspondences, matching_scores.numel())#确定实际的对应点对数量
corr_scores, corr_indices = matching_scores.view(-1).topk(k=num_correspondences, largest=True)#选择得分最高的对应点对
ref_sel_indices = corr_indices // matching_scores.shape#计算参考点云中选择的超点索引
src_sel_indices = corr_indices % matching_scores.shape#计算源点云中选择的超点索引
# recover original indices
ref_corr_indices = ref_indices#选择参考点云中对应的超点索引
src_corr_indices = src_indices#选择源点云中对应的超点索引
return ref_corr_indices, src_corr_indices, corr_scores这里得到top-k个点对,就来到了第三部分,将这部分和上采样部分一起输入到Point Matching Module
Point Matching Module
这里主要是进行精细匹配,它会挑选出对应的点对,然后分别对源点云和目标点云的点进行置信度矩阵计算,得到两个矩阵,如果是双向一致,就进行与操作,否则两矩阵就进行或操作,然后移除无效匹配,返回最终的索引和坐标
class PointMatching(nn.Module):
def __init__(
self,
k: int,
mutual: bool = True,
confidence_threshold: float = 0.05,
use_dustbin: bool = False,
use_global_score: bool = False,
remove_duplicate: bool = False,
):
r"""Point Matching with Local-to-Global Registration.
Args:
k (int): top-k selection for matching.
mutual (bool=True): mutual or non-mutual matching.
confidence_threshold (float=0.05): ignore matches whose scores are below this threshold.
use_dustbin (bool=False): whether dustbin row/column is used in the score matrix.
use_global_score (bool=False): whether use patch correspondence scores.
"""
super(PointMatching, self).__init__()
self.k = k#给定的top-k值
self.mutual = mutual#是否进行互相匹配
self.confidence_threshold = confidence_threshold#置信度阈值
self.use_dustbin = use_dustbin#是否使用尘箱
self.use_global_score = use_global_score#是否使用全局分数
self.remove_duplicate = remove_duplicate#是否移除重复匹配
def compute_correspondence_matrix(self, score_mat, ref_knn_masks, src_knn_masks):#定义计算对应矩阵的方法
r"""Compute matching matrix and score matrix for each patch correspondence."""
mask_mat = torch.logical_and(ref_knn_masks.unsqueeze(2), src_knn_masks.unsqueeze(1))#计算掩码矩阵
batch_size, ref_length, src_length = score_mat.shape#获取批量大小、参考点云长度和源点云长度
batch_indices = torch.arange(batch_size).cuda()#生成批量索引
# correspondences from reference side,即参考点云侧的对应关系
ref_topk_scores, ref_topk_indices = score_mat.topk(k=self.k, dim=2)# (B, N, K),给定top-k值,获取每个参考点的k个最高分数及其索引
ref_batch_indices = batch_indices.view(batch_size, 1, 1).expand(-1, ref_length, self.k)# (B, N, K),扩展批量索引以匹配参考点和top-k维度
ref_indices = torch.arange(ref_length).cuda().view(1, ref_length, 1).expand(batch_size, -1, self.k)# (B, N, K),扩展参考点索引以匹配批量和top-k维度
ref_score_mat = torch.zeros_like(score_mat)#初始化参考点云的分数矩阵
ref_score_mat = ref_topk_scores#将top-k分数填充到参考点云的分数矩阵中
ref_corr_mat = torch.gt(ref_score_mat, self.confidence_threshold)#生成参考点云的对应矩阵,基于置信度阈值
# correspondences from source side,即源点云侧的对应关系
src_topk_scores, src_topk_indices = score_mat.topk(k=self.k, dim=1)# (B, K, N)
src_batch_indices = batch_indices.view(batch_size, 1, 1).expand(-1, self.k, src_length)# (B, K, N)
src_indices = torch.arange(src_length).cuda().view(1, 1, src_length).expand(batch_size, self.k, -1)# (B, K, N)
src_score_mat = torch.zeros_like(score_mat)
src_score_mat = src_topk_scores
src_corr_mat = torch.gt(src_score_mat, self.confidence_threshold)
# merge results from two sides
if self.mutual:#如果是互相匹配
corr_mat = torch.logical_and(ref_corr_mat, src_corr_mat)#参考点云和源点云的对应矩阵进行逻辑与操作
else:#如果不是互相匹配
corr_mat = torch.logical_or(ref_corr_mat, src_corr_mat)#参考点云和源点云的对应矩阵进行逻辑或操作
if self.use_dustbin:#如果使用尘箱
corr_mat = corr_mat[:, -1:, -1]#保留尘箱对应的行和列
corr_mat = torch.logical_and(corr_mat, mask_mat)#应用掩码矩阵,移除无效匹配
return corr_mat
def forward(
self,
ref_knn_points,
src_knn_points,
ref_knn_masks,
src_knn_masks,
ref_knn_indices,
src_knn_indices,
score_mat,
global_scores,
):
score_mat = torch.exp(score_mat)#将对数似然转换为概率
corr_mat = self.compute_correspondence_matrix(score_mat, ref_knn_masks, src_knn_masks)# (B, K, K),计算对应矩阵
if self.use_dustbin:#如果使用尘箱
score_mat = score_mat[:, :-1, :-1]
if self.use_global_score:#如果使用全局分数
score_mat = score_mat * global_scores.view(-1, 1, 1)#结合全局分数调整匹配分数
score_mat = score_mat * corr_mat.float()#应用对应矩阵调整匹配分数
batch_indices, ref_indices, src_indices = torch.nonzero(corr_mat, as_tuple=True)#获取非零元素的批量索引、参考点索引和源点索引
ref_corr_indices = ref_knn_indices#获取参考点云中对应的超点索引
src_corr_indices = src_knn_indices#获取源点云中对应的超点索引
ref_corr_points = ref_knn_points#获取参考点云中对应的超点坐标
src_corr_points = src_knn_points#获取源点云中对应的超点坐标
corr_scores = score_mat#获取对应点对的匹配分数
return ref_corr_points, src_corr_points, ref_corr_indices, src_corr_indices, corr_scores#返回对应点对的坐标、索引和匹配分数Local to Global Registraion
这里看主要函数的实现,它其实就是对点对进行分块,然后从惊喜的匹配中不断计算R和t,找出内点数量最多的作为最终的R和t。
def local_to_global_registration(self, ref_knn_points, src_knn_points, score_mat, corr_mat):#本地到全局注册
# extract dense correspondences
batch_indices, ref_indices, src_indices = torch.nonzero(corr_mat, as_tuple=True)#获取非零元素的批量索引、参考点索引和源点索引
global_ref_corr_points = ref_knn_points#获取参考点云的对应点
global_src_corr_points = src_knn_points#获取源点云的对应点
global_corr_scores = score_mat#获取对应点的分数
# build verification set,即建立验证集
if self.correspondence_limit is not None and global_corr_scores.shape > self.correspondence_limit:#限制对应点的数量
corr_scores, sel_indices = global_corr_scores.topk(k=self.correspondence_limit, largest=True)#选择得分最高的对应点
ref_corr_points = global_ref_corr_points#选择参考点云中对应的点
src_corr_points = global_src_corr_points#选择源点云中对应的点
else:#不限制对应点的数量
ref_corr_points = global_ref_corr_points#选择参考点云中对应的点
src_corr_points = global_src_corr_points#选择源点云中对应的点
corr_scores = global_corr_scores#选择对应点的分数
# compute starting and ending index of each patch correspondence.
# torch.nonzero is row-major, so the correspondences from the same patch correspondence are consecutive.
# find the first occurrence of each batch index, then the chunk of this batch can be obtained.
unique_masks = torch.ne(batch_indices, batch_indices[:-1])#找到每个批次索引的第一次出现
unique_indices = torch.nonzero(unique_masks, as_tuple=True) + 1#调整索引以匹配原始张量
unique_indices = unique_indices.detach().cpu().numpy().tolist()#将张量转换为列表
unique_indices = + unique_indices + ]#添加起始和结束索引
chunks = [
(x, y) for x, y in zip(unique_indices[:-1], unique_indices) if y - x >= self.correspondence_threshold
]#为每个批次创建块,确保每个块至少有最小数量的对应点
batch_size = len(chunks)#计算批次大小
if batch_size > 0:#如果批次大小大于0
# local registration
batch_ref_corr_points, batch_src_corr_points, batch_corr_scores = self.convert_to_batch(
global_ref_corr_points, global_src_corr_points, global_corr_scores, chunks
)#转换为批量点
batch_transforms = self.procrustes(batch_src_corr_points, batch_ref_corr_points, batch_corr_scores)#计算变换矩阵
batch_aligned_src_corr_points = apply_transform(src_corr_points.unsqueeze(0), batch_transforms)#应用变换矩阵
batch_corr_residuals = torch.linalg.norm(
ref_corr_points.unsqueeze(0) - batch_aligned_src_corr_points, dim=2
)#计算残差
batch_inlier_masks = torch.lt(batch_corr_residuals, self.acceptance_radius)# (P, N),即进行lier掩码
best_index = batch_inlier_masks.sum(dim=1).argmax()#选择具有最多内点的批次作为最佳索引
cur_corr_scores = corr_scores * batch_inlier_masks.float()#更新当前对应分数
else:#如果批次大小为0
# degenerate: initialize transformation with all correspondences
estimated_transform = self.procrustes(src_corr_points, ref_corr_points, corr_scores)#计算初始变换矩阵
cur_corr_scores = self.recompute_correspondence_scores(
ref_corr_points, src_corr_points, corr_scores, estimated_transform
)#更新当前对应分数
# global refinement
estimated_transform = self.procrustes(src_corr_points, ref_corr_points, cur_corr_scores)#计算变换矩阵
for _ in range(self.num_refinement_steps - 1):
cur_corr_scores = self.recompute_correspondence_scores(
ref_corr_points, src_corr_points, corr_scores, estimated_transform
)# 根据当前变换重新计算内点
estimated_transform = self.procrustes(src_corr_points, ref_corr_points, cur_corr_scores)#计算变换矩阵
return global_ref_corr_points, global_src_corr_points, global_corr_scores, estimated_transform#返回参考点云对应点、源点云对应点、对应分数和估计变换矩阵
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! 感谢分享,学习下。 这个好,看起来很实用 yyds。多谢分享 懂技术并乐意极积无私分享的人越来越少。珍惜 分享、互助 让互联网精神温暖你我 分享、互助 让互联网精神温暖你我 这个好,看起来很实用 谢谢分享,辛苦了 东西不错很实用谢谢分享 谢谢分享,辛苦了 用心讨论,共获提升! 谢谢楼主提供! 不错,里面软件多更新就更好了
页:
[1]