天域至尊 发表于 2022-3-10 18:02

树莓派第五课之自制小米家庭真智能网关

本帖最后由 天域至尊 于 2022-3-10 18:04 编辑

第一课,温度传感器https://www.52pojie.cn/thread-902898-1-1.html
第二课,继电器https://www.52pojie.cn/thread-903925-1-1.html
第三课,舵机https://www.52pojie.cn/thread-911228-1-1.html
第四课,DHT11湿温度传感器https://www.52pojie.cn/thread-927626-1-1.html

这篇文章早就想写了,一直没时间,今天忙里偷闲,赶紧写了发出来。

作为资深米粉,我对小米的产品是这样描述的:可以用,但是也只是可以用。功能都有,但是好用吗,就没啥关系了。
列为看官可能要问了,有问题不去找客服反馈,为啥在网上抹黑小米。
我在小米论坛多次搜索,发现数年前就有我遇到问题,但是一直没解决。问题是啥呢?

在寒冷的冬季,我有一个最简单的需求:
1.温度低于18度,自动打开电暖器和加湿器
2.温度高于25度,自动关闭电暖器和加湿器。
为此,我购买了如下智能设备:
1.小米蓝牙湿温度传感计2
2.小米智能插座2 蓝牙网关版

如下:



然后我就发现我犯了个致命的错误!
这俩都是二代,加起来不就二中二了吗属于是

然而,我还是大意了,我知道它二,但是我没想到它那么二

我的设计方案是这样的:

温湿度传感器将数据传给智能插座,通过控制电源开关,来实现电暖器和加湿器的工作或停止。

在米家智能场景里配置好了以后,我发现它不是不智能的问题,而且癫痫末期的问题。

该开的时候不开,该关的时候不关
不是半夜把我冻醒,就是热的我一身大汗。
在网上多番求助,看到有些人说,买智能网关就能解决。
也看到有些人分享购买使用后,家里的智能家居终于不抽风了,直接抽搐了。

在肉体和灵魂的双重折磨下,我想起了我那纯真而而善良的树莓派
它还是那么的明媚善良,不像雷总套路深,半夜折磨人。
所以我们要解决的问题如下:
1.通过蓝牙读取温度传感器数据。
2.读取智能插座开关状态。
3.判断是否需要进行开关操作,如需操作,则操作。

首先,第一关:
读取蓝牙传感器数据。通过多次熬夜尝试,我发现了问题的根源,为了节约能耗,蓝牙传感器不是一直开启的状态,什么时候开启,这个规则,网上也是众说纷纭,就是因为这个原因,导致整个智能策略抽风。
路走死了,但是幸好,我们第一课讲解了树莓派直接连接外置温度传感器,稳定而准确。https://www.52pojie.cn/thread-902898-1-1.html

第二关:
读取智能插座的状态,这时候,我们就得提及一个通用协议了
miot协议
简而言之,言而简之,就是一个控制智能家居的协议,唯一的问题是怎么安装客户端和使用。
安装客户端:
pip3 install python_miio
简单高效
然后,测试使用,三个接口模板如下
直接cmd命令行输入如下命令

1.查询智能插座当前状态:
miiocli device --ip "智能插座IP" --token "智能插座token" raw_command get_properties "[{'did': 'MYDID', 'siid': 2, 'piid': 1 }]"
返回如下:
待补充
2.打开智能插座
miiocli device --ip "智能插座IP" --token "智能插座token" raw_command set_properties "[{'did': 'MYDID', 'siid': 2, 'piid': 1, 'value':True}]"
返回内容:
待补充
3.关闭智能插座
miiocli device --ip "智能插座IP" --token "智能插座token" raw_command set_properties "[{'did': 'MYDID', 'siid': 2, 'piid': 1, 'value':False}]"
返回如下:
待补充
大家可以看到,这三个命令就可以操控我们的智能插座。

列位看官可能要问了:
IP我好搞,家里局域网吗,从路由器查下就行,你token是个啥东东。
通过,几天的坚持寻找,终于找到一个大佬发的文章
https://github.com/PiotrMachowski/Xiaomi-cloud-tokens-extractor
其中的py脚本,输入米家账户密码,可以爬出家里所有智能家居的ip和token

特别注意:该脚本非被人开发维护,本人进行初步代审以后,未发现明显后门或窃取账密的行为,但是,因为工具不是自己开发的,而且该工具一直在更新迭代,无法保证绝对安全,请使用者做好自行甄别。

由此,我们获得了关键信息,就可以写程序了。
逻辑如下:


完整代码如下:
'''
token获取:https://github.com/PiotrMachowski/Xiaomi-cloud-tokens-extractor
依赖库安装:pip3 install python_miio
'''
import os,re,time,json,logging

#设置打开开关的最低温度
open_temperature_min=19.0
#设置打开开关的最高温度
open_temperature_max=22.0

#设置打开开关时间
start_time1="21:30"
#设置关闭开关时间
end_time1="23:59"
#设置第二组打开开关时间
start_time2="0:0"
#设置第二组,关闭开关时间
end_time2="7:0"

#指定温度文件路径
temperature_file_path="/sys/bus/w1/devices/28-011432931aaf/w1_slave"
#指定智能插座IP
socket_ip="192.168.0.100"
#指定智能插座token
socket_token="b000001ac900000000db40000000008"

#初始化温度存储变量
temperature_num=0.0
#设置日志记录相关参数
LOG_FORMAT = "%(asctime)s %(levelname)s %(pathname)s %(message)s "#配置输出日志格式
DATE_FORMAT = '%Y-%m-%d%H:%M:%S %a ' #配置输出时间的格式,注意月份和天数不要搞乱了
logging.basicConfig(level=logging.DEBUG,
                  format=LOG_FORMAT,
                  datefmt = DATE_FORMAT ,
                  filename="/var/log/xiaomi.log" #有了filename参数就不会直接输出显示到控制台,而是直接写入文件
                  )

def read_temperature(file_path):
    """
    读取温度文件,并转化为浮点型
    """
    global temperature_num
    try:
      logging.debug("开始读取温度文件")
      file=open(file_path,mode="r")
      val=file.read()
      crc=re.compile('crc=.*? (.*?)\n').findall(val)
      if crc=='YES':
            t=re.compile('t=(.*?)\n').findall(val)
            t=float(t)/1000.0
            temperature_num=t
            logging.debug("温度文件解析成功,温度为:"+str(t))
            return (True,t)
      else:
            #未能读取到温度文件
            logging.warning("温度文件读取失败")
            return False
    except Exception as err:
      logging.error("温度文件读取失败:"+str(err))
      raise err

def str_to_dict(data:str):
    """
    将智能插座返回的数据,解析成字典
    """
    try:
      data=str(data).split("\n")[-2].replace("'",'"').replace(" ","").replace("False","false").replace("True","true")
      data=json.loads(data)
      return data
    except Exception as err:
      logging.error("智能插座返回信息解析失败:"+str(err)+"源数据为:"+str(data))
      raise err
   
def get_socket_status(socket_ip:str,socket_token:str):
    """
    获得智能插座开关状态
    """
    base_order='miiocli device --ip "%s" --token "%s" raw_command get_properties "[{\'did\': \'MYDID\', \'siid\': 2, \'piid\': 1 }]"'
    now_order=base_order%(socket_ip,socket_token)
    try:
      logging.debug("智能插座开始查询状态")
      answer=os.popen(now_order).read()
      answer=str_to_dict(answer)
      return answer["value"]
    except Exception as err:
      logging.error("请求智能插座状态出现异常,报错信息为:"+str(err))
      raise err

def close_socket(socket_ip:str,socket_token:str):
    """
    关闭智能插座
    """
    base_order='miiocli device --ip "%s" --token "%s" raw_command set_properties "[{\'did\': \'MYDID\', \'siid\': 2, \'piid\': 1, \'value\':False}]"'
    now_order=base_order%(socket_ip,socket_token)
    try:
      logging.debug("开始关闭智能插座")
      answer=os.popen(now_order).read()
      answer=str_to_dict(answer)
      answer=get_socket_status(socket_ip=socket_ip,socket_token=socket_token)
      if answer==False:
            logging.debug("关闭智能插座成功")
            return True
      else:
            logging.warning("关闭智能插座失败")
            return False
    except Exception as err:
      logging.error("关闭智能插座发生报错,内容为:"+str(err))
      return False

def open_socket(socket_ip:str,socket_token:str):
    """
    打开智能插座
    """
    base_order='miiocli device --ip "%s" --token "%s" raw_command set_properties "[{\'did\': \'MYDID\', \'siid\': 2, \'piid\': 1, \'value\':True}]"'
    now_order=base_order%(socket_ip,socket_token)
    try:
      logging.debug("开始打开智能插座")
      answer=os.popen(now_order).read()
      answer=str_to_dict(answer)
      answer=get_socket_status(socket_ip=socket_ip,socket_token=socket_token)
      if answer==True:
            logging.debug("智能插座打开成功")
            return True
      else:
            logging.warning("智能插座打开失败")
            return False
    except Exception as err:
      logging.error("智能插座打开时发生异常,报错为:"+str(err))
      return False

def temperature_judge(open_min:float,open_max:float,temperature_file_path:str):
    """
    判断插座是否需要打开
    温度高于open_max,则返回False
    温度低于open_min,则返回True
    温度在open_min和open_max之间,则返回-1
    """
    now_temperature=read_temperature(file_path=temperature_file_path)
    if now_temperature==False:
      exit()
    else:
      now_temperature=now_temperature[-1]
    if now_temperature<open_min:
      logging.debug("当前温度为%s,最低温度为:%s,程序判定打开开关"%(str(now_temperature),str(open_min)))
      return True
    if now_temperature>open_max:
      logging.debug("当前温度为%s,最低温度为:%s,程序判定关闭开关"%(str(now_temperature),str(open_min)))
      return False
    if now_temperature>=open_min and now_temperature<=open_max:
      logging.debug("当前温度为%s,最低温度为:%s,程序判定无需操作"%(str(now_temperature),str(open_min)))
      return -1

def change_time_to_dict(time_data:str):
    """
    将时间解析为字典
    """
    time_data=str(time_data).replace(" ","").split(":")
    return {
      "hour":int(time_data),
      "min":int(time_data[-1])
    }.copy()

def time_judge(start_time:str,end_time:str):
    """
    判断时间是否在区间内,如否,返回false,如是,返回true
    """
    localtime = time.localtime(time.time())
    now_hour=localtime.tm_hour
    now_min=localtime.tm_min
    start_time=change_time_to_dict(start_time)
    end_time=change_time_to_dict(end_time)
    if now_hour > start_time["hour"]:
      if now_hour<end_time["hour"]:
            return True
      elif now_hour==end_time["hour"]:
            if now_min <= end_time["min"]:
                return True
    elif now_hour==start_time["hour"]:
      if now_min>= start_time["min"]:
            if now_hour<end_time["hour"]:
                return True
            elif now_hour==end_time["hour"]:
                if now_min <= end_time["min"]:
                  return True
    return False

def main():
    """
    运行的主程序
    """
    global temperature_num
    logging.debug("程序启动")
    try:
      check_time1=time_judge(start_time1,end_time1)
      check_time2=time_judge(start_time2,end_time2)
      if check_time1==True or check_time2==True:
            logging.debug("程序判断在任务执行时间段中")
            temperature=temperature_judge(open_min=open_temperature_min,open_max=open_temperature_max,temperature_file_path=temperature_file_path)
            
            if temperature==-1:
                pass
            else:
                socket_status=get_socket_status(socket_ip=socket_ip,socket_token=socket_token)
                if socket_status!=temperature:
                  if temperature:
                        open_Answer=open_socket(socket_ip=socket_ip,socket_token=socket_token)
                        if open_Answer:
                            print("开关打开操作成功")
                        else:
                            print("开关打开操作失败")
                  else:
                        close_Answer=close_socket(socket_ip=socket_ip,socket_token=socket_token)
                        if close_Answer:
                            print("开关关闭操作成功")
                        else:
                            print("开关关闭操作失败")
    except Exception as err:
      logging.error("主程序报错,报错内容为:"+str(err))
    finally:
      time.sleep(10)

main()

使用感受:
安然入睡----啪,开关闭合-----睡着了-----啪,开关关闭-----又睡着了-----以此类推………………
噪音如斯,唉,拿什么拯救你,我的雷军。

冥月影 发表于 2022-3-10 21:26

路过,看看大佬{:301_998:};
我也有个小米的智能插座,本想着用来做定时开关热水器的,可惜功率不够,现在被我拿来当作定时开关用了(每天睡觉前手动打开倒计时充电30分钟。。。)

dingqh 发表于 2022-3-10 18:15

挺有趣的。。。

就是那个开关开启和关闭的时候会 啪响一下。真的难受。。

感觉如果买了小米的电暖器和加湿器,会简单实现哦

天域至尊 发表于 2022-3-10 18:02

手滑,发表了,返回内容还没贴上去,不过无关大雅,今晚我补充上去

ken1994 发表于 2022-3-11 08:42

谢谢分享学习,树莓派不再吃灰

DEC.zh 发表于 2022-3-11 09:35

前排学习又想折腾了

renyuankun 发表于 2022-3-11 10:40

学习了,家里有同款插座,回家也研究下试试

CTHJ6990 发表于 2022-3-11 11:48

可以,学到了

lllcrhlll 发表于 2022-8-10 16:05

666666666

wkdxz 发表于 2022-8-10 21:00

收藏备用,谢谢分享!!
页: [1] 2
查看完整版本: 树莓派第五课之自制小米家庭真智能网关