diff --git a/bigframes/ml/cluster.py b/bigframes/ml/cluster.py
index 360ab01453..c294d1f424 100644
--- a/bigframes/ml/cluster.py
+++ b/bigframes/ml/cluster.py
@@ -96,6 +96,34 @@ def predict(
 
         return self._bqml_model.predict(X)
 
+    def detect_anomalies(
+        self, X: Union[bpd.DataFrame, bpd.Series], *, contamination: float = 0.1
+    ) -> bpd.DataFrame:
+        """Detect the anomaly data points of the input.
+
+        Args:
+            X (bigframes.dataframe.DataFrame or bigframes.series.Series):
+                Series or a DataFrame to detect anomalies.
+            contamination (float, default 0.1):
+                Identifies the proportion of anomalies in the training dataset that are used to create the model.
+                The value must be in the range [0, 0.5].
+
+        Returns:
+            bigframes.dataframe.DataFrame: detected DataFrame."""
+        if contamination < 0.0 or contamination > 0.5:
+            raise ValueError(
+                f"contamination must be [0.0, 0.5], but is {contamination}."
+            )
+
+        if not self._bqml_model:
+            raise RuntimeError("A model must be fitted before detect_anomalies")
+
+        (X,) = utils.convert_to_dataframe(X)
+
+        return self._bqml_model.detect_anomalies(
+            X, options={"contamination": contamination}
+        )
+
     def to_gbq(self, model_name: str, replace: bool = False) -> KMeans:
         """Save the model to BigQuery.
 
diff --git a/bigframes/ml/decomposition.py b/bigframes/ml/decomposition.py
index 2714664dce..9dc60be78f 100644
--- a/bigframes/ml/decomposition.py
+++ b/bigframes/ml/decomposition.py
@@ -111,7 +111,7 @@ def predict(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame:
         return self._bqml_model.predict(X)
 
     def detect_anomalies(
-        self, X: Union[bpd.DataFrame, bpd.Series], *, contamination=0.1
+        self, X: Union[bpd.DataFrame, bpd.Series], *, contamination: float = 0.1
     ) -> bpd.DataFrame:
         """Detect the anomaly data points of the input.
 
diff --git a/bigframes/ml/forecasting.py b/bigframes/ml/forecasting.py
index 0c33660475..18380328c7 100644
--- a/bigframes/ml/forecasting.py
+++ b/bigframes/ml/forecasting.py
@@ -119,6 +119,36 @@ def predict(
             options={"horizon": horizon, "confidence_level": confidence_level}
         )
 
+    def detect_anomalies(
+        self,
+        X: Union[bpd.DataFrame, bpd.Series],
+        *,
+        anomaly_prob_threshold: float = 0.95,
+    ) -> bpd.DataFrame:
+        """Detect the anomaly data points of the input.
+
+        Args:
+            X (bigframes.dataframe.DataFrame or bigframes.series.Series):
+                Series or a DataFrame to detect anomalies.
+            anomaly_prob_threshold (float, default 0.95):
+                Identifies the custom threshold to use for anomaly detection. The value must be in the range [0, 1), with a default value of 0.95.
+
+        Returns:
+            bigframes.dataframe.DataFrame: detected DataFrame."""
+        if anomaly_prob_threshold < 0.0 or anomaly_prob_threshold >= 1.0:
+            raise ValueError(
+                f"anomaly_prob_threshold must be [0.0, 1.0), but is {anomaly_prob_threshold}."
+            )
+
+        if not self._bqml_model:
+            raise RuntimeError("A model must be fitted before detect_anomalies")
+
+        (X,) = utils.convert_to_dataframe(X)
+
+        return self._bqml_model.detect_anomalies(
+            X, options={"anomaly_prob_threshold": anomaly_prob_threshold}
+        )
+
     def score(
         self,
         X: Union[bpd.DataFrame, bpd.Series],
diff --git a/tests/system/small/ml/test_cluster.py b/tests/system/small/ml/test_cluster.py
index a9fec0bbce..96066e5fbe 100644
--- a/tests/system/small/ml/test_cluster.py
+++ b/tests/system/small/ml/test_cluster.py
@@ -15,6 +15,7 @@
 import pandas as pd
 
 from bigframes.ml import cluster
+import bigframes.pandas as bpd
 from tests.system.utils import assert_pandas_df_equal
 
 _PD_NEW_PENGUINS = pd.DataFrame.from_dict(
@@ -73,6 +74,50 @@ def test_kmeans_predict(session, penguins_kmeans_model: cluster.KMeans):
     assert_pandas_df_equal(result, expected, ignore_order=True)
 
 
+def test_kmeans_detect_anomalies(
+    penguins_kmeans_model: cluster.KMeans, new_penguins_df: bpd.DataFrame
+):
+    anomalies = penguins_kmeans_model.detect_anomalies(new_penguins_df).to_pandas()
+    expected = pd.DataFrame(
+        {
+            "is_anomaly": [False, False, False],
+            "normalized_distance": [1.082937, 0.77139, 0.478304],
+        },
+        index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
+    )
+
+    pd.testing.assert_frame_equal(
+        anomalies[["is_anomaly", "normalized_distance"]].sort_index(),
+        expected,
+        check_exact=False,
+        check_dtype=False,
+        rtol=0.1,
+    )
+
+
+def test_kmeans_detect_anomalies_params(
+    penguins_kmeans_model: cluster.KMeans, new_penguins_df: bpd.DataFrame
+):
+    anomalies = penguins_kmeans_model.detect_anomalies(
+        new_penguins_df, contamination=0.4
+    ).to_pandas()
+    expected = pd.DataFrame(
+        {
+            "is_anomaly": [True, False, False],
+            "normalized_distance": [1.082937, 0.77139, 0.478304],
+        },
+        index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
+    )
+
+    pd.testing.assert_frame_equal(
+        anomalies[["is_anomaly", "normalized_distance"]].sort_index(),
+        expected,
+        check_exact=False,
+        check_dtype=False,
+        rtol=0.1,
+    )
+
+
 def test_kmeans_score(session, penguins_kmeans_model: cluster.KMeans):
     new_penguins = session.read_pandas(_PD_NEW_PENGUINS)
     result = penguins_kmeans_model.score(new_penguins).to_pandas()
diff --git a/tests/system/small/ml/test_decomposition.py b/tests/system/small/ml/test_decomposition.py
index 72fdc6d951..9eb9b25ea1 100644
--- a/tests/system/small/ml/test_decomposition.py
+++ b/tests/system/small/ml/test_decomposition.py
@@ -59,6 +59,29 @@ def test_pca_detect_anomalies(
     )
 
 
+def test_pca_detect_anomalies_params(
+    penguins_pca_model: decomposition.PCA, new_penguins_df: bpd.DataFrame
+):
+    anomalies = penguins_pca_model.detect_anomalies(
+        new_penguins_df, contamination=0.2
+    ).to_pandas()
+    expected = pd.DataFrame(
+        {
+            "is_anomaly": [False, True, True],
+            "mean_squared_error": [0.254188, 0.731243, 0.298889],
+        },
+        index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
+    )
+
+    pd.testing.assert_frame_equal(
+        anomalies[["is_anomaly", "mean_squared_error"]].sort_index(),
+        expected,
+        check_exact=False,
+        check_dtype=False,
+        rtol=0.1,
+    )
+
+
 def test_pca_score(penguins_pca_model: decomposition.PCA):
     result = penguins_pca_model.score().to_pandas()
     expected = pd.DataFrame(
diff --git a/tests/system/small/ml/test_forecasting.py b/tests/system/small/ml/test_forecasting.py
index 4726d5ab21..7fef189550 100644
--- a/tests/system/small/ml/test_forecasting.py
+++ b/tests/system/small/ml/test_forecasting.py
@@ -35,7 +35,9 @@
 ]
 
 
-def test_model_predict_default(time_series_arima_plus_model: forecasting.ARIMAPlus):
+def test_arima_plus_predict_default(
+    time_series_arima_plus_model: forecasting.ARIMAPlus,
+):
     utc = pytz.utc
     predictions = time_series_arima_plus_model.predict().to_pandas()
     assert predictions.shape == (3, 8)
@@ -63,7 +65,7 @@ def test_model_predict_default(time_series_arima_plus_model: forecasting.ARIMAPl
     )
 
 
-def test_model_predict_params(time_series_arima_plus_model: forecasting.ARIMAPlus):
+def test_arima_plus_predict_params(time_series_arima_plus_model: forecasting.ARIMAPlus):
     utc = pytz.utc
     predictions = time_series_arima_plus_model.predict(
         horizon=4, confidence_level=0.9
@@ -94,7 +96,55 @@ def test_model_predict_params(time_series_arima_plus_model: forecasting.ARIMAPlu
     )
 
 
-def test_model_score(
+def test_arima_plus_detect_anomalies(
+    time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
+):
+    anomalies = time_series_arima_plus_model.detect_anomalies(
+        new_time_series_df
+    ).to_pandas()
+
+    expected = pd.DataFrame(
+        {
+            "is_anomaly": [False, False, False],
+            "lower_bound": [2349.301736, 2153.614829, 1849.040192],
+            "upper_bound": [3099.642833, 3033.12195, 2858.185876],
+            "anomaly_probability": [0.757824, 0.322559, 0.43011],
+        },
+    )
+    pd.testing.assert_frame_equal(
+        anomalies[["is_anomaly", "lower_bound", "upper_bound", "anomaly_probability"]],
+        expected,
+        rtol=0.1,
+        check_index_type=False,
+        check_dtype=False,
+    )
+
+
+def test_arima_plus_detect_anomalies_params(
+    time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
+):
+    anomalies = time_series_arima_plus_model.detect_anomalies(
+        new_time_series_df, anomaly_prob_threshold=0.7
+    ).to_pandas()
+
+    expected = pd.DataFrame(
+        {
+            "is_anomaly": [True, False, False],
+            "lower_bound": [2525.5363, 2360.1870, 2086.0609],
+            "upper_bound": [2923.408256, 2826.54981, 2621.165188],
+            "anomaly_probability": [0.757824, 0.322559, 0.43011],
+        },
+    )
+    pd.testing.assert_frame_equal(
+        anomalies[["is_anomaly", "lower_bound", "upper_bound", "anomaly_probability"]],
+        expected,
+        rtol=0.1,
+        check_index_type=False,
+        check_dtype=False,
+    )
+
+
+def test_arima_plus_score(
     time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
 ):
     result = time_series_arima_plus_model.score(
@@ -118,16 +168,14 @@ def test_model_score(
     )
 
 
-def test_model_summary(
-    time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
-):
+def test_arima_plus_summary(time_series_arima_plus_model: forecasting.ARIMAPlus):
     result = time_series_arima_plus_model.summary()
     assert result.shape == (1, 12)
     assert all(column in result.columns for column in ARIMA_EVALUATE_OUTPUT_COL)
 
 
-def test_model_summary_show_all_candidates(
-    time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
+def test_arima_plus_summary_show_all_candidates(
+    time_series_arima_plus_model: forecasting.ARIMAPlus,
 ):
     result = time_series_arima_plus_model.summary(
         show_all_candidate_models=True,
@@ -136,7 +184,7 @@ def test_model_summary_show_all_candidates(
     assert all(column in result.columns for column in ARIMA_EVALUATE_OUTPUT_COL)
 
 
-def test_model_score_series(
+def test_arima_plus_score_series(
     time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
 ):
     result = time_series_arima_plus_model.score(
@@ -160,9 +208,7 @@ def test_model_score_series(
     )
 
 
-def test_model_summary_series(
-    time_series_arima_plus_model: forecasting.ARIMAPlus, new_time_series_df
-):
+def test_arima_plus_summary_series(time_series_arima_plus_model: forecasting.ARIMAPlus):
     result = time_series_arima_plus_model.summary()
     assert result.shape == (1, 12)
     assert all(column in result.columns for column in ARIMA_EVALUATE_OUTPUT_COL)