refactor(predict): Optimize LSTM training with caching and vectorized window creation

parent 2587971f
""" """
OPTIMIZED VERSION
=================
LSTM tuning (CV by CASE) + final retrain on 100% train_pool + predict PREDICT_CASE (PyTorch) LSTM tuning (CV by CASE) + final retrain on 100% train_pool + predict PREDICT_CASE (PyTorch)
Key speed-ups vs baseline:
1) Vectorized window creation (sliding_window_view) ✅ much faster than Python loops
2) Window cache per window_size (for ALL relevant cases) ✅ avoid rebuilding windows per fold/trial
3) Faster DataLoaders (num_workers + persistent_workers + prefetch_factor)
4) Optional torch.compile(model) (PyTorch 2.x) when on CUDA
5) Avoid recreating large tensors when possible; keep arrays float32
6) Minimal GPU sync; keep eval/predict batched with AMP
CV rules: CV rules:
- Use ONLY the first N_TRAIN_CASES cases as train_pool (cases <= N_TRAIN_CASES) - train_pool = cases <= N_TRAIN_CASES
- If N_TRAIN_CASES < 20 -> Leave-One-Out CV by CASE (each fold holds out 1 case) - If N_TRAIN_CASES < 20 -> Leave-One-Out CV by CASE
- Else -> 5-fold CV by CASE (KFold on case IDs) - Else -> 5-fold CV by CASE
After CV tuning: After CV tuning:
- Train FINAL model on 100% train_pool (NO early stopping; keep best by TRAIN loss) - Train FINAL model on 100% train_pool (NO early stopping; keep best by TRAIN loss)
- Predict PREDICT_CASE (assumed not in train_pool) and save CSV with Force_RNN column - Predict PREDICT_CASE (assumed not in train_pool) and save CSV with Force_RNN column
VRAM PATCHES:
- Batched DataLoaders (no full x_val/x_test on GPU)
- AMP mixed precision on GPU (torch.amp.* when available)
- optimizer.zero_grad(set_to_none=True)
- explicit cleanup + empty_cache
""" """
from typing import cast from __future__ import annotations
from typing import cast, Dict, Tuple, List, Optional
import os
import gc import gc
import numpy as np import numpy as np
...@@ -29,6 +36,13 @@ import torch ...@@ -29,6 +36,13 @@ import torch
import torch.nn as nn import torch.nn as nn
from torch.utils.data import Dataset, DataLoader from torch.utils.data import Dataset, DataLoader
try:
from numpy.lib.stride_tricks import sliding_window_view
_HAS_SWV = True
except Exception:
_HAS_SWV = False
# ------------------------- # -------------------------
# Configuration # Configuration
# ------------------------- # -------------------------
...@@ -77,11 +91,17 @@ elif W == 5: ...@@ -77,11 +91,17 @@ elif W == 5:
else: else:
print("Warning: N_TRAIN_CASES and PREDICT_CASE not set for W != 2, 3, or 5") print("Warning: N_TRAIN_CASES and PREDICT_CASE not set for W != 2, 3, or 5")
N_TRIALS = 15 # Reduced from 25 N_TRIALS = 15
MAX_EPOCHS_TUNE = 40 # Reduced from 60 MAX_EPOCHS_TUNE = 40
PATIENCE_TUNE = 5 # Reduced from 8 PATIENCE_TUNE = 5
MAX_EPOCHS_FINAL = 200 # Reduced from 300 MAX_EPOCHS_FINAL = 200
N_WORKERS = 2 # for DataLoader parallelization
# DataLoader perf knobs
NUM_WORKERS = min(8, (os.cpu_count() or 4)) # tune if needed
PREFETCH_FACTOR = 2
# torch.compile knob
USE_TORCH_COMPILE = True # will auto-fallback if unsupported
# ------------------------- # -------------------------
# Utilities # Utilities
...@@ -92,50 +112,129 @@ def rmse(y_true, y_pred): # type: ignore ...@@ -92,50 +112,129 @@ def rmse(y_true, y_pred): # type: ignore
return float(np.sqrt(np.mean((y_true - y_pred) ** 2))) return float(np.sqrt(np.mean((y_true - y_pred) ** 2)))
def make_windows_for_case(df_case, feature_cols, target_col, window_size): # type: ignore def _make_grad_scaler(use_amp: bool):
if not use_amp:
return None
try:
return torch.amp.GradScaler("cuda", enabled=True)
except Exception:
return torch.cuda.amp.GradScaler(enabled=True)
def make_windows_for_case_fast(
df_case: pd.DataFrame,
feature_cols: List[str],
target_col: str,
window_size: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Vectorized windows using sliding_window_view for speed.
Returns:
xw: (n_windows, window_size, n_features) float32
yw: (n_windows,) float32
iw: (n_windows,) indices
"""
df_case = df_case.sort_index() df_case = df_case.sort_index()
x = df_case[feature_cols].values x = df_case[feature_cols].to_numpy(dtype=np.float32, copy=False)
y = df_case[target_col].values y = df_case[target_col].to_numpy(dtype=np.float32, copy=False)
idx = df_case.index.to_numpy() idx = df_case.index.to_numpy()
if len(df_case) < window_size: n = len(df_case)
return np.empty((0, window_size, len(feature_cols))), np.empty((0,)), np.array([]) if n < window_size:
return (
np.empty((0, window_size, x.shape[1]), dtype=np.float32),
np.empty((0,), dtype=np.float32),
np.array([], dtype=idx.dtype),
)
x_windows, y_windows, idx_windows = [], [], [] if _HAS_SWV:
for end in range(window_size - 1, len(df_case)): # sliding_window_view over first axis
start = end - window_size + 1 xw = sliding_window_view(x, window_shape=(window_size, x.shape[1]))[:, 0, :, :]
x_windows.append(x[start:end + 1]) else:
y_windows.append(y[end]) # fallback (slower): numpy stack
idx_windows.append(idx[end]) xw = np.stack([x[i - window_size + 1:i + 1] for i in range(window_size - 1, n)], axis=0)
return np.array(x_windows), np.array(y_windows), np.array(idx_windows) yw = y[window_size - 1:]
iw = idx[window_size - 1:]
return xw.astype(np.float32, copy=False), yw.astype(np.float32, copy=False), iw
def build_windows(df_all, cases, feature_cols, target_col, window_size): # type: ignore # -------------------------
x_list, y_list = [], [] # Window cache per window_size
for case in sorted(cases): # -------------------------
df_case = df_all[df_all["Case"] == case] CaseWindows = Dict[int, Tuple[np.ndarray, np.ndarray, np.ndarray]] # case -> (x, y, idx)
xc, yc, _ = make_windows_for_case(df_case, feature_cols, target_col, window_size) WINDOW_CACHE: Dict[int, CaseWindows] = {} # window_size -> CaseWindows
if len(xc) > 0:
x_list.append(xc)
y_list.append(yc) def precompute_windows_for_cases(
if len(x_list) == 0: df: pd.DataFrame,
cases: List[int],
feature_cols: List[str],
target_col: str,
window_size: int,
) -> CaseWindows:
cache: CaseWindows = {}
for c in cases:
dfc = df[df["Case"] == c]
xw, yw, iw = make_windows_for_case_fast(dfc, feature_cols, target_col, window_size)
cache[int(c)] = (xw, yw, iw)
return cache
def build_from_cache(
cache: CaseWindows,
cases: List[int]
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
xs, ys = [], []
for c in cases:
x, y, _ = cache[int(c)]
if x.shape[0] > 0:
xs.append(x)
ys.append(y)
if not xs:
return None, None return None, None
return np.concatenate(x_list, axis=0), np.concatenate(y_list, axis=0) x_all = np.concatenate(xs, axis=0)
y_all = np.concatenate(ys, axis=0)
return x_all, y_all
def get_cache_for_window_size(
df: pd.DataFrame,
cases_to_cache: List[int],
feature_cols: List[str],
target_col: str,
window_size: int
) -> CaseWindows:
if window_size not in WINDOW_CACHE:
WINDOW_CACHE[window_size] = precompute_windows_for_cases(
df=df,
cases=cases_to_cache,
feature_cols=feature_cols,
target_col=target_col,
window_size=window_size
)
return WINDOW_CACHE[window_size]
# -------------------------
# Dataset: avoid extra copies
# -------------------------
class WindowDataset(Dataset): class WindowDataset(Dataset):
def __init__(self, x, y): def __init__(self, x: np.ndarray, y: np.ndarray):
self.x = torch.tensor(x, dtype=torch.float32) # x, y expected float32 already
self.y = torch.tensor(y, dtype=torch.float32) self.x = torch.from_numpy(x)
self.y = torch.from_numpy(y)
def __len__(self): def __len__(self):
return len(self.x) return self.x.shape[0]
def __getitem__(self, idx): # type: ignore def __getitem__(self, idx): # type: ignore
return self.x[idx], self.y[idx] return self.x[idx], self.y[idx]
# -------------------------
# Model
# -------------------------
class LSTMRegressor(nn.Module): class LSTMRegressor(nn.Module):
def __init__(self, input_dim, hidden_dim=64, dense_dim=32, num_layers=1, dropout=0.0): def __init__(self, input_dim, hidden_dim=64, dense_dim=32, num_layers=1, dropout=0.0):
super().__init__() super().__init__()
...@@ -159,52 +258,66 @@ class LSTMRegressor(nn.Module): ...@@ -159,52 +258,66 @@ class LSTMRegressor(nn.Module):
return out return out
def _make_grad_scaler(use_amp: bool, device: str): def maybe_compile(model: nn.Module, device: str) -> nn.Module:
if not use_amp: if device != "cuda" or not USE_TORCH_COMPILE:
return None return model
try: try:
return torch.amp.GradScaler("cuda", enabled=True) # torch.compile exists in torch>=2.0
model = torch.compile(model) # type: ignore
except Exception: except Exception:
return torch.cuda.amp.GradScaler(enabled=True) pass
return model
# ------------------------- # -------------------------
# CV folds by CASE # CV folds by CASE
# ------------------------- # -------------------------
def make_case_folds(train_pool, seed=123, n_splits=5): def make_case_folds(train_pool: List[int], seed=123, n_splits=5):
train_pool = np.array(sorted(train_pool), dtype=int) tp = np.array(sorted(train_pool), dtype=int)
if len(train_pool) < 2: if len(tp) < 2:
raise ValueError("Need at least 2 cases for CV.") raise ValueError("Need at least 2 cases for CV.")
if len(train_pool) < 20: if len(tp) < 20:
# Leave-One-Out (by case)
folds = [] folds = []
for i in range(len(train_pool)): for i in range(len(tp)):
val_cases = [int(train_pool[i])] val_cases = [int(tp[i])]
train_cases = [int(c) for j, c in enumerate(train_pool) if j != i] train_cases = [int(c) for j, c in enumerate(tp) if j != i]
folds.append((train_cases, val_cases)) folds.append((train_cases, val_cases))
return folds, "LOO" return folds, "LOO"
else: else:
kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed) kf = KFold(n_splits=n_splits, shuffle=True, random_state=seed)
folds = [] folds = []
for tr_idx, va_idx in kf.split(train_pool): for tr_idx, va_idx in kf.split(tp):
train_cases = [int(c) for c in train_pool[tr_idx]] train_cases = [int(c) for c in tp[tr_idx]]
val_cases = [int(c) for c in train_pool[va_idx]] val_cases = [int(c) for c in tp[va_idx]]
folds.append((train_cases, val_cases)) folds.append((train_cases, val_cases))
return folds, f"{n_splits}-fold" return folds, f"{n_splits}-fold"
def make_loader(ds: Dataset, batch_size: int, shuffle: bool, device: str) -> DataLoader:
pin = (device == "cuda")
num_workers = NUM_WORKERS if pin else 0
return DataLoader(
ds,
batch_size=batch_size,
shuffle=shuffle,
pin_memory=pin,
num_workers=num_workers,
persistent_workers=(num_workers > 0),
prefetch_factor=(PREFETCH_FACTOR if num_workers > 0 else None),
drop_last=False,
)
# ------------------------- # -------------------------
# Train one fold (train_cases -> val_cases) with early stopping # Train one fold (cached windows) with early stopping
# ------------------------- # -------------------------
def train_one_fold( # type: ignore def train_one_fold_cached( # type: ignore
df_all, cache_ws: CaseWindows,
feature_cols, feature_dim: int,
target_col, train_cases: List[int],
train_cases, val_cases: List[int],
val_cases,
window_size,
hidden_dim, hidden_dim,
dense_dim, dense_dim,
num_layers, num_layers,
...@@ -218,43 +331,41 @@ def train_one_fold( # type: ignore ...@@ -218,43 +331,41 @@ def train_one_fold( # type: ignore
grad_clip, grad_clip,
device, device,
): ):
x_train, y_train = build_windows(df_all, train_cases, feature_cols, target_col, window_size) x_train, y_train = build_from_cache(cache_ws, train_cases)
x_val, y_val = build_windows(df_all, val_cases, feature_cols, target_col, window_size) x_val, y_val = build_from_cache(cache_ws, val_cases)
if x_train is None or x_val is None: if x_train is None or x_val is None:
return np.inf return np.inf
n_features = x_train.shape[-1] # --- Fit scalers on TRAIN only (per fold) ---
# Fit scalers on TRAIN only (per fold)
scaler_x = StandardScaler() scaler_x = StandardScaler()
scaler_x.fit(x_train.reshape(-1, n_features)) scaler_x.fit(x_train.reshape(-1, feature_dim))
x_train_scaled = scaler_x.transform(x_train.reshape(-1, n_features)).reshape(x_train.shape) x_train_scaled = scaler_x.transform(x_train.reshape(-1, feature_dim)).reshape(x_train.shape).astype(np.float32, copy=False)
x_val_scaled = scaler_x.transform(x_val.reshape(-1, n_features)).reshape(x_val.shape) x_val_scaled = scaler_x.transform(x_val.reshape(-1, feature_dim)).reshape(x_val.shape).astype(np.float32, copy=False)
scaler_y = StandardScaler() scaler_y = StandardScaler()
y_train_scaled = scaler_y.fit_transform(y_train.reshape(-1, 1)).ravel() y_train_scaled = scaler_y.fit_transform(y_train.reshape(-1, 1)).ravel().astype(np.float32, copy=False)
pin = (device == "cuda")
train_ds = WindowDataset(x_train_scaled, y_train_scaled) train_ds = WindowDataset(x_train_scaled, y_train_scaled)
val_ds = WindowDataset(x_val_scaled, np.zeros(len(x_val_scaled), dtype=np.float32)) val_ds = WindowDataset(x_val_scaled, np.zeros((x_val_scaled.shape[0],), dtype=np.float32))
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, pin_memory=pin, num_workers=N_WORKERS, persistent_workers=True if N_WORKERS > 0 else False) train_loader = make_loader(train_ds, batch_size=batch_size, shuffle=True, device=device)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, pin_memory=pin, num_workers=N_WORKERS, persistent_workers=True if N_WORKERS > 0 else False) val_loader = make_loader(val_ds, batch_size=batch_size, shuffle=False, device=device)
model = LSTMRegressor( model = LSTMRegressor(
input_dim=n_features, input_dim=feature_dim,
hidden_dim=hidden_dim, hidden_dim=hidden_dim,
dense_dim=dense_dim, dense_dim=dense_dim,
num_layers=num_layers, num_layers=num_layers,
dropout=dropout dropout=dropout
).to(device) ).to(device)
model = maybe_compile(model, device=device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay) optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
criterion = nn.SmoothL1Loss(beta=huber_beta) criterion = nn.SmoothL1Loss(beta=huber_beta)
use_amp = (device == "cuda") use_amp = (device == "cuda")
scaler_amp = _make_grad_scaler(use_amp, device) scaler_amp = _make_grad_scaler(use_amp)
best_val_rmse = np.inf best_val_rmse = np.inf
wait = 0 wait = 0
...@@ -278,6 +389,7 @@ def train_one_fold( # type: ignore ...@@ -278,6 +389,7 @@ def train_one_fold( # type: ignore
loss = criterion(pred, yb) loss = criterion(pred, yb)
scaler_amp.scale(loss).backward() scaler_amp.scale(loss).backward()
if grad_clip is not None: if grad_clip is not None:
scaler_amp.unscale_(optimizer) scaler_amp.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip) torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
...@@ -321,7 +433,7 @@ def train_one_fold( # type: ignore ...@@ -321,7 +433,7 @@ def train_one_fold( # type: ignore
if wait >= patience: if wait >= patience:
break break
# cleanup # cleanup (important for many folds/trials)
del model, optimizer, criterion, train_loader, val_loader, train_ds, val_ds del model, optimizer, criterion, train_loader, val_loader, train_ds, val_ds
gc.collect() gc.collect()
if torch.cuda.is_available(): if torch.cuda.is_available():
...@@ -331,32 +443,39 @@ def train_one_fold( # type: ignore ...@@ -331,32 +443,39 @@ def train_one_fold( # type: ignore
# ------------------------- # -------------------------
# CV score for a hyperparam set # CV score for a hyperparam set (cached windows)
# ------------------------- # -------------------------
def cv_score_for_params( def cv_score_for_params_cached(
df, feature_cols, target_col, folds, params, cache_ws: CaseWindows,
max_epochs, patience, grad_clip, device feature_dim: int,
folds,
params,
max_epochs,
patience,
grad_clip,
device
): ):
fold_rmses = [] fold_rmses = []
for fold_id, (train_cases, val_cases) in enumerate(folds, start=1): for _, (train_cases, val_cases) in enumerate(folds, start=1):
try: try:
fold_rmse = train_one_fold( params_no_ws = dict(params)
df_all=df, params_no_ws.pop("window_size", None)
feature_cols=feature_cols,
target_col=target_col, fold_rmse = train_one_fold_cached(
cache_ws=cache_ws,
feature_dim=feature_dim,
train_cases=train_cases, train_cases=train_cases,
val_cases=val_cases, val_cases=val_cases,
max_epochs=max_epochs, max_epochs=max_epochs,
patience=patience, patience=patience,
grad_clip=grad_clip, grad_clip=grad_clip,
device=device, device=device,
**params **params_no_ws
) )
except torch.cuda.OutOfMemoryError: except torch.cuda.OutOfMemoryError:
if torch.cuda.is_available(): if torch.cuda.is_available():
torch.cuda.empty_cache() torch.cuda.empty_cache()
# invalidate whole trial
return np.inf, np.inf, None return np.inf, np.inf, None
fold_rmses.append(fold_rmse) fold_rmses.append(fold_rmse)
...@@ -365,22 +484,20 @@ def cv_score_for_params( ...@@ -365,22 +484,20 @@ def cv_score_for_params(
if torch.cuda.is_available(): if torch.cuda.is_available():
torch.cuda.empty_cache() torch.cuda.empty_cache()
fold_rmses_arr = np.array(fold_rmses, dtype=float) arr = np.array(fold_rmses, dtype=float)
mean_rmse = float(np.mean(fold_rmses_arr)) mean_rmse = float(np.mean(arr))
std_rmse = float(np.std(fold_rmses_arr, ddof=1)) if len(fold_rmses_arr) > 1 else 0.0 std_rmse = float(np.std(arr, ddof=1)) if len(arr) > 1 else 0.0
return mean_rmse, std_rmse, fold_rmses_arr return mean_rmse, std_rmse, arr
# ------------------------- # -------------------------
# Final training on 100% train_pool (no val / no early stopping) # Final training on 100% train_pool (cached windows) - no val
# Keep best by TRAIN loss # Keep best by TRAIN loss
# ------------------------- # -------------------------
def train_final_full_trainpool( # type: ignore def train_final_full_trainpool_cached( # type: ignore
df_all, cache_ws: CaseWindows,
feature_cols, feature_dim: int,
target_col, train_cases: List[int],
train_cases,
window_size,
hidden_dim, hidden_dim,
dense_dim, dense_dim,
num_layers, num_layers,
...@@ -393,36 +510,34 @@ def train_final_full_trainpool( # type: ignore ...@@ -393,36 +510,34 @@ def train_final_full_trainpool( # type: ignore
grad_clip, grad_clip,
device, device,
): ):
x_train, y_train = build_windows(df_all, train_cases, feature_cols, target_col, window_size) x_train, y_train = build_from_cache(cache_ws, train_cases)
if x_train is None: if x_train is None:
raise RuntimeError("No training windows created. Check window_size and data.") raise RuntimeError("No training windows created. Check window_size and data.")
n_features = x_train.shape[-1]
scaler_x = StandardScaler() scaler_x = StandardScaler()
scaler_x.fit(x_train.reshape(-1, n_features)) scaler_x.fit(x_train.reshape(-1, feature_dim))
x_train_scaled = scaler_x.transform(x_train.reshape(-1, n_features)).reshape(x_train.shape) x_train_scaled = scaler_x.transform(x_train.reshape(-1, feature_dim)).reshape(x_train.shape).astype(np.float32, copy=False)
scaler_y = StandardScaler() scaler_y = StandardScaler()
y_train_scaled = scaler_y.fit_transform(y_train.reshape(-1, 1)).ravel() y_train_scaled = scaler_y.fit_transform(y_train.reshape(-1, 1)).ravel().astype(np.float32, copy=False)
pin = (device == "cuda")
train_ds = WindowDataset(x_train_scaled, y_train_scaled) train_ds = WindowDataset(x_train_scaled, y_train_scaled)
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, pin_memory=pin, num_workers=N_WORKERS, persistent_workers=True if N_WORKERS > 0 else False) train_loader = make_loader(train_ds, batch_size=batch_size, shuffle=True, device=device)
model = LSTMRegressor( model = LSTMRegressor(
input_dim=n_features, input_dim=feature_dim,
hidden_dim=hidden_dim, hidden_dim=hidden_dim,
dense_dim=dense_dim, dense_dim=dense_dim,
num_layers=num_layers, num_layers=num_layers,
dropout=dropout dropout=dropout
).to(device) ).to(device)
model = maybe_compile(model, device=device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay) optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
criterion = nn.SmoothL1Loss(beta=huber_beta) criterion = nn.SmoothL1Loss(beta=huber_beta)
use_amp = (device == "cuda") use_amp = (device == "cuda")
scaler_amp = _make_grad_scaler(use_amp, device) scaler_amp = _make_grad_scaler(use_amp)
best_train_loss = np.inf best_train_loss = np.inf
best_state = None best_state = None
...@@ -487,11 +602,11 @@ def train_final_full_trainpool( # type: ignore ...@@ -487,11 +602,11 @@ def train_final_full_trainpool( # type: ignore
# Hyperparameter sampler # Hyperparameter sampler
# ------------------------- # -------------------------
def sample_params(device: str = "cpu"): def sample_params(device: str = "cpu"):
window_size = int(np.random.choice([40, 60], p=[0.6, 0.4])) # Removed 80 window_size = int(np.random.choice([40, 60, 80]))
hidden_dim = int(np.random.choice([32, 64, 128], p=[0.30, 0.45, 0.25])) # Removed 256 hidden_dim = int(np.random.choice([32, 64, 128, 256], p=[0.25, 0.35, 0.30, 0.10]))
dense_dim = int(np.random.choice([16, 32, 64], p=[0.25, 0.50, 0.25])) # Removed 128 dense_dim = int(np.random.choice([16, 32, 64, 128], p=[0.20, 0.40, 0.30, 0.10]))
num_layers = int(np.random.choice([1, 2], p=[0.55, 0.45])) # Removed 3 num_layers = int(np.random.choice([1, 2, 3], p=[0.45, 0.40, 0.15]))
dropout = float(np.random.choice([0.0, 0.1, 0.2])) # Removed 0.3 dropout = float(np.random.choice([0.0, 0.1, 0.2, 0.3]))
lr = float(10 ** np.random.uniform(-4.0, -2.6)) lr = float(10 ** np.random.uniform(-4.0, -2.6))
weight_decay = float(10 ** np.random.uniform(-6.0, -3.5)) weight_decay = float(10 ** np.random.uniform(-6.0, -3.5))
...@@ -500,10 +615,15 @@ def sample_params(device: str = "cpu"): ...@@ -500,10 +615,15 @@ def sample_params(device: str = "cpu"):
cost = window_size * hidden_dim * num_layers cost = window_size * hidden_dim * num_layers
if device == "cuda": if device == "cuda":
if cost >= 60 * 128 * 2: if cost >= 80 * 256 * 2:
batch_size = 16
elif cost >= 60 * 256 * 2 or cost >= 80 * 128 * 2:
batch_size = 32 batch_size = 32
else: else:
batch_size = int(np.random.choice([32, 64], p=[0.45, 0.55])) batch_size = int(np.random.choice([32, 64], p=[0.55, 0.45]))
if hidden_dim >= 256 and num_layers >= 2 and dense_dim > 64:
dense_dim = 64
else: else:
batch_size = int(np.random.choice([32, 64, 128])) batch_size = int(np.random.choice([32, 64, 128]))
...@@ -531,34 +651,55 @@ def main(): ...@@ -531,34 +651,55 @@ def main():
target_col = "Force" target_col = "Force"
all_cases = sorted(df["Case"].unique()) all_cases = sorted(df["Case"].unique())
train_pool = [c for c in all_cases if c <= N_TRAIN_CASES] train_pool = [int(c) for c in all_cases if c <= N_TRAIN_CASES]
if PREDICT_CASE in train_pool: if PREDICT_CASE in train_pool:
raise RuntimeError(f"PREDICT_CASE={PREDICT_CASE} is in train_pool. This must not happen.") raise RuntimeError(f"PREDICT_CASE={PREDICT_CASE} is in train_pool. This must not happen.")
device = "cuda" if torch.cuda.is_available() else "cpu"
print("============================================================") print("============================================================")
print("DATA") print("DATA")
print("Device:", device)
print("Predict (test) case:", PREDICT_CASE) print("Predict (test) case:", PREDICT_CASE)
print("Train pool cases (for CV):", train_pool) print("Train pool cases (for CV):", train_pool)
print("n_cases total:", len(all_cases), "| train_pool:", len(train_pool)) print("n_cases total:", len(all_cases), "| train_pool:", len(train_pool))
print("NUM_WORKERS:", (NUM_WORKERS if device == "cuda" else 0))
print("torch.compile:", ("ON" if (device == "cuda" and USE_TORCH_COMPILE) else "OFF"))
print("============================================================") print("============================================================")
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)
folds, strategy = make_case_folds(train_pool, seed=SEED, n_splits=5) folds, strategy = make_case_folds(train_pool, seed=SEED, n_splits=5)
print(f"CV strategy: {strategy} | n_folds={len(folds)}") print(f"CV strategy: {strategy} | n_folds={len(folds)}")
# Cases we need in cache during tuning:
# train_pool is enough for CV. We'll cache test case too for later prediction.
cases_to_cache_all = sorted(set(train_pool + [int(PREDICT_CASE)]))
# ---- CV Tuning ---- # ---- CV Tuning ----
best = {"mean": np.inf, "std": np.inf, "params": None} best = {"mean": np.inf, "std": np.inf, "params": None}
for t in range(1, N_TRIALS + 1): for t in range(1, N_TRIALS + 1):
params = sample_params(device=device) params = sample_params(device=device)
ws = params["window_size"]
params_model = dict(params)
params_model.pop("window_size", None)
mean_rmse, std_rmse, fold_rmses = cv_score_for_params( # Window cache for this ws (build once, reuse across folds)
cache_ws = get_cache_for_window_size(
df=df, df=df,
cases_to_cache=cases_to_cache_all,
feature_cols=feature_cols, feature_cols=feature_cols,
target_col=target_col, target_col=target_col,
window_size=ws
)
# feature_dim is stable once windows exist
any_case = cases_to_cache_all[0]
feature_dim = cache_ws[any_case][0].shape[-1]
mean_rmse, std_rmse, fold_rmses = cv_score_for_params_cached(
cache_ws=cache_ws,
feature_dim=feature_dim,
folds=folds, folds=folds,
params=params, params=params,
max_epochs=MAX_EPOCHS_TUNE, max_epochs=MAX_EPOCHS_TUNE,
...@@ -575,7 +716,6 @@ def main(): ...@@ -575,7 +716,6 @@ def main():
print(f"[Trial {t:02d}] CV mean RMSE={mean_rmse:.4f} | std={std_rmse:.4f} | folds=[{fold_str}]") print(f"[Trial {t:02d}] CV mean RMSE={mean_rmse:.4f} | std={std_rmse:.4f} | folds=[{fold_str}]")
print(f" params: {params}") print(f" params: {params}")
# primary: mean, secondary: std (stability)
if (mean_rmse < best["mean"]) or (np.isclose(mean_rmse, best["mean"]) and std_rmse < best["std"]): if (mean_rmse < best["mean"]) or (np.isclose(mean_rmse, best["mean"]) and std_rmse < best["std"]):
best.update(mean=mean_rmse, std=std_rmse, params=params) best.update(mean=mean_rmse, std=std_rmse, params=params)
...@@ -594,6 +734,18 @@ def main(): ...@@ -594,6 +734,18 @@ def main():
raise RuntimeError("No valid trial found. Try lowering window sizes or check your data.") raise RuntimeError("No valid trial found. Try lowering window sizes or check your data.")
best_params = cast(dict, best["params"]) best_params = cast(dict, best["params"])
ws_best = int(best_params["window_size"])
# Ensure cache exists for best ws
cache_ws_best = get_cache_for_window_size(
df=df,
cases_to_cache=cases_to_cache_all,
feature_cols=feature_cols,
target_col=target_col,
window_size=ws_best
)
any_case = cases_to_cache_all[0]
feature_dim = cache_ws_best[any_case][0].shape[-1]
# ---- Final train on 100% train_pool (NO VAL) ---- # ---- Final train on 100% train_pool (NO VAL) ----
print("============================================================") print("============================================================")
...@@ -602,15 +754,17 @@ def main(): ...@@ -602,15 +754,17 @@ def main():
print("Using best parameters:", best_params) print("Using best parameters:", best_params)
print("============================================================") print("============================================================")
final_state, final_scalers = train_final_full_trainpool( best_params_no_ws = dict(best_params)
df_all=df, best_params_no_ws.pop("window_size", None)
feature_cols=feature_cols,
target_col=target_col, final_state, final_scalers = train_final_full_trainpool_cached(
train_cases=train_pool, # 100% train_pool cache_ws=cache_ws_best,
feature_dim=feature_dim,
train_cases=train_pool,
max_epochs=MAX_EPOCHS_FINAL, max_epochs=MAX_EPOCHS_FINAL,
grad_clip=1.0, grad_clip=1.0,
device=device, device=device,
**best_params **best_params_no_ws
) )
scaler_x, scaler_y = final_scalers scaler_x, scaler_y = final_scalers
...@@ -618,36 +772,31 @@ def main(): ...@@ -618,36 +772,31 @@ def main():
# ---- Predict PREDICT_CASE ---- # ---- Predict PREDICT_CASE ----
df_test = df[df["Case"] == PREDICT_CASE].copy() df_test = df[df["Case"] == PREDICT_CASE].copy()
window_size_test = best_params["window_size"] # Use cached windows for test case too (fast + consistent)
x_test, y_test, test_indices = make_windows_for_case( x_test, y_test, test_indices = cache_ws_best[int(PREDICT_CASE)]
df_test, feature_cols, target_col, window_size_test
)
if x_test.shape[0] == 0: if x_test.shape[0] == 0:
raise RuntimeError( raise RuntimeError(
f"Case {PREDICT_CASE} has fewer points ({len(df_test)}) than " f"Case {PREDICT_CASE} has fewer points ({len(df_test)}) than "
f"window_size ({window_size_test})." f"window_size ({ws_best})."
) )
n_features = x_test.shape[-1] x_test_scaled = scaler_x.transform(x_test.reshape(-1, feature_dim)).reshape(x_test.shape).astype(np.float32, copy=False)
x_test_scaled = scaler_x.transform(x_test.reshape(-1, n_features)).reshape(x_test.shape)
final_model = LSTMRegressor( final_model = LSTMRegressor(
input_dim=n_features, input_dim=feature_dim,
hidden_dim=best_params["hidden_dim"], hidden_dim=best_params["hidden_dim"],
dense_dim=best_params["dense_dim"], dense_dim=best_params["dense_dim"],
num_layers=best_params["num_layers"], num_layers=best_params["num_layers"],
dropout=best_params["dropout"] dropout=best_params["dropout"]
).to(device) ).to(device)
final_model = maybe_compile(final_model, device=device)
final_model.load_state_dict(final_state) final_model.load_state_dict(final_state)
final_model.eval() final_model.eval()
pin = (device == "cuda") test_ds = WindowDataset(x_test_scaled, np.zeros((x_test_scaled.shape[0],), dtype=np.float32))
test_ds = WindowDataset(x_test_scaled, np.zeros(len(x_test_scaled), dtype=np.float32)) test_loader = make_loader(test_ds, batch_size=best_params["batch_size"], shuffle=False, device=device)
test_loader = DataLoader(
test_ds, batch_size=best_params["batch_size"], shuffle=False, pin_memory=pin, num_workers=N_WORKERS, persistent_workers=True if N_WORKERS > 0 else False
)
use_amp = (device == "cuda") use_amp = (device == "cuda")
preds = [] preds = []
...@@ -674,6 +823,7 @@ def main(): ...@@ -674,6 +823,7 @@ def main():
# ---- Save results ---- # ---- Save results ----
df_test_pred = df_test.copy() df_test_pred = df_test.copy()
df_test_pred["Force_RNN"] = np.nan df_test_pred["Force_RNN"] = np.nan
for idx, pred in zip(test_indices, y_pred): for idx, pred in zip(test_indices, y_pred):
df_test_pred.loc[idx, "Force_RNN"] = pred df_test_pred.loc[idx, "Force_RNN"] = pred
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment