天域至尊 发表于 2023-2-17 12:51

linux防火墙firewall与钉钉机器人联动

本帖最后由 天域至尊 于 2023-2-17 12:53 编辑

一、项目背景
在日常使用中,我们常常希望动态远程操作防火墙,以保证风险应用的安全,此项目实现了使用钉钉机器人来远程控制Linux应用防火墙。可以在钉钉群以@机器人的方式,来实现控制防火墙开关,支持权限控制、定时关闭。
二、依赖环境
运行环境:Linux (测试系统为centos7)
防火墙:firewall,并处在运行状态。
机器人:钉钉机器人,并开启outgoing功能(该功能现在处在维护状态,新申请的机器人无法打开)
开发环境:python 3.7+ flask requests sqlite3
三、环境部署
启动firewalld
sodu systemctl start firewalld
放行服务端口(自己指定端口号,应与下述main.py文件中,第16行端口保持一致)
firewall-cmd --zone=public --add-port=你的端口/tcp --permanent
刷新防火墙
firewall-cmd --reload
配置脚本内容,并运行脚本
python3 main.py
查看提示信息

如果是云服务器,则在云服务器安全组上放开对应端口
访问提示站点尝试,如下则是正常

增加钉钉机器人,勾选outgoing,输入对应的提示信息

该功能现属于维护阶段,如放开使用,则如下

配置完成后,@机器人,查询帮助信息,如正常响应,则配置完成。



四、源代码
代码结构:
在同一文件夹下,两个py文件,分别为:firewall.py和main.py

mian.py代码如下(其中8-16行,都需要自己配置)
from flask import Flask
import requests,json,ipaddress,time,threading,json
from flask import request
from firewall import Firewall

app = Flask(__name__)
#机器人声明的token
TOKEN="123"
#设置机器人地址
robot_url="https://oapi.dingtalk.com/robot/send?access_token=*******"
#本机url_key,建议设置为复杂的key,不要出现特殊符号
url_key="xjcbkasbasba"
#本机公网地址
ip="127.0.0.1"
#本服务监听端口
port="8080"

control_url='/control/'+url_key

control_url_out=f"http://{ip}:{port}{control_url}"
print(f"""
请在您的钉钉机器人的Outgoing配置中输入如下信息:
post地址:{control_url_out}
Token:{TOKEN}
""")

fire_wall=Firewall(robot_url=robot_url)
fire_wall_check=threading.Thread(target=fire_wall.check_data)
fire_wall_check.start()

def is_valid_ip(ip:str):
    try:
      ipaddress.ip_address(ip)
      return True
    except ValueError:
      return False

def send_word(word):
    """
    """
    global robot_url
    header={
      "Content-Type":"application/json"
    }
    data={"msgtype": "text","text": {"content":word}}
    for i in range(0,3):
      try:
            r=requests.post(url=robot_url,headers=header,timeout=60,json=data)
            break
      except Exception as err:
            print(err)

@app.after_request
def modify_server_header(response):
    response.headers['Server'] = 'Windows Server 8.0/IIS 10.0'
    return response


#路由
@app.route(control_url,methods=['POST'])
def add_data():
    word="命令识别失败,发送“帮助”,查看命令格式。"
    value = request.headers.get("token")
    if TOKEN!=value:
      return "error power not allow!"
    data=request.json
    if data:
      user_id=data["senderId"]
      text=data["text"]["content"]
      text=str(text).replace("\t","").replace(""," ")
      while "" in text:
            text=text.replace("","")
      if text==" ":
            text=text
      text=text.split(" ")
      if len(text)>0:
            if "帮助" in text:
                word="""
帮助信息:
    1.查看帮助信息 @机器人 帮助
    2.查看自己的id @机器人 查询id
    3.增加id @机器人 增加id 权限 id值
      权限分为:1-管理员 可以增加id,增删放行规则
               2-用户 可以增删放行规则
            eg:@机器人 增加id 2 $=cndjbaxba
    4.删除id @机器人 删除id id值
            eg:@机器人 删除id $=cndjbaxba
    5.增加放行规则 @机器人 放行 源ip地址 目的端口 放行时间
      eg:@机器人 放行 127.0.0.1 8080 10
    6.删除放行规则 @机器人 删除 源ip地址 目的端口
      eg:@机器人 删除 127.0.0.1 8080
                """
            elif "查询id" in text:
                word="您的ID是:"+str(user_id)

            elif "放行" in text:
                if len(text)<4:
                  word="放行命令格式不标准。"
                else:
                  is_pass=True
                  ip=text
                  port=text
                  time_data=text
                  if is_valid_ip(ip=ip)==False:
                        word="IP格式错误。"
                        is_pass=False
                  try:
                        port=int(port)
                  except Exception:
                        is_pass=False
                        word="端口格式错误。"
                  try:
                        time_data=int(time_data)
                  except Exception:
                        is_pass=False
                        word="时间格式错误。"
                  power_data=fire_wall.check_power(user_id=user_id,power_id=2)
                  
                  if power_data==False:
                        is_pass=False
                        word=power_data

                  if is_pass:
                        try:
                            end_data=fire_wall.add_rule(ip=ip,port=port,time_data=time_data)
                            if end_data:
                              word="增加成功"
                            else:
                              word="增加失败:"+str(end_data[-1])
                        except Exception as err:
                            word="增加失败,报错内容为:"+str(err)
            elif "删除id" in text:
                if len(text)<2:
                  word="删除id命令格式不标准。"
                else:
                  is_pass=True
                  rm_user_id=text
                  power_data=fire_wall.check_power(user_id=user_id,power_id=1)
                  
                  if power_data==False:
                        is_pass=False
                        word=power_data
                  
                  if is_pass:
                        try:
                            end_data=fire_wall.rm_user(user_id=rm_user_id)
                            word="删除id成功"
                        except Exception as err:
                            word="删除id失败,报错内容为:"+str(err)
            elif "增加id" in text:
                if len(text)<3:
                  word="增加id命令格式不标准。"
                else:
                  is_pass=True
                  add_user_id=text
                  add_power_id=text
                  try:
                        add_power_id=int(add_power_id)
                  except Exception:
                        is_pass=False
                        word="权限数据格式错误。"

                  power_data=fire_wall.check_power(user_id=user_id,power_id=1)
                  
                  if power_data==False:
                        is_pass=False
                        word=power_data
                  
                  if is_pass:
                        try:
                            end_data=fire_wall.add_user(user_id=add_user_id,power_id=add_power_id)
                            word="增加用户成功"
                        except Exception as err:
                            word="增加用户失败,报错内容为:"+str(err)
            
            elif "删除" in text and "id" not in text:
                if len(text)<3:
                  word="删除命令格式不标准。"
                else:
                  is_pass=True
                  ip=text
                  port=text
                  if is_valid_ip(ip=ip)==False:
                        word="IP格式错误。"
                        is_pass=False
                  try:
                        port=int(port)
                  except Exception:
                        is_pass=False
                        word="端口格式错误。"
                  power_data=fire_wall.check_power(user_id=user_id,power_id=2)
                  
                  if power_data==False:
                        is_pass=False
                        word=power_data
                  
                  if is_pass:
                        try:
                            end_data=fire_wall.rm_rule(ip=ip,port=port)
                            if end_data:
                              word="删除成功"
                            else:
                              word="删除失败:"+str(end_data[-1])
                        except Exception as err:
                            word="删除失败,报错内容为:"+str(err)
      else:
            word="命令识别失败,发送“帮助”,查看命令格式。"
    data={"msgtype": "text","text": {"content":word}}
    return json.dumps(data)

#运行
if __name__ == '__main__':
    app.run(host="0.0.0.0",port=port)
firewall.py代码如下
import os,time,json
import ipaddress
import sqlite3
import requests

class Firewall(object):
    def __init__(self,robot_url) -> None:
      self.robot_url=robot_url
      self.__base_order={
            "增加放开规则":"firewall-cmd --permanent --add-rich-rule='rule family=\"ipv4\" source address=\"%s\" port protocol=\"tcp\" port=\"%d\" accept'",
            "删除放开规则":"firewall-cmd --permanent --remove-rich-rule='rule family=\"ipv4\" source address=\"%s\" port protocol=\"tcp\" port=\"%d\" accept'",
            "重新载入规则":"firewall-cmd --reload",
            "新建数据表_规则表":"""
                create table filewall_rule(
                  id INTEGER PRIMARY KEY AUTOINCREMENT,
                  ip_data varchar(200) not null,
                  port_data unsigned int not null,
                  create_time unsigned int not null,
                  out_time unsigned int not null,
                  enable_status unsigned int not null
                )
            """,
            "新建数据表_权限表":"""
                create table filewall_user(
                  user_id varchar(200) PRIMARY KEY not null,
                  power_id unsigned int not null,
                  create_time unsigned int not null,
                  enable_status unsigned int not null
                )
            """,
            "增加规则数据":"insert into filewall_rule(ip_data,port_data,create_time,out_time,enable_status) values (?,?,?,?,?);",
            "删除规则数据":"update filewall_rule set enable_status=0 where ip_data=? and port_data=?;",
            "删除规则数据_id":"update filewall_rule set enable_status=0 where id=?;",
            "检查哪些数据过期":"select id,ip_data,port_data,create_time,out_time from filewall_rule where enable_status=1 and out_time<=?;",
            "增加权限表数据":"insert into filewall_user(user_id,power_id,create_time,enable_status) values (?,?,?,?);",
            "删除权限表数据":"delete from filewall_user where user_id=?;",
            "查询权限表数据":"select user_id,power_id,create_time,enable_status from filewall_user where user_id=?;",
      }
      self.__url="http://82.156.184.131:10012/get_word/wvkXMMrn75SWAWbS"
      self.db_way="./firewall.db"
      self.conn = sqlite3.connect(self.db_way, check_same_thread=False)
      self.cursor = self.conn.cursor()
      try:
            # 建表语句
            self.cursor.execute(self.__base_order["新建数据表_权限表"])
      except Exception as err:
            if "already exists" not in str(err):
                raise err
      try:
            # 建表语句
            self.cursor.execute(self.__base_order["新建数据表_规则表"])
      except Exception as err:
            if "already exists" not in str(err):
                raise err
    def __del__(self):
      self.conn.close()
    def is_valid_ip(self,ip:str):
      try:
            ipaddress.ip_address(ip)
            return True
      except ValueError:
            return False
    def add_rule_os(self,ip:str,port:int):
      """
      增加一个放开规则
      """
      ip=str(ip)
      if self.is_valid_ip(ip)==False:
            return False
      try:
            port=int(port)
      except Exception:
            return False
      
      run_data=os.popen(self.__base_order["增加放开规则"]%(ip,port))
      if "success" in run_data.read():
            run_data=os.popen(self.__base_order["重新载入规则"])
            if "success" in run_data.read():
                return True
      return False
   
    def rm_rule_os(self,ip:str,port:int):
      """
      删除一个放开规则
      """
      ip=str(ip)
      if self.is_valid_ip(ip)==False:
            return False
      try:
            port=int(port)
      except Exception:
            return False
      
      run_data=os.popen(self.__base_order["删除放开规则"]%(ip,port))
      if "success" in run_data.read():
            run_data=os.popen(self.__base_order["重新载入规则"])
            if "success" in run_data.read():
                return True
      return False
   
    def add_rule_sql(self,ip:str,port:int,time_data:int):
      """
      在数据库中增加一条数据
      """
      time_data=time_data*60*60
      time_now=int(time.time())
      time_out=time_now+time_data
      self.cursor.execute(self.__base_order["增加规则数据"],(ip,port,time_now,time_out,1))
      self.conn.commit()
   
    def rm_rule_sql(self,ip:str,port:int):
      """
      删除某一条规则数据
      """
      self.cursor.execute(self.__base_order["删除规则数据"],(ip,port))
      self.conn.commit()
   
    def rm_rule_sql_id(self,id:int):
      """
      删除某一条规则数据
      """
      self.cursor.execute(self.__base_order["删除规则数据_id"],(id,))
      self.conn.commit()

    def add_rule(self,ip:str,port:int,time_data:int):
      ip=str(ip)
      if self.is_valid_ip(ip)==False:
            return
      try:
            port=int(port)
      except Exception:
            return
      
      try:
            time_data=int(time_data)
            if time_data<0 or time_data>50:
               
      except Exception:
            return
      
      self.add_rule_sql(ip,port,time_data)
      self.add_rule_os(ip,port)
      return
   
    def rm_rule(self,ip:str,port:int,id=-1):
      """
      删除某一规则
      """
      ip=str(ip)
      is_pass=True
      if self.is_valid_ip(ip)==False:
            return
      try:
            port=int(port)
      except Exception:
            return
      try:
            if id==-1:
                self.rm_rule_sql(ip,port)
            else:
                self.rm_rule_sql_id(id)
      except Exception as err:
            print(err)
      answer=self.rm_rule_os(ip,port)
      if is_pass==False:
            return
      return
   
    def check_data_sql(self):
      """
      检查哪些数据过期,并删除
      """
      time_now=int(time.time())
      self.cursor.execute(self.__base_order["检查哪些数据过期"],(time_now,))
      data=self.cursor.fetchall()
      self.conn.commit()

      all_data=[]
      for now in data:
            answer=self.rm_rule(ip=now,port=now,id=now)
            all_data.append(
                  {
                        "id":now,
                        "ip":now,
                        "port":now,
                        "create_time":now,
                        "out_time":now,
                        "answer":answer
                  }
                )
            print(all_data)

      return all_data
   
    def send_word(self,word):
      """
      向钉钉机器人发送消息
      """
      header={
            "Content-Type":"application/json"
      }
      data={"msgtype": "text","text": {"content":word}}
      for i in range(0,3):
            try:
                r=requests.post(url=self.robot_url,headers=header,timeout=60,json=data)
                break
            except Exception as err:
                print(err)

    def check_data(self):
      """
      检查数据是否超时
      """
      while True:
            time.sleep(10)
            
            data=self.check_data_sql()
            
            if len(data)>0:
                for rm_data in data:
                  nid=rm_data["id"]
                  ip=rm_data["ip"]
                  port=rm_data["port"]
                  create_time=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(rm_data["create_time"]))))
                  out_time=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(rm_data["out_time"]))))
                  answer=str(rm_data["answer"])
                  word=f"""
任务超时删除:
    任务ID:{nid}
    ip:{ip}
    port:{port}
    创建时间:{create_time}
    超时时间:{out_time}
    删除结果:{answer}
                """
                  self.send_word(word=word)
   
    def check_power(self,user_id:str,power_id):
      """
      检查用户是否拥有此权限
      """
      self.cursor.execute(self.__base_order["查询权限表数据"],(user_id,))
      data=self.cursor.fetchall()
      self.conn.commit()

      is_pass=False
      word=""

      for user_data in data:
            if user_id==user_data:
                if int(user_data)<=power_id and int(user_data)==1:
                  is_pass=True
                  word="鉴权通过"
                else:
                  is_pass=False
                  word="权限不足"
      if word=="":
            word="查无此人"
      return
   
    def add_user(self,user_id:str,power_id:int):
      """
      增加用户
      """
      time_now=int(time.time())
      self.cursor.execute(self.__base_order["增加权限表数据"],(user_id,power_id,time_now,1))
      self.conn.commit()
   
    def rm_user(self,user_id):
      """
      删除用户
      """
      self.cursor.execute(self.__base_order["删除权限表数据"],(user_id,))
      self.conn.commit()




五、附件


jingyu2333 发表于 2023-2-17 13:48

感觉服务器的防火墙真的好难呀。

jingyu2333 发表于 2023-2-17 13:49

谁能弄个免费的服务器的防火墙?有这样的东西吗?

JuncoJet 发表于 2023-2-17 16:07

jingyu2333 发表于 2023-2-17 13:49
谁能弄个免费的服务器的防火墙?有这样的东西吗?

iptables

dujiu3611 发表于 2023-2-17 16:42

{:1_921:} 赞啊,我一直想找一个可以远程管理服务器防火墙的程序,为的就是方便偶尔的操作,感谢分享

wkdxz 发表于 2023-2-17 17:45

思路清晰 操作稳健赞一个

huduke 发表于 2023-2-18 10:36

感谢分享

lcg2014 发表于 2023-2-18 13:36

阿里钉钉链接内网服务器安全吗?

天域至尊 发表于 2023-2-20 09:08

jingyu2333 发表于 2023-2-17 13:49
谁能弄个免费的服务器的防火墙?有这样的东西吗?

Linux自带iptables,firewalld

天域至尊 发表于 2023-2-20 09:09

lcg2014 发表于 2023-2-18 13:36
阿里钉钉链接内网服务器安全吗?

看业务需要,内网机器没法回传啊
页: [1] 2
查看完整版本: linux防火墙firewall与钉钉机器人联动