このサイトを参考にしながら1ヶ月先の株価の予測をしてみる。
import sys
import warnings
if not sys.warnoptions:
warnings.simplefilter('ignore')
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from datetime import datetime
from datetime import timedelta
from tqdm import tqdm
from pandas_datareader import data as pdr
import yfinance as yf
sns.set()
tf.compat.v1.random.set_random_seed(1234)
今回予測する銘柄はAMDにする。
tickers = (['AMD'])
2019年7月19日から1ヶ月先のAMDの株価を予測する。
import datetime
stocks_start = datetime.datetime(2018, 1, 1)
stocks_end = datetime.datetime(2019, 7, 18)
def get(tickers, startdate, enddate):
def data(ticker):
return (pdr.get_data_yahoo(ticker, start=startdate, end=enddate))
datas = map(data, tickers)
return(pd.concat(datas, keys=tickers, names=['Ticker', 'Date']))
all_data = get(tickers, stocks_start, stocks_end)
all_data.tail()
df = all_data[['Open','High','Low','Close','Adj Close','Volume']]
df.reset_index(level='Ticker',drop=True,inplace=True)
df.reset_index(inplace=True)
df.tail()
minmax = MinMaxScaler().fit(df.iloc[:, 4:5].astype('float32')) # Close index
df_log = minmax.transform(df.iloc[:, 4:5].astype('float32')) # Close index
df_log = pd.DataFrame(df_log)
df_log.head()
This example is using model 1.lstm, if you want to use another model, need to tweak a little bit, but I believe it is not that hard.
今回の例ではmodel1.lstmを使用している。もし、別のモデルを使いたい場合、若干の変更が必要だが、そんなに難しくないはずだ。
I want to forecast 30 days ahead! So just change test_size to forecast t + N ahead.
30日先を予測したいので、test_sizeをt + N先を予測するように変更すればいい。
Also, I want to simulate 10 times, 10 variances of forecasted patterns. Just change simulation_size.
また、simulation_sizeを変更して、10種類の予測パターンを使ってシミュレーションを10回行うようにする。
simulation_size = 10
num_layers = 1
size_layer = 128
timestamp = 5
epoch = 300
dropout_rate = 0.8
test_size = 30
learning_rate = 0.01
df_train = df_log
df.shape, df_train.shape
class Model:
def __init__(
self,
learning_rate,
num_layers,
size,
size_layer,
output_size,
forget_bias = 0.1,
):
def lstm_cell(size_layer):
return tf.nn.rnn_cell.LSTMCell(size_layer, state_is_tuple = False)
rnn_cells = tf.nn.rnn_cell.MultiRNNCell(
[lstm_cell(size_layer) for _ in range(num_layers)],
state_is_tuple = False,
)
self.X = tf.placeholder(tf.float32, (None, None, size))
self.Y = tf.placeholder(tf.float32, (None, output_size))
drop = tf.contrib.rnn.DropoutWrapper(
rnn_cells, output_keep_prob = forget_bias
)
self.hidden_layer = tf.placeholder(
tf.float32, (None, num_layers * 2 * size_layer)
)
self.outputs, self.last_state = tf.nn.dynamic_rnn(
drop, self.X, initial_state = self.hidden_layer, dtype = tf.float32
)
self.logits = tf.layers.dense(self.outputs[-1], output_size)
self.cost = tf.reduce_mean(tf.square(self.Y - self.logits))
self.optimizer = tf.train.AdamOptimizer(learning_rate).minimize(
self.cost
)
def calculate_accuracy(real, predict):
real = np.array(real) + 1
predict = np.array(predict) + 1
percentage = 1 - np.sqrt(np.mean(np.square((real - predict) / real)))
return percentage * 100
def anchor(signal, weight):
buffer = []
last = signal[0]
for i in signal:
smoothed_val = last * weight + (1 - weight) * i
buffer.append(smoothed_val)
last = smoothed_val
return buffer
def forecast():
tf.reset_default_graph()
modelnn = Model(
learning_rate, num_layers, df_log.shape[1], size_layer, df_log.shape[1], dropout_rate
)
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())
date_ori = pd.to_datetime(df.iloc[:, 0]).tolist()
pbar = tqdm(range(epoch), desc = 'train loop')
for i in pbar:
init_value = np.zeros((1, num_layers * 2 * size_layer))
total_loss, total_acc = [], []
for k in range(0, df_train.shape[0] - 1, timestamp):
index = min(k + timestamp, df_train.shape[0] - 1)
batch_x = np.expand_dims(
df_train.iloc[k : index, :].values, axis = 0
)
batch_y = df_train.iloc[k + 1 : index + 1, :].values
logits, last_state, _, loss = sess.run(
[modelnn.logits, modelnn.last_state, modelnn.optimizer, modelnn.cost],
feed_dict = {
modelnn.X: batch_x,
modelnn.Y: batch_y,
modelnn.hidden_layer: init_value,
},
)
init_value = last_state
total_loss.append(loss)
total_acc.append(calculate_accuracy(batch_y[:, 0], logits[:, 0]))
pbar.set_postfix(cost = np.mean(total_loss), acc = np.mean(total_acc))
future_day = test_size
output_predict = np.zeros((df_train.shape[0] + future_day, df_train.shape[1]))
output_predict[0] = df_train.iloc[0]
upper_b = (df_train.shape[0] // timestamp) * timestamp
init_value = np.zeros((1, num_layers * 2 * size_layer))
for k in range(0, (df_train.shape[0] // timestamp) * timestamp, timestamp):
out_logits, last_state = sess.run(
[modelnn.logits, modelnn.last_state],
feed_dict = {
modelnn.X: np.expand_dims(
df_train.iloc[k : k + timestamp], axis = 0
),
modelnn.hidden_layer: init_value,
},
)
init_value = last_state
output_predict[k + 1 : k + timestamp + 1] = out_logits
if upper_b != df_train.shape[0]:
out_logits, last_state = sess.run(
[modelnn.logits, modelnn.last_state],
feed_dict = {
modelnn.X: np.expand_dims(df_train.iloc[upper_b:], axis = 0),
modelnn.hidden_layer: init_value,
},
)
output_predict[upper_b + 1 : df_train.shape[0] + 1] = out_logits
future_day -= 1
date_ori.append(date_ori[-1] + timedelta(days = 1))
init_value = last_state
for i in range(future_day):
o = output_predict[-future_day - timestamp + i:-future_day + i]
out_logits, last_state = sess.run(
[modelnn.logits, modelnn.last_state],
feed_dict = {
modelnn.X: np.expand_dims(o, axis = 0),
modelnn.hidden_layer: init_value,
},
)
init_value = last_state
output_predict[-future_day + i] = out_logits[-1]
date_ori.append(date_ori[-1] + timedelta(days = 1))
output_predict = minmax.inverse_transform(output_predict)
deep_future = anchor(output_predict[:, 0], 0.4)
return deep_future
results = []
for i in range(simulation_size):
print('simulation %d'%(i + 1))
results.append(forecast())
date_ori = pd.to_datetime(df.iloc[:, 0]).tolist()
for i in range(test_size):
date_ori.append(date_ori[-1] + timedelta(days = 1))
date_ori = pd.Series(date_ori).dt.strftime(date_format = '%Y-%m-%d').tolist()
date_ori[-5:]
Sanity check¶
Some of our models might not have stable gradient, so forecasted trend might really hangwired. You can use many methods to filter out unstable models.
使用するモデルが全て正常に機能するとは限らないので、使い物にならないモデルを間引くための多くの方法を利用できる。
This method is very simple,(今回の方法は非常に単純)
If one of element in forecasted trend lower than min(original trend).
予測トレンドが実際のトレンドの最小値より小さい場合
If one of element in forecasted trend bigger than max(original trend) * 2.
予測トレンドが実際のトレンドの最大値より大きい場合
If both are true, reject that trend.
もし両方共に真ならそのトレンドを拒否する。
accepted_results = []
for r in results:
if (np.array(r[-test_size:]) < np.min(df['Close'])).sum() == 0 and \
(np.array(r[-test_size:]) > np.max(df['Close']) * 2).sum() == 0:
accepted_results.append(r)
len(accepted_results)
accuracies = [calculate_accuracy(df['Close'].values, r[:-test_size]) for r in accepted_results]
plt.figure(figsize = (22, 15))
for no, r in enumerate(accepted_results):
plt.plot(r, label = 'forecast %d'%(no + 1))
plt.plot(df['Close'], label = 'true trend', c = 'black')
plt.title('average accuracy: %.4f'%(np.mean(accuracies)),fontsize=25)
plt.rc('xtick', labelsize=30)
plt.rc('ytick', labelsize=25)
x_range_future = np.arange(len(results[0]))
plt.xticks(x_range_future[::15], date_ori[::15],rotation=90)
plt.legend(prop={'size': 25})
plt.show()
実際のトレンドを8月16日まで延長する。
stocks_start = datetime.datetime(2018, 1, 1)
stocks_end = datetime.datetime(2019, 8, 19)
def get(tickers, startdate, enddate):
def data(ticker):
return (pdr.get_data_yahoo(ticker, start=startdate, end=enddate))
datas = map(data, tickers)
return(pd.concat(datas, keys=tickers, names=['Ticker', 'Date']))
recent_data = get(tickers, stocks_start, stocks_end)
df1 = recent_data[['Open','High','Low','Close','Adj Close','Volume']]
df1.reset_index(level='Ticker',drop=True,inplace=True)
df1.reset_index(inplace=True)
df1.tail()
accuracies = [calculate_accuracy(df['Close'].values, r[:-test_size]) for r in accepted_results]
plt.figure(figsize = (22, 15))
for no, r in enumerate(accepted_results):
plt.plot(r, label = 'forecast %d'%(no + 1))
plt.plot(df1['Close'], label = 'true trend', c = 'black')
plt.title('average accuracy: %.4f'%(np.mean(accuracies)),fontsize=25)
plt.rc('xtick', labelsize=30)
plt.rc('ytick', labelsize=25)
x_range_future = np.arange(len(results[0]))
plt.xticks(x_range_future[::15], date_ori[::15],rotation=90)
plt.legend(prop={'size': 25})
plt.show()
7月19日以降の予測はそんなに上手くできていないことに気付かされる。最後に正確度が低い(正確度90以下の)予測トレンドを省く。
accepted_results2 = []
for r in accepted_results:
if calculate_accuracy(df['Close'].values, r[:-test_size])>90:
accepted_results2.append(r)
len(accepted_results2)
accuracies = [calculate_accuracy(df['Close'].values, r[:-test_size]) for r in accepted_results2]
plt.figure(figsize = (22, 15))
for no, r in enumerate(accepted_results2):
plt.plot(r, label = 'forecast %d'%(no + 1))
plt.plot(df1['Close'], label = 'true trend', c = 'navy',lw=2)
plt.title('average accuracy: %.4f'%(np.mean(accuracies)),fontsize=25)
plt.rc('xtick', labelsize=30)
plt.rc('ytick', labelsize=25)
x_range_future = np.arange(len(results[0]))
plt.xticks(x_range_future[::15], date_ori[::15],rotation=90)
plt.legend(prop={'size': 25})
plt.show()
予測3が一番まともなような気もするが、それでも30日先のトレンドを正確には捉えきれていない。