refactor(predict): Enhance stability and performance in LSTM training with…

refactor(predict): Enhance stability and performance in LSTM training with improved DataLoader settings and caching mechanisms
parent 03e1b37e
"""
OPTIMIZED VERSION
=================
FAST + STABLE (no "too many open files")
========================================
LSTM tuning (CV by CASE) + final retrain on 100% train_pool + predict PREDICT_CASE (PyTorch)
Key speed-ups vs baseline:
1) Vectorized window creation (sliding_window_view) ✅ much faster than Python loops
2) Window cache per window_size (for ALL relevant cases) ✅ avoid rebuilding windows per fold/trial
3) Faster DataLoaders (num_workers + persistent_workers + prefetch_factor)
4) Optional torch.compile(model) (PyTorch 2.x) when on CUDA
5) Avoid recreating large tensors when possible; keep arrays float32
6) Minimal GPU sync; keep eval/predict batched with AMP
CV rules:
- train_pool = cases <= N_TRAIN_CASES
- Use ONLY the first N_TRAIN_CASES cases as train_pool (cases <= N_TRAIN_CASES)
- If N_TRAIN_CASES < 20 -> Leave-One-Out CV by CASE
- Else -> 5-fold CV by CASE
- Else -> 5-fold CV by CASE (KFold on case IDs)
After CV tuning:
- Train FINAL model on 100% train_pool (NO early stopping; keep best by TRAIN loss)
- Predict PREDICT_CASE (assumed not in train_pool) and save CSV with Force_RNN column
Speed-ups:
- Vectorized window creation via sliding_window_view
- Window cache per window_size (reused across folds & trials)
- Avoid Python-loop window building per fold/trial
- Batched validation & prediction with AMP on CUDA
Stability fix:
- DataLoader multiprocessing OFF during CV (num_workers=0) to avoid "Too many open files"
- persistent_workers=False everywhere
- Optional torch.compile (default: OFF for CV; ON only for final by default)
"""
from __future__ import annotations
from typing import cast, Dict, Tuple, List, Optional
from typing import Dict, Tuple, List, Optional, cast
import os
import gc
......@@ -96,15 +99,18 @@ MAX_EPOCHS_TUNE = 40
PATIENCE_TUNE = 5
MAX_EPOCHS_FINAL = 200
# DataLoader perf knobs
NUM_WORKERS = min(8, (os.cpu_count() or 4)) # tune if needed
PREFETCH_FACTOR = 2
# DataLoader knobs (kept safe)
PIN_MEMORY_ON_CUDA = True
# torch.compile knobs (stable choice)
# - OFF for CV (many recompiles, can be slower + more resources)
# - ON for FINAL (single compile)
COMPILE_IN_CV = False
COMPILE_IN_FINAL = True
# torch.compile knob
USE_TORCH_COMPILE = True # will auto-fallback if unsupported
# -------------------------
# Utilities
# Utils
# -------------------------
def rmse(y_true, y_pred): # type: ignore
y_true = np.asarray(y_true).ravel()
......@@ -128,11 +134,10 @@ def make_windows_for_case_fast(
window_size: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Vectorized windows using sliding_window_view for speed.
Returns:
xw: (n_windows, window_size, n_features) float32
yw: (n_windows,) float32
iw: (n_windows,) indices
Vectorized sliding windows.
xw: (n_windows, window_size, n_features) float32
yw: (n_windows,) float32
iw: (n_windows,) index values
"""
df_case = df_case.sort_index()
x = df_case[feature_cols].to_numpy(dtype=np.float32, copy=False)
......@@ -148,11 +153,11 @@ def make_windows_for_case_fast(
)
if _HAS_SWV:
# sliding_window_view over first axis
# window over axis 0; shape becomes (n-window+1, 1, window, nfeat)
xw = sliding_window_view(x, window_shape=(window_size, x.shape[1]))[:, 0, :, :]
else:
# fallback (slower): numpy stack
xw = np.stack([x[i - window_size + 1:i + 1] for i in range(window_size - 1, n)], axis=0)
# fallback
xw = np.stack([x[i - window_size + 1:i + 1] for i in range(window_size - 1, n)], axis=0).astype(np.float32)
yw = y[window_size - 1:]
iw = idx[window_size - 1:]
......@@ -163,7 +168,7 @@ def make_windows_for_case_fast(
# Window cache per window_size
# -------------------------
CaseWindows = Dict[int, Tuple[np.ndarray, np.ndarray, np.ndarray]] # case -> (x, y, idx)
WINDOW_CACHE: Dict[int, CaseWindows] = {} # window_size -> CaseWindows
WINDOW_CACHE: Dict[int, CaseWindows] = {} # window_size -> cache
def precompute_windows_for_cases(
......@@ -181,23 +186,6 @@ def precompute_windows_for_cases(
return cache
def build_from_cache(
cache: CaseWindows,
cases: List[int]
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
xs, ys = [], []
for c in cases:
x, y, _ = cache[int(c)]
if x.shape[0] > 0:
xs.append(x)
ys.append(y)
if not xs:
return None, None
x_all = np.concatenate(xs, axis=0)
y_all = np.concatenate(ys, axis=0)
return x_all, y_all
def get_cache_for_window_size(
df: pd.DataFrame,
cases_to_cache: List[int],
......@@ -216,12 +204,27 @@ def get_cache_for_window_size(
return WINDOW_CACHE[window_size]
def build_from_cache(
cache_ws: CaseWindows,
cases: List[int]
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]:
xs, ys = [], []
for c in cases:
x, y, _ = cache_ws[int(c)]
if x.shape[0] > 0:
xs.append(x)
ys.append(y)
if not xs:
return None, None
return np.concatenate(xs, axis=0), np.concatenate(ys, axis=0)
# -------------------------
# Dataset: avoid extra copies
# Dataset (no extra copies)
# -------------------------
class WindowDataset(Dataset):
def __init__(self, x: np.ndarray, y: np.ndarray):
# x, y expected float32 already
# expects float32
self.x = torch.from_numpy(x)
self.y = torch.from_numpy(y)
......@@ -258,11 +261,10 @@ class LSTMRegressor(nn.Module):
return out
def maybe_compile(model: nn.Module, device: str) -> nn.Module:
if device != "cuda" or not USE_TORCH_COMPILE:
def maybe_compile(model: nn.Module, device: str, enable: bool) -> nn.Module:
if not enable or device != "cuda":
return model
try:
# torch.compile exists in torch>=2.0
model = torch.compile(model) # type: ignore
except Exception:
pass
......@@ -295,23 +297,29 @@ def make_case_folds(train_pool: List[int], seed=123, n_splits=5):
return folds, f"{n_splits}-fold"
def make_loader(ds: Dataset, batch_size: int, shuffle: bool, device: str) -> DataLoader:
pin = (device == "cuda")
num_workers = NUM_WORKERS if pin else 0
# -------------------------
# DataLoader: STABLE (no multiprocessing in CV)
# -------------------------
def make_loader(ds: Dataset, batch_size: int, shuffle: bool, device: str, use_workers: bool) -> DataLoader:
pin = (device == "cuda") and PIN_MEMORY_ON_CUDA
# Critical stability choice:
# - For CV (many loaders created): num_workers=0 prevents fd/pipe explosion.
# - For final train/predict: you may try workers, but keep it 0 for maximum stability.
num_workers = 0 if not use_workers else 0 # keep 0 by default (safe)
return DataLoader(
ds,
batch_size=batch_size,
shuffle=shuffle,
pin_memory=pin,
num_workers=num_workers,
persistent_workers=(num_workers > 0),
prefetch_factor=(PREFETCH_FACTOR if num_workers > 0 else None),
persistent_workers=False,
drop_last=False,
)
# -------------------------
# Train one fold (cached windows) with early stopping
# Train one fold (cached windows) + early stopping
# -------------------------
def train_one_fold_cached( # type: ignore
cache_ws: CaseWindows,
......@@ -330,6 +338,7 @@ def train_one_fold_cached( # type: ignore
patience,
grad_clip,
device,
compile_model: bool,
):
x_train, y_train = build_from_cache(cache_ws, train_cases)
x_val, y_val = build_from_cache(cache_ws, val_cases)
......@@ -337,7 +346,7 @@ def train_one_fold_cached( # type: ignore
if x_train is None or x_val is None:
return np.inf
# --- Fit scalers on TRAIN only (per fold) ---
# --- scalers on TRAIN only ---
scaler_x = StandardScaler()
scaler_x.fit(x_train.reshape(-1, feature_dim))
x_train_scaled = scaler_x.transform(x_train.reshape(-1, feature_dim)).reshape(x_train.shape).astype(np.float32, copy=False)
......@@ -349,8 +358,9 @@ def train_one_fold_cached( # type: ignore
train_ds = WindowDataset(x_train_scaled, y_train_scaled)
val_ds = WindowDataset(x_val_scaled, np.zeros((x_val_scaled.shape[0],), dtype=np.float32))
train_loader = make_loader(train_ds, batch_size=batch_size, shuffle=True, device=device)
val_loader = make_loader(val_ds, batch_size=batch_size, shuffle=False, device=device)
# CV: use_workers=False (avoid too many open files)
train_loader = make_loader(train_ds, batch_size=batch_size, shuffle=True, device=device, use_workers=False)
val_loader = make_loader(val_ds, batch_size=batch_size, shuffle=False, device=device, use_workers=False)
model = LSTMRegressor(
input_dim=feature_dim,
......@@ -359,7 +369,7 @@ def train_one_fold_cached( # type: ignore
num_layers=num_layers,
dropout=dropout
).to(device)
model = maybe_compile(model, device=device)
model = maybe_compile(model, device=device, enable=compile_model)
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
criterion = nn.SmoothL1Loss(beta=huber_beta)
......@@ -389,11 +399,9 @@ def train_one_fold_cached( # type: ignore
loss = criterion(pred, yb)
scaler_amp.scale(loss).backward()
if grad_clip is not None:
scaler_amp.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
scaler_amp.step(optimizer)
scaler_amp.update()
else:
......@@ -404,7 +412,7 @@ def train_one_fold_cached( # type: ignore
torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
optimizer.step()
# ---- Validation (batched) ----
# ---- validation ----
model.eval()
preds_scaled = []
with torch.no_grad():
......@@ -433,7 +441,6 @@ def train_one_fold_cached( # type: ignore
if wait >= patience:
break
# cleanup (important for many folds/trials)
del model, optimizer, criterion, train_loader, val_loader, train_ds, val_ds
gc.collect()
if torch.cuda.is_available():
......@@ -443,25 +450,23 @@ def train_one_fold_cached( # type: ignore
# -------------------------
# CV score for a hyperparam set (cached windows)
# CV score for a hyperparam set (cached)
# -------------------------
def cv_score_for_params_cached(
cache_ws: CaseWindows,
feature_dim: int,
folds,
params,
params_model, # must NOT include window_size
max_epochs,
patience,
grad_clip,
device
device,
compile_model: bool,
):
fold_rmses = []
for _, (train_cases, val_cases) in enumerate(folds, start=1):
try:
params_no_ws = dict(params)
params_no_ws.pop("window_size", None)
fold_rmse = train_one_fold_cached(
cache_ws=cache_ws,
feature_dim=feature_dim,
......@@ -471,7 +476,8 @@ def cv_score_for_params_cached(
patience=patience,
grad_clip=grad_clip,
device=device,
**params_no_ws
compile_model=compile_model,
**params_model
)
except torch.cuda.OutOfMemoryError:
if torch.cuda.is_available():
......@@ -491,7 +497,7 @@ def cv_score_for_params_cached(
# -------------------------
# Final training on 100% train_pool (cached windows) - no val
# Final training on 100% train_pool (cached) - no val
# Keep best by TRAIN loss
# -------------------------
def train_final_full_trainpool_cached( # type: ignore
......@@ -509,6 +515,7 @@ def train_final_full_trainpool_cached( # type: ignore
max_epochs,
grad_clip,
device,
compile_model: bool,
):
x_train, y_train = build_from_cache(cache_ws, train_cases)
if x_train is None:
......@@ -522,7 +529,9 @@ def train_final_full_trainpool_cached( # type: ignore
y_train_scaled = scaler_y.fit_transform(y_train.reshape(-1, 1)).ravel().astype(np.float32, copy=False)
train_ds = WindowDataset(x_train_scaled, y_train_scaled)
train_loader = make_loader(train_ds, batch_size=batch_size, shuffle=True, device=device)
# Final: still keep workers OFF for stability; you can try True later.
train_loader = make_loader(train_ds, batch_size=batch_size, shuffle=True, device=device, use_workers=False)
model = LSTMRegressor(
input_dim=feature_dim,
......@@ -531,7 +540,7 @@ def train_final_full_trainpool_cached( # type: ignore
num_layers=num_layers,
dropout=dropout
).to(device)
model = maybe_compile(model, device=device)
model = maybe_compile(model, device=device, enable=compile_model)
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
criterion = nn.SmoothL1Loss(beta=huber_beta)
......@@ -567,7 +576,6 @@ def train_final_full_trainpool_cached( # type: ignore
if grad_clip is not None:
scaler_amp.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
scaler_amp.step(optimizer)
scaler_amp.update()
else:
......@@ -586,7 +594,6 @@ def train_final_full_trainpool_cached( # type: ignore
best_train_loss = epoch_loss
best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
# cleanup
del model, optimizer, criterion, train_loader, train_ds
gc.collect()
if torch.cuda.is_available():
......@@ -615,9 +622,9 @@ def sample_params(device: str = "cpu"):
cost = window_size * hidden_dim * num_layers
if device == "cuda":
if cost >= 80 * 256 * 2:
if cost >= 80 * 128 * 2:
batch_size = 16
elif cost >= 60 * 256 * 2 or cost >= 80 * 128 * 2:
elif cost >= 60 * 128 * 2 or cost >= 80 * 64 * 2:
batch_size = 32
else:
batch_size = int(np.random.choice([32, 64], p=[0.55, 0.45]))
......@@ -653,7 +660,7 @@ def main():
all_cases = sorted(df["Case"].unique())
train_pool = [int(c) for c in all_cases if c <= N_TRAIN_CASES]
if PREDICT_CASE in train_pool:
if int(PREDICT_CASE) in train_pool:
raise RuntimeError(f"PREDICT_CASE={PREDICT_CASE} is in train_pool. This must not happen.")
device = "cuda" if torch.cuda.is_available() else "cpu"
......@@ -664,15 +671,14 @@ def main():
print("Predict (test) case:", PREDICT_CASE)
print("Train pool cases (for CV):", train_pool)
print("n_cases total:", len(all_cases), "| train_pool:", len(train_pool))
print("NUM_WORKERS:", (NUM_WORKERS if device == "cuda" else 0))
print("torch.compile:", ("ON" if (device == "cuda" and USE_TORCH_COMPILE) else "OFF"))
print("torch.compile CV:", "ON" if (device == "cuda" and COMPILE_IN_CV) else "OFF")
print("torch.compile FINAL:", "ON" if (device == "cuda" and COMPILE_IN_FINAL) else "OFF")
print("============================================================")
folds, strategy = make_case_folds(train_pool, seed=SEED, n_splits=5)
print(f"CV strategy: {strategy} | n_folds={len(folds)}")
# Cases we need in cache during tuning:
# train_pool is enough for CV. We'll cache test case too for later prediction.
# Cache cases needed (train_pool for CV, plus test case for prediction)
cases_to_cache_all = sorted(set(train_pool + [int(PREDICT_CASE)]))
# ---- CV Tuning ----
......@@ -680,7 +686,9 @@ def main():
for t in range(1, N_TRIALS + 1):
params = sample_params(device=device)
ws = params["window_size"]
# Separate window_size from model params (cached funcs must NOT receive window_size)
ws = int(params["window_size"])
params_model = dict(params)
params_model.pop("window_size", None)
......@@ -693,19 +701,27 @@ def main():
window_size=ws
)
# feature_dim is stable once windows exist
any_case = cases_to_cache_all[0]
feature_dim = cache_ws[any_case][0].shape[-1]
# feature_dim derived from cached windows
# pick a case that has windows; if first has none, find another
feature_dim = None
for c in cases_to_cache_all:
x_c = cache_ws[int(c)][0]
if x_c.shape[0] > 0:
feature_dim = x_c.shape[-1]
break
if feature_dim is None:
raise RuntimeError("All cases have 0 windows for this window_size. Lower window_size.")
mean_rmse, std_rmse, fold_rmses = cv_score_for_params_cached(
cache_ws=cache_ws,
feature_dim=feature_dim,
folds=folds,
params=params,
params_model=params_model,
max_epochs=MAX_EPOCHS_TUNE,
patience=PATIENCE_TUNE,
grad_clip=1.0,
device=device
device=device,
compile_model=(COMPILE_IN_CV and device == "cuda"),
)
if fold_rmses is None:
......@@ -735,6 +751,8 @@ def main():
best_params = cast(dict, best["params"])
ws_best = int(best_params["window_size"])
best_params_model = dict(best_params)
best_params_model.pop("window_size", None)
# Ensure cache exists for best ws
cache_ws_best = get_cache_for_window_size(
......@@ -744,8 +762,16 @@ def main():
target_col=target_col,
window_size=ws_best
)
any_case = cases_to_cache_all[0]
feature_dim = cache_ws_best[any_case][0].shape[-1]
# feature_dim again
feature_dim = None
for c in cases_to_cache_all:
x_c = cache_ws_best[int(c)][0]
if x_c.shape[0] > 0:
feature_dim = x_c.shape[-1]
break
if feature_dim is None:
raise RuntimeError("All cases have 0 windows for best window_size. Lower window_size.")
# ---- Final train on 100% train_pool (NO VAL) ----
print("============================================================")
......@@ -754,9 +780,6 @@ def main():
print("Using best parameters:", best_params)
print("============================================================")
best_params_no_ws = dict(best_params)
best_params_no_ws.pop("window_size", None)
final_state, final_scalers = train_final_full_trainpool_cached(
cache_ws=cache_ws_best,
feature_dim=feature_dim,
......@@ -764,7 +787,8 @@ def main():
max_epochs=MAX_EPOCHS_FINAL,
grad_clip=1.0,
device=device,
**best_params_no_ws
compile_model=(COMPILE_IN_FINAL and device == "cuda"),
**best_params_model
)
scaler_x, scaler_y = final_scalers
......@@ -772,31 +796,28 @@ def main():
# ---- Predict PREDICT_CASE ----
df_test = df[df["Case"] == PREDICT_CASE].copy()
# Use cached windows for test case too (fast + consistent)
x_test, y_test, test_indices = cache_ws_best[int(PREDICT_CASE)]
if x_test.shape[0] == 0:
raise RuntimeError(
f"Case {PREDICT_CASE} has fewer points ({len(df_test)}) than "
f"window_size ({ws_best})."
f"Case {PREDICT_CASE} has fewer points ({len(df_test)}) than window_size ({ws_best})."
)
x_test_scaled = scaler_x.transform(x_test.reshape(-1, feature_dim)).reshape(x_test.shape).astype(np.float32, copy=False)
final_model = LSTMRegressor(
input_dim=feature_dim,
hidden_dim=best_params["hidden_dim"],
dense_dim=best_params["dense_dim"],
num_layers=best_params["num_layers"],
dropout=best_params["dropout"]
hidden_dim=best_params_model["hidden_dim"],
dense_dim=best_params_model["dense_dim"],
num_layers=best_params_model["num_layers"],
dropout=best_params_model["dropout"]
).to(device)
final_model = maybe_compile(final_model, device=device)
# compile not necessary for predict, but harmless if final was compiled; keep OFF to avoid overhead
final_model.load_state_dict(final_state)
final_model.eval()
test_ds = WindowDataset(x_test_scaled, np.zeros((x_test_scaled.shape[0],), dtype=np.float32))
test_loader = make_loader(test_ds, batch_size=best_params["batch_size"], shuffle=False, device=device)
test_loader = make_loader(test_ds, batch_size=best_params_model["batch_size"], shuffle=False, device=device, use_workers=False)
use_amp = (device == "cuda")
preds = []
......@@ -823,7 +844,6 @@ def main():
# ---- Save results ----
df_test_pred = df_test.copy()
df_test_pred["Force_RNN"] = np.nan
for idx, pred in zip(test_indices, y_pred):
df_test_pred.loc[idx, "Force_RNN"] = pred
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment