7.10
前情提要——前段时间无聊学着白哥手搓了个Qbot,只实现了关键词回复和违禁词自动禁言两个功能,词库热更新倒是也写了,但是有bug没修,也就没更新,但无疑写的都是史,所以在它堆成史山之前,我要把它重构一哈
简称——构史
赤石传送门:
1
| https://github.com/Qingjiu233/Qbot-based-on-Python
|
首先把Z哥emoji-bot的mian.py给copy下来,然后变成我的形状:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| import time import json import threading
import websocket
import messagecheck import word_split
api_url = 'ws://127.0.0.1:3001' bot_id = '1940975548' auth_token = ''
def on_message(ws, message): message = json.loads(message) is_heart_beat = messagecheck.filter_heart_beat(message) if not is_heart_beat:
messagecheck.check(message)
else: pass
def on_error(ws, error): print(f'WebSocket 错误: {error}')
def on_close(ws, close_status): print(f'WebSocket 连接已关闭: {close_status}')
def on_open(ws): def run(): while True: time.sleep(30) # 保持连接
thread = threading.Thread(target=run) thread.start()
if __name__ == '__main__': header = { 'Authorization': f'Bearer {auth_token}', 'bot_id': bot_id } ws = websocket.WebSocketApp(api_url, header=header, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever()
|
然后首当其冲的是我那个不遵守命名规则的messagecheck——最早写的,没加_,后面也懒得管
1
| messagecheck.py --> message_check.py
|
然后是data_test.py——更新词库用的,当时只是一个测试:
1
| data_test.py --> data.py
|
再就是把已经没用的word_split.py删掉
然后新建一个common文件夹把这些文件全丢进去
之后就是修改main文件中的import和函数名:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| import time import json import threading
import websocket
import common.message_check
api_url = 'ws://127.0.0.1:3001' bot_id = '1940975548' auth_token = ''
def on_message(ws, message): message = json.loads(message) is_heart_beat = message_check.filter_heart_beat(message) if not is_heart_beat:
message_check.check(message)
else: pass
def on_error(ws, error): print(f'WebSocket 错误: {error}')
def on_close(ws, close_status): print(f'WebSocket 连接已关闭: {close_status}')
def on_open(ws): def run(): while True: time.sleep(30) # 保持连接
thread = threading.Thread(target=run) thread.start()
if __name__ == '__main__': header = { 'Authorization': f'Bearer {auth_token}', 'bot_id': bot_id } ws = websocket.WebSocketApp(api_url, header=header, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever()
|
搞完main再来message_check赤石:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
| import requests import pickle
# 发送消息的接口 send="http://127.0.0.1:3000/send_msg?"
# 撤回消息接口 delete = "http://127.0.0.1:3000/delete_msg?"
# 群组禁言接口
ban = "http://127.0.0.1:3000/set_group_ban?"
# 读取词库 with open('wordlist.pickle', 'rb') as f: wordlist = pickle.load(f)
# 违禁词名单 blacklist = ["老子他妈"]
def filter_heart_beat(message): """ 过滤心跳检测 :param message: :return: bool 是否为心跳检查 """ if "meta_event_type" in message.keys() and message['meta_event_type'] in ["heartbeat", "lifecycle"]: return True return False
def private_message(word_list): message = "[私聊]" + word_list["sender"]["nickname"] + ":" + word_list["raw_message"] + "\n" print(message) f = open("message.txt", "a", encoding="utf-8") f.write(message)
def group_message(word_list): message = "[群聊" + str(word_list["group_id"]) + "]" + word_list["sender"]["nickname"] + ":" + word_list[ "raw_message"] + "\n" print(message) f = open("message.txt", "a", encoding="utf-8") f.write(message)
def check(word_list): # 检查是否存在发送消息以外的事件 if word_list != None: if "raw_message" in word_list: if word_list["message_type"] != None: # 违禁词检查 for i in blacklist: if i in word_list["raw_message"]: delete_url = delete + "message_id=" + str(word_list["message_id"]) requests.get(delete_url) send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=触发违禁词" requests.get(send_url) ban_url = ban + "group_id=" + str(word_list["group_id"]) + "&user_id=" + str(word_list["user_id"]) + "&duration=3600" requests.get(ban_url) group_message(word_list) return None
if word_list["raw_message"][0] == "/" and word_list["message_type"] == "group": if word_list["user_id"] == 2066045898: if word_list["raw_message"][0:5] == "/help": send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=/help:查看帮助\n/list:查看词库\n/add:添加词条\n/delete:删除词条" elif word_list["raw_message"][0:4] == "/add": if word_list["raw_message"] == "/add": send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=请输入要添加的词条\n格式为:/add 触发词:触发消息" requests.get(send_url) # 检查格式是否合法,避免出现“/add ”,“/add:”,“/add : ”等情况 elif len(word_list["raw_message"][4:]) > 4 and word_list["raw_message"][6:].find(":") == 1: if word_list["raw_message"][5:] in wordlist: send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=该词条已存在" else: wordlist += word_list["raw_message"][5:] with open('wordlist.pickle', 'wb') as f: pickle.dump(wordlist, f) send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=词条删除成功" requests.get(send_url) else: send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=格式错误,请输入/add 触发词:触发消息" requests.get(send_url) elif word_list["raw_message"][0:6] == "/delete": if word_list["raw_message"] == "/delete": send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=请输入要删除的词条\n格式为:/delete 触发词" elif word_list["raw_message"][8:] in wordlist: wordlist.remove(word_list["raw_message"][8:]) with open('wordlist.pickle', 'wb') as f: pickle.dump(wordlist, f) send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=词条删除成功" requests.get(send_url) else: send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=格式错误或该词条不存在" requests.get(send_url) elif word_list["raw_message"] == "/list": send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=" + str(wordlist) else: send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=格式错误,请输入/help查看帮助" requests.get(send_url) else: if word_list["raw_message"][0:5] == "/help": send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=/help:查看帮助\n/list:查看词库\n/add:添加词库\n/delete:删除词库" else: send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=语法错误或权限不足" requests.get(send_url) # 检查消息是否为关键词 if word_list["raw_message"] in wordlist: # 判断是私聊还是群聊 if word_list["message_type"] == "private": send_url = send + "user_id=" + str(word_list["user_id"]) + "&message=" + wordlist[word_list["raw_message"]] requests.get(send_url) private_message(word_list) if word_list["message_type"] == "group": send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=" + wordlist[word_list["raw_message"]] requests.get(send_url) group_message(word_list)
else: # 不是关键词就直接在终端输出不做其余处理 if word_list["message_type"] == "private": private_message(word_list) if word_list["message_type"] == "group": group_message(word_list)
|
先把撤回消息的接口给删了,因为它善(实现不了)
然后把违禁词检查封装成函数
再把词库热更新的代码单独拎出来,新建个plugins文件夹,新建一个word_update.py文件,扔进去,晚点再修
想了想把data.py也新建一个data文件夹扔进去,再稍微修改一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import pickle
# 用来生成词库用的 data = { "你好":"你好你好大家好", "我不好":"不好就爬", "你们这是什么群啊":"真是害人不浅呐", "不好":"不好就爬", } # 将字典保存到本地文件 def save(data): with open('wordlist.pickle', 'wb') as f: pickle.dump(data, f) # 从本地文件加载字典 def load(data): with open('wordlist.pickle', 'rb') as f: loaded_data = pickle.load(f)
return loaded_data
save(data) print(load(data))
|
今天的史就先构到这里
7.15
重构了一整天,差不多算是重构完了
接着上回的进度把messsage_check里边的一部分东西拿出来,然后新建了一api.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import requests import pickle
send="http://127.0.0.1:3000/send_msg?"
ban = "http://127.0.0.1:3000/set_group_ban?"
blacklist = ["老子他妈"]
with open('data/wordlist.pickle', 'rb') as f: wordlist = pickle.load(f) with open('data/wordlist.pickle', 'rb') as i: imglist = pickle.load(i)
def message(word_list,c,id1,id2,name): if c: send_url = send + id1 + id2 + "&message=" + wordlist[word_list["raw_message"]] requests.get(send_url) message = name + id2 + word_list["sender"]["nickname"] + ":" + word_list["raw_message"] + "\n" print(message) f = open("data/message/"+name+id2+".txt", "a", encoding="utf-8") f.write(message)
def black_check(word_list): for i in blacklist: if i in word_list["raw_message"]: send_url = send + "group_id=" + str(word_list["group_id"]) + "&message=触发违禁词" requests.get(send_url) ban_url = ban + "group_id=" + str(word_list["group_id"]) + "&user_id=" + str( word_list["user_id"]) + "&duration=3600" requests.get(ban_url) message(word_list) return None
def send_img(word_list,imglist,id1,id2): img_url = imglist[word_list["raw_message"]] img_cq_code = f"[CQ:image,file={img_url}]" send_url = send + id1 + id2 + "&message=" + img_cq_code requests.get(send_url)
|
再尽量减少代码的重复,就比如message函数从群聊和私聊两个消息分开处理的双倍的代码变成了一个,然后还新增了个发送图片的功能,相应的,message_check的代码也发生了大改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| import pickle import requests
from common.api import message,black_check,send_img
with open('data/wordlist.pickle', 'rb') as f: wordlist = pickle.load(f) with open('data/img.pickle', 'rb') as i: imglist = pickle.load(i)
def filter_heart_beat(message): """ 过滤心跳检测 :param message: :return: bool 是否为心跳检查 """ if "meta_event_type" in message.keys() and message['meta_event_type'] in ["heartbeat", "lifecycle"]: return True return False
def check(word_list): c = 0 if word_list != None: if "notice_type" in word_list: if word_list["notice_type"] == "group_increase": user_id = word_list["user_id"] send_url="http://127.0.0.1:3000/send_msg?group_id="+str(word_list["group_id"])+"&message="+f"欢迎新成员 [CQ:at,qq={user_id}] 加入群聊!\n新人入群请看群公告,如有问题可在群内发送关键词“常见问题”进行查询" requests.get(send_url)
if "raw_message" in word_list: if word_list["message_type"] != None:
black_check(word_list)
global id1,id2,name if word_list["message_type"] == "private": id1 = "user_id=" id2 = str(word_list["user_id"]) name ="[私聊]" if word_list["message_type"] == "group": id1 = "group_id=" id2 = str(word_list["group_id"]) name = "[群聊]" if word_list["raw_message"] in wordlist: c = 1 message(word_list,c,id1,id2,name) else: message(word_list,c,id1,id2,name)
if word_list["raw_message"] in imglist: send_img(word_list,imglist,id1,id2)
|
减少了很多代码的复用,同时还增加了个入群提醒功能
最后是data.py,发送图片的功能写了,肯定也要存储,这里采用了和词库同样的方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import pickle
data = { "你好":"你好你好大家好", "我不好":"不好就爬", "你们这是什么群啊":"真是害人不浅呐", "不好":"不好就爬", "笑死我了":"别死" }
img = { "难吗":"file:///D:/CTF2/Tools/NapCat/EarthV2.0/data/img/nanma.jpg" }
def save(data,img): with open('wordlist.pickle', 'wb') as f: pickle.dump(data, f) with open('img.pickle', 'wb') as f: pickle.dump(img, f)
def load(data,img): with open('wordlist.pickle', 'rb') as f: loaded_data = pickle.load(f) with open('img.pickle', 'rb') as i: loaded_img = pickle.load(i) load22 = str(loaded_img)+str(loaded_data) return load22
save(data,img) print(load(data,img))
|
剩下一个热更新的命令下次再改嘿嘿