from pprint import pformat as pf, pprint as pp
from datetime import datetime, timezone
import json
import logging
import os
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
import requests
from influxdb import DataFrameClient
import pandas as pd
import numpy as np
import statsmodels
from statsmodels.tsa import stattools
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.arima_process import arma_generate_sample
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.seasonal import seasonal_decompose
np.random.seed(3)
import matplotlib.pyplot as plt
import itermplot
%matplotlib inline
print(os.environ.get('MPLBACKEND', ''))
print(statsmodels.__version__)
print(np.__version__)
print(pd.__version__)
client = DataFrameClient(host='192.168.0.156', port=8086, database='metrics')
data = client.query(
'''
SELECT
mean("value")
FROM "temperature"
WHERE
("sensor_id" = 'RPI4-DHT22')
AND time >= now() - 90d
GROUP BY time(1h) fill(null);
''',
bind_params={}, chunked=True, chunk_size=10000
)
data = data['temperature']
data.columns = ['value']
data.describe()
df = data.dropna()
df_temp = df[['value']].resample('H').mean()
df_temp.index
df_temp.plot()
df_diff = df_temp.diff().dropna()
df_diff.plot()
Null Hypothesis (H0): Time Series is NOT stationary.
adf: float
Test statistic
pvalue: float
MacKinnon’s approximate p-value based on MacKinnon (1994, 2010)
usedlag: int
Number of lags used
nobs: int
Number of observations used for the ADF regression and calculation of the critical values
critical values: dict
Critical values for the test statistic at the 1 %, 5 %, and 10 % levels. Based on MacKinnon (2010)
icbest: float
The maximized information criterion if autolag is not None.
print("TEMPERATURE")
results = stattools.adfuller(df_temp['value'].dropna())
adf, pvalue, usedlag, no_obs, critical_values, ic_best = results
print(f"p-value: {pvalue}")
if pvalue < 0.05:
print("Reject H0. Reject non-stationary. It could be stationary.")
else:
print("Could not reject H0. It is not likely stationary.")
print("DIFFERENCED")
results = stattools.adfuller(df_diff['value'].dropna())
adf, pvalue, usedlag, no_obs, critical_values, ic_best = results
print(f"p-value: {pvalue}")
if pvalue < 0.05:
print("Reject H0. Reject non-stationary. It could be stationary.")
else:
print("Could not reject H0. It is not likely stationary.")
Null Hypothesis (H0): Computes the Kwiatkowski-Phillips-Schmidt-Shin (KPSS) test for the null hypothesis that x is level or trend stationary.
kpss_stat: float
The KPSS test statistic
p_value: float
The p-value of the test. The p-value is interpolated from Table 1 in Kwiatkowski et al. (1992), and a boundary point is returned if the test statistic is outside the table of critical values, that is, if the p-value is outside the interval (0.01, 0.1).
lags: int
The truncation lag parameter
crit: dict
The critical values at 10%, 5%, 2.5% and 1%. Based on Kwiatkowski et al. (1992).
print("TEMPERATURE")
results = stattools.kpss(df_temp['value'].dropna(), regression='ct', lags='auto')
kpss_stat, pvalue, lags, critical_values = results
print(f"p-value: {pvalue}")
if pvalue < 0.05:
print("Reject H0. Reject stationary. It could be non-stationary.")
else:
print("Could not reject H0. It is likely stationary.")
print("DIFFERENCED")
results = stattools.kpss(df_diff['value'].dropna(), regression='ct', lags='auto')
kpss_stat, pvalue, lags, critical_values = results
print(f"p-value: {pvalue}")
if pvalue < 0.05:
print("Reject H0. Reject stationary. It could be non-stationary.")
else:
print("Could not reject H0. It is likely stationary.")
fig, (ax1, ax2) = plt.subplots(2,1, figsize=(22,8))
plot_acf(df_temp.dropna(), lags=48, zero=False, ax=ax1)
plot_pacf(df_temp.dropna(), lags=48, zero=False, ax=ax2)
plt.show()
fig, (ax1, ax2) = plt.subplots(2,1, figsize=(22,8))
plot_acf(df_diff.dropna(), lags=48, zero=True, ax=ax1)
plot_pacf(df_diff.dropna(), lags=48, zero=True, ax=ax2)
plt.show()
First we will look at a seasonal decomposition, followed by and exhaustive search of parameters minimizing for the AIC and BIC.
fig, axes = plt.subplots(ncols=1, nrows=4, sharex=True, figsize=(22,8))
decomp = seasonal_decompose(df_temp.interpolate().resample('H').mean())
decomp.observed.plot(ax=axes[0])
axes[0].set_ylabel('Observed')
decomp.trend.plot(ax=axes[1])
axes[1].set_ylabel('Trend')
decomp.seasonal.plot(ax=axes[2])
axes[2].set_ylabel('Seasonal')
decomp.resid.plot(ax=axes[3])
axes[3].set_ylabel('Residual')
plt.show()
# WARNING: THIS STEP CAN TAKE 12 HOURS
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Pool
from itertools import product
# Setup list of experiments
seasonal_list = list(product(range(0,5),[0,1],range(0,5), [12,24]))
non_seasonal_list = list(product(range(1,5),[1],range(1,5)))
pdqPDQS = list(product(seasonal_list, non_seasonal_list))
print(len(seasonal_list))
print(len(non_seasonal_list))
print(len(pdqPDQS))
start = datetime.now()
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
# Define a fitting task
def fit_sarimax(pdqPDQS):
non_seasonal = pdqPDQS[1]
seasonal = pdqPDQS[0]
try:
model = SARIMAX(df_temp['value'], order=non_seasonal, seasonal_order=seasonal)
results = model.fit()
#print(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), non_seasonal, seasonal, results.aic, results.bic)
return (non_seasonal, seasonal, results.aic, results.bic)
except:
#print(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), non_seasonal, seasonal, None, None)
return (non_seasonal, seasonal, None, None)
order_aic_bic = []
with ThreadPoolExecutor() as pool:
for result in pool.map(fit_sarimax, pdqPDQS):
order_aic_bic.append(result)
print("EVALUATING")
order_aic_bic
order_df = pd.DataFrame(order_aic_bic, columns=['pdq','PDQS','aic', 'bic'])
order_df.sort_values('aic').head(10)
order_df.sort_values('bic').head(10)
pdq = (3,1,4)
PDQS = (0,1,1,24)
model = SARIMAX(df_temp['value'], order=pdq, seasonal_order=PDQS)
results = model.fit()
results.summary()
results.plot_diagnostics(figsize=(22,8))
lookback = -1 * 24 * 14
lookahead = 24 * 3
actual = df_temp[lookback:]
prediction = results.get_prediction(start=lookback, dynamic=False)
prediction_mean = prediction.predicted_mean
prediction_ci = prediction.conf_int()
forecast = results.get_forecast(steps=lookahead, dynamic=True)
forecast_mean = forecast.predicted_mean
forecast_ci = forecast.conf_int()
plt.figure(figsize=(22,8))
plt.plot(
prediction_mean.index,
prediction_mean.values,
color='blue',
label='prediction'
)
plt.fill_between(
prediction_ci.index,
prediction_ci['lower value'],
prediction_ci['upper value'],
color='cyan',
label='prediction confidence'
)
plt.plot(
forecast_mean.index,
forecast_mean.values,
color='red',
label='forecast'
)
plt.fill_between(
forecast_ci.index,
forecast_ci['lower value'],
forecast_ci['upper value'],
color='pink',
label='forecast confidence'
)
plt.plot(
actual.index,
actual['value'],
color='magenta',
label='actual'
)
plt.legend(loc="upper left")
plt.show()