通过企业微信推送消息到微信, 内置提醒打卡, 热搜推送
本帖最后由 Sen 于 2021-8-30 09:02 编辑很久之前写的了, 有年头了。
目录:
1. 定时推送打卡提醒
2. 定时提醒跑步等内容
3. 推送微博热搜消息, 点击可直接进入排行榜
4. 监听女友微博动态xxxx(一言难尽)
我个人比较喜欢用notepad开发。
上面是成果图, 稳定运行很多年了
启动文件runtime.py
import time
import requests
import schedule
import src.wbtop
from src.wxsms import WeChat_SMS
print('加载中..........')
wx = WeChat_SMS('XuPeiSen', '') # 给本尊自己发消息
wx_appoint = WeChat_SMS('', '4|3') # 给部门2发消息
wxAll = WeChat_SMS('@all', '') # 给所有人发消息
print('Wechat_SMS 消息组件加载完毕')
def get_juzi(c):
# a 动画
# b 漫画
# c 游戏
# d 文学
# e 原创
# f 来自网络
# g 其他
# h 影视
# i 诗词
# j 网易云
# k 哲学
# l 抖机灵
j_url = "https://v1.hitokoto.cn/?c=" + c + "&encode=text&charset=utf-8"
r = requests.get(j_url).text
return r;
def job(name):
if 'get_agent' == name :
# 抓取IP代{过}{滤}理数据
url = 'https://www.xupeisen.com/app/driver/V2/AutoRun/AutoRun.php'
r = requests.get(url)
# 执行刷新海外云盘的缓存
# url = 'https://www.xupeisen.com/yunpan/one.php'
# r = requests.get(url)
elif 'hour' == name :
print('hour')
elif 'good_morning' == name :
wx_appoint.send_data(msg=[ { "title" : "一日之计在于晨, 早上好~", "description" : get_juzi("d"), "url" : "URL","picurl" : "https://ss2.bdstatic.com/70cFvnSh_Q1YnxGkpoWK1HF6hhy/it/u=3563018171,3259126459&fm=26&gp=0.jpg" }], type="news")
print('wx_appoint good_morning succseful.')
elif 'wbtop' == name :
print('wx wbtop succseful.')
strlines = str(src.wbtop.start())
strsend = ''
lines = strlines.split('\n')
i = 0
for line in lines :
title = "热搜开始推送啦~点击看详情"
i = i + 1
strsend = strsend + line + '\n'
if i > 10 or i == len(lines):
wx_appoint.send_data(msg={'title': title, 'description': '<div class="gray"> '+ time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' </div>' + strsend, 'url': 'https://s.weibo.com/top/summary?Refer=top_hot&topnav=1&wvr=6', 'btntxt': '详情'}, type='textcard')
strsend = ''
i = 0
elif 'Eight' == name :
wx_appoint.send_data(msg=[ { "title" : "准备好去跑步了吗~坚持运动才行哦~", "description" : get_juzi("k"), "url" : "URL","picurl" : "http://pic29.photophoto.cn/20131213/0037037536052368_b.jpg" }], type="news")
print('wx_appoint runner, wx dingding succseful.')
elif 'daka' == name :
wx_appoint.send_data(msg=[ { "title" : "嘿, 兄嘚,打卡了没~", "description" : get_juzi("k"), "url" : "https://www.xupeisen.com/blog","picurl" : "https://ss2.bdstatic.com/70cFvnSh_Q1YnxGkpoWK1HF6hhy/it/u=4254959696,2740629052&fm=26&gp=0.jpg" }], type="news")
else :
#send_msg = listenweibo.start('这个自己去网页抓。。。')
#if send_msg != '无内容':
# wx.send_data(msg=send_msg, type="news")
print("要不起~过牌~~~")
def run():
# 每30分钟向PHP服务器提交一次请求抓取信息
schedule.every(30).minutes.do(job, 'get_agent')
# 每小时发送一次微信消息
# schedule.every().hour.do(job, 'hour')
# 每天早上7点钟发送微信早安消息
schedule.every().day.at('07:20').do(job, 'good_morning')
# 每天提醒打卡
schedule.every().day.at("08:40").do(job, 'daka')
schedule.every().day.at("19:30").do(job, 'daka')
# 每天三个时间点推送热搜信息
schedule.every().day.at("10:00").do(job, 'wbtop')
schedule.every().day.at("14:00").do(job, 'wbtop')
# 每天晚上8点提醒去跑步
schedule.every().day.at("20:00").do(job, 'Eight')
# 每分钟调用一次util
schedule.every(1).minutes.do(job, 'util')
# schedule.every(5).to(10).days.do(job, '')
# schedule.every().monday.do(job, '')
# schedule.every().wednesday.at('13:15').do(job, '')
wx.send_data(msg=[ { "title" : "服务器启动完毕~用时1396ms~", "description" : get_juzi("k"), "url" : "https://www.xupeisen.com/blog","picurl" : "http://img0.imgtn.bdimg.com/it/u=3645308573,1197617423&fm=26&gp=0.jpg" }], type="news")
wx_appoint.send_data(msg=[ { "title" : "嘿, 兄嘚,你服务器挂了~", "description" : get_juzi("k"), "url" : "https://www.xupeisen.com/blog","picurl" : "https://ss2.bdstatic.com/70cFvnSh_Q1YnxGkpoWK1HF6hhy/it/u=4254959696,2740629052&fm=26&gp=0.jpg" }], type="news")
#wx.send_data(msg="服务器启动完毕~用时1396ms~", type="text")
#job("wbtop")
while True:
schedule.run_pending()
time.sleep(1)
run()
消息发送配置项: wxsms.py 里面换成自己的id, 别用我的。。。。死鬼
里面我封装了一些不同的消息格式, 时间比较久了新出的格式我没加进去
import time
import requests
import json,os
class WeChat_SMS:
def __init__(self, touser, topary):
self.CORPID = ''#企业ID, 登陆企业微信,在我的企业-->企业信息里查看
self.CORPSECRET = ''#自建应用,每个自建应用里都有单独的secret
self.AGENTID = ''# 应用代码
self.TOUSER = touser # 接收者用户名,@all 全体成员
self.TOPARY = topary # 部门ID,@all 全部部门
def _get_access_token(self):
url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
values = {'corpid': self.CORPID,'corpsecret': self.CORPSECRET,}
req = requests.post(url, params=values)
data = json.loads(req.text)
#print (data)
return data["access_token"]
def get_access_token(self):
try:
with open('access_token.conf', 'r') as f:
t, access_token = f.read().split()
except:
with open('access_token.conf', 'w') as f:
access_token = self._get_access_token()
cur_time = time.time()
f.write('\t'.join())
return access_token
else:
cur_time = time.time()
if 0 < cur_time - float(t) < 7200:#token的有效时间7200s
return access_token
else:
with open('access_token.conf', 'w') as f:
access_token = self._get_access_token()
f.write('\t'.join())
return access_token
def send_data(self, msg, type):
endtoken = self.get_access_token();
print (endtoken)
send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + endtoken
send_values = {
"touser": self.TOUSER,
"toparty": self.TOPARY, #设置给部门发送
"msgtype": "text",
"agentid": self.AGENTID,
"text": {
"content": msg
},
"safe": "0"
}
if type == 'markdown' :
send_values = {
"touser" : self.TOUSER,
"toparty" : self.TOPARY,
"agentid" : self.AGENTID,
"msgtype": "markdown",
"markdown": {
"content": msg
},
"enable_duplicate_check": 0
}
elif type == 'news' :
send_values = {
"touser" : self.TOUSER,
"toparty" : self.TOPARY,
"agentid" : self.AGENTID,
"msgtype" : "news",
"news" : {
"articles" : msg
},
"enable_id_trans": 0,
"enable_duplicate_check": 0
}
elif type == 'textcard' :
send_values = {
"touser" : self.TOUSER,
"toparty" : self.TOPARY,
"agentid" : self.AGENTID,
"msgtype" : "textcard",
"textcard" : msg,
"enable_id_trans": 0,
"enable_duplicate_check": 0
}
elif type == 'taskcard' :
send_values = {
"touser" : self.TOUSER,
"toparty" : self.TOPARY,
"agentid" : self.AGENTID,
"msgtype" : "taskcard",
"taskcard" : msg,
"enable_id_trans": 0,
"enable_duplicate_check": 0
}
print(send_values)
send_msges=(bytes(json.dumps(send_values), 'utf-8'))
respone = requests.post(send_url, send_msges)
respone = respone.json()#当返回的数据是json串的时候直接用.json即可将respone转换成字典
print (respone["errmsg"])
return respone["errmsg"]
微博热搜爬取代码 weibo.py之前没想过这种东西也能共享, 看到论坛一朋友分享了, 我分享下我的思路吧
#!/usr/bin/python
# coding=utf-8
###导入模块
import requests
from lxml import etree
###网址
url="https://s.weibo.com/top/summary?Refer=top_hot&topnav=1&wvr=6"
###模拟浏览器
header={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36'}
###主函数
def start():
###获取html页面
html = etree.HTML(requests.get(url,headers=header).text)
rank = html.xpath('//td[@class="td-01 ranktop"]/text()')
affair = html.xpath('//td[@class="td-02"]/a/text()')
view = html.xpath('//td[@class="td-02"]/span/text()')
top = affair
affair = affair
str = "置顶榜单: " + top + "\n"
#print('{0:<10}\t{1:<40}'.format("top",top))
for i in range(0, len(affair)):
str = str + rank + "\t" + affair + "\t\n" # + "\t" + view
print(str)
#print("{0:<10}\t{1:{3}<30}\t{2:{3}>20}".format(rank,affair,view,chr(12288)))
return str
打包版下载, Linux启动脚本我也写好了, ./start/start.sh
如果对你有帮忙, 下面帮忙评下分谢谢, 不要C币
本帖最后由 Sen 于 2021-8-27 17:58 编辑
可以测试推送的情况如果不嫌弃我的推送内容有点多的话。
https://work.weixin.qq.com/wework_admin/wxplugin/getQRCode?size=224 666,果然是大神!! 本帖最后由 清风留音 于 2021-8-29 13:58 编辑
大佬,这个能在云函数里运行吗? 谢谢分享!! 感谢楼主无私分享,万分感谢 感谢楼主无偿分享 大佬,能发消息到企业微信的客户群吗 大佬,能讲一下企业微信小功能打卡的token怎么自动获取吗,想搞一个校园自动打卡。。 果然大神无处不在,什么都能玩 这个想法有意思!!支持原创