吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 4548|回复: 29
收起左侧

[Python 转载] 地震内容的推送

  [复制链接]
笔墨纸砚 发表于 2021-5-22 20:00
本帖最后由 笔墨纸砚 于 2021-5-22 23:07 编辑

“  一条大河波浪宽,风吹稻花香两岸 …  ”

布置到腾讯SCF :https://dongharry.lanzoui.com/ixsjApd2c7e
最近地震频发,做了一个小爬虫,进行企业推送。

[Python] 纯文本查看 复制代码
import requests
import pandas as pd
import pytz
import time
import datetime
import json

# 文件配置
corpid = ''
agentid = ''
corpsecret = ''
pushusr = '@all'  # 企业微信推送用户,默认'@all'为应用全体用户
wxpusher_type = 1
img_url = 'https://s3.ax1x.com/2021/01/23/s7GOTP.png'  # 微信图文消息提醒图片地址

# 时间戳的问题
tz = pytz.timezone('Asia/Shanghai')
c_time = datetime.datetime.now(tz)
nowtime = str(datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")).replace('-', '/')
new_time = str(time.mktime(time.strptime(nowtime, '%Y/%m/%d %H:%M:%S')) + int(2) * 60)[:10]


# @print(nowtime, new_time)
class WXPusher:
    def __init__(self, usr=None, desp=None):
        self.base_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?'
        self.req_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token='
        self.corpid = corpid  # 填写企业ID
        self.corpsecret = corpsecret  # 应用Secret
        self.agentid = int(agentid)  # 填写应用ID,是个整型常数,就是应用AgentId
        if usr is None:
            usr = '@all'
        self.usr = usr
        self.msg = desp

    def get_access_token(self):
        urls = self.base_url + 'corpid=' + self.corpid + '&corpsecret=' + self.corpsecret
        resp = requests.get(urls).json()
        access_token = resp['access_token']
        return access_token

    def send_message(self):
        data = self.get_message()
        req_urls = self.req_url + self.get_access_token()
        res = requests.post(url=req_urls, data=data)
        print(res.text)

    def get_message(self):
        data = {
            "touser": self.usr,
            "toparty": "@all",
            "totag": "@all",
            "msgtype": "text",
            "agentid": self.agentid,
            "text": {
                "content": self.msg
            },
            "safe": 0,
            "enable_id_trans": 0,
            "enable_duplicate_check": 0,
            "duplicate_check_interval": 1800
        }
        data = json.dumps(data)
        return data


def obtain_msg():
    url = "http://news.ceic.ac.cn/"

    df = pd.read_html(url)[0]
    # print(df)
    dict_earthquake = df.to_dict('records')
    # print(dict_earthquake)
    msg = ''
    for i in dict_earthquake:
        start_time = i['发震时刻(UTC+8)']
        timeArray = time.strptime(start_time, "%Y-%m-%d %H:%M:%S")
        # 转换为时间戳
        start_timeStamp = int(time.mktime(timeArray))
        if start_timeStamp > int(int(new_time) - 100):
            # print('刚刚:', i)
            msg += '发震时刻(UTC+8)' + str(i['发震时刻(UTC+8)']) + '\n震级(M):' + str(i['震级(M)']) + '\n纬度(°):' + str(
                i['纬度(°)']) + '\n经度(°):' + str(
                i['经度(°)']) + '\n深度(千米):' + \
                  str(i['深度(千米)']) + '\n参考位置' + str(i['参考位置']) + '\n==========\n'
            
    if msg:
        desp = msg
        push = WXPusher(pushusr, desp)
        push.send_message()
    else:
        print(msg)



def main_handler(event, context):
    return obtain_msg()


if __name__ == '__main__':
    obtain_msg()



有求就要有应
[Python] 纯文本查看 复制代码
import requests
import pandas as pd
import pytz
import time
import datetime
import json

# 文件配置
corpid = ''
agentid = ''
corpsecret = ''
pushusr = '@all'  # 企业微信推送用户,默认'@all'为应用全体用户  pip install -v pandas==1.0.5 --target=D:\Users\HarryDong\Desktop\1
wxpusher_type = 1
img_url = 'https://s3.ax1x.com/2021/01/23/s7GOTP.png'  # 微信图文消息提醒图片地址
location = '云南'


# 时间戳的问题
tz = pytz.timezone('Asia/Shanghai')
c_time = datetime.datetime.now(tz)
nowtime = str(datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")).replace('-', '/')
new_time = str(time.mktime(time.strptime(nowtime, '%Y/%m/%d %H:%M:%S')) + int(2) * 60)[:10]


# @print(nowtime, new_time)
class WXPusher:
    def __init__(self, usr=None, desp=None):
        self.base_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?'
        self.req_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token='
        self.corpid = corpid  # 填写企业ID
        self.corpsecret = corpsecret  # 应用Secret
        self.agentid = int(agentid)  # 填写应用ID,是个整型常数,就是应用AgentId
        if usr is None:
            usr = '@all'
        self.usr = usr
        self.msg = desp

    def get_access_token(self):
        urls = self.base_url + 'corpid=' + self.corpid + '&corpsecret=' + self.corpsecret
        resp = requests.get(urls).json()
        access_token = resp['access_token']
        return access_token

    def send_message(self):
        data = self.get_message()
        req_urls = self.req_url + self.get_access_token()
        res = requests.post(url=req_urls, data=data)
        print(res.text)

    def get_message(self):
        data = {
            "touser": self.usr,
            "toparty": "@all",
            "totag": "@all",
            "msgtype": "text",
            "agentid": self.agentid,
            "text": {
                "content": self.msg
            },
            "safe": 0,
            "enable_id_trans": 0,
            "enable_duplicate_check": 0,
            "duplicate_check_interval": 1800
        }
        data = json.dumps(data)
        return data


def obtain_msg():
    url = "http://news.ceic.ac.cn/"
    df = pd.read_html(url)[0]
    # print(df)
    # print(df[df['参考位置'].str.contains('云南')])
    df =  df[df['参考位置'].str.contains(location) & ((df['震级(M)'] >= 4) | (df['深度(千米)'] <= 60))]
    dict_earthquake = df.to_dict('records')
    # print(dict_earthquake)
    msg = ''
    for i in dict_earthquake:
        start_time = i['发震时刻(UTC+8)']
        timeArray = time.strptime(start_time, "%Y-%m-%d %H:%M:%S")
        # 转换为时间戳
        start_timeStamp = int(time.mktime(timeArray))
        if start_timeStamp > int(int(new_time) - 100):
            # print('刚刚:', i)
            msg += '发震时刻(UTC+8)' + str(i['发震时刻(UTC+8)']) + '\n震级(M):' + str(i['震级(M)']) + '\n纬度(°):' + str(
                i['纬度(°)']) + '\n经度(°):' + str(
                i['经度(°)']) + '\n深度(千米):' + \
                  str(i['深度(千米)']) + '\n参考位置' + str(i['参考位置']) + '\n==========\n'
            
    if msg:
        desp = msg
        push = WXPusher(pushusr, desp)
        push.send_message()
    else:
        print(msg)


def main_handler(event, context):
    return obtain_msg()


if __name__ == '__main__':
    obtain_msg()


免费评分

参与人数 11吾爱币 +14 热心值 +9 收起 理由
ccchpig + 1 + 1 鼓励转贴优秀软件安全工具和文档!
g8254684 + 1 我很赞同!
0821fzh + 1 + 1 我很赞同!
liuqm + 1 + 1 热心回复!
sdaza + 1 热心回复!
Βigbang + 1 + 1 我很赞同!
secjia + 1 + 1 我很赞同!
a76yyyy + 2 + 1 用心讨论,共获提升!
BetterChinese + 1 谢谢@Thanks!
苏紫方璇 + 5 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
等待穿山乙 + 1 能加上筛选省份吗?要不一天推送几十次..

查看全部评分

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

a76yyyy 发表于 2021-5-22 21:43
多个地址可以按照以下格式修改location
[Python] 纯文本查看 复制代码
location = '重庆|河南|山西|湖北|安徽|江西|广西|陕西|湖南'

免费评分

参与人数 1吾爱币 +1 热心值 +1 收起 理由
笔墨纸砚 + 1 + 1 感谢您的宝贵建议,我们会努力争取做得更好!

查看全部评分

加奈绘 发表于 2021-5-25 20:39
笔墨纸砚 发表于 2021-5-22 20:12
可以啊   没有问题 但是我觉得还是中国本土的更准确把?

因为做地震分析时下载数据用的是jweed(因为是英文的)的,想有没有更方便的程序
加奈绘 发表于 2021-5-22 20:05
 楼主| 笔墨纸砚 发表于 2021-5-22 20:06
加奈绘 发表于 2021-5-22 20:05
可以做爬取iris地震台站数据的吗

啥 你发我网站 我看看
 楼主| 笔墨纸砚 发表于 2021-5-22 20:12
加奈绘 发表于 2021-5-22 20:05
可以做爬取iris地震台站数据的吗

可以啊   没有问题 但是我觉得还是中国本土的更准确把?
lg560852 发表于 2021-5-22 20:36
能通过企业微信给微信推送吗?
类似wxpusher和Server酱,想搭一个自己的
 楼主| 笔墨纸砚 发表于 2021-5-22 20:43
lg560852 发表于 2021-5-22 20:36
能通过企业微信给微信推送吗?
类似wxpusher和Server酱,想搭一个自己的

这 就是企业微信啊0.0
heykuz 发表于 2021-5-22 21:01
应该推送,感觉不错的,
8204118 发表于 2021-5-22 21:13
你这个比专家预测的准吗
lg560852 发表于 2021-5-22 21:56
笔墨纸砚 发表于 2021-5-22 20:43
这 就是企业微信啊0.0

企业微信好像只能发给企业微信群
想弄个直接通过企业微信账号推送给普通微信的
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - LCG - LSG ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2024-11-25 15:30

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表