Exercise 18.4
Catch the Flatline - Contextual vs. Magnitude Anomaly Detection
The isolation forest missed the stuck-tool flatline because each point looked normal. Write a simple contextual detector: flag any depth where a log's rolling standard deviation over a short window drops near zero. Inject a flatlined neutron interval, show your detector catches it, and explain why this contextual check and the isolation forest are complementary rather than redundant.
---
The isolation forest in this chapter is a magnitude detector: it flags readings of the wrong size. A stuck logging tool that flatlines a curve at a perfectly normal value slips straight through, since every individual point looks fine, and only the lack of variation betrays it. That is a contextual anomaly, and it needs a different check: a short rolling standard deviation that notices the curve has stopped moving.
You are given a verified well builder (embedded, do not edit). The helper build_flatlined_well(seed=3) calls make_well_with_bad_data(seed), then overwrites the NPHI curve with its own value at index FLAT_START across a short interval, so the flatline sits at a normal value and is the only anomaly we score against. It returns (depth, df, flat_mask), where flat_mask is True over exactly the flatlined rows.
Embedded constants (do not change them):
FLAT_START = 120: index where the flatline beginsFLAT_LEN = 16: number of constant samplesWINDOW = 5: rolling-window lengthSTD_THRESH = 0.004: a rolling std below this counts as "stuck"ISO_CONTAM = 0.08: IsolationForest contaminationISO_RANDOM_STATE = 0: IsolationForest random_state
Write two small detector functions:
def flatline_recall_rolling(seed=3, window=WINDOW, thresh=STD_THRESH):
"""Contextual detector. Flag depths where NPHI's *trailing* rolling standard
deviation drops below `thresh` (the curve has stopped moving). Return recall
against flat_mask."""
depth, df, flat_mask = build_flatlined_well(seed)
rstd = df['NPHI'].rolling(window, min_periods=window).std()
flagged = (rstd < thresh).fillna(False).values
return float((flagged & flat_mask).sum() / flat_mask.sum())
def flatline_recall_isoforest(seed=3, contamination=ISO_CONTAM):
"""Magnitude detector. Standardise the well and run IsolationForest, then
return its recall on the SAME flat_mask."""
depth, df, flat_mask = build_flatlined_well(seed)
Xa = df.copy(); Xa['RT'] = np.log10(Xa['RT'])
Xa = StandardScaler().fit_transform(Xa.values)
iso = IsolationForest(contamination=contamination, random_state=ISO_RANDOM_STATE).fit(Xa)
flagged = iso.predict(Xa) == -1
return float((flagged & flat_mask).sum() / flat_mask.sum())Then assign these exact module-level output names (note the scalar for the forest is flatline_recall_iso, kept distinct from the function so the function itself stays callable):
flatline_recall_rollingstd = flatline_recall_rolling(3)
flatline_recall_iso = flatline_recall_isoforest(3)> Think about it: at seed=3 the rolling-std detector catches the stuck-tool > flatline (recall ~0.75; the trailing window of size 5 means the first > WINDOW - 1 flatline samples still straddle pre-flatline points, so they keep > moving), while IsolationForest misses it almost entirely (recall ~0). Every > flatlined point sits at a perfectly normal magnitude, so the forest has nothing > to isolate. The two detectors are complementary, not redundant: one catches > the wrong size, the other catches the wrong behaviour. Why does no single > detector catch every failure?
Stuck? Reveal hints one at a time — they progress from nudge to near-solution.
visibilityReveal reference solutionexpand_more
Try solving it yourself first — the hints walk you through it. The solution below is one valid approach; yours may differ and still be correct.
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
# ── Verified log-QC well generator (do not edit) ─────────────────────────
# Copied VERBATIM from chapters/18-unsupervised-learning.qmd. A clean
# depth-ordered log run with realistic tool/sensor failures injected.
def make_well_with_bad_data(seed=3):
"""A clean depth-ordered log run (a stand-in for a real LAS) with realistic
tool/sensor failures injected, so we know the ground truth to score against."""
rng = np.random.default_rng(seed)
depth = 9000 + 0.5 * np.arange(240)
Vsh = np.clip(0.25 + 0.15 * np.sin(depth / 14) + rng.normal(0, 0.05, depth.size), 0, 1)
phi = np.clip(0.27 * (1 - 0.8 * Vsh) + rng.normal(0, 0.02, depth.size), 0.02, 0.34)
GR = 22 * (1 - Vsh) + 125 * Vsh + rng.normal(0, 6, depth.size)
RHOB = (2.65 + 0.03 * Vsh) * (1 - phi) + 1.0 * phi + rng.normal(0, 0.03, depth.size)
NPHI = phi + 0.30 * Vsh + rng.normal(0, 0.02, depth.size)
RT = np.exp(rng.normal(0, 0.3, depth.size)) * (2 + 8 * np.clip(0.30 - phi, 0, 1)) * (1 - 0.5 * Vsh) + 0.5
df = pd.DataFrame({"GR": GR, "RHOB": RHOB, "NPHI": NPHI, "RT": np.clip(RT, 0.3, 400)})
bad = np.zeros(depth.size, bool)
w = (depth >= 9030) & (depth < 9035); df.loc[w, "RHOB"] = 1.55 + rng.normal(0, 0.05, w.sum()); bad |= w
df.loc[[60, 61, 150], "RT"] = [4500, 3800, 5200.0]; bad[[60, 61, 150]] = True
h = (depth >= 9088) & (depth < 9092); df.loc[h, "GR"] = 330 + rng.normal(0, 10, h.sum()); bad |= h
df.loc[[200, 201], "NPHI"] = [-0.08, 0.62]; bad[[200, 201]] = True
return depth, df, bad
# ── Flatline injection constants + builder (do not edit) ─────────────────
FLAT_START = 120 # index where the flatline begins
FLAT_LEN = 16 # number of constant samples
WINDOW = 5 # rolling-window length
STD_THRESH = 0.004 # a rolling std below this counts as "stuck"
ISO_CONTAM = 0.08 # IsolationForest contamination
ISO_RANDOM_STATE = 0 # IsolationForest random_state
def build_flatlined_well(seed=3):
"""Build a clean-ish well, then overwrite NPHI with its own value at
FLAT_START across [FLAT_START : FLAT_START+FLAT_LEN]. The flatline sits at a
perfectly NORMAL value, so it is the ONLY anomaly to score against.
Returns (depth, df, flat_mask)."""
depth, df, _ = make_well_with_bad_data(seed)
df = df.copy()
const = df["NPHI"].iloc[FLAT_START]
df.loc[FLAT_START:FLAT_START + FLAT_LEN - 1, "NPHI"] = const
flat_mask = np.zeros(len(df), bool)
flat_mask[FLAT_START:FLAT_START + FLAT_LEN] = True
return depth, df, flat_mask
# ─────────────────────────────────────────────────────────────────────────
def flatline_recall_rolling(seed=3, window=WINDOW, thresh=STD_THRESH):
"""Rolling-std contextual detector: flag depths where NPHI's trailing
rolling standard deviation drops below thresh (the curve has stopped
moving). Return recall against flat_mask."""
depth, df, flat_mask = build_flatlined_well(seed)
rstd = df["NPHI"].rolling(window, min_periods=window).std()
flagged = (rstd < thresh).fillna(False).values
return float((flagged & flat_mask).sum() / flat_mask.sum())
def flatline_recall_isoforest(seed=3, contamination=ISO_CONTAM):
"""Magnitude detector: standardise the well and run IsolationForest;
return its recall on the SAME flat_mask."""
depth, df, flat_mask = build_flatlined_well(seed)
Xa = df.copy(); Xa["RT"] = np.log10(Xa["RT"])
Xa = StandardScaler().fit_transform(Xa.values)
iso = IsolationForest(contamination=contamination, random_state=ISO_RANDOM_STATE).fit(Xa)
flagged = iso.predict(Xa) == -1
return float((flagged & flat_mask).sum() / flat_mask.sum())
flatline_recall_rollingstd = flatline_recall_rolling(3)
flatline_recall_iso = flatline_recall_isoforest(3)
print("rolling-std detector recall:", flatline_recall_rollingstd)
print("IsolationForest recall: ", flatline_recall_iso)
print("gap (contextual - magnitude):", flatline_recall_rollingstd - flatline_recall_iso)
lockCopying code is a Full Access feature.