专注于工业智能预警系统研发, 通过机理算法和数据驱动算法分析振动信号、音频、DCS、PLC信号、SCADA信号等设备运行状态数据对机器设备进行看病预诊,为机器设备健康运行保驾护航。 网站正在不断建设和完善过程中,欢迎大家给予建议和参与社区建设
52phm,专注于预测性维护知识学习和交流,欢迎广大从事预测性维护行业人员投稿,投稿请联系管理员(wx: www52phmcn),投稿内容可以是:
本文以CWRU轴承故障的振动信号数据库作为模型的训练集和测试集
并根据现有论文的思路和模型框架,用pytorch复现了论文的模型结构和性能,在二分类问题中准确率高达100%
本文在理论方面不再过多赘述,详细可看博主之前的博客或观看论文原文
数据连接:https://csegroups.case.edu/bearingdatacenter/pages/download-data-file
论文链接:https://www.sci-hub.ren/10.1109/tie.2017.2774777
代码链接:https://github.com/XD-onmyway/cnn_for_fault_diagnosis
博客链接:https://blog.csdn.net/weixin_42036144/article/details/110780890
原始数据是连续的一维时序数据,文章为了利用二维CNN的特征提取和降噪能力,将一维数据堆叠成二维数据
数据处理好过后,给数据打上标签,并转出成文件,方便模型读取
用pytorch构建模型,读取数据,训练模型,最后测试
generate.py
# normal
normal_0 = io.loadmat("./data/normal/normal_0")["X097_DE_time"].tolist()
normal_1 = io.loadmat("./data/normal/normal_1")["X098_DE_time"].tolist()
normal_2 = io.loadmat("./data/normal/normal_2")["X099_DE_time"].tolist()
normal_3 = io.loadmat("./data/normal/normal_3")["X100_DE_time"].tolist()
normal = [normal_0, normal_1, normal_2, normal_3]
# all_data
all_data = [
normal,
ball_18,
ball_36,
ball_54,
inner_18,
inner_36,
inner_54,
outer_18,
outer_36,
outer_54,
]
加载mat文件中特定的表,并转换成一维数组,存入数据库,此处用二分类为例
normal数据标签为0,其余数据标签均为1,打上标签
# 二类
if data_type == 0:
the_type = 0
else:
the_type = 1
每份数据都是64×64的图片,需要用到4096个一维数据点
load_data = data[load_type]
max_start = len(load_data) - 4096
starts = []
for i in range(500):
# 随机一个start,不在starts里,就加入
while True:
start = random.randint(0, max_start)
if start not in starts:
starts.append(start)
break
# 将4096个数据点转化成64×64的二维图
temp = load_data[start : start 4096]
temp = np.array(temp)
train_pics.append(temp.reshape(64, 64))
train_labels.append(the_type)
生成测试集:
用max_start存储最大起始取值点,starts保存用过的起始点,避免数据重复
获取一个起始点,从起始点往后取4096个数据点,把数据转化成64×64的二维图片
存入图片,存入标签
for i in range(100):
while True:
start = random.randint(0, max_start)
if start not in starts:
starts.append(start)
break
temp = load_data[start : start 4096]
temp = np.array(temp)
test_pics.append(temp.reshape(64, 64))
test_labels.append(the_type)
测试集生成原理类似
109cnn.py
用GPU跑代码的时候需要加上下面一行,并且在读取模型和数据的时候加上.cuda(),本文以CPU为例
os.environ["CUDA_VISIBLE_DEVICES"] = "2"
数字2代表,使用编号为2的GPU
根据需求构造模型,设置卷积层,池化层,激活函数和全连接层
pytorch具体使用可到官网学习:https://pytorch.org/tutorials/beginner/deep_learning_60min_blitz.html
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 5, padding=2)
self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
self.conv4 = nn.Conv2d(128, 256, 3, padding=1)
self.pool = nn.MaxPool2d(2)
self.fc1 = nn.Linear(4 * 4 * 256, 2560)
self.fc2 = nn.Linear(2560, 2)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = self.pool(F.relu(self.conv3(x)))
x = self.pool(F.relu(self.conv4(x)))
x = x.view(-1, 4 * 4 * 256)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
读取模型和数据,设置损失函数和优化方法
第一次跑的时候,需要把net.load_stat_dict()注释掉,因为此时没有cnn_net.pth文件
PATH = "cnn_net.pth"
net = Net()
net.load_state_dict(torch.load(PATH, map_location="cpu"))
# net = Net().cuda()
# net.load_state_dict(torch.load(PATH))
print("load success")
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9)
train_pics_dict = np.load("train_pics.npz")
train_labels_dict = np.load("train_labels.npz")
test_pics_dict = np.load("test_pics.npz")
test_labels_dict = np.load("test_labels.npz")
转化成list
train_pics = []
train_labels = []
test_pics = []
test_labels = []
for i in train_pics_dict.files:
train_pics.append(train_pics_dict[i])
train_labels.append(int(train_labels_dict[i]))
for i in test_pics_dict.files:
test_pics.append(test_pics_dict[i])
test_labels.append(int(test_labels_dict[i]))
自定义dataset,制作数据库必不可少的一步
init:初始化数据集
getitem:返回编号index的数据
len:返回数据集总长度
class MyData(Dataset):
def __init__(self, pics, labels):
self.pics = pics
self.labels = labels
# print(len(self.pics.files))
# print(len(self.labels.files))
def __getitem__(self, index):
# print(index)
# print(len(self.pics))
assert index < len(self.pics)
return torch.Tensor([self.pics[index]]), self.labels[index]
def __len__(self):
return len(self.pics)
用loader装载数据集
trainset = MyData(train_pics, train_labels)
trainloader = torch.utils.data.DataLoader(
trainset, batch_size=4, shuffle=True, num_workers=2
)
testset = MyData(test_pics, test_labels)
testloader = torch.utils.data.DataLoader(
testset, batch_size=4, shuffle=True, num_workers=2
)
传入数据,得到输出,根据公式计算损失值并输出,模型根据梯度和策略学习,保存模型参数到文件
running_loss = 0
for i, data in enumerate(trainloader):
inputs, labels = data
# inputs = inputs.cuda()
# labels = labels.cuda()
outputs = net(inputs)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss = loss
if i % 2000 == 1999:
print(
"epoch:",
epoch,
"[",
i - 1999,
":",
i,
"] loss:",
running_loss.item() / 2000,
)
running_loss = 0
PATH = "cnn_net.pth"
torch.save(net.state_dict(), PATH)
print("save success")
测试的时候,将模型输出和实际标签逐个比对,记录正确的总数,最后输出测试结果
# test
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in testloader:
# inputs = inputs.cuda()
# labels = labels.cuda()
outputs = net(inputs)
_, predicts = torch.max(outputs, 1)
total = 4
correct = (predicts == labels).sum().item()
print(correct / total * 100)
多次训练模型,可以修改训练代码里的epoch,每次跑代码多跑几次数据
# train
for epoch in range(10):
经过反复训练,模型训练完成,二分类问题下准确率达到100%
load success
100.0
generate.py
# import os
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import sys
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from PIL import Image
from torch.utils.data import Dataset
import scipy.io as io
import random
from datetime import datetime
random.seed(datetime.now())
# ball_18
ball_18_0 = io.loadmat("./data/ball_18/ball_18_0")["X118_DE_time"].tolist()
ball_18_1 = io.loadmat("./data/ball_18/ball_18_1")["X119_DE_time"].tolist()
ball_18_2 = io.loadmat("./data/ball_18/ball_18_2")["X120_DE_time"].tolist()
ball_18_3 = io.loadmat("./data/ball_18/ball_18_3")["X121_DE_time"].tolist()
ball_18 = [ball_18_0, ball_18_1, ball_18_2, ball_18_3]
# ball_36
ball_36_0 = io.loadmat("./data/ball_36/ball_36_0")["X185_DE_time"].tolist()
ball_36_1 = io.loadmat("./data/ball_36/ball_36_1")["X186_DE_time"].tolist()
ball_36_2 = io.loadmat("./data/ball_36/ball_36_2")["X187_DE_time"].tolist()
ball_36_3 = io.loadmat("./data/ball_36/ball_36_3")["X188_DE_time"].tolist()
ball_36 = [ball_36_0, ball_36_1, ball_36_2, ball_36_3]
# ball_54
ball_54_0 = io.loadmat("./data/ball_54/ball_54_0")["X222_DE_time"].tolist()
ball_54_1 = io.loadmat("./data/ball_54/ball_54_1")["X223_DE_time"].tolist()
ball_54_2 = io.loadmat("./data/ball_54/ball_54_2")["X224_DE_time"].tolist()
ball_54_3 = io.loadmat("./data/ball_54/ball_54_3")["X225_DE_time"].tolist()
ball_54 = [ball_54_0, ball_54_1, ball_54_2, ball_54_3]
# inner_18
inner_18_0 = io.loadmat("./data/inner_18/inner_18_0")["X105_DE_time"].tolist()
inner_18_1 = io.loadmat("./data/inner_18/inner_18_1")["X106_DE_time"].tolist()
inner_18_2 = io.loadmat("./data/inner_18/inner_18_2")["X107_DE_time"].tolist()
inner_18_3 = io.loadmat("./data/inner_18/inner_18_3")["X108_DE_time"].tolist()
inner_18 = [inner_18_0, inner_18_1, inner_18_2, inner_18_3]
# inner_36
inner_36_0 = io.loadmat("./data/inner_36/inner_36_0")["X169_DE_time"].tolist()
inner_36_1 = io.loadmat("./data/inner_36/inner_36_1")["X170_DE_time"].tolist()
inner_36_2 = io.loadmat("./data/inner_36/inner_36_2")["X171_DE_time"].tolist()
inner_36_3 = io.loadmat("./data/inner_36/inner_36_3")["X172_DE_time"].tolist()
inner_36 = [inner_36_0, inner_36_1, inner_36_2, inner_36_3]
# inner_54
inner_54_0 = io.loadmat("./data/inner_54/inner_54_0")["X209_DE_time"].tolist()
inner_54_1 = io.loadmat("./data/inner_54/inner_54_1")["X210_DE_time"].tolist()
inner_54_2 = io.loadmat("./data/inner_54/inner_54_2")["X211_DE_time"].tolist()
inner_54_3 = io.loadmat("./data/inner_54/inner_54_3")["X212_DE_time"].tolist()
inner_54 = [inner_54_0, inner_54_1, inner_54_2, inner_54_3]
# outer_18
outer_18_0 = io.loadmat("./data/outer_18/outer_18_0")["X130_DE_time"].tolist()
outer_18_1 = io.loadmat("./data/outer_18/outer_18_1")["X131_DE_time"].tolist()
outer_18_2 = io.loadmat("./data/outer_18/outer_18_2")["X132_DE_time"].tolist()
outer_18_3 = io.loadmat("./data/outer_18/outer_18_3")["X133_DE_time"].tolist()
outer_18 = [outer_18_0, outer_18_1, outer_18_2, outer_18_3]
# outer_36
outer_36_0 = io.loadmat("./data/outer_36/outer_36_0")["X197_DE_time"].tolist()
outer_36_1 = io.loadmat("./data/outer_36/outer_36_1")["X198_DE_time"].tolist()
outer_36_2 = io.loadmat("./data/outer_36/outer_36_2")["X199_DE_time"].tolist()
outer_36_3 = io.loadmat("./data/outer_36/outer_36_3")["X200_DE_time"].tolist()
outer_36 = [outer_36_0, outer_36_1, outer_36_2, outer_36_3]
# outer_54
outer_54_0 = io.loadmat("./data/outer_54/outer_54_0")["X234_DE_time"].tolist()
outer_54_1 = io.loadmat("./data/outer_54/outer_54_1")["X235_DE_time"].tolist()
outer_54_2 = io.loadmat("./data/outer_54/outer_54_2")["X236_DE_time"].tolist()
outer_54_3 = io.loadmat("./data/outer_54/outer_54_3")["X237_DE_time"].tolist()
outer_54 = [outer_54_0, outer_54_1, outer_54_2, outer_54_3]
# normal
normal_0 = io.loadmat("./data/normal/normal_0")["X097_DE_time"].tolist()
normal_1 = io.loadmat("./data/normal/normal_1")["X098_DE_time"].tolist()
normal_2 = io.loadmat("./data/normal/normal_2")["X099_DE_time"].tolist()
normal_3 = io.loadmat("./data/normal/normal_3")["X100_DE_time"].tolist()
normal = [normal_0, normal_1, normal_2, normal_3]
# all_data
all_data = [
normal,
ball_18,
ball_36,
ball_54,
inner_18,
inner_36,
inner_54,
outer_18,
outer_36,
outer_54,
]
print(len(all_data))
def main(argv=None):
classes = (
"normal",
"ball_18",
"ball_36",
"ball_54",
"inner_18",
"inner_36",
"inner_54",
"outer_18",
"outer_36",
"outer_54",
)
# classes = ("normal", "error")
train_pics = []
train_labels = []
test_pics = []
test_labels = []
for data_type in range(10):
# 二类
if data_type == 0:
the_type = 0
else:
the_type = 1
# 四类
# the_type = (data_type 2) // 3
# 十类
the_type = data_type
data = all_data[data_type]
for load_type in range(4):
load_data = data[load_type]
max_start = len(load_data) - 4096
starts = []
for i in range(500):
# 随机一个start,不在starts里,就加入
while True:
start = random.randint(0, max_start)
if start not in starts:
starts.append(start)
break
# 将4096个数据点转化成64×64的二维图
temp = load_data[start : start 4096]
temp = np.array(temp)
train_pics.append(temp.reshape(64, 64))
train_labels.append(the_type)
for i in range(100):
while True:
start = random.randint(0, max_start)
if start not in starts:
starts.append(start)
break
temp = load_data[start : start 4096]
temp = np.array(temp)
test_pics.append(temp.reshape(64, 64))
test_labels.append(the_type)
print("train_pics", len(train_pics))
print("train_labels", len(train_labels))
print("test_pics", len(test_pics))
print("test_labels", len(test_labels))
np.savez("train_pics", *train_pics)
np.savez("train_labels", *train_labels)
np.savez("test_pics", *test_pics)
np.savez("test_labels", *test_labels)
print("save success")
if __name__ == "__main__":
sys.exit(main())
cnn.py
import os
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import sys
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from PIL import Image
from torch.utils.data import Dataset
import scipy.io as io
import random
from datetime import datetime
os.environ["CUDA_VISIBLE_DEVICES"] = "2"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 5, padding=2)
self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
self.conv4 = nn.Conv2d(128, 256, 3, padding=1)
self.pool = nn.MaxPool2d(2)
self.fc1 = nn.Linear(4 * 4 * 256, 2560)
self.fc2 = nn.Linear(2560, 2)
# self.fc2 = nn.Linear(2560, 4)
def forward(self, x):
# print(x.size())
x = self.pool(F.relu(self.conv1(x)))
# print(x.size())
x = self.pool(F.relu(self.conv2(x)))
# print(x.size())
x = self.pool(F.relu(self.conv3(x)))
# print(x.size())
x = self.pool(F.relu(self.conv4(x)))
# print(x.size())
x = x.view(-1, 4 * 4 * 256)
# print(x.size())
x = F.relu(self.fc1(x))
# print(x.size())
x = self.fc2(x)
# print(x.size())
return x
PATH = "cnn_net.pth"
net = Net()
net.load_state_dict(torch.load(PATH, map_location="cpu"))
# net = Net().to(device)
# net.load_state_dict(torch.load(PATH))
print("load success")
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9)
train_pics_dict = np.load("train_pics.npz")
train_labels_dict = np.load("train_labels.npz")
test_pics_dict = np.load("test_pics.npz")
test_labels_dict = np.load("test_labels.npz")
# print(test_labels_dict["arr_" str(3000)])
train_pics = []
train_labels = []
test_pics = []
test_labels = []
# for i in train_pics_dict.files:
# train_pics.append(train_pics_dict[i])
# train_labels.append(int(train_labels_dict[i]))
for i in test_pics_dict.files:
test_pics.append(test_pics_dict[i])
test_labels.append(int(test_labels_dict[i]))
# print(test_labels)
class MyData(Dataset):
def __init__(self, pics, labels):
self.pics = pics
self.labels = labels
# print(len(self.pics.files))
# print(len(self.labels.files))
def __getitem__(self, index):
# print(index)
# print(len(self.pics))
assert index < len(self.pics)
return torch.Tensor([self.pics[index]]), self.labels[index]
def __len__(self):
return len(self.pics)
def get_tensors(self):
return torch.Tensor([self.pics]), torch.Tensor(self.labels)
def main(argv=None):
# classes = (
# "normal",
# "ball_18",
# "ball_36",
# "ball_54",
# "inner_18",
# "inner_36",
# "inner_54",
# "outer_18",
# "outer_36",
# "outer_54",
# )
classes = ["normal", "error"]
# classes = ["normal", "ball", "inner", "outer"]
# 加载训练数据库
# trainset = MyData(train_pics, train_labels)
# trainloader = torch.utils.data.DataLoader(
# trainset, batch_size=4, shuffle=True, num_workers=2
# )
testset = MyData(test_pics, test_labels)
testloader = torch.utils.data.DataLoader(
testset, batch_size=4, shuffle=True, num_workers=2
)
# train
# for epoch in range(10):
# running_loss = 0
# for i, data in enumerate(trainloader):
# inputs, labels = data
# inputs = inputs.cuda()
# labels = labels.cuda()
# outputs = net(inputs)
# loss = criterion(outputs, labels)
# optimizer.zero_grad()
# loss.backward()
# optimizer.step()
# running_loss = loss
# if i % 2000 == 1999:
# print(
# "epoch:",
# epoch,
# "[",
# i - 1999,
# ":",
# i,
# "] loss:",
# running_loss.item() / 2000,
# )
# running_loss = 0
# PATH = "cnn_net.pth"
# torch.save(net.state_dict(), PATH)
# print("save success")
# test
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in testloader:
# inputs = inputs.cuda()
# labels = labels.cuda()
outputs = net(inputs)
_, predicts = torch.max(outputs, 1)
total = 4
correct = (predicts == labels).sum().item()
print(correct / total * 100)
if __name__ == "__main__":
sys.exit(main())
版权声明:遵循 CC 4.0 BY-SA 版权协议原文链接:https://blog.csdn.net/weixin_42036144/article/details/116720550
2021-12-09 14:46:03
互联网
1949
分类:算法开发
专栏:故障诊断
2021-12-25 19:48:44
互联网
842
分类:算法开发
专栏:时间序列预测
2022-02-09 18:19:44
博客笔记
4150
分类:边缘感知
专栏:未分组
关注公众号进群
让志同道合读者学习交流
电机状态预测数据集
2021-12-09 11:26:41
博客笔记
7199
分类:边缘感知
专栏:故障诊断
机电设备故障诊断
2021-12-09 11:27:13
博客笔记
5953
分类:边缘感知
专栏:故障诊断
1.故障诊断方法可分为三个步骤:信号处理、特征提取、模式分类。2.信号处理方法通常包括:时域处理(提取振动信号的相关指标);频域处理(包络谱分析,频谱分析);时频域分析(小波分析,傅里叶变换)3.故障诊断方法:专家系统故障诊断法,模糊故障诊断、灰色关联度故障诊断、神经网络故障诊断、数据融合故障诊断。...
2021-12-09 11:32:32
互联网
1893
分类:算法开发
专栏:故障诊断
本文主要介绍如何使用python搭建:一个基于深度学习的滚动轴承故障诊断系统项目中涉及使用了多种方法对比检测结果,包括:传统机器学习方法:随机森林深度学习方法:CNN增加残差模块后的深度学习方法:ResNet如各位童鞋需要更换训练数据,完全可以根据源码将图像和标注文件更换即可直接运行。
2021-12-10 18:15:00
互联网
1174
分类:算法开发
专栏:故障诊断
在设备的故障检测中,有约30%-40%的设备故障是由轴承故障引起的,因此本文将列举有关检测轴承故障使用到的相关数据集,模型和算法。数据集现有的数据集,普遍由固定在电机马达上的两个震动检测器获得,并根据需要,分离震动数据在时域和频域上的特征以供网络模型学习。不同的数据集,区别在于,检测的马达转速不同,环境不同,取样频率不同,一段样本的时长不同等等(1)Case Western Reserve University (CWRU) Dataset该数据集拥有多种数据,测量的时候,通过改变轴承.
2021-12-13 13:17:35
互联网
2935
分类:算法开发
专栏:故障诊断
1. 故障诊断概念故障诊断主要研究如何对系统中出现的故障进行检测、分离和辨识 , 即判断故障是否发生 , 定位故障发生的部位和种类 , 以及确定故障的大小和发生的时间等 。2. 故障诊断方法故障诊断防范可分为定性分析和定量分析两大类 , 如图 1 所示。 其中 , 定量分析方法又分为基于解析模型的方法和数据驱动的方法 , 后者又进一步包括机器学习类方法、多元统计分析类方法、信号处理类方...
2021-12-13 14:48:01
互联网
2029
分类:算法开发
专栏:故障诊断
时间序列预测8种常用方法简介,包括朴素预测法、简单平均法、移动平均法、简单指数平滑法、霍尔特(Holt)线性趋势法、Holt-Winter方法、AMRIA。
2021-12-25 19:46:21
互联网
862
分类:算法开发
专栏:时间序列预测
本数据集用于检测疟疾的细胞图像,细胞图像分为两类: 一类是感染的细胞,另一类是未感染的细胞目标需求1:要基于卷积神经网络CNN来识别哪些细胞已经感染、哪些细胞还未感染 目标需求2:可视化模型随着迭代次数的训练集与测试集损失值的变化情况 目标需求3:可视化模型随着迭代次数的训练集与测试集准确率的变化情况
2022-03-31 21:45:25
博客笔记
1104
分类:算法开发
专栏:未分组
故障诊断之基于振动信号的阶比谱分析
2022-05-31 11:08:40
互联网
1569
分类:算法开发
专栏:振动信号预处理
该试验台在不同小齿轮条件下进行测试,并通过加速度计进行齿轮故障数据振动信号采集,加速度计采样率为10KHz、采样时长为10s,采样数据共3包,每一包数据对应着不同故障类型,分别是健康状态、齿轮断齿、齿轮磨损状态下的数据集。该数据集被授权于用于任何学术和研究目的。...
2022-08-09 16:05:36
博客笔记
543
分类:边缘感知
专栏:齿轮箱数据集
从事设备故障预测与健康管理行业多年的PHM算法工程师(机器医生)、国际振动分析师, 实践、研发和交付的项目涉及“化工、工业机器人、风电机组、钢铁、核电、机床、机器视觉”等领域。专注于工业智能预警系统研发, 通过机理算法和数据驱动算法分析振动信号、音频、DCS、PLC信号、SCADA信号等设备运行状态数据对机器设备进行看病预诊,为机器设备健康运行保驾护航。