搜索
您的当前位置:首页正文

tensorflow keras 搭建相机位姿估计网络--例

来源:易榕旅网

主要展示keras 的使用方法,搭建网络模型的步骤。

概要

这个网络是利用两帧图像,预测相机的位姿变换矩阵,总共6个自由度
网络的输入: [image_ref , image_cur]
网络的输出: [tx , ty , tz , roll , pitch , yaw]
 

一、数据集的处理

数据集的格式:

# image_ref  image_cur  tx  ty  tz  roll(x) pitch(y) yaw(z)
0 images/0.jpg images/1.jpg 0.000999509 -0.00102794 0.00987293 0.00473228 -0.0160252 -0.0222079 
1 images/1.jpg images/2.jpg -0.00544488 -0.00282174 0.00828871 -0.00271557 -0.00770117 -0.0195182 
2 images/2.jpg images/3.jpg -0.0074375 -0.00368121 0.0114751 -0.00721246 -0.0103843 -0.0171883 
3 images/3.jpg images/4.jpg -0.00238111 -0.00371362 0.0120466 -0.0081171 -0.0149111 -0.0198595 
4 images/4.jpg images/5.jpg 0.000965841 -0.00520437 0.0135452 -0.0141721 -0.0126401 -0.0182697 
5 images/5.jpg images/6.jpg -0.00295753 -0.00340146 0.0144557 -0.013633 -0.00463747 -0.0143332 

通过load_image映射数据集到 TF tensor

class datasets:
    def __init__(self, datasetsPath:str):
        self.dataPath = datasetsPath
        self.dim = 512
        self.epochs = 40
        self.batch_size = 8
        self.train_percent = 0.92
        self.learning_rate = 2e-4
        self.model_path = 'kerasTempModel/'
        self.posetxt = os.path.join(self.dataPath,'pose.txt') 

        self.GetTheImagesAndPose()
        self.buildTrainData()

    def GetTheImagesAndPose(self):
        self.poselist = []
        with open(self.posetxt,'r') as f:
            for line in f.readlines():
                line = line.strip()
                line = line.split(' ')
                line.remove(line[0])
                self.poselist.append(line)
                # im1 im2 tx,ty,tz,roll,pitch,yaw
        #打乱数据集
        length = np.shape(self.poselist)[0]
        train_num =int(length * self.train_percent) 
        test_num = length - train_num
        randomPoses = np.array(random.sample(self.poselist,length)) #取出所有数据集
        self.train_pose_list = randomPoses[0:train_num,:]
        self.test_pose_list = randomPoses[train_num:length+1,:]
        print(f"The size of train pose list is : {np.shape(self.train_pose_list)[0]}")
        print(f"The size of test pose list is : {np.shape(self.test_pose_list)[0]}")

    def load_image(self,index:tf.Tensor):

        img_ref= img_ref = tf.io.read_file(index[0])
        img_ref = tf.image.decode_jpeg(img_ref) #此处为jpeg格式
        img_ref = tf.image.resize(img_ref,(self.dim,self.dim))/255.0
        #img = tf.reshape(img,[self.dim,self.dim,3])
        img_ref = tf.cast(img_ref,tf.float32)

        img_cur = img_cur = tf.io.read_file(index[1])
        img_cur = tf.image.decode_jpeg(img_cur) #此处为jpeg格式
        img_cur = tf.image.resize(img_cur,(self.dim,self.dim))/255.0
        #img = tf.reshape(img,[self.dim,self.dim,3])
        img_cur = tf.cast(img_cur,tf.float32)
        
        pose = tf.strings.to_number(index[2:8],tf.float32)
        
        return (img_ref,img_cur),(pose)
        
    def buildTrainData(self):
        '''
        for example:\\
        >>> poses = dataset.y_train.take(20)\\
        >>> imgs = dataset.x1_train.take(40)\\
        >>> print(np.array(list(imgs.as_numpy_iterator()))[39]) \\
        >>> imgs = dataset.x2_train.take(40)\\
        >>> print(np.array(list(imgs.as_numpy_iterator()))[39]) \\
        >>> print(np.array(list(poses.as_numpy_iterator()))[19]) \\
        '''
        self.traindata = tf.data.Dataset.from_tensor_slices(self.train_pose_list) \
           .map(self.load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE) \
           .shuffle(500)\
           .repeat(10)\
           .batch(self.batch_size) \
           .prefetch(tf.data.experimental.AUTOTUNE)#.cache() 
           
        self.testdata = tf.data.Dataset.from_tensor_slices(self.test_pose_list) \
           .map(self.load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE) \
           .shuffle(500)\
           .repeat(10)\
           .batch(self.batch_size) \
           .prefetch(tf.data.experimental.AUTOTUNE)

二、网络模型的搭建

简单的模型

# 模型
def model(dim):
    First = K.layers.Input(shape=(dim,dim,3),name="input1")
    Second = K.layers.Input(shape=(dim,dim,3),name="input2")

    x1 = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(First)
    x1 = K.layers.Conv2D(512,kernel_size=(3,3), strides=2,padding='same')(x1)
    x1 = K.layers.BatchNormalization()(x1)
    x1 = K.layers.ReLU()(x1)
    x1 = K.layers.Conv2D(256,kernel_size=(3,3), strides=2,padding='same')(x1)
    x1 = K.layers.BatchNormalization()(x1)
    x1 = K.layers.ReLU()(x1)
    x1 = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(x1)

    x2 = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(Second)
    x2 = K.layers.Conv2D(512,kernel_size=(3,3), strides=2,padding='same')(x2)
    x2 = K.layers.BatchNormalization()(x2)
    x2 = K.layers.ReLU()(x2)
    x2 = K.layers.Conv2D(256,kernel_size=(3,3), strides=2,padding='same')(x2)
    x2 = K.layers.BatchNormalization()(x2)
    x2 = K.layers.ReLU()(x2)
    x2 = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(x2)

    x = K.layers.concatenate([x1,x2])
    x = K.layers.Conv2D(256,kernel_size=(3,3), strides=1,padding='same',
                        activation='relu')(x)
    x = K.layers.BatchNormalization()(x)
    x = K.layers.ReLU()(x)
    x = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(x)
    x = K.layers.Conv2D(128,kernel_size=(3,3), strides=1,padding='same',
                        activation='relu')(x)
    x = K.layers.BatchNormalization()(x)
    x = K.layers.ReLU()(x)
    x = K.layers.Flatten()(x)
    x = K.layers.Dense(1024)(x)
    x = K.layers.Dense(6,name='Output')(x)
    poseModel = K.Model([First,Second],x)

    return poseModel
#损失函数
def loss_fn(y_true,y_pre):
    loss_value = K.backend.mean(K.backend.square(y_true-y_pre))
    return loss_value

# 学习率下降回调函数
class learningDecay(K.callbacks.Callback):
    def __init__(self,schedule=None,alpha=1,verbose=0):
        super().__init__()
        self.schedule = schedule
        self.verbose = verbose
        self.alpha = alpha
    def on_epoch_begin(self, epoch, logs=None):
        lr = float(K.backend.get_value(self.model.optimizer.lr))
        if self.schedule != None:
            lr = self.schedule(epoch,lr)
        else:
            if epoch != 0:
                lr = lr*self.alpha
        K.backend.set_value(self.model.optimizer.lr,K.backend.get_value(lr))
        if self.verbose > 0:
            print(f"Current learning rate is {lr}")
# 学习率计划
def scheduler(epoch, lr):
    if epoch < 10:
        return lr
    else:
        return lr * tf.math.exp(-0.1) 

resnet34为主骨

#-------resnet 34-------------
def conv_block(inputs, 
        neuron_num, 
        kernel_size,  
        use_bias, 
        padding= 'same',
        strides= (1, 1),
        with_conv_short_cut = False):
    conv1 = K.layers.Conv2D(
        neuron_num,
        kernel_size = kernel_size,
        activation= 'relu',
        strides= strides,
        use_bias= use_bias,
        padding= padding
    )(inputs)
    conv1 = K.layers.BatchNormalization(axis = 1)(conv1)

    conv2 = K.layers.Conv2D(
        neuron_num,
        kernel_size= kernel_size,
        activation= 'relu',
        use_bias= use_bias,
        padding= padding)(conv1)
    conv2 = K.layers.BatchNormalization(axis = 1)(conv2)

    if with_conv_short_cut:
        inputs = K.layers.Conv2D(
            neuron_num, 
            kernel_size= kernel_size,
            strides= strides,
            use_bias= use_bias,
            padding= padding
            )(inputs)
        return K.layers.add([inputs, conv2])

    else:
        return K.layers.add([inputs, conv2])

def ResNet34(inputs,namescope = ""):
    x = K.layers.ZeroPadding2D((3, 3))(inputs)

    # Define the converlutional block 1
    x = K.layers.Conv2D(64, kernel_size= (7, 7), strides= (2, 2), padding= 'valid')(x)
    x = K.layers.BatchNormalization(axis= 1)(x)
    x = K.layers.MaxPooling2D(pool_size= (3, 3), strides= (2, 2), padding= 'same')(x)

    # Define the converlutional block 2
    x = conv_block(x, neuron_num= 64, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 64, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 64, kernel_size= (3, 3), use_bias= True)

    # Define the converlutional block 3
    x = conv_block(x, neuron_num= 128, kernel_size= (3, 3), use_bias= True, strides= (2, 2), with_conv_short_cut= True)
    x = conv_block(x, neuron_num= 128, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 128, kernel_size= (3, 3), use_bias= True)

    # Define the converlutional block 4
    x = conv_block(x, neuron_num= 256, kernel_size= (3, 3), use_bias= True, strides= (2, 2), with_conv_short_cut= True)
    x = conv_block(x, neuron_num= 256, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 256, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 256, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 256, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 256, kernel_size= (3, 3), use_bias= True)

    # Define the converltional block 5
    x = conv_block(x, neuron_num= 512, kernel_size= (3, 3), use_bias= True, strides= (2, 2), with_conv_short_cut= True)
    x = conv_block(x, neuron_num= 512, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 512, kernel_size= (3, 3), use_bias= True)
    x = K.layers.AveragePooling2D(pool_size=(7, 7))(x)
    return x


def model(dim_w,dim_h):
    First = K.layers.Input(shape=(dim_w,dim_h,3),name="input1")
    Second = K.layers.Input(shape=(dim_w,dim_h,3),name="input2")

    # x1 = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(First)
    x1 = K.layers.Conv2D(128,kernel_size=(3,3), strides=2,padding='same')(First)
    x1 = K.layers.BatchNormalization()(x1)
    x1 = K.layers.LeakyReLU()(x1)
    # x1 = K.layers.Conv2D(256,kernel_size=(3,3), strides=2,padding='same')(x1)
    # x1 = K.layers.BatchNormalization()(x1)
    # x1 = K.layers.ReLU()(x1)
    x1 = ResNet34(x1,"x1")

    # x2 = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(Second)
    x2 = K.layers.Conv2D(128,kernel_size=(3,3), strides=2,padding='same')(Second)
    x2 = K.layers.BatchNormalization()(x2)
    x2 = K.layers.LeakyReLU()(x2)
    # x2 = K.layers.Conv2D(256,kernel_size=(3,3), strides=2,padding='same')(x2)
    # x2 = K.layers.BatchNormalization()(x2)
    # x2 = K.layers.ReLU()(x2)
    x2 = ResNet34(x2,"x2")

    x = K.layers.concatenate([x1,x2])

    x = K.layers.Flatten()(x)
    x = K.layers.Dense(6,name='Output')(x)
    poseModel = K.Model([First,Second],x)

    return poseModel

def loss_fn(y_true,y_pre):
    loss_value_translation = K.backend.square(y_true[-1,0:3]-y_pre[-1,0:3])
    loss_value_rotation = 1/5.7*K.backend.square(y_true[-1,3:6]-y_pre[-1,3:6])
    loss_value = K.backend.mean(loss_value_translation + loss_value_rotation)

    # loss_value = K.backend.mean(K.backend.square(y_true-y_pre))
    # tf.print(y_pre)
    return loss_value

三、模型的训练

build()函数用来编译模型,建立回调函数;
train_fit() 函数是利用keras 的 fit函数来训练;
train_gradient() 函数是利用 apply_gradients函数训练,可以实时对loss和梯度进行监控,灵活性强;
save_model() 函数是保存模型的函数,可以有多种保存的方式,利用model.save() 可以保存为h5文件和tf格式的模型。

class Posenet:
    def __init__(self,dataset:datasets):
        self.dataset = dataset
        self.build()

    def build(self):
        self.poseModel = model(self.dataset.dim)
        self.poseModel.summary()
        self.optm = K.optimizers.RMSprop(1e-4,momentum=0.9) #,decay=1e-5/self.dataset.epochs
        self.decayCallback = learningDecay(schedule = None,alpha = 0.99,verbose = 1)
        decayCallbackScheduler = K.callbacks.LearningRateScheduler(scheduler)
        self.callbacks = [decayCallbackScheduler]

        try:
            print("************************loading the model weights***********************************")
            self.poseModel.load_weights("model.h5")
        except:
            pass

    def train_fit(self):
        self.poseModel.compile(optimizer=self.optm,loss=loss_fn,metrics=['accuracy'])
        self.poseModel.fit(self.dataset.traindata,
                            validation_data=self.dataset.testdata,
                            epochs=self.dataset.epochs,
                            callbacks=[self.decayCallback],
                            verbose=1)
    
    def train_gradient(self):
        for step in range(self.dataset.epochs):
            loss = 0
            val_loss = 0
            lr = float(self.optm.lr)
            tf.print(">>> [Epoch is %s/%s]"%(step,self.dataset.epochs))
            for (x1,x2),y in self.dataset.traindata:
                with tf.GradientTape() as tape:
                    prediction = self.poseModel([x1,x2])
                    # y = tf.cast(y,dtype=prediction.dtype)
                    loss = loss_fn(y,prediction)
                gradients = tape.gradient(loss,self.poseModel.trainable_variables)
                self.optm.apply_gradients(zip(gradients,self.poseModel.trainable_variables))
            # 测试
            for (x1,x2),y in self.dataset.testdata:
                prediction = self.poseModel([x1,x2])
                val_loss = loss_fn(y,prediction)
            tf.print("The loss is %s,the learning rate is : %s, test loss is %s]"%(np.array(loss),lr,val_loss))
            K.backend.set_value(self.optm.lr,K.backend.get_value(lr*0.99))

    def save_model(self):
        '''
        利用 save 函数来保存,可以保存为h5文件,也可以保存为文件夹的形式,推荐保存第二种,再使用tf2onnx转onnx
        >>> python -m tf2onnx.convert --saved-model kerasTempModel --output "model.onnx" --opset 14
        '''
        self.poseModel.save("model.h5")
        self.poseModel.save(self.dataset.model_path)
        # self.poseModel.save_weights("model.h5") #只保存权重,没有保存结构
        # tf.saved_model.save(self.poseModel,'tf2TempModel') #这种保存方式不再使用了

主函数:

if __name__ == "__main__":
    dataset = datasets("images")
    posenet = Posenet(dataset)
    posenet.train_fit()
    # posenet.train_gradient() #利用 apply_gradient的方式训练
    posenet.save_model()

四、加载保存的模型

保存的模型文件夹为: kerasTempModel\

model = K.models.load_model(dataset.model_path,compile=False)

测试模型:

output = model([img_ref,img_cur])

五、完整代码

import argparse
import tensorflow as tf
import tensorflow.keras as K
import numpy as np
import cv2 as cv
import os
import time
import random
from tensorflow.keras import optimizers
from tensorflow.keras import callbacks

class datasets:
    def __init__(self, datasetsPath:str):
        self.dataPath = datasetsPath
        self.dim = 512
        self.epochs = 40
        self.batch_size = 8
        self.train_percent = 0.92
        self.learning_rate = 2e-4
        self.model_path = 'kerasTempModel/'
        self.posetxt = os.path.join(self.dataPath,'pose.txt') 

        self.GetTheImagesAndPose()
        self.buildTrainData()

    def GetTheImagesAndPose(self):
        self.poselist = []
        with open(self.posetxt,'r') as f:
            for line in f.readlines():
                line = line.strip()
                line = line.split(' ')
                line.remove(line[0])
                self.poselist.append(line)
                # im1 im2 tx,ty,tz,roll,pitch,yaw
        #打乱数据集
        length = np.shape(self.poselist)[0]
        train_num =int(length * self.train_percent) 
        test_num = length - train_num
        randomPoses = np.array(random.sample(self.poselist,length)) #取出所有数据集
        self.train_pose_list = randomPoses[0:train_num,:]
        self.test_pose_list = randomPoses[train_num:length+1,:]
        print(f"The size of train pose list is : {np.shape(self.train_pose_list)[0]}")
        print(f"The size of test pose list is : {np.shape(self.test_pose_list)[0]}")

    def load_image(self,index:tf.Tensor):

        img_ref= img_ref = tf.io.read_file(index[0])
        img_ref = tf.image.decode_jpeg(img_ref) #此处为jpeg格式
        img_ref = tf.image.resize(img_ref,(self.dim,self.dim))/255.0
        #img = tf.reshape(img,[self.dim,self.dim,3])
        img_ref = tf.cast(img_ref,tf.float32)

        img_cur = img_cur = tf.io.read_file(index[1])
        img_cur = tf.image.decode_jpeg(img_cur) #此处为jpeg格式
        img_cur = tf.image.resize(img_cur,(self.dim,self.dim))/255.0
        #img = tf.reshape(img,[self.dim,self.dim,3])
        img_cur = tf.cast(img_cur,tf.float32)
        pose = tf.strings.to_number(index[2:8],tf.float32)
        return (img_ref,img_cur),(pose)

    def buildTrainData(self):
        '''
        for example:\\
        >>> poses = dataset.y_train.take(20)\\
        >>> imgs = dataset.x1_train.take(40)\\
        >>> print(np.array(list(imgs.as_numpy_iterator()))[39]) \\
        >>> imgs = dataset.x2_train.take(40)\\
        >>> print(np.array(list(imgs.as_numpy_iterator()))[39]) \\
        >>> print(np.array(list(poses.as_numpy_iterator()))[19]) \\
        '''
        self.traindata = tf.data.Dataset.from_tensor_slices(self.train_pose_list) \
           .map(self.load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE) \
           .shuffle(500)\
           .repeat(10)\
           .batch(self.batch_size) \
           .prefetch(tf.data.experimental.AUTOTUNE)#.cache() 
        self.testdata = tf.data.Dataset.from_tensor_slices(self.test_pose_list) \
           .map(self.load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE) \
           .shuffle(500)\
           .repeat(10)\
           .batch(self.batch_size) \
           .prefetch(tf.data.experimental.AUTOTUNE)

def model(dim):
    First = K.layers.Input(shape=(dim,dim,3),name="input1")
    Second = K.layers.Input(shape=(dim,dim,3),name="input2")

    x1 = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(First)
    x1 = K.layers.Conv2D(512,kernel_size=(3,3), strides=2,padding='same')(x1)
    x1 = K.layers.BatchNormalization()(x1)
    x1 = K.layers.ReLU()(x1)
    x1 = K.layers.Conv2D(256,kernel_size=(3,3), strides=2,padding='same')(x1)
    x1 = K.layers.BatchNormalization()(x1)
    x1 = K.layers.ReLU()(x1)
    x1 = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(x1)

    x2 = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(Second)
    x2 = K.layers.Conv2D(512,kernel_size=(3,3), strides=2,padding='same')(x2)
    x2 = K.layers.BatchNormalization()(x2)
    x2 = K.layers.ReLU()(x2)
    x2 = K.layers.Conv2D(256,kernel_size=(3,3), strides=2,padding='same')(x2)
    x2 = K.layers.BatchNormalization()(x2)
    x2 = K.layers.ReLU()(x2)
    x2 = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(x2)

    x = K.layers.concatenate([x1,x2])
    x = K.layers.Conv2D(256,kernel_size=(3,3), strides=1,padding='same',
                        activation='relu')(x)
    x = K.layers.BatchNormalization()(x)
    x = K.layers.ReLU()(x)
    x = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(x)
    x = K.layers.Conv2D(128,kernel_size=(3,3), strides=1,padding='same',
                        activation='relu')(x)
    x = K.layers.BatchNormalization()(x)
    x = K.layers.ReLU()(x)
    x = K.layers.Flatten()(x)
    x = K.layers.Dense(1024)(x)
    x = K.layers.Dense(6,name='Output')(x)
    poseModel = K.Model([First,Second],x)

    return poseModel

def loss_fn(y_true,y_pre):
    loss_value = K.backend.mean(K.backend.square(y_true-y_pre))
    return loss_value


class learningDecay(K.callbacks.Callback):
    def __init__(self,schedule=None,alpha=1,verbose=0):
        super().__init__()
        self.schedule = schedule
        self.verbose = verbose
        self.alpha = alpha
    def on_epoch_begin(self, epoch, logs=None):
        lr = float(K.backend.get_value(self.model.optimizer.lr))
        if self.schedule != None:
            lr = self.schedule(epoch,lr)
        else:
            if epoch != 0:
                lr = lr*self.alpha
        K.backend.set_value(self.model.optimizer.lr,K.backend.get_value(lr))
        if self.verbose > 0:
            print(f"Current learning rate is {lr}")

def scheduler(epoch, lr):
    if epoch < 10:
        return lr
    else:
        return lr * tf.math.exp(-0.1) 

class Posenet:
    def __init__(self,dataset:datasets):
        self.dataset = dataset
        self.build()

    def build(self):
        self.poseModel = model(self.dataset.dim)
        self.poseModel.summary()
        self.optm = K.optimizers.RMSprop(1e-4,momentum=0.9) #,decay=1e-5/self.dataset.epochs
        self.decayCallback = learningDecay(schedule = None,alpha = 0.99,verbose = 1)
        decayCallbackScheduler = K.callbacks.LearningRateScheduler(scheduler)
        self.callbacks = [decayCallbackScheduler]

        try:
            print("************************loading the model weights***********************************")
            self.poseModel.load_weights("model.h5")
        except:
            pass

    def train_fit(self):
        self.poseModel.compile(optimizer=self.optm,loss=loss_fn,metrics=['accuracy'])
        self.poseModel.fit(self.dataset.traindata,
                            validation_data=self.dataset.testdata,
                            epochs=self.dataset.epochs,
                            callbacks=[self.decayCallback],
                            verbose=1)
    
    def train_gradient(self):
        for step in range(self.dataset.epochs):
            loss = 0
            val_loss = 0
            lr = float(self.optm.lr)
            tf.print(">>> [Epoch is %s/%s]"%(step,self.dataset.epochs))
            for (x1,x2),y in self.dataset.traindata:
                with tf.GradientTape() as tape:
                    prediction = self.poseModel([x1,x2])
                    # y = tf.cast(y,dtype=prediction.dtype)
                    loss = loss_fn(y,prediction)
                gradients = tape.gradient(loss,self.poseModel.trainable_variables)
                self.optm.apply_gradients(zip(gradients,self.poseModel.trainable_variables))
            # 测试
            for (x1,x2),y in self.dataset.testdata:
                prediction = self.poseModel([x1,x2])
                val_loss = loss_fn(y,prediction)
            tf.print("The loss is %s,the learning rate is : %s, test loss is %s]"%(np.array(loss),lr,val_loss))
            K.backend.set_value(self.optm.lr,K.backend.get_value(lr*0.99))

    def save_model(self):
        '''
        利用 save 函数来保存,可以保存为h5文件,也可以保存为文件夹的形式,推荐保存第二种,再使用tf2onnx转onnx
        >>> python -m tf2onnx.convert --saved-model kerasTempModel --output "model.onnx" --opset 14
        '''
        self.poseModel.save("model.h5")
        self.poseModel.save(self.dataset.model_path)
        # self.poseModel.save_weights("model.h5") #只保存权重,没有保存结构
        # tf.saved_model.save(self.poseModel,'tf2TempModel') #这种保存方式不再使用了

if __name__ == "__main__":
    dataset = datasets("images")
    posenet = Posenet(dataset)
    posenet.train_fit()
    # posenet.train_gradient() #利用 apply_gradient的方式训练
    posenet.save_model()
    

改进:

import argparse
import tensorflow as tf
import tensorflow.keras as K
import numpy as np
import cv2 as cv
import os
import time
import sys
import random
from tensorflow.keras import optimizers
from tensorflow.keras import callbacks
from tensorflow.python.keras.saving.save import save_model

class datasets:
    def __init__(self, datasetsPath:str):
        self.dataPath = datasetsPath
        self.dim_w = 512
        self.dim_h = 512
        self.epochs = 200
        self.batch_size = 8
        self.train_percent = 0.92
        self.learning_rate = 2e-4
        self.model_path = 'kerasTempModel/'
        self.posetxt = os.path.join(self.dataPath,'pose.txt') 

        self.GetTheImagesAndPose()
        self.buildTrainData()

    def GetTheImagesAndPose(self):
        self.poselist = []
        with open(self.posetxt,'r') as f:
            for line in f.readlines():
                line = line.strip()
                line = line.split(' ')
                line.remove(line[0])
                self.poselist.append(line)
                # im1 im2 tx,ty,tz,roll,pitch,yaw
        #打乱数据集
        length = np.shape(self.poselist)[0]
        train_num =int(length * self.train_percent) 
        test_num = length - train_num
        randomPoses = np.array(random.sample(self.poselist,length)) #取出所有数据集
        self.train_pose_list = randomPoses[0:train_num,:]
        self.test_pose_list = randomPoses[train_num:length+1,:]
        print(f"The size of train pose list is : {np.shape(self.train_pose_list)[0]}")
        print(f"The size of test pose list is : {np.shape(self.test_pose_list)[0]}")

    def load_image(self,index:tf.Tensor):

        img_ref= img_ref = tf.io.read_file(index[0])
        img_ref = tf.image.decode_jpeg(img_ref) #此处为jpeg格式
        #img = tf.reshape(img,[self.dim,self.dim,3])
        img_ref = tf.image.resize(img_ref,(self.dim_w,self.dim_h))/255.0
        img_ref = tf.cast(img_ref,tf.float32)

        img_cur = img_cur = tf.io.read_file(index[1])
        img_cur = tf.image.decode_jpeg(img_cur) #此处为jpeg格式
        img_cur = tf.image.resize(img_cur,(self.dim_w,self.dim_h))/255.0
        #img = tf.reshape(img,[self.dim,self.dim,3])
        img_cur = tf.cast(img_cur,tf.float32)
        pose = tf.strings.to_number(index[2:8],tf.float32)
        return (img_ref,img_cur),(pose)

    def buildTrainData(self):
        '''
        for example:\\
        >>> poses = dataset.y_train.take(20)\\
        >>> imgs = dataset.x1_train.take(40)\\
        >>> print(np.array(list(imgs.as_numpy_iterator()))[39]) \\
        >>> imgs = dataset.x2_train.take(40)\\
        >>> print(np.array(list(imgs.as_numpy_iterator()))[39]) \\
        >>> print(np.array(list(poses.as_numpy_iterator()))[19]) \\
        '''
        self.traindata = tf.data.Dataset.from_tensor_slices(self.train_pose_list) \
           .map(self.load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE) \
           .shuffle(500)\
           .repeat(10)\
           .batch(self.batch_size) \
           .prefetch(tf.data.experimental.AUTOTUNE)#.cache() 
        self.testdata = tf.data.Dataset.from_tensor_slices(self.test_pose_list) \
           .map(self.load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE) \
           .shuffle(500)\
           .repeat(10)\
           .batch(self.batch_size) \
           .prefetch(tf.data.experimental.AUTOTUNE)
#-------resnet 34-------------
def conv_block(inputs, 
        neuron_num, 
        kernel_size,  
        use_bias, 
        padding= 'same',
        strides= (1, 1),
        with_conv_short_cut = False):
    conv1 = K.layers.Conv2D(
        neuron_num,
        kernel_size = kernel_size,
        activation= 'relu',
        strides= strides,
        use_bias= use_bias,
        padding= padding
    )(inputs)
    conv1 = K.layers.BatchNormalization(axis = 1)(conv1)

    conv2 = K.layers.Conv2D(
        neuron_num,
        kernel_size= kernel_size,
        activation= 'relu',
        use_bias= use_bias,
        padding= padding)(conv1)
    conv2 = K.layers.BatchNormalization(axis = 1)(conv2)

    if with_conv_short_cut:
        inputs = K.layers.Conv2D(
            neuron_num, 
            kernel_size= kernel_size,
            strides= strides,
            use_bias= use_bias,
            padding= padding
            )(inputs)
        return K.layers.add([inputs, conv2])

    else:
        return K.layers.add([inputs, conv2])

def ResNet34(inputs,namescope = ""):
    x = K.layers.ZeroPadding2D((3, 3))(inputs)

    # Define the converlutional block 1
    x = K.layers.Conv2D(64, kernel_size= (7, 7), strides= (2, 2), padding= 'valid')(x)
    x = K.layers.BatchNormalization(axis= 1)(x)
    x = K.layers.MaxPooling2D(pool_size= (3, 3), strides= (2, 2), padding= 'same')(x)

    # Define the converlutional block 2
    x = conv_block(x, neuron_num= 64, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 64, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 64, kernel_size= (3, 3), use_bias= True)

    # Define the converlutional block 3
    x = conv_block(x, neuron_num= 128, kernel_size= (3, 3), use_bias= True, strides= (2, 2), with_conv_short_cut= True)
    x = conv_block(x, neuron_num= 128, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 128, kernel_size= (3, 3), use_bias= True)

    # Define the converlutional block 4
    x = conv_block(x, neuron_num= 256, kernel_size= (3, 3), use_bias= True, strides= (2, 2), with_conv_short_cut= True)
    x = conv_block(x, neuron_num= 256, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 256, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 256, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 256, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 256, kernel_size= (3, 3), use_bias= True)

    # Define the converltional block 5
    x = conv_block(x, neuron_num= 512, kernel_size= (3, 3), use_bias= True, strides= (2, 2), with_conv_short_cut= True)
    x = conv_block(x, neuron_num= 512, kernel_size= (3, 3), use_bias= True)
    x = conv_block(x, neuron_num= 512, kernel_size= (3, 3), use_bias= True)
    x = K.layers.AveragePooling2D(pool_size=(7, 7))(x)
    return x


def model(dim_w,dim_h):
    First = K.layers.Input(shape=(dim_w,dim_h,3),name="input1")
    Second = K.layers.Input(shape=(dim_w,dim_h,3),name="input2")

    # x1 = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(First)
    x1 = K.layers.Conv2D(128,kernel_size=(3,3), strides=2,padding='same')(First)
    x1 = K.layers.BatchNormalization()(x1)
    x1 = K.layers.LeakyReLU()(x1)
    # x1 = K.layers.Conv2D(256,kernel_size=(3,3), strides=2,padding='same')(x1)
    # x1 = K.layers.BatchNormalization()(x1)
    # x1 = K.layers.ReLU()(x1)
    x1 = ResNet34(x1,"x1")

    # x2 = K.layers.MaxPool2D(pool_size=(2,2),strides=2)(Second)
    x2 = K.layers.Conv2D(128,kernel_size=(3,3), strides=2,padding='same')(Second)
    x2 = K.layers.BatchNormalization()(x2)
    x2 = K.layers.LeakyReLU()(x2)
    # x2 = K.layers.Conv2D(256,kernel_size=(3,3), strides=2,padding='same')(x2)
    # x2 = K.layers.BatchNormalization()(x2)
    # x2 = K.layers.ReLU()(x2)
    x2 = ResNet34(x2,"x2")

    x = K.layers.concatenate([x1,x2])

    x = K.layers.Flatten()(x)
    x = K.layers.Dense(6,name='Output')(x)
    poseModel = K.Model([First,Second],x)

    return poseModel

def loss_fn(y_true,y_pre):
    loss_value_translation = K.backend.square(y_true[-1,0:3]-y_pre[-1,0:3])
    loss_value_rotation = 1/5.7*K.backend.square(y_true[-1,3:6]-y_pre[-1,3:6])
    loss_value = K.backend.mean(loss_value_translation + loss_value_rotation)

    # loss_value = K.backend.mean(K.backend.square(y_true-y_pre))
    # tf.print(y_pre)
    return loss_value


class learningDecay(K.callbacks.Callback):
    def __init__(self,schedule=None,alpha=1,verbose=0):
        super().__init__()
        self.schedule = schedule
        self.verbose = verbose
        self.alpha = alpha
    def on_epoch_begin(self, epoch, logs=None):
        lr = float(K.backend.get_value(self.model.optimizer.lr))
        if self.schedule != None:
            lr = self.schedule(epoch,lr)
        else:
            if epoch >= 30:
                lr = lr*self.alpha
        K.backend.set_value(self.model.optimizer.lr,K.backend.get_value(lr))
        if self.verbose > 0:
            print(f"Current learning rate is {lr}")
        #save the model
        if epoch % 20 == 0 and epoch != 0:
            self.model.save("model.h5")

def scheduler(epoch, lr):
    if epoch < 10:
        return lr
    else:
        return lr * tf.math.exp(-0.1) 

class Posenet:
    def __init__(self,dataset:datasets):
        self.dataset = dataset
        self.build()

    def build(self):
        self.poseModel = model(self.dataset.dim_w,self.dataset.dim_h)
        self.poseModel.summary()
        self.optm = K.optimizers.RMSprop(1e-4,momentum=0.9) #,decay=1e-5/self.dataset.epochs
        # self.optm = K.optimizers.Adam(1e-4)
        self.decayCallback = learningDecay(schedule = None,alpha = 0.99,verbose = 1)
        decayCallbackScheduler = K.callbacks.LearningRateScheduler(scheduler)
        self.callbacks = [decayCallbackScheduler]

        try:
            print("************************loading the model weights***********************************")
            self.poseModel.load_weights("model.h5")
        except:
            pass

    def train_fit(self):
        self.poseModel.compile(optimizer=self.optm,loss=loss_fn,metrics=['accuracy'])
        self.poseModel.fit(self.dataset.traindata,
                            validation_data=self.dataset.testdata,
                            epochs=self.dataset.epochs,
                            callbacks=[self.decayCallback],
                            verbose=1)
    
    def train_gradient(self):
        for step in range(self.dataset.epochs):
            loss = 0
            val_loss = 0
            index = 0
            lr = float(self.optm.lr)
            tf.print(">>> [Epoch is %s/%s]"%(step,self.dataset.epochs))
            for (x1,x2),y in self.dataset.traindata:
                with tf.GradientTape() as tape:
                    prediction = self.poseModel([x1,x2])
                    # y = tf.cast(y,dtype=prediction.dtype)
                    loss = loss + loss_fn(y,prediction)
                gradients = tape.gradient(loss,self.poseModel.trainable_variables)
                self.optm.apply_gradients(zip(gradients,self.poseModel.trainable_variables))

                index = index + 1
                sys.stdout.write('--------train loss is %.5f-----'%(loss/float(index)))
                sys.stdout.write('\r')
                sys.stdout.flush()

            index_val = 0
            # 测试
            for (x1,x2),y in self.dataset.testdata:
                prediction = self.poseModel([x1,x2])
                val_loss = val_loss + loss_fn(y,prediction)
                index_val = index_val + 1
            tf.print("The loss is %s,the learning rate is : %s, test loss is %s]"%(np.array(loss/float(index)),lr,val_loss/float(index_val)))
            K.backend.set_value(self.optm.lr,K.backend.get_value(lr*0.99))
            if step%40==0:
                self.save_model()

    def save_model(self):
        '''
        利用 save 函数来保存,可以保存为h5文件,也可以保存为文件夹的形式,推荐保存第二种,再使用tf2onnx转onnx
        >>> python -m tf2onnx.convert --saved-model kerasTempModel --output "model.onnx" --opset 14
        '''
        self.poseModel.save("model.h5")
        self.poseModel.save(self.dataset.model_path)
        # self.poseModel.save_weights("model.h5") #只保存权重,没有保存结构
        # tf.saved_model.save(self.poseModel,'tf2TempModel') #这种保存方式不再使用了

        
def test(dataset):
    im1 = cv.imread("imagesNDI/0.jpg")
    im1 = cv.resize(im1,(512,512))
    im1 = np.array(im1,np.float).reshape((1,512,512,3))/255.0
    im2 = cv.imread("imagesNDI/1.jpg")
    im2 = cv.resize(im2,(512,512))
    im2 = np.array(im2,np.float).reshape((1,512,512,3))/255.0
    posemodel = K.models.load_model(dataset.model_path,compile=False)
    pose = posemodel([im1,im2])
    print(np.array(pose))

if __name__ == "__main__":
    dataset = datasets("images")
    posenet = Posenet(dataset)
    posenet.train_fit()
    # posenet.train_gradient() #利用 apply_gradient的方式训练
    posenet.save_model()
    test(dataset)
    

六、运行结果

将训练的模型配置到C++中运行:

利用pnp的方式运行的结果

处理速度:43ms

利用深度学习位姿估计的结果

处理速度:11ms

因篇幅问题不能全部显示,请点此查看更多更全内容

Top