[Python] 纯文本查看 复制代码
# coding: utf-8
import requests
import json
import datetime
import logging
import argparse
import os
import urllib.request
import pathlib
import shutil
from minio import Minio
import mimetypes
from minio.error import ResponseError
import time
log_file = '360.log'
request_params = 'qid=*****&sid=*****&from=mpc_ipcam_wechatmp'
if os.path.exists(log_file):
os.remove(log_file)
logging.basicConfig(filename=log_file, level=logging.DEBUG, format='%(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p')
logger = logging.getLogger("360可视门铃")
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
class IPC360:
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/7.0.11(0x17000b21) NetType/WIFI Language/zh_CN',
'Content-Type': 'application/x-www-form-urlencoded',
"Referer": 'https://servicewechat.com/wx5f78be8ca091436c/11/page-frame.html'
}
def build_url(self, action):
return "https://q3.jia.360.cn/" + action + "?" + request_params
def post_request(self, url, data):
logger.info('请求地址 %s ...', url)
res = requests.post(url=url, data=data, headers=self.headers)
res = json.loads(res.text)
return res
def ipc_list(self):
data = {
'version': 3
}
return self.post_request(self.build_url('app/getIpcList'), data)
def index(self):
data = {
'days': 1
}
return self.post_request(self.build_url('event/getIndex'), data)
def list_by_sn(self, sn, ipc_type=5):
today = datetime.date.today()
data = {
'sn': sn,
'ipcType': ipc_type,
'humanCount': 0,
'date': today.strftime("%Y%m%d"),
'count': 100,
'eventTypes': '[304,305,306,307]'
}
return self.post_request(self.build_url('event/getListBySn'), data)
parser = argparse.ArgumentParser(description='Start Args')
parser.add_argument('--download', type=bool, default=True)
args = parser.parse_args()
path = os.getcwd() + "/download/"
if not os.path.exists(path):
os.makedirs(path)
if args.download:
upload()
logger.info('延时3秒')
time.sleep(3)
ipc = IPC360()
ipc_list = ipc.ipc_list()
errorCode = ipc_list["errorCode"]
logger.info('获取摄像头信息,返回代码:%d', errorCode)
if errorCode == 0:
for device in ipc_list["devices"]:
logger.info('正在检查设备 SN:%s(%s) ...', device['sn'], device['title'])
sn_path = path + device["sn"] + "/"
if not os.path.exists(sn_path):
os.makedirs(sn_path)
index = 0
for event in device["eventList"]:
video_file = sn_path + str(event["eventTime"])
video_url = event["videoUrl"] + '&' + request_params
thumb_url = event["thumbUrl"] + '&' + request_params
thumb_file = video_file + ".jpg"
tmp_file = video_file + '.mpeg'
if not os.path.exists(sn_path + 'latest.mpeg'):
logger.info('最新视频不存在,下载中... %s', sn_path + 'latest.mpeg')
urllib.request.urlretrieve(video_url, sn_path + 'latest.mpeg')
if os.path.exists(video_file):
logger.info('视频记录 %s 已上传,跳过...', tmp_file)
continue
urllib.request.urlretrieve(video_url, tmp_file)
if not os.path.exists(tmp_file):
logger.info("文件 %s 下载失败 %s", video_url, tmp_file)
continue
logger.info('视频记录 %s 已保存到:%s .', str(event["eventTime"]), tmp_file)
if os.path.getsize(tmp_file) == 0:
logger.info('视频文件尺寸不正确,暂时跳过该文件: %s', video_url)
continue
urllib.request.urlretrieve(thumb_url, thumb_file)
if index == 0:
shutil.copy(tmp_file, sn_path + 'latest.mpeg')
shutil.copy(thumb_file, sn_path + 'latest.jpg')
pathlib.Path(video_file).touch()
index += 1
logger.info('完成...')