[Python] 纯文本查看 复制代码
import numpy as np
import pandas as pd
#import tensorflow as tf
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
import matplotlib.pyplot as plt
import seaborn as sns
from collections import deque
import random
df = pd.read_csv('300565.csv')
# 时间戳、开盘,最高,最低,收盘
print(df.head())
class Agent:
def __init__(self, state_size, window_size, trend, skip, batch_size):
self.state_size = state_size # 状态空间
self.window_size = window_size # 滑动窗口大小
self.half_window = window_size // 2
self.trend = trend # data
self.skip = skip # 采取动作的步长,1代表每个时刻都操作
self.action_size = 3 # 动作空间-买入、卖出、观望
self.batch_size = batch_size
self.memory = deque(maxlen=1000) # 双向队列
self.inventory = [] # 仓位
self.gamma = 0.95 # 奖励衰减
self.epsilon = 0.5 # 贪婪系数
self.epsilon_min = 0.01 # 阈值
self.epsilon_decay = 0.999 # 低于阈值将损失部分系数
tf.reset_default_graph()
self.sess = tf.InteractiveSession() # 交互式session
self.X = tf.placeholder(tf.float32, [None, self.state_size]) # 状态
self.Y = tf.placeholder(tf.float32, [None, self.action_size]) # 动作
feed = tf.layers.dense(self.X, 256, activation=tf.nn.relu)
self.logits = tf.layers.dense(feed, self.action_size) # 计算3种动作的概率
self.cost = tf.reduce_mean(tf.square(self.Y - self.logits)) # 计算损失函数
self.optimizer = tf.train.GradientDescentOptimizer(1e-5).minimize(self.cost) # 优化器
self.sess.run(tf.global_variables_initializer())
def act(self, state): # 选择动作
if random.random() <= self.epsilon: # 小于epsilon就随机探索
return random.randrange(self.action_size)
# 不然就选择最好的动作
return np.argmax(
self.sess.run(self.logits, feed_dict={self.X: state})[0]
)
def get_state(self, t): # 某t时刻的状态
window_size = self.window_size + 1
d = t - window_size + 1
# 早期天数不够窗口打小,用0时刻来凑,即填补相应个数
block = self.trend[d: t + 1] if d >= 0 else -d * [self.trend[0]] + self.trend[0: t + 1]
res = []
for i in range(window_size - 1):
res.append(block[i + 1] - block[i]) # 每步收益
return np.array([res]) # 作为状态编码
def replay(self, batch_size):
mini_batch = []
l = len(self.memory)
for i in range(l - batch_size, l):
mini_batch.append(self.memory[i]) # memory
replay_size = len(mini_batch)
X = np.empty((replay_size, self.state_size))
Y = np.empty((replay_size, self.action_size))
# 新旧状态及Q值计算
# [state, action, reward, next_state, done],故0和3分别获取新旧
states = np.array([a[0][0] for a in mini_batch])
new_states = np.array([a[3][0] for a in mini_batch])
Q = self.sess.run(self.logits, feed_dict={self.X: states})
Q_new = self.sess.run(self.logits, feed_dict={self.X: new_states})
# 更新Q表
for i in range(len(mini_batch)):
state, action, reward, next_state, done = mini_batch[i]
target = Q[i]
target[action] = reward
if not done: # 如果没有结束
target[action] += self.gamma * np.amax(Q_new[i])
# 结束了代表没有后续动作,直接等于
X[i] = state
Y[i] = target
cost, _ = self.sess.run(
[self.cost, self.optimizer], feed_dict={self.X: X, self.Y: Y}
)
# 调整贪婪系数
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
return cost
def buy(self, initial_money):
starting_money = initial_money # 启动资金
states_sell = []
states_buy = []
inventory = [] # 仓位
state = self.get_state(0) # 初始状态
for t in range(0, len(self.trend) - 1, self.skip):
action = self.act(state) # 根据状态选动作
next_state = self.get_state(t + 1) # 得到下一个状态
# action=1为买入,资金够用,且剩下的长度足够
if action == 1 and initial_money >= self.trend[t] and t < (len(self.trend) - self.half_window):
inventory.append(self.trend[t]) # 买入
initial_money -= self.trend[t] # 交易
states_buy.append(t) # 记录
print('day %d: buy 1 unit at price %f, total balance %f' % (t, self.trend[t], initial_money))
# action=2为卖出
elif action == 2 and len(inventory):
bought_price = inventory.pop(0) # 卖出
initial_money += self.trend[t] # 交易
states_sell.append(t) # 记录
# 计算收益率
invest = ((initial_money - starting_money) / starting_money) * 100
total_gains = initial_money - starting_money
return states_buy, states_sell, total_gains, invest
def train(self, iterations, checkpoint, initial_money):
# 迭代多次
for i in range(iterations):
total_profit = 0 # 累积利润
inventory = []
state = self.get_state(0)
starting_money = initial_money
for t in range(0, len(self.trend) - 1, self.skip):
action = self.act(state)
next_state = self.get_state(t + 1)
if action == 1 and starting_money >= self.trend[t] and t < (len(self.trend) - self.half_window):
inventory.append(self.trend[t])
starting_money -= self.trend[t]
elif action == 2 and len(inventory) > 0:
bought_price = inventory.pop(0)
total_profit += self.trend[t] - bought_price
starting_money += self.trend[t]
invest = ((starting_money - initial_money) / initial_money)
self.memory.append((state, action, invest,
next_state, starting_money < initial_money))
state = next_state
batch_size = min(self.batch_size, len(self.memory))
cost = self.replay(batch_size)
if (i + 1) % checkpoint == 0:
print('epoch: %d, total rewards: %f.3, cost: %f, total money: %f' % (
i + 1, total_profit, cost, starting_money))
close = df.close.values.tolist() #选取收盘数据做测试
initial_money = 10000
window_size = 30
skip = 1
batch_size = 32
agent = Agent(state_size = window_size,
window_size = window_size,
trend = close,
skip = skip,
batch_size = batch_size)
#"""
agent.train(iterations = 200, checkpoint = 10, initial_money = initial_money)
#"""
states_buy, states_sell, total_gains, invest = agent.buy( initial_money = initial_money)
fig = plt.figure(figsize = (15,5))
plt.plot(close, color='r', lw=2.)
plt.plot(close, '^', markersize=10, color='m', label = 'buying signal', markevery = states_buy)
plt.plot(close, 'v', markersize=10, color='k', label = 'selling signal', markevery = states_sell)
plt.title('total gains %f, total investment %f%%'%(total_gains, invest))
plt.legend()
plt.savefig('tmp.png')
plt.show()