吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 2147|回复: 13
收起左侧

[Python 原创] linux防火墙firewall与钉钉机器人联动

[复制链接]
天域至尊 发表于 2023-2-17 12:51
本帖最后由 天域至尊 于 2023-2-17 12:53 编辑

一、项目背景
在日常使用中,我们常常希望动态远程操作防火墙,以保证风险应用的安全,此项目实现了使用钉钉机器人来远程控制Linux应用防火墙。可以在钉钉群以@机器人的方式,来实现控制防火墙开关,支持权限控制、定时关闭。
二、依赖环境
运行环境:Linux (测试系统为centos7)
防火墙:firewall,并处在运行状态。
机器人:钉钉机器人,并开启outgoing功能(该功能现在处在维护状态,新申请的机器人无法打开)
开发环境:python 3.7+ flask requests sqlite3
三、环境部署
启动firewalld
[Bash shell] 纯文本查看 复制代码
sodu systemctl start firewalld

放行服务端口(自己指定端口号,应与下述main.py文件中,第16行端口保持一致)
[Bash shell] 纯文本查看 复制代码
firewall-cmd --zone=public --add-port=你的端口/tcp --permanent

刷新防火墙
[Bash shell] 纯文本查看 复制代码
firewall-cmd --reload

配置脚本内容,并运行脚本
[Bash shell] 纯文本查看 复制代码
python3 main.py

查看提示信息
66.png
如果是云服务器,则在云服务器安全组上放开对应端口
访问提示站点尝试,如下则是正常
77.png
增加钉钉机器人,勾选outgoing,输入对应的提示信息
88.png
该功能现属于维护阶段,如放开使用,则如下
99.png
配置完成后,@机器人,查询帮助信息,如正常响应,则配置完成。

100.png

四、源代码
代码结构:
在同一文件夹下,两个py文件,分别为:firewall.py和main.py
101.png
mian.py代码如下(其中8-16行,都需要自己配置
[Python] 纯文本查看 复制代码
from flask import Flask
import requests,json,ipaddress,time,threading,json
from flask import request
from firewall import Firewall

app = Flask(__name__)
#机器人声明的token
TOKEN="123"
#设置机器人地址
robot_url="https://oapi.dingtalk.com/robot/send?access_token=*******"
#本机url_key,建议设置为复杂的key,不要出现特殊符号
url_key="xjcbkasbasba"
#本机公网地址
ip="127.0.0.1"
#本服务监听端口
port="8080"

control_url='/control/'+url_key

control_url_out=f"http://{ip}:{port}{control_url}"
print(f"""
请在您的钉钉机器人的Outgoing配置中输入如下信息:
post地址:{control_url_out}
Token:{TOKEN}
""")

fire_wall=Firewall(robot_url=robot_url)
fire_wall_check=threading.Thread(target=fire_wall.check_data)
fire_wall_check.start()

def is_valid_ip(ip:str):
    try:
        ipaddress.ip_address(ip)
        return True
    except ValueError:
        return False

def send_word(word):
    """
    """
    global robot_url
    header={
        "Content-Type":"application/json"
    }
    data={"msgtype": "text","text": {"content":word}}
    for i in range(0,3):
        try:
            r=requests.post(url=robot_url,headers=header,timeout=60,json=data)
            break
        except Exception as err:
            print(err)

@app.after_request
def modify_server_header(response):
    response.headers['Server'] = 'Windows Server 8.0/IIS 10.0'
    return response


#路由
@app.route(control_url,methods=['POST'])
def add_data():
    word="命令识别失败,发送“帮助”,查看命令格式。"
    value = request.headers.get("token")
    if TOKEN!=value:
        return "error power not allow!"
    data=request.json
    if data:
        user_id=data["senderId"]
        text=data["text"]["content"]
        text=str(text).replace("\t","").replace("  "," ")
        while "  " in text:
            text=text.replace("  ","")
        if text[0]==" ":
            text=text[1:]
        text=text.split(" ")
        if len(text)>0:
            if "帮助" in text[0]:
                word="""
帮助信息:
    1.查看帮助信息 [url=home.php?mod=space&uid=402767]@机器人[/url] 帮助
    2.查看自己的id @机器人 查询id
    3.增加id @机器人 增加id 权限 id值
        权限分为:1-管理员 可以增加id,增删放行规则
                 2-用户 可以增删放行规则
            eg:@机器人 增加id 2 $=cndjbaxba
    4.删除id @机器人 删除id id值
            eg:@机器人 删除id $=cndjbaxba
    5.增加放行规则 @机器人 放行 源ip地址 目的端口 放行时间
        eg:@机器人 放行 127.0.0.1 8080 10
    6.删除放行规则 @机器人 删除 源ip地址 目的端口
        eg:@机器人 删除 127.0.0.1 8080
                """
            elif "查询id" in text[0]:
                word="您的ID是:"+str(user_id)

            elif "放行" in text[0]:
                if len(text)<4:
                    word="放行命令格式不标准。"
                else:
                    is_pass=True
                    ip=text[1]
                    port=text[2]
                    time_data=text[3]
                    if is_valid_ip(ip=ip)==False:
                        word="IP格式错误。"
                        is_pass=False
                    try:
                        port=int(port)
                    except Exception:
                        is_pass=False
                        word="端口格式错误。"
                    try:
                        time_data=int(time_data)
                    except Exception:
                        is_pass=False
                        word="时间格式错误。"
                    power_data=fire_wall.check_power(user_id=user_id,power_id=2)
                    
                    if power_data[0]==False:
                        is_pass=False
                        word=power_data[1]

                    if is_pass:
                        try:
                            end_data=fire_wall.add_rule(ip=ip,port=port,time_data=time_data)
                            if end_data[0]:
                                word="增加成功"
                            else:
                                word="增加失败:"+str(end_data[-1])
                        except Exception as err:
                            word="增加失败,报错内容为:"+str(err)
            elif "删除id" in text[0]:
                if len(text)<2:
                    word="删除id命令格式不标准。"
                else:
                    is_pass=True
                    rm_user_id=text[1]
                    power_data=fire_wall.check_power(user_id=user_id,power_id=1)
                    
                    if power_data[0]==False:
                        is_pass=False
                        word=power_data[1]
                    
                    if is_pass:
                        try:
                            end_data=fire_wall.rm_user(user_id=rm_user_id)
                            word="删除id成功"
                        except Exception as err:
                            word="删除id失败,报错内容为:"+str(err)
            elif "增加id" in text[0]:
                if len(text)<3:
                    word="增加id命令格式不标准。"
                else:
                    is_pass=True
                    add_user_id=text[2]
                    add_power_id=text[1]
                    try:
                        add_power_id=int(add_power_id)
                    except Exception:
                        is_pass=False
                        word="权限数据格式错误。"

                    power_data=fire_wall.check_power(user_id=user_id,power_id=1)
                    
                    if power_data[0]==False:
                        is_pass=False
                        word=power_data[1]
                    
                    if is_pass:
                        try:
                            end_data=fire_wall.add_user(user_id=add_user_id,power_id=add_power_id)
                            word="增加用户成功"
                        except Exception as err:
                            word="增加用户失败,报错内容为:"+str(err)
            
            elif "删除" in text[0] and "id" not in text[0]:
                if len(text)<3:
                    word="删除命令格式不标准。"
                else:
                    is_pass=True
                    ip=text[1]
                    port=text[2]
                    if is_valid_ip(ip=ip)==False:
                        word="IP格式错误。"
                        is_pass=False
                    try:
                        port=int(port)
                    except Exception:
                        is_pass=False
                        word="端口格式错误。"
                    power_data=fire_wall.check_power(user_id=user_id,power_id=2)
                    
                    if power_data[0]==False:
                        is_pass=False
                        word=power_data[1]
                    
                    if is_pass:
                        try:
                            end_data=fire_wall.rm_rule(ip=ip,port=port)
                            if end_data[0]:
                                word="删除成功"
                            else:
                                word="删除失败:"+str(end_data[-1])
                        except Exception as err:
                            word="删除失败,报错内容为:"+str(err)
        else:
            word="命令识别失败,发送“帮助”,查看命令格式。"
    data={"msgtype": "text","text": {"content":word}}
    return json.dumps(data)

#运行
if __name__ == '__main__':
    app.run(host="0.0.0.0",port=port)

firewall.py代码如下
[Python] 纯文本查看 复制代码
import os,time,json
import ipaddress
import sqlite3
import requests

class Firewall(object):
    def __init__(self,robot_url) -> None:
        self.robot_url=robot_url
        self.__base_order={
            "增加放开规则":"firewall-cmd --permanent --add-rich-rule='rule family=\"ipv4\" source address=\"%s\" port protocol=\"tcp\" port=\"%d\" accept'",
            "删除放开规则":"firewall-cmd --permanent --remove-rich-rule='rule family=\"ipv4\" source address=\"%s\" port protocol=\"tcp\" port=\"%d\" accept'",
            "重新载入规则":"firewall-cmd --reload",
            "新建数据表_规则表":"""
                create table filewall_rule(
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    ip_data varchar(200) not null,
                    port_data unsigned int not null,
                    create_time unsigned int not null,
                    out_time unsigned int not null,
                    enable_status unsigned int not null
                )
            """,
            "新建数据表_权限表":"""
                create table filewall_user(
                    user_id varchar(200) PRIMARY KEY not null,
                    power_id unsigned int not null,
                    create_time unsigned int not null,
                    enable_status unsigned int not null
                )
            """,
            "增加规则数据":"insert into filewall_rule(ip_data,port_data,create_time,out_time,enable_status) values (?,?,?,?,?);",
            "删除规则数据":"update filewall_rule set enable_status=0 where ip_data=? and port_data=?;",
            "删除规则数据_id":"update filewall_rule set enable_status=0 where id=?;",
            "检查哪些数据过期":"select id,ip_data,port_data,create_time,out_time from filewall_rule where enable_status=1 and out_time<=?;",
            "增加权限表数据":"insert into filewall_user(user_id,power_id,create_time,enable_status) values (?,?,?,?);",
            "删除权限表数据":"delete from filewall_user where user_id=?;",
            "查询权限表数据":"select user_id,power_id,create_time,enable_status from filewall_user where user_id=?;",
        }
        self.__url="http://82.156.184.131:10012/get_word/wvkXMMrn75SWAWbS"
        self.db_way="./firewall.db"
        self.conn = sqlite3.connect(self.db_way, check_same_thread=False)
        self.cursor = self.conn.cursor()
        try:
            # 建表语句
            self.cursor.execute(self.__base_order["新建数据表_权限表"])
        except Exception as err:
            if "already exists" not in str(err):
                raise err
        try:
            # 建表语句
            self.cursor.execute(self.__base_order["新建数据表_规则表"])
        except Exception as err:
            if "already exists" not in str(err):
                raise err
    def __del__(self):
        self.conn.close()
    def is_valid_ip(self,ip:str):
        try:
            ipaddress.ip_address(ip)
            return True
        except ValueError:
            return False
    def add_rule_os(self,ip:str,port:int):
        """
        增加一个放开规则
        """
        ip=str(ip)
        if self.is_valid_ip(ip)==False:
            return False
        try:
            port=int(port)
        except Exception:
            return False
        
        run_data=os.popen(self.__base_order["增加放开规则"]%(ip,port))
        if "success" in run_data.read():
            run_data=os.popen(self.__base_order["重新载入规则"])
            if "success" in run_data.read():
                return True
        return False
    
    def rm_rule_os(self,ip:str,port:int):
        """
        删除一个放开规则
        """
        ip=str(ip)
        if self.is_valid_ip(ip)==False:
            return False
        try:
            port=int(port)
        except Exception:
            return False
        
        run_data=os.popen(self.__base_order["删除放开规则"]%(ip,port))
        if "success" in run_data.read():
            run_data=os.popen(self.__base_order["重新载入规则"])
            if "success" in run_data.read():
                return True
        return False
    
    def add_rule_sql(self,ip:str,port:int,time_data:int):
        """
        在数据库中增加一条数据
        """
        time_data=time_data*60*60
        time_now=int(time.time())
        time_out=time_now+time_data
        self.cursor.execute(self.__base_order["增加规则数据"],(ip,port,time_now,time_out,1))
        self.conn.commit()
    
    def rm_rule_sql(self,ip:str,port:int):
        """
        删除某一条规则数据
        """
        self.cursor.execute(self.__base_order["删除规则数据"],(ip,port))
        self.conn.commit()
    
    def rm_rule_sql_id(self,id:int):
        """
        删除某一条规则数据
        """
        self.cursor.execute(self.__base_order["删除规则数据_id"],(id,))
        self.conn.commit()

    def add_rule(self,ip:str,port:int,time_data:int):
        ip=str(ip)
        if self.is_valid_ip(ip)==False:
            return [False,"ip格式错误"]
        try:
            port=int(port)
        except Exception:
            return [False,"端口格式错误"]
        
        try:
            time_data=int(time_data)
            if time_data<0 or time_data>50:
                [False,"时间范围超限"]
        except Exception:
            return [False,"时间格式错误"]
        
        self.add_rule_sql(ip,port,time_data)
        self.add_rule_os(ip,port)
        return [True,""]
    
    def rm_rule(self,ip:str,port:int,id=-1):
        """
        删除某一规则
        """
        ip=str(ip)
        is_pass=True
        if self.is_valid_ip(ip)==False:
            return [False,"ip格式错误"]
        try:
            port=int(port)
        except Exception:
            return [False,"端口格式错误"]
        try:
            if id==-1:
                self.rm_rule_sql(ip,port)
            else:
                self.rm_rule_sql_id(id)
        except Exception as err:
            print(err)
        answer=self.rm_rule_os(ip,port)
        if is_pass==False:
            return [False,"sql操作错误"]
        return [answer,""]
    
    def check_data_sql(self):
        """
        检查哪些数据过期,并删除
        """
        time_now=int(time.time())
        self.cursor.execute(self.__base_order["检查哪些数据过期"],(time_now,))
        data=self.cursor.fetchall()
        self.conn.commit()

        all_data=[]
        for now in data:
            answer=self.rm_rule(ip=now[1],port=now[2],id=now[0])
            all_data.append(
                    {
                        "id":now[0],
                        "ip":now[1],
                        "port":now[2],
                        "create_time":now[3],
                        "out_time":now[4],
                        "answer":answer
                    }
                )
            print(all_data)

        return all_data
    
    def send_word(self,word):
        """
        向钉钉机器人发送消息
        """
        header={
            "Content-Type":"application/json"
        }
        data={"msgtype": "text","text": {"content":word}}
        for i in range(0,3):
            try:
                r=requests.post(url=self.robot_url,headers=header,timeout=60,json=data)
                break
            except Exception as err:
                print(err)

    def check_data(self):
        """
        检查数据是否超时
        """
        while True:
            time.sleep(10)
            
            data=self.check_data_sql()
            
            if len(data)>0:
                for rm_data in data:
                    nid=rm_data["id"]
                    ip=rm_data["ip"]
                    port=rm_data["port"]
                    create_time=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(rm_data["create_time"]))))
                    out_time=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(rm_data["out_time"]))))
                    answer=str(rm_data["answer"])
                    word=f"""
任务超时删除:
    任务ID:{nid}
    ip:{ip}
    port:{port}
    创建时间:{create_time}
    超时时间:{out_time}
    删除结果:{answer}
                """
                    self.send_word(word=word)
    
    def check_power(self,user_id:str,power_id):
        """
        检查用户是否拥有此权限
        """
        self.cursor.execute(self.__base_order["查询权限表数据"],(user_id,))
        data=self.cursor.fetchall()
        self.conn.commit()

        is_pass=False
        word=""

        for user_data in data:
            if user_id==user_data[0]:
                if int(user_data[1])<=power_id and int(user_data[3])==1:
                    is_pass=True
                    word="鉴权通过"
                else:
                    is_pass=False
                    word="权限不足"
        if word=="":
            word="查无此人"
        return [is_pass,word]
    
    def add_user(self,user_id:str,power_id:int):
        """
        增加用户
        """
        time_now=int(time.time())
        self.cursor.execute(self.__base_order["增加权限表数据"],(user_id,power_id,time_now,1))
        self.conn.commit()
    
    def rm_user(self,user_id):
        """
        删除用户
        """
        self.cursor.execute(self.__base_order["删除权限表数据"],(user_id,))
        self.conn.commit()




五、附件
钉钉机器人控制防火墙工具.zip (4.45 KB, 下载次数: 9)

免费评分

参与人数 6吾爱币 +13 热心值 +6 收起 理由
mengfang1 + 1 + 1 我很赞同!
happyxuexi + 1 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
笙若 + 1 + 1 谢谢@Thanks!
侃遍天下无二人 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
wkdxz + 2 + 1 感谢发布原创作品,吾爱破解论坛因你更精彩!
lyq87 + 1 + 1 谢谢@Thanks!

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

jingyu2333 发表于 2023-2-17 13:48
感觉服务器的防火墙真的好难呀。
jingyu2333 发表于 2023-2-17 13:49
谁能弄个免费的服务器的防火墙?有这样的东西吗?
JuncoJet 发表于 2023-2-17 16:07
dujiu3611 发表于 2023-2-17 16:42
赞啊,我一直想找一个可以远程管理服务器防火墙的程序,为的就是方便偶尔的操作,感谢分享
wkdxz 发表于 2023-2-17 17:45
思路清晰 操作稳健  赞一个
huduke 发表于 2023-2-18 10:36
感谢分享
lcg2014 发表于 2023-2-18 13:36
阿里钉钉链接内网服务器安全吗?
 楼主| 天域至尊 发表于 2023-2-20 09:08
jingyu2333 发表于 2023-2-17 13:49
谁能弄个免费的服务器的防火墙?有这样的东西吗?

Linux自带iptables,firewalld
 楼主| 天域至尊 发表于 2023-2-20 09:09
lcg2014 发表于 2023-2-18 13:36
阿里钉钉链接内网服务器安全吗?

看业务需要,内网机器没法回传啊
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-11-28 14:36

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表