笔墨纸砚 发表于 2021-5-22 20:00

地震内容的推送

本帖最后由 笔墨纸砚 于 2021-5-22 23:07 编辑

“一条大河波浪宽,风吹稻花香两岸 …”

布置到腾讯SCF :https://dongharry.lanzoui.com/ixsjApd2c7e
最近地震频发,做了一个小爬虫,进行企业推送。
import requests
import pandas as pd
import pytz
import time
import datetime
import json

# 文件配置
corpid = ''
agentid = ''
corpsecret = ''
pushusr = '@all'# 企业微信推送用户,默认'@all'为应用全体用户
wxpusher_type = 1
img_url = 'https://s3.ax1x.com/2021/01/23/s7GOTP.png'# 微信图文消息提醒图片地址

# 时间戳的问题
tz = pytz.timezone('Asia/Shanghai')
c_time = datetime.datetime.now(tz)
nowtime = str(datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")).replace('-', '/')
new_time = str(time.mktime(time.strptime(nowtime, '%Y/%m/%d %H:%M:%S')) + int(2) * 60)[:10]


# @print(nowtime, new_time)
class WXPusher:
    def __init__(self, usr=None, desp=None):
      self.base_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?'
      self.req_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token='
      self.corpid = corpid# 填写企业ID
      self.corpsecret = corpsecret# 应用Secret
      self.agentid = int(agentid)# 填写应用ID,是个整型常数,就是应用AgentId
      if usr is None:
            usr = '@all'
      self.usr = usr
      self.msg = desp

    def get_access_token(self):
      urls = self.base_url + 'corpid=' + self.corpid + '&corpsecret=' + self.corpsecret
      resp = requests.get(urls).json()
      access_token = resp['access_token']
      return access_token

    def send_message(self):
      data = self.get_message()
      req_urls = self.req_url + self.get_access_token()
      res = requests.post(url=req_urls, data=data)
      print(res.text)

    def get_message(self):
      data = {
            "touser": self.usr,
            "toparty": "@all",
            "totag": "@all",
            "msgtype": "text",
            "agentid": self.agentid,
            "text": {
                "content": self.msg
            },
            "safe": 0,
            "enable_id_trans": 0,
            "enable_duplicate_check": 0,
            "duplicate_check_interval": 1800
      }
      data = json.dumps(data)
      return data


def obtain_msg():
    url = "http://news.ceic.ac.cn/"

    df = pd.read_html(url)
    # print(df)
    dict_earthquake = df.to_dict('records')
    # print(dict_earthquake)
    msg = ''
    for i in dict_earthquake:
      start_time = i['发震时刻(UTC+8)']
      timeArray = time.strptime(start_time, "%Y-%m-%d %H:%M:%S")
      # 转换为时间戳
      start_timeStamp = int(time.mktime(timeArray))
      if start_timeStamp > int(int(new_time) - 100):
            # print('刚刚:', i)
            msg += '发震时刻(UTC+8)' + str(i['发震时刻(UTC+8)']) + '\n震级(M):' + str(i['震级(M)']) + '\n纬度(°):' + str(
                i['纬度(°)']) + '\n经度(°):' + str(
                i['经度(°)']) + '\n深度(千米):' + \
                  str(i['深度(千米)']) + '\n参考位置' + str(i['参考位置']) + '\n==========\n'
            
    if msg:
      desp = msg
      push = WXPusher(pushusr, desp)
      push.send_message()
    else:
      print(msg)



def main_handler(event, context):
    return obtain_msg()


if __name__ == '__main__':
    obtain_msg()


有求就要有应
import requests
import pandas as pd
import pytz
import time
import datetime
import json

# 文件配置
corpid = ''
agentid = ''
corpsecret = ''
pushusr = '@all'# 企业微信推送用户,默认'@all'为应用全体用户pip install -v pandas==1.0.5 --target=D:\Users\HarryDong\Desktop\1
wxpusher_type = 1
img_url = 'https://s3.ax1x.com/2021/01/23/s7GOTP.png'# 微信图文消息提醒图片地址
location = '云南'


# 时间戳的问题
tz = pytz.timezone('Asia/Shanghai')
c_time = datetime.datetime.now(tz)
nowtime = str(datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")).replace('-', '/')
new_time = str(time.mktime(time.strptime(nowtime, '%Y/%m/%d %H:%M:%S')) + int(2) * 60)[:10]


# @print(nowtime, new_time)
class WXPusher:
    def __init__(self, usr=None, desp=None):
      self.base_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?'
      self.req_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token='
      self.corpid = corpid# 填写企业ID
      self.corpsecret = corpsecret# 应用Secret
      self.agentid = int(agentid)# 填写应用ID,是个整型常数,就是应用AgentId
      if usr is None:
            usr = '@all'
      self.usr = usr
      self.msg = desp

    def get_access_token(self):
      urls = self.base_url + 'corpid=' + self.corpid + '&corpsecret=' + self.corpsecret
      resp = requests.get(urls).json()
      access_token = resp['access_token']
      return access_token

    def send_message(self):
      data = self.get_message()
      req_urls = self.req_url + self.get_access_token()
      res = requests.post(url=req_urls, data=data)
      print(res.text)

    def get_message(self):
      data = {
            "touser": self.usr,
            "toparty": "@all",
            "totag": "@all",
            "msgtype": "text",
            "agentid": self.agentid,
            "text": {
                "content": self.msg
            },
            "safe": 0,
            "enable_id_trans": 0,
            "enable_duplicate_check": 0,
            "duplicate_check_interval": 1800
      }
      data = json.dumps(data)
      return data


def obtain_msg():
    url = "http://news.ceic.ac.cn/"
    df = pd.read_html(url)
    # print(df)
    # print(df.str.contains('云南')])
    df =df.str.contains(location) & ((df['震级(M)'] >= 4) | (df['深度(千米)'] <= 60))]
    dict_earthquake = df.to_dict('records')
    # print(dict_earthquake)
    msg = ''
    for i in dict_earthquake:
      start_time = i['发震时刻(UTC+8)']
      timeArray = time.strptime(start_time, "%Y-%m-%d %H:%M:%S")
      # 转换为时间戳
      start_timeStamp = int(time.mktime(timeArray))
      if start_timeStamp > int(int(new_time) - 100):
            # print('刚刚:', i)
            msg += '发震时刻(UTC+8)' + str(i['发震时刻(UTC+8)']) + '\n震级(M):' + str(i['震级(M)']) + '\n纬度(°):' + str(
                i['纬度(°)']) + '\n经度(°):' + str(
                i['经度(°)']) + '\n深度(千米):' + \
                  str(i['深度(千米)']) + '\n参考位置' + str(i['参考位置']) + '\n==========\n'
            
    if msg:
      desp = msg
      push = WXPusher(pushusr, desp)
      push.send_message()
    else:
      print(msg)


def main_handler(event, context):
    return obtain_msg()


if __name__ == '__main__':
    obtain_msg()


a76yyyy 发表于 2021-5-22 21:43

多个地址可以按照以下格式修改location
location = '重庆|河南|山西|湖北|安徽|江西|广西|陕西|湖南'

加奈绘 发表于 2021-5-25 20:39

笔墨纸砚 发表于 2021-5-22 20:12
可以啊   没有问题 但是我觉得还是中国本土的更准确把?

因为做地震分析时下载数据用的是jweed(因为是英文的)的,想有没有更方便的程序

加奈绘 发表于 2021-5-22 20:05

可以做爬取iris地震台站数据的吗

笔墨纸砚 发表于 2021-5-22 20:06

加奈绘 发表于 2021-5-22 20:05
可以做爬取iris地震台站数据的吗

啥 你发我网站 我看看

笔墨纸砚 发表于 2021-5-22 20:12

加奈绘 发表于 2021-5-22 20:05
可以做爬取iris地震台站数据的吗

可以啊   没有问题 但是我觉得还是中国本土的更准确把?

lg560852 发表于 2021-5-22 20:36

能通过企业微信给微信推送吗?
类似wxpusher和Server酱,想搭一个自己的

笔墨纸砚 发表于 2021-5-22 20:43

lg560852 发表于 2021-5-22 20:36
能通过企业微信给微信推送吗?
类似wxpusher和Server酱,想搭一个自己的

这 就是企业微信啊0.0

heykuz 发表于 2021-5-22 21:01

应该推送,感觉不错的,

8204118 发表于 2021-5-22 21:13

你这个比专家预测的准吗

lg560852 发表于 2021-5-22 21:56

笔墨纸砚 发表于 2021-5-22 20:43
这 就是企业微信啊0.0

企业微信好像只能发给企业微信群
想弄个直接通过企业微信账号推送给普通微信的
页: [1] 2 3 4
查看完整版本: 地震内容的推送