之前找到个获取同花顺问财数据的库很不错,打算用它来获取问财数据
先贴上代码
[Python] 纯文本查看 复制代码 import pywencai
import os
# 获取关键字列表
keywords = os.environ.get("KEYWORD", "").split(',')
# 遍历关键字列表进行查询和保存结果
for keyword in keywords:
filename = f'{keyword}.csv'
res = pywencai.get(question=keyword, loop=True)
res.to_csv(filename, index=False, encoding='utf-8-sig')
print(res)
代码放github方便自动运行,不用本地再布置运行环境
代码很简单,按所设置的关键字(在Settings-Actions中创建一个New repository secret,名称为“KEYWORD”。秘密为"人气排名,涨停,退市股票")从pywencai库中获取数据并保存为.csv文件。
接着在Actions中创建一个workflows,yml文件代码如下
[Asm] 纯文本查看 复制代码 name: Run Python Script and Commit CSV
on:
push:
branches:
- main # 触发动作的分支
schedule:
# 每小时运行,按需调整
- cron: '0 * * * *'
jobs:
build-and-commit-csv:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.8' # 你的 Python 版本
- name: Configure Git
run: |
git config --local user.email ""
git config --local user.name ""
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pywencai
- name: Run Python script
run: |
python wencai.py
env:
KEYWORD: ${{ secrets.KEYWORD }}
- name: Run Custom data script
run: |
python popularity.py
- name: Commit changes
run: |
git add .
git commit --allow-empty -m "spider"
git push
代码中触发动作设置为1小时运行一次
如果想要把获取的数据转换为通达信的自定义数据要用以下代码比如“人气排名”
[Python] 纯文本查看 复制代码 import os
import pywencai
try:
# 进行查询并获取结果
res = pywencai.get(question='人气排名', loop=True)
if res is None:
raise ValueError("Did not receive any data. The result is None.")
# 预处理数据以匹配需要的格式
processed_data = []
for index, row in res.iterrows():
code = str(row['股票代码'])
if code.startswith(('000', '001', '002', '003', '300', '301', '200')):
market = '0'
elif code.startswith(('600', '601', '603', '605', '688', '689', '900')):
market = '1'
elif code.startswith(('8', '4')):
market = '2'
else:
market = '未知'
stock_code = row['股票代码']
data_number = '1' # 假定这里你想写入任何数字,例如1
string_value = str(index + 1)
numeric_value = string_value
line = '|'.join([market, stock_code, data_number, string_value, numeric_value])
processed_data.append(line)
# 保存数据到 extern_user.txt 文件
txt_path = 'extern_user.txt'
with open(txt_path, 'w', encoding='utf-8') as f:
for line in processed_data:
f.write(line + '\n')
except Exception as e:
print(f"An error occurred: {e}")
好了,随便水水了,效果就自己去看吧
https://github.com/qqhsx/wencai
|