MStream API¶
MStream implements the multi-aspect streaming anomaly detector described in the mStream paper. It is designed for events that contain separate numerical and categorical features, such as login traffic with byte counts plus country or endpoint IDs.
Configuration¶
| Parameter | Type | Default | Description |
|---|---|---|---|
numeric_dim |
usize / int |
Required | Number of numerical features in each record |
categorical_dim |
usize / int |
Required | Number of categorical features in each record |
num_rows |
usize / int |
2 |
Number of hash rows |
num_buckets |
usize / int |
1024 |
Number of buckets per hash row |
alpha |
f64 / float |
0.8 |
Temporal decay factor in (0, 1) |
seed |
u64 / int |
random | Optional seed for deterministic hashing |
At least one of numeric_dim or categorical_dim must be greater than zero.
Implementation choices¶
MStream follows the paper's overall detector structure, while making a small
number of practical implementation choices for library use:
- For real-valued features, the paper applies
log(1+x). This library usesasinh(x)instead so that all finite signed numerical inputs are supported, including values at or below-1. - For categorical hashing, the paper describes linear hash functions. This
library uses seeded
ahashinstances instead of implementing the paper's arithmetic linear hashes directly, while still keeping the per-row hash layout deterministic from the detector seed.
Timestamp semantics¶
timestamp is a logical tick index, not wall-clock time. Only differences between timestamps matter.
- shifting all timestamps by a constant leaves scores unchanged
- a gap of
kticks applies the temporal decay factoralphaexactlyktimes - timestamps must be positive and monotonically non-decreasing
For paper-style usage, pass the same timestamp for all records in the same tick, then increment it when the stream advances.
Rust API¶
Creating a detector¶
use rcf3::MStream;
let mut detector = MStream::builder(2, 1)
.alpha(0.8)
.num_rows(2)
.num_buckets(1024)
.seed(7)
.build()?;
Update and preview scoring¶
Use update_and_score when you want to ingest the event and receive its anomaly score:
Use score to preview what the next inserted record would score without mutating detector state:
let preview = detector.score(&[1.5, 2.0], &[7], 2)?;
let committed = detector.update_and_score(&[1.5, 2.0], &[7], 2)?;
assert_eq!(preview, committed);
Use update when you want to ingest without returning the score.
Detailed scores¶
The final score is the sum of one record-level contribution plus one contribution per numerical and categorical feature:
let detailed = detector.score_detailed(&[1.5, 2.0], &[7], 3)?;
assert_eq!(detailed.numeric_features.len(), 2);
assert_eq!(detailed.categorical_features.len(), 1);
MStreamScore contains:
totalrecordnumeric_featurescategorical_features
Use update_and_score_detailed when you want that decomposition for a committed insert.
Status accessors¶
assert!(detector.is_ready());
assert_eq!(detector.entries_seen(), 2);
assert_eq!(detector.current_time(), Some(2));
is_ready() becomes true after the first processed record.
Serialization¶
With the serde feature enabled:
With both serde and std enabled, file helpers are also available:
Practical example¶
use rcf3::MStream;
let mut detector = MStream::builder(2, 2)
.seed(2026)
.num_buckets(512)
.build()?;
let normal = detector.update_and_score(&[0.0, 3.2], &[1, 10], 1)?;
let suspicious = detector.score_detailed(&[12.0, 0.3], &[99, 10], 2)?;
println!("normal={normal}, suspicious={}", suspicious.total);
println!("failed-attempt contribution={}", suspicious.numeric_features[0]);
println!("country contribution={}", suspicious.categorical_features[0]);
Python API¶
Creating a detector¶
from rcf3 import MStream
detector = MStream(
numeric_dim=2,
categorical_dim=1,
alpha=0.8,
num_rows=2,
num_buckets=1024,
seed=7,
)
Update and preview scoring¶
score = detector.update_and_score([1.5, 2.0], [7], 1)
preview = detector.score([1.5, 2.0], [7], 2)
committed = detector.update_and_score([1.5, 2.0], [7], 2)
assert preview == committed
Detailed scores¶
Python returns the decomposition as a dict-like object:
detailed = detector.score_detailed([1.5, 2.0], [7], 3)
assert len(detailed["numeric_features"]) == 2
assert len(detailed["categorical_features"]) == 1
The available keys are:
totalrecordnumeric_featurescategorical_features
Status accessors¶
Serialization¶
json_str = detector.to_json()
restored = MStream.from_json(json_str)
detector.save_json("mstream.json")
restored_from_file = MStream.load_json("mstream.json")
MStream also supports normal Python pickle round-trips through its JSON-backed state hooks.
Practical example¶
from rcf3 import MStream
detector = MStream(numeric_dim=2, categorical_dim=2, seed=2026, num_buckets=512)
normal = detector.update_and_score([0.0, 3.2], [1, 10], 1)
suspicious = detector.score_detailed([12.0, 0.3], [99, 10], 2)
print(f"normal={normal}, suspicious={suspicious['total']}")
print(f"failed-attempt contribution={suspicious['numeric_features'][0]}")
print(f"country contribution={suspicious['categorical_features'][0]}")