好友
阅读权限20
听众
最后登录1970-1-1
|
天域至尊
发表于 2023-2-17 12:51
本帖最后由 天域至尊 于 2023-2-17 12:53 编辑
一、项目背景
在日常使用中,我们常常希望动态远程操作防火墙,以保证风险应用的安全,此项目实现了使用钉钉机器人来远程控制Linux应用防火墙。可以在钉钉群以@机器人的方式,来实现控制防火墙开关,支持权限控制、定时关闭。
二、依赖环境
运行环境:Linux (测试系统为centos7)
防火墙:firewall,并处在运行状态。
机器人:钉钉机器人,并开启outgoing功能(该功能现在处在维护状态,新申请的机器人无法打开)
开发环境:python 3.7+ flask requests sqlite3
三、环境部署
启动firewalld
[Bash shell] 纯文本查看 复制代码 sodu systemctl start firewalld
放行服务端口(自己指定端口号,应与下述main.py文件中,第16行端口保持一致)
[Bash shell] 纯文本查看 复制代码 firewall-cmd --zone=public --add-port=你的端口/tcp --permanent
刷新防火墙
[Bash shell] 纯文本查看 复制代码 firewall-cmd --reload
配置脚本内容,并运行脚本
[Bash shell] 纯文本查看 复制代码 python3 main.py
查看提示信息
如果是云服务器,则在云服务器安全组上放开对应端口
访问提示站点尝试,如下则是正常
增加钉钉机器人,勾选outgoing,输入对应的提示信息
该功能现属于维护阶段,如放开使用,则如下
配置完成后,@机器人,查询帮助信息,如正常响应,则配置完成。
四、源代码
代码结构:
在同一文件夹下,两个py文件,分别为:firewall.py和main.py
mian.py代码如下(其中8-16行,都需要自己配置)
[Python] 纯文本查看 复制代码 from flask import Flask
import requests,json,ipaddress,time,threading,json
from flask import request
from firewall import Firewall
app = Flask(__name__)
#机器人声明的token
TOKEN="123"
#设置机器人地址
robot_url="https://oapi.dingtalk.com/robot/send?access_token=*******"
#本机url_key,建议设置为复杂的key,不要出现特殊符号
url_key="xjcbkasbasba"
#本机公网地址
ip="127.0.0.1"
#本服务监听端口
port="8080"
control_url='/control/'+url_key
control_url_out=f"http://{ip}:{port}{control_url}"
print(f"""
请在您的钉钉机器人的Outgoing配置中输入如下信息:
post地址:{control_url_out}
Token:{TOKEN}
""")
fire_wall=Firewall(robot_url=robot_url)
fire_wall_check=threading.Thread(target=fire_wall.check_data)
fire_wall_check.start()
def is_valid_ip(ip:str):
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
def send_word(word):
"""
"""
global robot_url
header={
"Content-Type":"application/json"
}
data={"msgtype": "text","text": {"content":word}}
for i in range(0,3):
try:
r=requests.post(url=robot_url,headers=header,timeout=60,json=data)
break
except Exception as err:
print(err)
@app.after_request
def modify_server_header(response):
response.headers['Server'] = 'Windows Server 8.0/IIS 10.0'
return response
#路由
@app.route(control_url,methods=['POST'])
def add_data():
word="命令识别失败,发送“帮助”,查看命令格式。"
value = request.headers.get("token")
if TOKEN!=value:
return "error power not allow!"
data=request.json
if data:
user_id=data["senderId"]
text=data["text"]["content"]
text=str(text).replace("\t","").replace(" "," ")
while " " in text:
text=text.replace(" ","")
if text[0]==" ":
text=text[1:]
text=text.split(" ")
if len(text)>0:
if "帮助" in text[0]:
word="""
帮助信息:
1.查看帮助信息 [url=home.php?mod=space&uid=402767]@机器人[/url] 帮助
2.查看自己的id @机器人 查询id
3.增加id @机器人 增加id 权限 id值
权限分为:1-管理员 可以增加id,增删放行规则
2-用户 可以增删放行规则
eg:@机器人 增加id 2 $=cndjbaxba
4.删除id @机器人 删除id id值
eg:@机器人 删除id $=cndjbaxba
5.增加放行规则 @机器人 放行 源ip地址 目的端口 放行时间
eg:@机器人 放行 127.0.0.1 8080 10
6.删除放行规则 @机器人 删除 源ip地址 目的端口
eg:@机器人 删除 127.0.0.1 8080
"""
elif "查询id" in text[0]:
word="您的ID是:"+str(user_id)
elif "放行" in text[0]:
if len(text)<4:
word="放行命令格式不标准。"
else:
is_pass=True
ip=text[1]
port=text[2]
time_data=text[3]
if is_valid_ip(ip=ip)==False:
word="IP格式错误。"
is_pass=False
try:
port=int(port)
except Exception:
is_pass=False
word="端口格式错误。"
try:
time_data=int(time_data)
except Exception:
is_pass=False
word="时间格式错误。"
power_data=fire_wall.check_power(user_id=user_id,power_id=2)
if power_data[0]==False:
is_pass=False
word=power_data[1]
if is_pass:
try:
end_data=fire_wall.add_rule(ip=ip,port=port,time_data=time_data)
if end_data[0]:
word="增加成功"
else:
word="增加失败:"+str(end_data[-1])
except Exception as err:
word="增加失败,报错内容为:"+str(err)
elif "删除id" in text[0]:
if len(text)<2:
word="删除id命令格式不标准。"
else:
is_pass=True
rm_user_id=text[1]
power_data=fire_wall.check_power(user_id=user_id,power_id=1)
if power_data[0]==False:
is_pass=False
word=power_data[1]
if is_pass:
try:
end_data=fire_wall.rm_user(user_id=rm_user_id)
word="删除id成功"
except Exception as err:
word="删除id失败,报错内容为:"+str(err)
elif "增加id" in text[0]:
if len(text)<3:
word="增加id命令格式不标准。"
else:
is_pass=True
add_user_id=text[2]
add_power_id=text[1]
try:
add_power_id=int(add_power_id)
except Exception:
is_pass=False
word="权限数据格式错误。"
power_data=fire_wall.check_power(user_id=user_id,power_id=1)
if power_data[0]==False:
is_pass=False
word=power_data[1]
if is_pass:
try:
end_data=fire_wall.add_user(user_id=add_user_id,power_id=add_power_id)
word="增加用户成功"
except Exception as err:
word="增加用户失败,报错内容为:"+str(err)
elif "删除" in text[0] and "id" not in text[0]:
if len(text)<3:
word="删除命令格式不标准。"
else:
is_pass=True
ip=text[1]
port=text[2]
if is_valid_ip(ip=ip)==False:
word="IP格式错误。"
is_pass=False
try:
port=int(port)
except Exception:
is_pass=False
word="端口格式错误。"
power_data=fire_wall.check_power(user_id=user_id,power_id=2)
if power_data[0]==False:
is_pass=False
word=power_data[1]
if is_pass:
try:
end_data=fire_wall.rm_rule(ip=ip,port=port)
if end_data[0]:
word="删除成功"
else:
word="删除失败:"+str(end_data[-1])
except Exception as err:
word="删除失败,报错内容为:"+str(err)
else:
word="命令识别失败,发送“帮助”,查看命令格式。"
data={"msgtype": "text","text": {"content":word}}
return json.dumps(data)
#运行
if __name__ == '__main__':
app.run(host="0.0.0.0",port=port)
firewall.py代码如下
[Python] 纯文本查看 复制代码 import os,time,json
import ipaddress
import sqlite3
import requests
class Firewall(object):
def __init__(self,robot_url) -> None:
self.robot_url=robot_url
self.__base_order={
"增加放开规则":"firewall-cmd --permanent --add-rich-rule='rule family=\"ipv4\" source address=\"%s\" port protocol=\"tcp\" port=\"%d\" accept'",
"删除放开规则":"firewall-cmd --permanent --remove-rich-rule='rule family=\"ipv4\" source address=\"%s\" port protocol=\"tcp\" port=\"%d\" accept'",
"重新载入规则":"firewall-cmd --reload",
"新建数据表_规则表":"""
create table filewall_rule(
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip_data varchar(200) not null,
port_data unsigned int not null,
create_time unsigned int not null,
out_time unsigned int not null,
enable_status unsigned int not null
)
""",
"新建数据表_权限表":"""
create table filewall_user(
user_id varchar(200) PRIMARY KEY not null,
power_id unsigned int not null,
create_time unsigned int not null,
enable_status unsigned int not null
)
""",
"增加规则数据":"insert into filewall_rule(ip_data,port_data,create_time,out_time,enable_status) values (?,?,?,?,?);",
"删除规则数据":"update filewall_rule set enable_status=0 where ip_data=? and port_data=?;",
"删除规则数据_id":"update filewall_rule set enable_status=0 where id=?;",
"检查哪些数据过期":"select id,ip_data,port_data,create_time,out_time from filewall_rule where enable_status=1 and out_time<=?;",
"增加权限表数据":"insert into filewall_user(user_id,power_id,create_time,enable_status) values (?,?,?,?);",
"删除权限表数据":"delete from filewall_user where user_id=?;",
"查询权限表数据":"select user_id,power_id,create_time,enable_status from filewall_user where user_id=?;",
}
self.__url="http://82.156.184.131:10012/get_word/wvkXMMrn75SWAWbS"
self.db_way="./firewall.db"
self.conn = sqlite3.connect(self.db_way, check_same_thread=False)
self.cursor = self.conn.cursor()
try:
# 建表语句
self.cursor.execute(self.__base_order["新建数据表_权限表"])
except Exception as err:
if "already exists" not in str(err):
raise err
try:
# 建表语句
self.cursor.execute(self.__base_order["新建数据表_规则表"])
except Exception as err:
if "already exists" not in str(err):
raise err
def __del__(self):
self.conn.close()
def is_valid_ip(self,ip:str):
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
def add_rule_os(self,ip:str,port:int):
"""
增加一个放开规则
"""
ip=str(ip)
if self.is_valid_ip(ip)==False:
return False
try:
port=int(port)
except Exception:
return False
run_data=os.popen(self.__base_order["增加放开规则"]%(ip,port))
if "success" in run_data.read():
run_data=os.popen(self.__base_order["重新载入规则"])
if "success" in run_data.read():
return True
return False
def rm_rule_os(self,ip:str,port:int):
"""
删除一个放开规则
"""
ip=str(ip)
if self.is_valid_ip(ip)==False:
return False
try:
port=int(port)
except Exception:
return False
run_data=os.popen(self.__base_order["删除放开规则"]%(ip,port))
if "success" in run_data.read():
run_data=os.popen(self.__base_order["重新载入规则"])
if "success" in run_data.read():
return True
return False
def add_rule_sql(self,ip:str,port:int,time_data:int):
"""
在数据库中增加一条数据
"""
time_data=time_data*60*60
time_now=int(time.time())
time_out=time_now+time_data
self.cursor.execute(self.__base_order["增加规则数据"],(ip,port,time_now,time_out,1))
self.conn.commit()
def rm_rule_sql(self,ip:str,port:int):
"""
删除某一条规则数据
"""
self.cursor.execute(self.__base_order["删除规则数据"],(ip,port))
self.conn.commit()
def rm_rule_sql_id(self,id:int):
"""
删除某一条规则数据
"""
self.cursor.execute(self.__base_order["删除规则数据_id"],(id,))
self.conn.commit()
def add_rule(self,ip:str,port:int,time_data:int):
ip=str(ip)
if self.is_valid_ip(ip)==False:
return [False,"ip格式错误"]
try:
port=int(port)
except Exception:
return [False,"端口格式错误"]
try:
time_data=int(time_data)
if time_data<0 or time_data>50:
[False,"时间范围超限"]
except Exception:
return [False,"时间格式错误"]
self.add_rule_sql(ip,port,time_data)
self.add_rule_os(ip,port)
return [True,""]
def rm_rule(self,ip:str,port:int,id=-1):
"""
删除某一规则
"""
ip=str(ip)
is_pass=True
if self.is_valid_ip(ip)==False:
return [False,"ip格式错误"]
try:
port=int(port)
except Exception:
return [False,"端口格式错误"]
try:
if id==-1:
self.rm_rule_sql(ip,port)
else:
self.rm_rule_sql_id(id)
except Exception as err:
print(err)
answer=self.rm_rule_os(ip,port)
if is_pass==False:
return [False,"sql操作错误"]
return [answer,""]
def check_data_sql(self):
"""
检查哪些数据过期,并删除
"""
time_now=int(time.time())
self.cursor.execute(self.__base_order["检查哪些数据过期"],(time_now,))
data=self.cursor.fetchall()
self.conn.commit()
all_data=[]
for now in data:
answer=self.rm_rule(ip=now[1],port=now[2],id=now[0])
all_data.append(
{
"id":now[0],
"ip":now[1],
"port":now[2],
"create_time":now[3],
"out_time":now[4],
"answer":answer
}
)
print(all_data)
return all_data
def send_word(self,word):
"""
向钉钉机器人发送消息
"""
header={
"Content-Type":"application/json"
}
data={"msgtype": "text","text": {"content":word}}
for i in range(0,3):
try:
r=requests.post(url=self.robot_url,headers=header,timeout=60,json=data)
break
except Exception as err:
print(err)
def check_data(self):
"""
检查数据是否超时
"""
while True:
time.sleep(10)
data=self.check_data_sql()
if len(data)>0:
for rm_data in data:
nid=rm_data["id"]
ip=rm_data["ip"]
port=rm_data["port"]
create_time=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(rm_data["create_time"]))))
out_time=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(rm_data["out_time"]))))
answer=str(rm_data["answer"])
word=f"""
任务超时删除:
任务ID:{nid}
ip:{ip}
port:{port}
创建时间:{create_time}
超时时间:{out_time}
删除结果:{answer}
"""
self.send_word(word=word)
def check_power(self,user_id:str,power_id):
"""
检查用户是否拥有此权限
"""
self.cursor.execute(self.__base_order["查询权限表数据"],(user_id,))
data=self.cursor.fetchall()
self.conn.commit()
is_pass=False
word=""
for user_data in data:
if user_id==user_data[0]:
if int(user_data[1])<=power_id and int(user_data[3])==1:
is_pass=True
word="鉴权通过"
else:
is_pass=False
word="权限不足"
if word=="":
word="查无此人"
return [is_pass,word]
def add_user(self,user_id:str,power_id:int):
"""
增加用户
"""
time_now=int(time.time())
self.cursor.execute(self.__base_order["增加权限表数据"],(user_id,power_id,time_now,1))
self.conn.commit()
def rm_user(self,user_id):
"""
删除用户
"""
self.cursor.execute(self.__base_order["删除权限表数据"],(user_id,))
self.conn.commit()
五、附件
钉钉机器人控制防火墙工具.zip
(4.45 KB, 下载次数: 9)
|
免费评分
-
查看全部评分
|