阿里云盘自动签到-python
本帖最后由 XyD3 于 2023-12-11 11:58 编辑参考https://zhuanlan.zhihu.com/p/629476969 , 每月要更新一次refresh_token
把代码放到阿里/华为/腾讯云上定时运行函数流就行了
# -*- coding:utf-8 -*-
import datetime
import requests
import typing as t
from dateutil.relativedelta import relativedelta
def get_token(refresh_token: str) -> t.Tuple:
"""获取access_token和用户名"""
resj = requests.post(
'https://auth.aliyundrive.com/v2/account/token',
json={
'grant_type': 'refresh_token',
'refresh_token': refresh_token
}
).json()
return resj.get('access_token'), resj.get('user_name')
def get_sign_in_count(access_token: str) -> str:
"""获取签到次数"""
resj = requests.post(
'https://member.aliyundrive.com/v1/activity/sign_in_list',
json={'_rx-s': 'mobile'},
headers={
'Authorization': f'Bearer {access_token}'
}
).json()
return resj['result']['signInCount']
def get_sign_in_reward(access_token: str, sign_in_count: str) -> str:
"""获取签到奖励"""
resj = requests.post(
'https://member.aliyundrive.com/v1/activity/sign_in_reward?_rx-s=mobile',
json={'signInDay': sign_in_count},
headers={
'Authorization': f'Bearer {access_token}'
}
).json()
return ' *** '.join(["name"], resj["result"]["description"], resj["result"]["notice"]])
def alisignin():
refresh_token = '你的refresh_token'
access_token, user_name = get_token(refresh_token)
if access_token:
sign_in_count = get_sign_in_count(access_token)
print(f'用户名:{user_name} 签到成功,本月累计签到{sign_in_count}次')
info = get_sign_in_reward(access_token, sign_in_count)
print(f'用户名:{user_name} 领取奖励成功,获得 {info}')
today = datetime.datetime.today()
last_day = (relativedelta(months=1) + today).replace(day=1) - relativedelta(days=1)
if today == last_day:
print('已到月底')
for _ in range(1, int(last_day.strftime('%d')) + 1):
info = get_sign_in_reward(access_token, str(_))
print(f'用户名:{user_name} 领取第{_}天奖励成功,获得 {info}')
else:
print('refresh_token异常,获取access_token失败')
if __name__ == '__main__': alisignin() package main
import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
)
const GET_TOKEN = "https://auth.aliyundrive.com/v2/account/token"
const GET_SINGINCOUNT = "https://member.aliyundrive.com/v1/activity/sign_in_list"
const GET_SINGINREWARD = "https://member.aliyundrive.com/v1/activity/sign_in_reward?_rx-s=mobile"
type AliSign struct {
GrantType string
RefreshToken string
AccessToken string
UserName string
SignInCount int
ReadyRewardDays []int
}
func NewAliSign(refreshToken string) *AliSign {
return &AliSign{RefreshToken: refreshToken}
}
func (a *AliSign) getToken() bool {
b, _ := json.Marshal(mapinterface{}{
"grant_type": "refresh_token",
"refresh_token": a.RefreshToken,
})
payload := strings.NewReader(string(b))
req, _ := http.NewRequest("POST", GET_TOKEN, payload)
req.Header.Add("content-type", "application/json")
res, _ := http.DefaultClient.Do(req)
defer res.Body.Close()
body, _ := io.ReadAll(res.Body)
type AutoGenerated struct {
UserName string `json:"user_name"`
AccessToken string `json:"access_token"`
}
result := &AutoGenerated{}
if err := json.Unmarshal(body, result); err != nil {
log.Printf("请求token失败,err:%v", err)
return false
}
a.AccessToken = result.AccessToken
a.UserName = result.UserName
return a.AccessToken != ""
}
func (a *AliSign) getSignCount() bool {
payload := strings.NewReader("{}")
req, _ := http.NewRequest("POST", GET_SINGINCOUNT, payload)
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", a.AccessToken))
req.Header.Add("content-type", "application/json")
res, _ := http.DefaultClient.Do(req)
defer res.Body.Close()
body, _ := io.ReadAll(res.Body)
type AutoGenerated struct {
Result struct {
SignInCount int `json:"signInCount"`
SignInLogs[]struct {
Day int`json:"day"`
IsReward bool `json:"isReward"`
} `json:"signInLogs"`
} `json:"result"`
}
result := &AutoGenerated{}
if err := json.Unmarshal(body, result); err != nil {
log.Printf("请求signInCount失败,err:%v", err)
return false
}
a.SignInCount = result.Result.SignInCount
log.Printf("用户 %s 本月累计签到%d次", a.UserName, a.SignInCount)
for _, v := range result.Result.SignInLogs {
if !v.IsReward {
a.ReadyRewardDays = append(a.ReadyRewardDays, v.Day)
} else {
log.Printf("用户 %s 已经获得第%d天的奖励", a.UserName, v.Day)
}
}
return a.SignInCount != 0
}
func (a *AliSign) getSignInReward(day int) bool {
b, _ := json.Marshal(mapinterface{}{
"signInDay": day,
})
payload := strings.NewReader(string(b))
req, _ := http.NewRequest("POST", GET_SINGINREWARD, payload)
req.Header.Add("content-type", "application/json")
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", a.AccessToken))
res, _ := http.DefaultClient.Do(req)
defer res.Body.Close()
body, _ := io.ReadAll(res.Body)
type AutoGenerated struct {
Success bool `json:"success"`
Code interface{} `json:"code"`
Message interface{} `json:"message"`
Resultstruct {
Name string `json:"name"`
Description string `json:"description"`
Notice string `json:"notice"`
SubNotice string `json:"subNotice"`
} `json:"result"`
Arguments interface{} `json:"arguments"`
}
result := &AutoGenerated{}
if err := json.Unmarshal(body, result); err != nil {
log.Printf("请求领奖失败,err:%v", err)
return false
}
if result.Success {
log.Printf("用户 %s 获得第%d天的签到奖励 %s %s %s %s", a.UserName, day, result.Result.Name, result.Result.Description, result.Result.Notice, result.Result.SubNotice)
} else {
log.Printf("请求领奖失败,%v", string(body))
}
return result.Success
}
func (a *AliSign) Sign() {
defer func() {
if err := recover(); err != nil {
log.Printf("签到出现panic:%v", err)
}
}()
if !a.getToken() {
log.Printf("token 获取失败,请检查输入 %s", a.RefreshToken)
return
}
if !a.getSignCount() {
return
}
for _, day := range a.ReadyRewardDays {
if day <= a.SignInCount {
a.getSignInReward(day)
}
}
}
func main() {
args := os.Args
for i := 1; i < len(args); i++ {
a := NewAliSign(args)
a.Sign()
}
}
本帖最后由 lorzl 于 2023-12-11 11:27 编辑
楼主有用阿里云FC么 这个收费么?
另外大佬我有一个app,也想实现类似的功能,能否帮忙看看,可以看下我的悬赏 感谢大佬的分享 感谢大佬分享,最近都是手动点的,顺便研究下代码 大佬有没有考虑加一个月末自动领取的功能 学习一下,感谢分享 不知道有没有做限时任务的接口 我看有的说现在阿里云盘的token是每天更新一次? 今早发现金山云也开始支持python了 油猴里面有一个阿里云盘签到脚本,一直用的挺好的,不知道两个哪一个更方便