本帖最后由 笔墨纸砚 于 2021-5-22 23:07 编辑
“ 一条大河波浪宽,风吹稻花香两岸 … ”
布置到腾讯SCF :https://dongharry.lanzoui.com/ixsjApd2c7e
最近地震频发,做了一个小爬虫,进行企业推送。
[Python] 纯文本查看 复制代码 import requests
import pandas as pd
import pytz
import time
import datetime
import json
# 文件配置
corpid = ''
agentid = ''
corpsecret = ''
pushusr = '@all' # 企业微信推送用户,默认'@all'为应用全体用户
wxpusher_type = 1
img_url = 'https://s3.ax1x.com/2021/01/23/s7GOTP.png' # 微信图文消息提醒图片地址
# 时间戳的问题
tz = pytz.timezone('Asia/Shanghai')
c_time = datetime.datetime.now(tz)
nowtime = str(datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")).replace('-', '/')
new_time = str(time.mktime(time.strptime(nowtime, '%Y/%m/%d %H:%M:%S')) + int(2) * 60)[:10]
# @print(nowtime, new_time)
class WXPusher:
def __init__(self, usr=None, desp=None):
self.base_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?'
self.req_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token='
self.corpid = corpid # 填写企业ID
self.corpsecret = corpsecret # 应用Secret
self.agentid = int(agentid) # 填写应用ID,是个整型常数,就是应用AgentId
if usr is None:
usr = '@all'
self.usr = usr
self.msg = desp
def get_access_token(self):
urls = self.base_url + 'corpid=' + self.corpid + '&corpsecret=' + self.corpsecret
resp = requests.get(urls).json()
access_token = resp['access_token']
return access_token
def send_message(self):
data = self.get_message()
req_urls = self.req_url + self.get_access_token()
res = requests.post(url=req_urls, data=data)
print(res.text)
def get_message(self):
data = {
"touser": self.usr,
"toparty": "@all",
"totag": "@all",
"msgtype": "text",
"agentid": self.agentid,
"text": {
"content": self.msg
},
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0,
"duplicate_check_interval": 1800
}
data = json.dumps(data)
return data
def obtain_msg():
url = "http://news.ceic.ac.cn/"
df = pd.read_html(url)[0]
# print(df)
dict_earthquake = df.to_dict('records')
# print(dict_earthquake)
msg = ''
for i in dict_earthquake:
start_time = i['发震时刻(UTC+8)']
timeArray = time.strptime(start_time, "%Y-%m-%d %H:%M:%S")
# 转换为时间戳
start_timeStamp = int(time.mktime(timeArray))
if start_timeStamp > int(int(new_time) - 100):
# print('刚刚:', i)
msg += '发震时刻(UTC+8)' + str(i['发震时刻(UTC+8)']) + '\n震级(M):' + str(i['震级(M)']) + '\n纬度(°):' + str(
i['纬度(°)']) + '\n经度(°):' + str(
i['经度(°)']) + '\n深度(千米):' + \
str(i['深度(千米)']) + '\n参考位置' + str(i['参考位置']) + '\n==========\n'
if msg:
desp = msg
push = WXPusher(pushusr, desp)
push.send_message()
else:
print(msg)
def main_handler(event, context):
return obtain_msg()
if __name__ == '__main__':
obtain_msg()
有求就要有应
[Python] 纯文本查看 复制代码 import requests
import pandas as pd
import pytz
import time
import datetime
import json
# 文件配置
corpid = ''
agentid = ''
corpsecret = ''
pushusr = '@all' # 企业微信推送用户,默认'@all'为应用全体用户 pip install -v pandas==1.0.5 --target=D:\Users\HarryDong\Desktop\1
wxpusher_type = 1
img_url = 'https://s3.ax1x.com/2021/01/23/s7GOTP.png' # 微信图文消息提醒图片地址
location = '云南'
# 时间戳的问题
tz = pytz.timezone('Asia/Shanghai')
c_time = datetime.datetime.now(tz)
nowtime = str(datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")).replace('-', '/')
new_time = str(time.mktime(time.strptime(nowtime, '%Y/%m/%d %H:%M:%S')) + int(2) * 60)[:10]
# @print(nowtime, new_time)
class WXPusher:
def __init__(self, usr=None, desp=None):
self.base_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?'
self.req_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token='
self.corpid = corpid # 填写企业ID
self.corpsecret = corpsecret # 应用Secret
self.agentid = int(agentid) # 填写应用ID,是个整型常数,就是应用AgentId
if usr is None:
usr = '@all'
self.usr = usr
self.msg = desp
def get_access_token(self):
urls = self.base_url + 'corpid=' + self.corpid + '&corpsecret=' + self.corpsecret
resp = requests.get(urls).json()
access_token = resp['access_token']
return access_token
def send_message(self):
data = self.get_message()
req_urls = self.req_url + self.get_access_token()
res = requests.post(url=req_urls, data=data)
print(res.text)
def get_message(self):
data = {
"touser": self.usr,
"toparty": "@all",
"totag": "@all",
"msgtype": "text",
"agentid": self.agentid,
"text": {
"content": self.msg
},
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0,
"duplicate_check_interval": 1800
}
data = json.dumps(data)
return data
def obtain_msg():
url = "http://news.ceic.ac.cn/"
df = pd.read_html(url)[0]
# print(df)
# print(df[df['参考位置'].str.contains('云南')])
df = df[df['参考位置'].str.contains(location) & ((df['震级(M)'] >= 4) | (df['深度(千米)'] <= 60))]
dict_earthquake = df.to_dict('records')
# print(dict_earthquake)
msg = ''
for i in dict_earthquake:
start_time = i['发震时刻(UTC+8)']
timeArray = time.strptime(start_time, "%Y-%m-%d %H:%M:%S")
# 转换为时间戳
start_timeStamp = int(time.mktime(timeArray))
if start_timeStamp > int(int(new_time) - 100):
# print('刚刚:', i)
msg += '发震时刻(UTC+8)' + str(i['发震时刻(UTC+8)']) + '\n震级(M):' + str(i['震级(M)']) + '\n纬度(°):' + str(
i['纬度(°)']) + '\n经度(°):' + str(
i['经度(°)']) + '\n深度(千米):' + \
str(i['深度(千米)']) + '\n参考位置' + str(i['参考位置']) + '\n==========\n'
if msg:
desp = msg
push = WXPusher(pushusr, desp)
push.send_message()
else:
print(msg)
def main_handler(event, context):
return obtain_msg()
if __name__ == '__main__':
obtain_msg()
|