Qbot构史纪实

7.10

前情提要——前段时间无聊学着白哥手搓了个Qbot,只实现了关键词回复和违禁词自动禁言两个功能,词库热更新倒是也写了,但是有bug没修,也就没更新,但无疑写的都是史,所以在它堆成史山之前,我要把它重构一哈

简称——构史

赤石传送门:

1
https://github.com/Qingjiu233/Qbot-based-on-Python

首先把Z哥emoji-bot的mian.py给copy下来,然后变成我的形状:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import time
import json
import threading

import websocket

import messagecheck
import word_split


api_url = 'ws://127.0.0.1:3001'
bot_id = '1940975548'
auth_token = ''


def on_message(ws, message):
message = json.loads(message)
is_heart_beat = messagecheck.filter_heart_beat(message)
if not is_heart_beat:

messagecheck.check(message)

else:
pass


def on_error(ws, error):
print(f'WebSocket 错误: {error}')


def on_close(ws, close_status):
print(f'WebSocket 连接已关闭: {close_status}')


def on_open(ws):
def run():
while True:
time.sleep(30) # 保持连接

thread = threading.Thread(target=run)
thread.start()


if __name__ == '__main__':
header = {
'Authorization': f'Bearer {auth_token}',
'bot_id': bot_id
}
ws = websocket.WebSocketApp(api_url,
header=header,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
ws.run_forever()

然后首当其冲的是我那个不遵守命名规则的messagecheck——最早写的,没加_,后面也懒得管

1
messagecheck.py --> message_check.py

然后是data_test.py——更新词库用的,当时只是一个测试:

1
data_test.py --> data.py

再就是把已经没用的word_split.py删掉

然后新建一个common文件夹把这些文件全丢进去

之后就是修改main文件中的import和函数名:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import time
import json
import threading

import websocket

import common.message_check


api_url = 'ws://127.0.0.1:3001'
bot_id = '1940975548'
auth_token = ''


def on_message(ws, message):
message = json.loads(message)
is_heart_beat = message_check.filter_heart_beat(message)
if not is_heart_beat:

message_check.check(message)

else:
pass


def on_error(ws, error):
print(f'WebSocket 错误: {error}')


def on_close(ws, close_status):
print(f'WebSocket 连接已关闭: {close_status}')


def on_open(ws):
def run():
while True:
time.sleep(30) # 保持连接

thread = threading.Thread(target=run)
thread.start()


if __name__ == '__main__':
header = {
'Authorization': f'Bearer {auth_token}',
'bot_id': bot_id
}
ws = websocket.WebSocketApp(api_url,
header=header,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
ws.run_forever()

搞完main再来message_check赤石:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import requests
import pickle

# 发送消息的接口
send="http://127.0.0.1:3000/send_msg?"

# 撤回消息接口
delete = "http://127.0.0.1:3000/delete_msg?"

# 群组禁言接口

ban = "http://127.0.0.1:3000/set_group_ban?"

# 读取词库
with open('wordlist.pickle', 'rb') as f:
wordlist = pickle.load(f)

# 违禁词名单
blacklist = ["老子他妈"]


def filter_heart_beat(message):
"""
过滤心跳检测
:param message:
:return: bool 是否为心跳检查
"""
if "meta_event_type" in message.keys() and message['meta_event_type'] in ["heartbeat", "lifecycle"]:
return True
return False


def private_message(word_list):
message = "[私聊]" + word_list["sender"]["nickname"] + ":" + word_list["raw_message"] + "\n"
print(message)
f = open("message.txt", "a", encoding="utf-8")
f.write(message)


def group_message(word_list):
message = "[群聊" + str(word_list["group_id"]) + "]" + word_list["sender"]["nickname"] + ":" + word_list[
"raw_message"] + "\n"
print(message)
f = open("message.txt", "a", encoding="utf-8")
f.write(message)


def check(word_list):
# 检查是否存在发送消息以外的事件
if word_list != None:
if "raw_message" in word_list:
if word_list["message_type"] != None:
# 违禁词检查
for i in blacklist:
if i in word_list["raw_message"]:
delete_url = delete + "message_id=" + str(word_list["message_id"])
requests.get(delete_url)
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=触发违禁词"
requests.get(send_url)
ban_url = ban + "group_id=" + str(word_list["group_id"]) + "&user_id=" + str(word_list["user_id"]) + "&duration=3600"
requests.get(ban_url)
group_message(word_list)
return None


if word_list["raw_message"][0] == "/" and word_list["message_type"] == "group":
if word_list["user_id"] == 2066045898:
if word_list["raw_message"][0:5] == "/help":
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=/help:查看帮助\n/list:查看词库\n/add:添加词条\n/delete:删除词条"
elif word_list["raw_message"][0:4] == "/add":
if word_list["raw_message"] == "/add":
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=请输入要添加的词条\n格式为:/add 触发词:触发消息"
requests.get(send_url)
# 检查格式是否合法,避免出现“/add ”,“/add:”,“/add : ”等情况
elif len(word_list["raw_message"][4:]) > 4 and word_list["raw_message"][6:].find(":") == 1:
if word_list["raw_message"][5:] in wordlist:
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=该词条已存在"
else:
wordlist += word_list["raw_message"][5:]
with open('wordlist.pickle', 'wb') as f:
pickle.dump(wordlist, f)
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=词条删除成功"
requests.get(send_url)
else:
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=格式错误,请输入/add 触发词:触发消息"
requests.get(send_url)
elif word_list["raw_message"][0:6] == "/delete":
if word_list["raw_message"] == "/delete":
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=请输入要删除的词条\n格式为:/delete 触发词"
elif word_list["raw_message"][8:] in wordlist:
wordlist.remove(word_list["raw_message"][8:])
with open('wordlist.pickle', 'wb') as f:
pickle.dump(wordlist, f)
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=词条删除成功"
requests.get(send_url)
else:
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=格式错误或该词条不存在"
requests.get(send_url)
elif word_list["raw_message"] == "/list":
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=" + str(wordlist)
else:
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=格式错误,请输入/help查看帮助"
requests.get(send_url)
else:
if word_list["raw_message"][0:5] == "/help":
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=/help:查看帮助\n/list:查看词库\n/add:添加词库\n/delete:删除词库"
else:
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=语法错误或权限不足"
requests.get(send_url)
# 检查消息是否为关键词
if word_list["raw_message"] in wordlist:
# 判断是私聊还是群聊
if word_list["message_type"] == "private":
send_url = send + "user_id=" + str(word_list["user_id"]) + "&message=" + wordlist[word_list["raw_message"]]
requests.get(send_url)
private_message(word_list)
if word_list["message_type"] == "group":
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=" + wordlist[word_list["raw_message"]]
requests.get(send_url)
group_message(word_list)


else:
# 不是关键词就直接在终端输出不做其余处理
if word_list["message_type"] == "private":
private_message(word_list)
if word_list["message_type"] == "group":
group_message(word_list)

先把撤回消息的接口给删了,因为它善(实现不了)

然后把违禁词检查封装成函数

再把词库热更新的代码单独拎出来,新建个plugins文件夹,新建一个word_update.py文件,扔进去,晚点再修

想了想把data.py也新建一个data文件夹扔进去,再稍微修改一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pickle


# 用来生成词库用的
data = {
"你好":"你好你好大家好",
"我不好":"不好就爬",
"你们这是什么群啊":"真是害人不浅呐",
"不好":"不好就爬",
}

# 将字典保存到本地文件
def save(data):
with open('wordlist.pickle', 'wb') as f:
pickle.dump(data, f)

# 从本地文件加载字典
def load(data):
with open('wordlist.pickle', 'rb') as f:
loaded_data = pickle.load(f)


return loaded_data


save(data)
print(load(data))

今天的史就先构到这里

7.15

重构了一整天,差不多算是重构完了

接着上回的进度把messsage_check里边的一部分东西拿出来,然后新建了一api.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import requests
import pickle

# 发送消息的接口
send="http://127.0.0.1:3000/send_msg?"

# 群组禁言接口
ban = "http://127.0.0.1:3000/set_group_ban?"

# 违禁词名单
blacklist = ["老子他妈"]

# 读取词库、图库
with open('data/wordlist.pickle', 'rb') as f:
wordlist = pickle.load(f)
with open('data/wordlist.pickle', 'rb') as i:
imglist = pickle.load(i)


def message(word_list,c,id1,id2,name):
if c:
send_url = send + id1 + id2 + "&message=" + wordlist[word_list["raw_message"]]
requests.get(send_url)
message = name + id2 + word_list["sender"]["nickname"] + ":" + word_list["raw_message"] + "\n"
print(message)
f = open("data/message/"+name+id2+".txt", "a", encoding="utf-8")
f.write(message)


def black_check(word_list):
for i in blacklist:
if i in word_list["raw_message"]:
send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=触发违禁词"
requests.get(send_url)
ban_url = ban + "group_id=" + str(word_list["group_id"]) + "&user_id=" + str(
word_list["user_id"]) + "&duration=3600"
requests.get(ban_url)
message(word_list)
return None


def send_img(word_list,imglist,id1,id2):
img_url = imglist[word_list["raw_message"]]
img_cq_code = f"[CQ:image,file={img_url}]"
send_url = send + id1 + id2 + "&message=" + img_cq_code
requests.get(send_url)

再尽量减少代码的重复,就比如message函数从群聊和私聊两个消息分开处理的双倍的代码变成了一个,然后还新增了个发送图片的功能,相应的,message_check的代码也发生了大改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import pickle
import requests

from common.api import message,black_check,send_img

# 读取词库、图库
with open('data/wordlist.pickle', 'rb') as f:
wordlist = pickle.load(f)
with open('data/img.pickle', 'rb') as i:
imglist = pickle.load(i)




def filter_heart_beat(message):
"""
过滤心跳检测
:param message:
:return: bool 是否为心跳检查
"""
if "meta_event_type" in message.keys() and message['meta_event_type'] in ["heartbeat", "lifecycle"]:
return True
return False


def check(word_list):
c = 0
# 检查是否存在发送消息以外的事件
if word_list != None:
if "notice_type" in word_list:
if word_list["notice_type"] == "group_increase":
user_id = word_list["user_id"]
send_url="http://127.0.0.1:3000/send_msg?group_id="+str(word_list["group_id"])+"&message="+f"欢迎新成员 [CQ:at,qq={user_id}] 加入群聊!\n新人入群请看群公告,如有问题可在群内发送关键词“常见问题”进行查询"
requests.get(send_url)

if "raw_message" in word_list:
if word_list["message_type"] != None:

# 违禁词检查
black_check(word_list)

# 判断是私聊还是群聊
global id1,id2,name
if word_list["message_type"] == "private":
id1 = "user_id="
id2 = str(word_list["user_id"])
name ="[私聊]"
if word_list["message_type"] == "group":
id1 = "group_id="
id2 = str(word_list["group_id"])
name = "[群聊]"
# 检查消息是否为关键词
if word_list["raw_message"] in wordlist:
c = 1
message(word_list,c,id1,id2,name)
else:
# 不是关键词就直接在终端输出
message(word_list,c,id1,id2,name)

# 检查是否为图库关键词
if word_list["raw_message"] in imglist:
send_img(word_list,imglist,id1,id2)

减少了很多代码的复用,同时还增加了个入群提醒功能

最后是data.py,发送图片的功能写了,肯定也要存储,这里采用了和词库同样的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import pickle


# 词库
data = {
"你好":"你好你好大家好",
"我不好":"不好就爬",
"你们这是什么群啊":"真是害人不浅呐",
"不好":"不好就爬",
"笑死我了":"别死"
}

# 图库
img = {
"难吗":"file:///D:/CTF2/Tools/NapCat/EarthV2.0/data/img/nanma.jpg"
}

# 将字典保存到本地文件
def save(data,img):
with open('wordlist.pickle', 'wb') as f:
pickle.dump(data, f)
with open('img.pickle', 'wb') as f:
pickle.dump(img, f)

# 从本地文件加载字典
def load(data,img):
with open('wordlist.pickle', 'rb') as f:
loaded_data = pickle.load(f)
with open('img.pickle', 'rb') as i:
loaded_img = pickle.load(i)
load22 = str(loaded_img)+str(loaded_data)
return load22


save(data,img)
print(load(data,img))

剩下一个热更新的命令下次再改嘿嘿