import
re
import
socket
import
threading
import
time
from
datetime
import
datetime
import
winrm
import
requests
from
pythonping
import
ping
from
wakeonlan
import
send_magic_packet
class
PCpower():
def
__init__(
self
, uid:
str
, topic:
str
, pc_mac:
str
, local_ip:
str
, use_ping:
bool
=
False
, use_shutdown:
bool
=
False
, pc_ip:
str
=
'
', pc_account:str='
', pc_password:str='
', shutdown_time:int=0, use_write_log:bool=False, log_path:str='
', use_send_message:bool=False, url_send_message:str='
', channel_send_message:str='
'):
self
.__uid
=
uid
self
.__topic
=
topic
self
.__pc_mac
=
pc_mac
self
.__local_ip
=
local_ip
if
not
local_ip
=
=
'auto'
else
self
.get_ip_address()
self
.__use_ping
=
use_ping
self
.__use_shutdown
=
use_shutdown
self
.__pc_ip
=
pc_ip
self
.__pc_account
=
pc_account
self
.__pc_password
=
pc_password
self
.__shutdown_time
=
shutdown_time
self
.__use_write_log
=
use_write_log
self
.__log_path
=
log_path
self
.__use_send_message
=
use_send_message
self
.__url_send_message
=
url_send_message
self
.__channel_send_message
=
channel_send_message
self
.__pc_state
=
None
self
.__t_check
=
None
self
.__check_correct(
self
.__uid,
'uid'
)
self
.__check_correct(
self
.__topic,
'topic'
)
self
.__check_correct(
self
.__pc_mac,
'pc_mac'
, is_mac
=
True
)
self
.__check_correct(
self
.__local_ip,
'local_ip'
, is_ip
=
True
)
self
.__check_correct(
self
.__use_ping,
'use_ping'
, is_bool
=
True
)
self
.__check_correct(
self
.__use_shutdown,
'use_shutdown'
, is_bool
=
True
)
self
.__check_correct(
self
.__use_write_log,
'use_write_log'
, is_bool
=
True
)
self
.__check_correct(
self
.__use_send_message,
'use_send_message'
, is_bool
=
True
)
if
self
.__use_ping
or
self
.__use_shutdown:
self
.__check_correct(
self
.__pc_ip,
'pc_ip'
, is_ip
=
True
)
if
self
.__use_shutdown:
self
.__check_correct(
self
.__pc_account,
'pc_account'
)
self
.__check_correct(
self
.__pc_password,
'pc_password'
)
self
.__check_correct(
self
.__shutdown_time,
'shutdown_time'
, is_int
=
True
)
if
self
.__use_write_log:
self
.__check_correct(
self
.__log_path,
'log_path'
)
if
self
.__use_send_message:
self
.__check_correct(
self
.__url_send_message,
'url_send_message'
)
self
.__check_correct(
self
.__channel_send_message,
'channel_send_message'
)
print
(
"初始化成功"
)
self
.write_log(
"初始化成功"
)
if
local_ip
=
=
'auto'
:
print
(f
"自动获取本机局域网ip地址:{self.__local_ip}"
)
self
.write_log(f
"自动获取本机局域网ip地址:{self.__local_ip}"
)
def
__check_correct(
self
, self_var, var_name, is_mac
=
False
, is_ip
=
False
, is_bool
=
False
, is_int
=
False
):
if
is_mac:
if
not
isinstance
(self_var,
str
)
or
not
self
.check_mac_address(self_var):
raise
ValueError(f
"{var_name}必须是有效的MAC地址,当前为:{self_var}"
)
elif
is_ip:
if
not
isinstance
(self_var,
str
)
or
not
self
.check_ip_address(self_var):
raise
ValueError(f
"{var_name}必须是有效的IP地址,当前为:{self_var}"
)
elif
is_bool:
if
not
isinstance
(self_var,
bool
):
raise
TypeError(f
"{var_name}必须是True或False,当前为:{self_var},类型:{type(self_var)}"
)
elif
is_int:
if
not
isinstance
(self_var,
int
)
or
self_var <
0
:
raise
ValueError(f
"{var_name}必须是大于等于0的整数,当前为:{self_var},类型:{type(self_var)}"
)
else
:
if
not
isinstance
(self_var,
str
)
or
not
self_var.strip():
raise
ValueError(f
"{var_name}必须是非空字符串"
)
@staticmethod
def
check_mac_address(mac):
pattern
=
r
'^(([0-9A-Fa-f]{2}:){5}([0-9A-Fa-f]{2})|([0-9A-Fa-f]{2}-){5}([0-9A-Fa-f]{2}))$'
return
bool
(re.match(pattern, mac))
@staticmethod
def
check_ip_address(ip):
pattern
=
r
'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
return
bool
(re.match(pattern, ip))
@classmethod
def
get_time(
cls
):
time_now
=
datetime.now().strftime(
'%Y-%m-%d %H:%M:%S'
)
return
time_now
@classmethod
def
get_ip_address(
cls
):
try
:
s
=
socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect((
"192.168.255.255"
,
80
))
ip_address
=
s.getsockname()[
0
]
s.close()
return
ip_address
except
socket.error:
return
"无法获取IP地址"
def
__connTCP(
self
):
self
.__tcp_client_socket
=
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_ip
=
'bemfa.com'
server_port
=
8344
try
:
self
.__tcp_client_socket.connect((server_ip, server_port))
substr
=
f
'cmd=1&uid={self.__uid}&topic={self.__topic}\r\n'
self
.__tcp_client_socket.send(substr.encode(
'utf-8'
))
self
.write_log(
"订阅成功"
)
except
:
self
.write_log(
"订阅失败,正在重试"
)
time.sleep(
2
)
self
.__connTCP()
def
__Ping(
self
):
try
:
keeplive
=
'ping\r\n'
self
.__tcp_client_socket.send(keeplive.encode(
'utf-8'
))
self
.write_log(
"已发送心跳"
)
except
:
time.sleep(
2
)
self
.write_log(
"发送心跳出现问题,重新订阅"
)
self
.__connTCP()
t
=
threading.Timer(
60
,
self
.__Ping)
t.start()
def
__check_pc_state(
self
, is_power_off):
if
self
.__use_ping:
ping_result
=
str
(ping(
self
.__pc_ip, timeout
=
1
, count
=
1
))
self
.write_log(
'\n'
+
ping_result
+
'\n'
+
'---------------------'
,
'_ping_result'
)
if
'Reply'
in
ping_result:
is_power_off
=
False
new_state
=
"电脑已开机"
message
=
"开机"
update_state
=
"on"
elif
'timed out'
in
ping_result:
if
not
is_power_off:
self
.__check_pc_state(
True
)
return
new_state
=
"电脑已关机"
message
=
"关机"
update_state
=
"off"
if
self
.__pc_state !
=
new_state:
self
.__pc_state
=
new_state
substr
=
f
'cmd=2&uid={self.__uid}&topic={self.__topic}/up&msg={update_state}\r\n'
self
.__tcp_client_socket.send(substr.encode(
"utf-8"
))
self
.write_log(f
"电脑状态更新为:{message}"
)
self
.send_message(f
"电脑状态更新为:{message}"
)
self
.__t_check
=
threading.Timer(
120
,
self
.__check_pc_state, args
=
(is_power_off,))
self
.__t_check.start()
def
__restart_pc_check(
self
, value):
if
self
.__use_ping:
try
:
self
.__t_check.cancel()
self
.__t_check
=
threading.Timer(
120
,
self
.__check_pc_state, args
=
(value,))
self
.__t_check.start()
except
Exception as e:
self
.write_log(
" error: "
+
str
(e))
def
send_message(
self
, content):
if
self
.__use_send_message:
data
=
{
'title'
: content,
'desp'
:
self
.get_time(),
'channel'
:
self
.__channel_send_message
}
try
:
_
=
requests.post(
self
.__url_send_message, data
=
data)
except
Exception as e:
self
.write_log(
"发送消息出错:"
+
str
(e))
def
write_log(
self
, content, nameadd
=
''):
if
self
.__use_write_log:
time_day
=
datetime.now().strftime(
'%Y-%m-%d'
)
try
:
with
open
(rf
'{self.__log_path}/{time_day}{nameadd}.log'
,
'a'
) as
file
:
file
.write(
self
.get_time()
+
' '
+
content
+
'\n'
)
except
Exception as e:
print
(
self
.get_time()
+
" 写入日志出错了:\n"
+
str
(e))
def
wake_on_lan(
self
):
send_magic_packet(
self
.__pc_mac, interface
=
self
.__local_ip)
self
.__pc_state
=
"电脑已开机"
self
.__restart_pc_check(
False
)
self
.write_log(
self
.__pc_state)
self
.send_message(
self
.__pc_state)
def
pc_power_control(
self
, state):
if
state
=
=
'on'
and
(
self
.__use_ping
and
self
.__pc_state !
=
"电脑已开机"
or
not
self
.__use_ping):
self
.wake_on_lan()
elif
state
=
=
'off'
and
(
self
.__use_ping
and
self
.__pc_state !
=
"电脑已关机"
or
not
self
.__use_ping):
self
.__restart_pc_check(
True
)
if
self
.__use_shutdown:
try
:
session
=
winrm.Session(
self
.__pc_ip, auth
=
(
self
.__pc_account,
self
.__pc_password))
_
=
session.run_cmd(f
'shutdown -s -t {self.__shutdown_time}'
)
except
Exception as e:
print
(
self
.get_time()
+
" 电脑可能已经关机,error: "
+
str
(e))
self
.write_log(
str
(e)
+
"电脑可能已经关机"
)
self
.__pc_state
=
"电脑已关机"
self
.write_log(
self
.__pc_state)
self
.send_message(
self
.__pc_state)
def
start(
self
):
self
.__connTCP()
self
.__Ping()
self
.__check_pc_state(
False
)
while
True
:
try
:
__recv_Data
=
self
.__tcp_client_socket.recv(
1024
)
__recv_str
=
str
(__recv_Data.decode(
'utf-8'
))
self
.write_log(__recv_str)
except
Exception as e:
self
.write_log(
"接收消息出错,可能网络断开,error: "
+
str
(e))
time.sleep(
5
)
continue
if
not
__recv_Data:
self
.write_log(
"服务器未返回任何数据,重新订阅"
)
self
.__connTCP()
elif
f
'uid={self.__uid}'
in
__recv_str:
if
'msg=on'
in
__recv_str:
self
.pc_power_control(
'on'
)
elif
'msg=off'
in
__recv_str:
self
.pc_power_control(
'off'
)
if
__name__
=
=
'__main__'
:
c
=
PCpower(
uid
=
'abcdefghijk'
,topic
=
'mypc001'
,pc_mac
=
'11:22:33:44:55:66'
,local_ip
=
'auto'
,use_ping
=
False
,use_shutdown
=
False
,pc_ip
=
'192.168.1.23'
,pc_account
=
'admin'
,pc_password
=
'123456'
,shutdown_time
=
60
,use_write_log
=
False
,log_path
=
r
'/opt/log'
,use_send_message
=
False
,url_send_message
=
r
'https://*******'
,channel_send_message
=
'9'
)
c.start()