このサイトを参考にしながら、bidirectional GRU seq2seqモデルを使って、1ヶ月先のAMDの株価の予測をしてみる。
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from datetime import datetime
from datetime import timedelta
from tqdm import tqdm
from pandas_datareader import data as pdr
import yfinance as yf
sns.set()
tf.compat.v1.random.set_random_seed(1234)
tickers = (['AMD'])
import datetime
stocks_start = datetime.datetime(2018, 1, 1)
stocks_end = datetime.datetime(2019, 8, 20)
def get(tickers, startdate, enddate):
def data(ticker):
return (pdr.get_data_yahoo(ticker, start=startdate, end=enddate))
datas = map(data, tickers)
return(pd.concat(datas, keys=tickers, names=['Ticker', 'Date']))
all_data = get(tickers, stocks_start, stocks_end)
df = all_data[['Open','High','Low','Close','Adj Close','Volume']]
df.reset_index(level='Ticker',drop=True,inplace=True)
df.reset_index(inplace=True)
df.tail()
minmax = MinMaxScaler().fit(df.iloc[:, 4:5].astype('float32')) # Close index
df_log = minmax.transform(df.iloc[:, 4:5].astype('float32')) # Close index
df_log = pd.DataFrame(df_log)
df_log.head()
スポンサーリンク
データを学習用とテスト用に分割¶
用意した株価データを、開始から終了30日前までの学習データセットと終了30日前から終了日までのテストデータセットに分割する。こうすることで、最後の30日間をモデルに予測させることができる。
test_size = 30
simulation_size = 10
df_train = df_log.iloc[:-test_size]
df_test = df_log.iloc[-test_size:]
df.shape, df_train.shape, df_test.shape
class Model:
def __init__(
self,
learning_rate,
num_layers,
size,
size_layer,
output_size,
forget_bias = 0.1,
):
def lstm_cell(size_layer):
return tf.nn.rnn_cell.GRUCell(size_layer)
backward_rnn_cells = tf.nn.rnn_cell.MultiRNNCell(
[lstm_cell(size_layer) for _ in range(num_layers)],
state_is_tuple = False,
)
forward_rnn_cells = tf.nn.rnn_cell.MultiRNNCell(
[lstm_cell(size_layer) for _ in range(num_layers)],
state_is_tuple = False,
)
self.X = tf.placeholder(tf.float32, (None, None, size))
self.Y = tf.placeholder(tf.float32, (None, output_size))
drop_backward = tf.contrib.rnn.DropoutWrapper(
backward_rnn_cells, output_keep_prob = forget_bias
)
forward_backward = tf.contrib.rnn.DropoutWrapper(
forward_rnn_cells, output_keep_prob = forget_bias
)
self.backward_hidden_layer = tf.placeholder(
tf.float32, shape = (None, num_layers * size_layer)
)
self.forward_hidden_layer = tf.placeholder(
tf.float32, shape = (None, num_layers * size_layer)
)
_, last_state = tf.nn.bidirectional_dynamic_rnn(
forward_backward,
drop_backward,
self.X,
initial_state_fw = self.forward_hidden_layer,
initial_state_bw = self.backward_hidden_layer,
dtype = tf.float32,
)
with tf.variable_scope('decoder', reuse = False):
backward_rnn_cells_decoder = tf.nn.rnn_cell.MultiRNNCell(
[lstm_cell(size_layer) for _ in range(num_layers)],
state_is_tuple = False,
)
forward_rnn_cells_decoder = tf.nn.rnn_cell.MultiRNNCell(
[lstm_cell(size_layer) for _ in range(num_layers)],
state_is_tuple = False,
)
drop_backward_decoder = tf.contrib.rnn.DropoutWrapper(
backward_rnn_cells_decoder, output_keep_prob = forget_bias
)
forward_backward_decoder = tf.contrib.rnn.DropoutWrapper(
forward_rnn_cells_decoder, output_keep_prob = forget_bias
)
self.outputs, self.last_state = tf.nn.bidirectional_dynamic_rnn(
forward_backward_decoder, drop_backward_decoder, self.X,
initial_state_fw = last_state[0],
initial_state_bw = last_state[1],
dtype = tf.float32
)
self.outputs = tf.concat(self.outputs, 2)
self.logits = tf.layers.dense(self.outputs[-1], output_size)
self.cost = tf.reduce_mean(tf.square(self.Y - self.logits))
self.optimizer = tf.train.AdamOptimizer(learning_rate).minimize(
self.cost
)
def calculate_accuracy(real, predict):
real = np.array(real) + 1
predict = np.array(predict) + 1
percentage = 1 - np.sqrt(np.mean(np.square((real - predict) / real)))
return percentage * 100
def anchor(signal, weight):
buffer = []
last = signal[0]
for i in signal:
smoothed_val = last * weight + (1 - weight) * i
buffer.append(smoothed_val)
last = smoothed_val
return buffer
num_layers = 1
size_layer = 128
timestamp = 5
epoch = 300
dropout_rate = 0.8
future_day = test_size
learning_rate = 0.01
def forecast():
tf.reset_default_graph()
modelnn = Model(
learning_rate, num_layers, df_log.shape[1], size_layer, df_log.shape[1], dropout_rate
)
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())
date_ori = pd.to_datetime(df.iloc[:, 0]).tolist()
pbar = tqdm(range(epoch), desc = 'train loop')
for i in pbar:
init_value_forward = np.zeros((1, num_layers * size_layer))
init_value_backward = np.zeros((1, num_layers * size_layer))
total_loss, total_acc = [], []
for k in range(0, df_train.shape[0] - 1, timestamp):
index = min(k + timestamp, df_train.shape[0] - 1)
batch_x = np.expand_dims(
df_train.iloc[k : index, :].values, axis = 0
)
batch_y = df_train.iloc[k + 1 : index + 1, :].values
logits, last_state, _, loss = sess.run(
[modelnn.logits, modelnn.last_state, modelnn.optimizer, modelnn.cost],
feed_dict = {
modelnn.X: batch_x,
modelnn.Y: batch_y,
modelnn.backward_hidden_layer: init_value_backward,
modelnn.forward_hidden_layer: init_value_forward,
},
)
init_value_forward = last_state[0]
init_value_backward = last_state[1]
total_loss.append(loss)
total_acc.append(calculate_accuracy(batch_y[:, 0], logits[:, 0]))
pbar.set_postfix(cost = np.mean(total_loss), acc = np.mean(total_acc))
future_day = test_size
output_predict = np.zeros((df_train.shape[0] + future_day, df_train.shape[1]))
output_predict[0] = df_train.iloc[0]
upper_b = (df_train.shape[0] // timestamp) * timestamp
init_value_forward = np.zeros((1, num_layers * size_layer))
init_value_backward = np.zeros((1, num_layers * size_layer))
for k in range(0, (df_train.shape[0] // timestamp) * timestamp, timestamp):
out_logits, last_state = sess.run(
[modelnn.logits, modelnn.last_state],
feed_dict = {
modelnn.X: np.expand_dims(
df_train.iloc[k : k + timestamp], axis = 0
),
modelnn.backward_hidden_layer: init_value_backward,
modelnn.forward_hidden_layer: init_value_forward,
},
)
init_value_forward = last_state[0]
init_value_backward = last_state[1]
output_predict[k + 1 : k + timestamp + 1] = out_logits
if upper_b != df_train.shape[0]:
out_logits, last_state = sess.run(
[modelnn.logits, modelnn.last_state],
feed_dict = {
modelnn.X: np.expand_dims(df_train.iloc[upper_b:], axis = 0),
modelnn.backward_hidden_layer: init_value_backward,
modelnn.forward_hidden_layer: init_value_forward,
},
)
output_predict[upper_b + 1 : df_train.shape[0] + 1] = out_logits
future_day -= 1
date_ori.append(date_ori[-1] + timedelta(days = 1))
init_value_forward = last_state[0]
init_value_backward = last_state[1]
for i in range(future_day):
o = output_predict[-future_day - timestamp + i:-future_day + i]
out_logits, last_state = sess.run(
[modelnn.logits, modelnn.last_state],
feed_dict = {
modelnn.X: np.expand_dims(o, axis = 0),
modelnn.backward_hidden_layer: init_value_backward,
modelnn.forward_hidden_layer: init_value_forward,
},
)
init_value_forward = last_state[0]
init_value_backward = last_state[1]
output_predict[-future_day + i] = out_logits[-1]
date_ori.append(date_ori[-1] + timedelta(days = 1))
output_predict = minmax.inverse_transform(output_predict)
deep_future = anchor(output_predict[:, 0], 0.3)
return deep_future[-test_size:]
スポンサーリンク
予測シミュレーション¶
results = []
for i in range(simulation_size):
print('simulation %d'%(i + 1))
results.append(forecast())
上下幅の大きい予測結果を省く。
accepted_results = []
for r in results:
if (np.array(r[-test_size:]) < np.min(df['Close'])).sum() == 0 and \
(np.array(r[-test_size:]) > np.max(df['Close']) * 2).sum() == 0:
accepted_results.append(r)
len(accepted_results)
正確度が88以下の予測結果を省く。
accepted_results2 = []
for r in accepted_results:
if calculate_accuracy(df['Close'].iloc[-test_size:].values, r)>88:
accepted_results2.append(r)
len(accepted_results2)
accuracies = [calculate_accuracy(df['Close'].iloc[-test_size:].values, r) for r in accepted_results2]
plt.figure(figsize = (25, 15))
for no, r in enumerate(accepted_results2):
plt.plot(r, label = 'forecast %d'%(no + 1))
plt.plot(df['Close'].iloc[-test_size:].values, label = 'true trend', c = 'black',lw=2)
plt.legend(prop={'size': 25})
plt.rc('xtick', labelsize=30)
plt.rc('ytick', labelsize=25)
plt.title('average accuracy: %.4f'%(np.mean(accuracies)),fontsize=25)
plt.show()
予測精度はそんなに良くないことが見て取れる。ただ、パラメーターをいじることで多少精度が上がるようである。
スポンサーリンク
スポンサーリンク