Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ janus-dashboard/.vscode/
debug_*.py
tests/reproduction_test.rs
tests/user_query_repro.rs
/logs/
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ tower-http = { version = "0.5", features = ["cors", "trace"] }
tokio-tungstenite = "0.21"
reqwest = { version = "0.11", features = ["json"] }
futures-util = "0.3"
sha2 = "0.10"
plotters = "0.3.7"
sysinfo = "0.31.4"

[lib]
name = "janus"
Expand Down
53 changes: 35 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The name comes from the Roman deity Janus, associated with transitions and with
- Hybrid queries that mix historical and live windows
- Extension functions for anomaly-style predicates such as thresholds, relative change, z-score, outlier checks, and trend divergence
- Optional baseline bootstrapping for hybrid anomaly queries with `USING BASELINE <window> LAST|AGGREGATE`
- Query-defined baselines with `DEFINE BASELINE ... ON WINDOW ... AS SELECT ...`, `USING BASELINE :name`, and `GRAPH :name { ... }` materialization templates
- HTTP endpoints for registering, starting, stopping, listing, and deleting queries
- WebSocket result streaming for running queries

Expand All @@ -28,30 +29,45 @@ Janus uses Janus-QL, a hybrid query language for querying historical and live RD
Example:

```sparql
PREFIX ex: <http://example.org/>
PREFIX janus: <https://janus.rs/fn#>
PREFIX baseline: <https://janus.rs/baseline#>

REGISTER RStream ex:out AS
SELECT ?sensor ?reading
FROM NAMED WINDOW ex:hist ON LOG ex:store [START 1700000000000 END 1700003600000]
FROM NAMED WINDOW ex:live ON STREAM ex:stream1 [RANGE 5000 STEP 1000]
USING BASELINE ex:hist AGGREGATE
PREFIX : <http://example.org/>

FROM NAMED WINDOW :liveMinute ON STREAM :stream [RANGE 60000 STEP 1000]
FROM NAMED WINDOW :historyDay ON LOG :stream [START 0 END 86400000]

DEFINE BASELINE :dayBaseline ON WINDOW :historyDay AS
SELECT ?sensor
(AVG(?value) AS ?dayAvgValue)
WHERE {
?sensor :hasValue ?value .
}
GROUP BY ?sensor

REGISTER RStream :output AS
USING BASELINE :dayBaseline
SELECT ?sensor
(AVG(?value) AS ?minuteAvgValue)
?dayAvgValue
((AVG(?value) - ?dayAvgValue) AS ?difference)
WHERE {
WINDOW ex:hist {
?sensor ex:mean ?mean .
?sensor ex:sigma ?sigma .
WINDOW :liveMinute {
?sensor :hasValue ?value .
}
WINDOW ex:live {
?sensor ex:hasReading ?reading .
GRAPH :dayBaseline {
?sensor :dayAvgValue ?dayAvgValue .
}
?sensor baseline:mean ?mean .
?sensor baseline:sigma ?sigma .
FILTER(janus:is_outlier(?reading, ?mean, ?sigma, 3))
}
GROUP BY ?sensor ?dayAvgValue
HAVING(AVG(?value) > ?dayAvgValue)
```

`USING BASELINE` is optional. If present, Janus bootstraps baseline values from the named historical window before or during live execution:
For query-defined baselines:

- `DEFINE BASELINE` evaluates the historical baseline query before live startup over the source `LOG` window
- `USING BASELINE :dayBaseline` tells Janus to prepare that baseline and inject the resulting quads into the live engine
- `GRAPH :dayBaseline { ... }` is the materialization template; its concrete predicates and projected variables define the quads that are inserted
- the live query can then use baseline variables in `SELECT`, `GROUP BY`, `HAVING`, and arithmetic expressions

Legacy `USING BASELINE <window> LAST|AGGREGATE` remains available:

- `LAST`: use the final historical window snapshot as baseline
- `AGGREGATE`: merge the historical window outputs into one compact baseline
Expand Down Expand Up @@ -80,6 +96,7 @@ Janus uses dictionary encoding and segmented storage for high-throughput ingesti
- Space efficiency: about 40% smaller encoded events

Detailed benchmark data is in [docs/BENCHMARK_RESULTS.md](./docs/BENCHMARK_RESULTS.md).
Current benchmark commands and scope are in [docs/BENCHMARKING.md](./docs/BENCHMARKING.md).

## Quick Start

Expand Down
52 changes: 17 additions & 35 deletions benches/historical_fixed.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,24 @@
mod support;

use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use janus::{
execution::historical_executor::HistoricalExecutor,
parsing::janusql_parser::{SourceKind, WindowDefinition, WindowType},
querying::oxigraph_adapter::OxigraphAdapter,
storage::{segmented_storage::StreamingSegmentedStorage, util::StreamingConfig},
};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
storage::segmented_storage::StreamingSegmentedStorage,
};
use std::time::{SystemTime, UNIX_EPOCH};

static COUNTER: AtomicU64 = AtomicU64::new(0);

fn unique_config() -> StreamingConfig {
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
StreamingConfig {
segment_base_path: format!("/tmp/janus_bench_fixed_{}_{}", ts, id),
max_batch_events: 1_000_000,
max_batch_age_seconds: 3600,
max_batch_bytes: 1_000_000_000,
sparse_interval: 64,
entries_per_index_block: 256,
}
}
use std::sync::Arc;
use support::{populate_storage, unique_config, GRAPH_URI};

/// Write N events at timestamps [1000, 1000+N) into a fresh storage.
/// These land in the in-memory batch buffer — no flush needed before querying.
fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
let storage = StreamingSegmentedStorage::new(unique_config()).unwrap();
for i in 0..n as u64 {
storage
.write_rdf(
1_000 + i,
&format!("http://example.org/sensor{}", i % 5),
"http://saref.etsi.org/core/hasValue",
&format!("{}", 20 + (i % 10)),
"http://example.org/graph",
)
.unwrap();
}
let storage = StreamingSegmentedStorage::new(unique_config("historical_fixed")).unwrap();
populate_storage(&storage, n, 1_000, 1, GRAPH_URI);
let window = WindowDefinition {
window_name: "w".to_string(),
source_kind: SourceKind::Stream,
stream_name: "http://example.org/stream".to_string(),
stream_name: GRAPH_URI.to_string(),
width: n as u64,
slide: n as u64,
offset: None,
Expand All @@ -55,7 +29,15 @@ fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
(Arc::new(storage), window)
}

const SPARQL: &str = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }";
const SPARQL: &str = r#"
PREFIX ex: <http://example.org/>
SELECT ?sensor ?temp
WHERE {
GRAPH ex:graph1 {
?sensor ex:temperature ?temp .
}
}
"#;

fn historical_fixed(c: &mut Criterion) {
let mut group = c.benchmark_group("historical/fixed_window");
Expand Down
57 changes: 19 additions & 38 deletions benches/historical_sliding.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,14 @@
mod support;

use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use janus::{
execution::historical_executor::HistoricalExecutor,
parsing::janusql_parser::{SourceKind, WindowDefinition, WindowType},
querying::oxigraph_adapter::OxigraphAdapter,
storage::{segmented_storage::StreamingSegmentedStorage, util::StreamingConfig},
};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
storage::segmented_storage::StreamingSegmentedStorage,
};
use std::time::{SystemTime, UNIX_EPOCH};

static COUNTER: AtomicU64 = AtomicU64::new(0);

fn unique_config() -> StreamingConfig {
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
StreamingConfig {
segment_base_path: format!("/tmp/janus_bench_sliding_{}_{}", ts, id),
max_batch_events: 1_000_000,
max_batch_age_seconds: 3600,
max_batch_bytes: 1_000_000_000,
sparse_interval: 64,
entries_per_index_block: 256,
}
}
use std::sync::Arc;
use support::{populate_storage, recent_base_timestamp, unique_config, GRAPH_URI};

// Window config: OFFSET=10_000ms, RANGE=2_000ms, SLIDE=1_000ms
// SlidingWindowIterator scans [now-10000, now] with 8 overlapping windows.
Expand All @@ -36,25 +20,14 @@ const DATA_START_BEFORE_NOW_MS: u64 = 8_000;
const DATA_SPAN_MS: u64 = 6_000;

fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;
let storage = StreamingSegmentedStorage::new(unique_config()).unwrap();
let n64 = n as u64;
for i in 0..n64 {
let ts = now - DATA_START_BEFORE_NOW_MS + i * DATA_SPAN_MS / n64.max(1);
storage
.write_rdf(
ts,
&format!("http://example.org/sensor{}", i % 5),
"http://saref.etsi.org/core/hasValue",
&format!("{}", 20 + (i % 10)),
"http://example.org/graph",
)
.unwrap();
}
let start_ts = recent_base_timestamp(DATA_START_BEFORE_NOW_MS);
let storage = StreamingSegmentedStorage::new(unique_config("historical_sliding")).unwrap();
let step_ms = (DATA_SPAN_MS / n.max(1) as u64).max(1);
populate_storage(&storage, n, start_ts, step_ms, GRAPH_URI);
let window = WindowDefinition {
window_name: "w".to_string(),
source_kind: SourceKind::Stream,
stream_name: "http://example.org/stream".to_string(),
stream_name: GRAPH_URI.to_string(),
width: RANGE_MS,
slide: SLIDE_MS,
offset: Some(OFFSET_MS),
Expand All @@ -65,7 +38,15 @@ fn setup(n: usize) -> (Arc<StreamingSegmentedStorage>, WindowDefinition) {
(Arc::new(storage), window)
}

const SPARQL: &str = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }";
const SPARQL: &str = r#"
PREFIX ex: <http://example.org/>
SELECT ?sensor ?temp
WHERE {
GRAPH ex:graph1 {
?sensor ex:temperature ?temp .
}
}
"#;

fn historical_sliding(c: &mut Criterion) {
let mut group = c.benchmark_group("historical/sliding_window");
Expand Down
68 changes: 68 additions & 0 deletions benches/hybrid_baseline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
mod support;

use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion};
use janus::{core::RDFEvent, stream::live_stream_processing::LiveStreamProcessing};
use std::time::Duration;
use support::{make_sensor_event, wait_for_live_result, BASELINE_PREDICATE, STREAM_URI};

const HYBRID_RSPQL: &str = r#"
PREFIX ex: <http://example.org/>
REGISTER RStream <output> AS
SELECT ?sensor ?liveTemp ?baselineTemp
FROM NAMED WINDOW ex:live ON STREAM ex:stream1 [RANGE 10000 STEP 1000]
WHERE {
WINDOW ex:live {
?sensor ex:temperature ?liveTemp .
}
?sensor ex:baselineTemperature ?baselineTemp .
}
"#;

fn setup_processor() -> LiveStreamProcessing {
let mut proc = LiveStreamProcessing::new(HYBRID_RSPQL.to_string()).unwrap();
proc.register_stream(STREAM_URI).unwrap();

for i in 0..5u64 {
proc.add_static_data(RDFEvent::new(
0,
&format!("http://example.org/sensor{i}"),
BASELINE_PREDICATE,
&format!("{}", 20 + i),
"",
))
.unwrap();
}

proc.start_processing().unwrap();
proc
}

fn hybrid_baseline_join(c: &mut Criterion) {
let mut group = c.benchmark_group("hybrid/baseline_join");
group.sample_size(20);

for &n in &[1usize, 10, 100] {
group.bench_with_input(BenchmarkId::new("events_per_window", n), &n, |b, &n| {
b.iter_batched(
setup_processor,
|proc| {
let n64 = n as u64;
for i in 0..n64 {
let ts = if n64 > 1 { i * 9_000 / (n64 - 1) } else { 0 };
proc.add_event(STREAM_URI, make_sensor_event(ts, i, "")).unwrap();
}

proc.close_stream(STREAM_URI, 20_000).unwrap();
let result = wait_for_live_result(&proc, Duration::from_secs(10));
black_box(result)
},
criterion::BatchSize::SmallInput,
);
});
}

group.finish();
}

criterion_group!(benches, hybrid_baseline_join);
criterion_main!(benches);
Loading
Loading