MStream API¶
MStream is a practical multi-aspect streaming anomaly detector for events
that contain separate numerical and categorical features, such as login traffic
with byte counts plus country or endpoint IDs.
Configuration¶
| Parameter | Type | Default | Description | Constraints |
|---|---|---|---|---|
numeric_dim |
usize / int |
Required | Number of numerical features in each record | At least one of numeric_dim or categorical_dim must be > 0 |
categorical_dim |
usize / int |
Required | Number of categorical features in each record | At least one of numeric_dim or categorical_dim must be > 0 |
num_rows |
usize / int |
2 |
Number of hash rows | Must be > 0 |
num_buckets |
usize / int |
1024 |
Number of buckets per hash row | Must be >= 2 |
alpha |
f64 / float |
0.8 |
Temporal decay factor | Must be finite and in (0.0, 1.0) |
seed |
u64 / int |
random | Optional seed for deterministic hashing | None |
Implementation choices¶
MStream keeps the core mStream sketch layout, while making a small number of
practical implementation choices for library use:
- Real-valued features are normalized with
asinh(x)so that all finite signed numerical inputs are supported, including values at or below-1. The paper applieslog(1+x), which is only defined forx > -1. - Categorical hashing uses seeded
ahashinstances. The paper describes arithmetic linear hash functions; this library usesahashbecause benchmark runs were faster with it while keeping the per-row hash layout deterministic from the detector seed.
Timestamp semantics¶
timestamp is a logical tick index, not wall-clock time. Only differences between timestamps matter.
- shifting all timestamps by a constant leaves scores unchanged
- a gap of
kticks applies the temporal decay factoralphaexactlyktimes - timestamps must be positive and monotonically non-decreasing
For batched stream ticks, pass the same timestamp for all records in the same tick, then increment it when the stream advances.
Rust API¶
Creating a detector¶
use rcf3::MStream;
let mut detector = MStream::builder(2, 1)
.alpha(0.8)
.num_rows(2)
.num_buckets(1024)
.seed(7)
.build()?;
Update and preview scoring¶
Use update_and_score when you want to ingest the event and receive its anomaly score:
Use score to preview what the next inserted record would score without mutating detector state:
let preview = detector.score(&[1.5, 2.0], &[7], 2)?;
let committed = detector.update_and_score(&[1.5, 2.0], &[7], 2)?;
assert_eq!(preview, committed);
Use update when you want to ingest without returning the score.
Detailed scores¶
The final score is the sum of one record-level contribution plus one contribution per numerical and categorical feature:
let detailed = detector.score_detailed(&[1.5, 2.0], &[7], 3)?;
assert_eq!(detailed.numeric_features.len(), 2);
assert_eq!(detailed.categorical_features.len(), 1);
MStreamScore contains:
totalrecordnumeric_featurescategorical_features
Use update_and_score_detailed when you want that decomposition for a committed insert.
Status accessors¶
assert!(detector.is_ready());
assert_eq!(detector.entries_seen(), 2);
assert_eq!(detector.current_time(), Some(2));
is_ready() becomes true after the first processed record.
Serialization¶
With the serde feature enabled:
With both serde and std enabled, file helpers are also available:
Practical example¶
use rcf3::MStream;
let mut detector = MStream::builder(2, 2)
.seed(2026)
.num_buckets(512)
.build()?;
let normal = detector.update_and_score(&[0.0, 3.2], &[1, 10], 1)?;
let suspicious = detector.score_detailed(&[12.0, 0.3], &[99, 10], 2)?;
println!("normal={normal}, suspicious={}", suspicious.total);
println!("failed-attempt contribution={}", suspicious.numeric_features[0]);
println!("country contribution={}", suspicious.categorical_features[0]);
Python API¶
Creating a detector¶
from rcf3 import MStream
detector = MStream(
numeric_dim=2,
categorical_dim=1,
alpha=0.8,
num_rows=2,
num_buckets=1024,
seed=7,
)
Update and preview scoring¶
score = detector.update_and_score([1.5, 2.0], [7], 1)
preview = detector.score([1.5, 2.0], [7], 2)
committed = detector.update_and_score([1.5, 2.0], [7], 2)
assert preview == committed
Detailed scores¶
Python returns the decomposition as a dict-like object:
detailed = detector.score_detailed([1.5, 2.0], [7], 3)
assert len(detailed["numeric_features"]) == 2
assert len(detailed["categorical_features"]) == 1
The available keys are:
totalrecordnumeric_featurescategorical_features
Status accessors¶
Serialization¶
json_str = detector.to_json()
restored = MStream.from_json(json_str)
detector.save_json("mstream.json")
restored_from_file = MStream.load_json("mstream.json")
MStream also supports normal Python pickle round-trips through its JSON-backed state hooks.
Practical example¶
from rcf3 import MStream
detector = MStream(numeric_dim=2, categorical_dim=2, seed=2026, num_buckets=512)
normal = detector.update_and_score([0.0, 3.2], [1, 10], 1)
suspicious = detector.score_detailed([12.0, 0.3], [99, 10], 2)
print(f"normal={normal}, suspicious={suspicious['total']}")
print(f"failed-attempt contribution={suspicious['numeric_features'][0]}")
print(f"country contribution={suspicious['categorical_features'][0]}")