Materialized Views
Materialized views enable schema-aware data transformation directly at ingest time, producing typed Parquet outputs optimized for analytics and warehouse consumption.
Materialized views are available on Pro and Enterprise plans. A 14-day free trial is available for all new accounts.
What are Materialized Views?
Materialized views transform raw JSON events into strongly-typed columnar data (Parquet or CSV) as data is ingested. Instead of writing opaque payloads that require downstream parsing, EdgeMQ can extract, cast, filter, and partition your data automatically.
Key benefits:
- Eliminate ETL jobs - Schema extraction happens at ingest time
- Type safety - Automatic casting with error handling
- Partitioning - Organize data by date, region, or other dimensions
- Query-ready - Direct consumption by Snowflake, Databricks, Trino, Athena, etc.
- Declarative - Define views as YAML, not SQL (no user SQL execution)
How It Works
JSON Events → EdgeMQ Ingest → WAL Segment → View Materializer → Typed Parquet → S3
↓
View Definition (YAML)
- Column extraction
- Type casting
- Filtering
- PartitioningArchitecture
- View Definition - You define a declarative YAML mapping (columns, types, filters, partitions)
- Validation - EdgeMQ validates and compiles the definition to safe SQL
- Per-segment execution - For each sealed WAL segment, the materializer:
- Loads frames into a temporary frames table - Executes the compiled SELECT query - Writes typed Parquet/CSV to S3
- Commit markers - Output artifacts are listed in commit manifests for safe consumption
Security Model
- No user SQL - View definitions compile to safe, validated SQL
- Sandboxed execution - Each view runs in an isolated execution context
- Resource limits - Memory and timeout constraints prevent runaway queries
- Function allowlist - Only approved functions are permitted
Output Formats
Materialized views are powered by the Parquet (Schema-Aware) output format. To use materialized views, you must enable this format on your endpoint.
See Output Formats for comparison with Segments and Raw Parquet formats.
Quick Start
1. Enable Schema-Aware Parquet
In the EdgeMQ Console, navigate to your endpoint's Configuration tab and enable Parquet (Schema-Aware):
Output Formats ☑ Segments (optional) ☑ Parquet (Raw) (optional) ☑ Parquet (Schema-Aware) (required for views)
2. Create a View Definition
Create a YAML file defining your view:
apiVersion: edgemq.io/v1alpha1 kind: ViewDefinition metadata: name: user-events spec: columns: - name: user_id expr: type: json_path json_path: "$.userId" type: VARCHAR required: true - name: event_type expr: type: json_path json_path: "$.eventType" type: VARCHAR - name: event_time expr: type: fn fn: to_timestamp_ms args: - type: json_path json_path: "$.timestamp" type: TIMESTAMP output: partition_by: - name: dt expr: type: fn fn: date_trunc args: - type: lit lit: "day" - type: col col: event_time format: parquet compression: zstd
3. Attach View to Endpoint
In the Console, go to Configuration → Materialized Views and attach your view. You can specify:
- Required - Ingest fails if view extraction fails (data quality enforcement)
- Optional - Errors are logged but ingest continues (best-effort)
- Canary mode - Apply view only to segments sealed after a specific timestamp (gradual rollout)
4. Query Your Data
Once segments are sealed, query the typed Parquet directly:
-- Example query SELECT user_id, event_type, event_time FROM read_parquet('s3://bucket/prefix/views/user-events/dt=2025-01-20/seg=*/part-*.parquet') WHERE event_type = 'purchase' ORDER BY event_time DESC LIMIT 100;
Core Concepts
The frames Table
Your view definition queries a virtual frames table created by the materializer for each segment:
CREATE TEMP TABLE frames( ingest_ts_ms BIGINT, -- Unix epoch milliseconds(ingestion time) payload VARCHAR -- JSON string(your ingested event) );
Each row represents one ingested event. Views extract and transform data from these frames.
Expression Types
View definitions use expressions to define column values:
| Expression | Purpose | Example |
|---|---|---|
json_path | Extract field from payload | $.userId, $.data.amount |
meta | Access metadata fields | timestamp_ms (ingest time) |
col | Reference output column | Used in partitioning, computed columns |
fn | Function call | to_timestamp_ms, upper, coalesce |
lit | Literal value | Strings, numbers, booleans, null |
Type System
All output columns must declare a supported type:
| Category | Types | Notes |
|---|---|---|
| String | VARCHAR, TEXT, STRING | Aliases for same type |
| Integer | BIGINT, INTEGER, INT, SMALLINT, TINYINT | Use BIGINT for IDs |
| Float | DOUBLE, REAL, FLOAT | Use DOUBLE for precision |
| Boolean | BOOLEAN, BOOL | Aliases for same type |
| Temporal | TIMESTAMP, TIMESTAMPTZ, DATE | For time-series data |
| JSON | JSON, JSONB | Preserve nested structures |
All type casts use TRY_CAST for safety - invalid casts return NULL instead of failing.
Required Fields and Defaults
Control data quality with required and default:
- name: status expr: type: json_path json_path: "$.status" type: VARCHAR required: true # Drop rows if NULL after casting + default default: # Apply default if extraction returns NULL type: lit lit: "pending"
Semantics:
- Extract field from JSON
- Cast to target type (using
TRY_CAST) - Apply default if still NULL
- If required=true, drop row if still NULL
Filtering
Filter rows with a where clause:
transform: where: type: fn fn: eq args: - type: json_path json_path: "$.eventType" - type: lit lit: "purchase"
Filtering happens before required field checks. Only rows matching the filter are considered for output.
Partitioning
Organize output files by partition keys for efficient querying:
output: partition_by: - name: dt expr: type: fn fn: date_trunc args: - type: lit lit: "day" - type: col col: event_time - name: region expr: type: col col: region
Results in S3 layout:
s3://bucket/prefix/views/user-events/ dt=2025-01-20/region=us-east-1/seg=00001/part-0.parquet dt=2025-01-20/region=us-west-2/seg=00001/part-0.parquet dt=2025-01-21/region=us-east-1/seg=00002/part-0.parquet
Best practices:
- Use low-cardinality dimensions (date, region, status)
- Avoid high-cardinality fields (user_id, session_id, order_id)
- Limit to 2-3 partition columns max
- High cardinality creates millions of small files → slow writes + S3 overhead
Supported Functions
Comparison Functions
Use for filtering and conditions:
| Function | Arguments | Description | Example |
|---|---|---|---|
eq | 2 | Equality (=) | eq(col, lit) |
ne | 2 | Not equal (<>) | ne(status, "deleted") |
lt | 2 | Less than (<) | lt(amount, 100) |
lte | 2 | Less than or equal (<=) | lte(age, 65) |
gt | 2 | Greater than (>) | gt(price, 0) |
gte | 2 | Greater than or equal (>=) | gte(score, 80) |
Boolean Functions
Combine conditions:
| Function | Arguments | Description |
|---|---|---|
and | 2+ | Logical AND |
or | 2+ | Logical OR |
not | 1 | Logical NOT |
String Functions
Transform text data:
| Function | Arguments | Description |
|---|---|---|
concat | 1+ | Concatenate strings |
upper | 1 | Convert to uppercase |
lower | 1 | Convert to lowercase |
trim | 1 | Trim whitespace from both ends |
ltrim | 1 | Trim whitespace from left |
rtrim | 1 | Trim whitespace from right |
substring | 2-3 | Extract substring |
regexp_extract | 2-3 | Extract using regex |
regexp_replace | 3 | Replace using regex |
Numeric Functions
Mathematical operations:
| Function | Arguments | Description |
|---|---|---|
abs | 1 | Absolute value |
round | 1-2 | Round to integer or decimal places |
floor | 1 | Round down to nearest integer |
ceil | 1 | Round up to nearest integer |
trunc | 1 | Truncate to integer |
Timestamp Functions
Handle time-series data:
| Function | Arguments | Description |
|---|---|---|
to_timestamp | 1 | Convert Unix epoch seconds to TIMESTAMP |
to_timestamp_ms | 1 | Convert Unix epoch milliseconds to TIMESTAMP |
parse_rfc3339 | 1 | Parse ISO 8601/RFC 3339 timestamp string |
parse_timestamp | 2 | Parse timestamp with custom format |
date_trunc | 2 | Truncate to precision (day, hour, etc.) |
extract | 2 | Extract field (year, month, day, etc.) |
date | 1 | Convert to DATE |
epoch_ms | 1 | Convert timestamp to Unix epoch milliseconds |
JSON Functions
Navigate JSON structures:
| Function | Arguments | Description |
|---|---|---|
json_extract | 2 | Extract JSON value at path |
json_extract_string | 2 | Extract JSON value at path as string |
Type Casting Functions
Explicit type conversion:
| Function | Arguments | Description |
|---|---|---|
cast | 2 | Explicit type cast |
try_cast | 2 | Type cast returning NULL on failure |
Utility Functions
Handle NULL values:
| Function | Arguments | Description |
|---|---|---|
coalesce | 1+ | Return first non-NULL value |
nullif | 2 | Return NULL if arguments are equal |
Common Patterns
Pattern 1: Basic JSON Extraction
Extract simple fields from JSON:
columns: - name: user_id expr: type: json_path json_path: "$.userId" type: VARCHAR - name: amount expr: type: json_path json_path: "$.amount" type: DOUBLE
Pattern 2: Nested JSON Navigation
Access nested structures:
columns: - name: city expr: type: json_path json_path: "$.address.city" type: VARCHAR - name: country_code expr: type: json_path json_path: "$.address.country.code" type: VARCHAR
Pattern 3: Timestamp Conversion
Convert various timestamp formats:
columns: # From ISO 8601 string - name: created_at expr: type: fn fn: parse_rfc3339 args: - type: json_path json_path: "$.createdAt" type: TIMESTAMP # From Unix epoch milliseconds - name: updated_at expr: type: fn fn: to_timestamp_ms args: - type: json_path json_path: "$.updatedAtMs" type: TIMESTAMP # From metadata (ingest time) - name: ingested_at expr: type: fn fn: to_timestamp_ms args: - type: meta meta: timestamp_ms type: TIMESTAMP
Pattern 4: Conditional Logic
Use defaults and coalesce:
columns: - name: status expr: type: fn fn: coalesce args: - type: json_path json_path: "$.status" - type: lit lit: "pending" type: VARCHAR - name: priority expr: type: json_path json_path: "$.priority" type: VARCHAR default: type: lit lit: "normal"
Pattern 5: String Transformations
Normalize text data:
columns: - name: email_normalized expr: type: fn fn: lower args: - type: fn fn: trim args: - type: json_path json_path: "$.email" type: VARCHAR - name: category_upper expr: type: fn fn: upper args: - type: json_path json_path: "$.category" type: VARCHAR
Pattern 6: Filtering by Event Type
Process only specific events:
transform: where: type: fn fn: and args: - type: fn fn: eq args: - type: json_path json_path: "$.eventType" - type: lit lit: "purchase" - type: fn fn: gt args: - type: json_path json_path: "$.amount" - type: lit lit: 0
Pattern 7: Date Partitioning
Partition by day for time-series queries:
output: partition_by: - name: dt expr: type: fn fn: date_trunc args: - type: lit lit: "day" - type: col col: event_time format: parquet compression: zstd
Pattern 8: Multi-Dimension Partitioning
Partition by date and region:
output: partition_by: - name: dt expr: type: fn fn: cast args: - type: col col: event_time - type: lit lit: "DATE" - name: region expr: type: col col: region format: parquet
Error Handling
Type Casting Errors
All type casts use TRY_CAST automatically:
- name: amount expr: type: json_path json_path: "$.amount" type: DOUBLE
Generated SQL:
TRY_CAST(json_extract_string(payload, '$.amount') AS DOUBLE) AS amount
Behavior:
- Valid number:
"123.45"→123.45 - Invalid number:
"invalid"→NULL - Missing field:
undefined→NULL
Required Field Enforcement
Use required: true to drop rows with NULL values:
- name: user_id expr: type: json_path json_path: "$.userId" type: VARCHAR required: true
Generated WHERE clause:
WHERE user_id IS NOT NULL
Rows with NULL user_id (after casting and defaults) are silently dropped.
View Execution Failures
Views can be configured as:
- Required - Ingest fails if view extraction fails (blocks commit)
- Optional - Errors logged but ingest continues (view output may be missing)
Configure this when attaching the view to your endpoint.
Performance Considerations
Memory Usage
The materializer loads entire segments into memory:
memory_needed ≈ (segment_size_uncompressed × 2-3x) + query_overhead
Example:
- 100 MB uncompressed segment
- 2x expansion during query → 200 MB
- Intermediate operations → +100 MB
- Total: ~300-400 MB
For large segments or complex views, ensure adequate memory allocation.
Query Complexity
- Keep views simple (projection + filter)
- Avoid expensive aggregations across large segments
- Use required fields for filtering instead of complex WHERE clauses
- Views cannot query across segments (each segment is independent)
Row Group Size
Control Parquet row group size for optimal query performance:
output: format: parquet row_group_size: 100000 # Default: 100,000 rows
Tuning:
- Smaller (10k-50k) - Better for selective queries, more metadata overhead
- Larger (100k-500k) - Better for full scans, less metadata overhead
Partitioning Strategy
Good partitions:
- Date/hour partitions:
dt=2025-01-20 - Low-cardinality dimensions:
region=us-east-1,status=active - Typically creates 10-100 folders per dimension
Bad partitions:
- High-cardinality fields:
user_id,session_id,order_id - Can create millions of small files
- Slow S3 writes + list operations
- High S3 request costs
Operational Model
View Lifecycle
- Draft - Create and edit view definition
- Publish - Validate and compile view
- Attach - Bind view to endpoint(s)
- Materialize - Execute per-segment as data arrives
- Query - Consume typed Parquet from S3
Versioning
Views are versioned independently:
view-name@v1 → Initial version view-name@v2 → Updated schema view-name@v3 → New columns added
Multiple versions can coexist. Update endpoint bindings to migrate to new versions.
Canary Deployments
Test views on new data before full rollout:
canary_start_ts: "2025-01-20T00:00:00Z"
The view only applies to segments sealed after this timestamp. Use this to:
- Test new view versions on recent data
- Gradually roll out schema changes
- Validate output before full migration
Monitoring
Track view execution metrics:
- Rows processed - Total frames in segment
- Rows written - Rows passing filters and required checks
- Execution time - Materialization duration per segment
- Errors - View execution failures (if required=true, blocks commit)
View metrics are available in the EdgeMQ Console under Endpoints → Configuration → Materialized Views.
Limits and Quotas
View capabilities vary by plan:
| Plan | Max Views per Endpoint | Max Columns per View | Max Partition Dimensions |
|---|---|---|---|
| Pro | 3 | 50 | 3 |
| Enterprise | 10+ | 100+ | 5+ |
Other limits:
- Function call nesting: 10 levels deep
- WHERE clause complexity: Reasonable (no correlated subqueries)
- Memory per view: Configured at deployment
- Execution timeout: 5 minutes (configurable)
Examples
Example 1: E-commerce Purchases
Transform purchase events into analytics-ready table:
apiVersion: edgemq.io/v1alpha1 kind: ViewDefinition metadata: name: purchases spec: columns: - name: order_id expr: type: json_path json_path: "$.orderId" type: VARCHAR required: true - name: user_id expr: type: json_path json_path: "$.userId" type: VARCHAR required: true - name: amount expr: type: json_path json_path: "$.amount" type: DOUBLE required: true - name: currency expr: type: json_path json_path: "$.currency" type: VARCHAR default: type: lit lit: "USD" - name: purchased_at expr: type: fn fn: to_timestamp_ms args: - type: json_path json_path: "$.timestamp" type: TIMESTAMP required: true transform: where: type: fn fn: gt args: - type: json_path json_path: "$.amount" - type: lit lit: 0 output: partition_by: - name: dt expr: type: fn fn: date_trunc args: - type: lit lit: "day" - type: col col: purchased_at format: parquet compression: zstd row_group_size: 100000
Query example:
SELECT DATE_TRUNC('day', purchased_at) as day, currency, COUNT(*) as orders, SUM(amount) as revenue FROM read_parquet('s3://bucket/prefix/views/purchases/dt=*/seg=*/part-*.parquet') WHERE dt >= '2025-01-01' GROUP BY day, currency ORDER BY day DESC, revenue DESC;
Example 2: IoT Sensor Readings
Process time-series sensor data:
apiVersion: edgemq.io/v1alpha1 kind: ViewDefinition metadata: name: sensor-readings spec: columns: - name: device_id expr: type: json_path json_path: "$.deviceId" type: VARCHAR required: true - name: sensor_type expr: type: json_path json_path: "$.sensorType" type: VARCHAR required: true - name: value expr: type: json_path json_path: "$.value" type: DOUBLE required: true - name: unit expr: type: json_path json_path: "$.unit" type: VARCHAR - name: reading_time expr: type: fn fn: parse_rfc3339 args: - type: json_path json_path: "$.timestamp" type: TIMESTAMP required: true - name: location expr: type: json_path json_path: "$.location" type: VARCHAR output: partition_by: - name: dt expr: type: fn fn: cast args: - type: col col: reading_time - type: lit lit: "DATE" - name: sensor_type expr: type: col col: sensor_type format: parquet compression: zstd
Query example:
SELECT DATE_TRUNC('hour', reading_time) as hour, device_id, AVG(value) as avg_value, MIN(value) as min_value, MAX(value) as max_value, COUNT(*) as reading_count FROM read_parquet('s3://bucket/prefix/views/sensor-readings/dt=2025-01-20/sensor_type=temperature/seg=*/part-*.parquet') GROUP BY hour, device_id ORDER BY hour DESC, device_id;
Example 3: Application Logs
Extract structured log events:
apiVersion: edgemq.io/v1alpha1 kind: ViewDefinition metadata: name: app-logs spec: columns: - name: level expr: type: fn fn: upper args: - type: json_path json_path: "$.level" type: VARCHAR required: true - name: message expr: type: json_path json_path: "$.message" type: VARCHAR required: true - name: service expr: type: json_path json_path: "$.service" type: VARCHAR default: type: lit lit: "unknown" - name: request_id expr: type: json_path json_path: "$.requestId" type: VARCHAR - name: user_id expr: type: json_path json_path: "$.userId" type: VARCHAR - name: logged_at expr: type: fn fn: to_timestamp_ms args: - type: meta meta: timestamp_ms type: TIMESTAMP transform: where: type: fn fn: or args: - type: fn fn: eq args: - type: col col: level - type: lit lit: "ERROR" - type: fn fn: eq args: - type: col col: level - type: lit lit: "WARN" output: partition_by: - name: dt expr: type: fn fn: cast args: - type: col col: logged_at - type: lit lit: "DATE" - name: level expr: type: col col: level format: parquet compression: zstd
Query example:
-- Find all errors for a specific user SELECT logged_at, level, message, service, request_id FROM read_parquet('s3://bucket/prefix/views/app-logs/dt=*/level=ERROR/seg=*/part-*.parquet') WHERE user_id = 'user_12345' ORDER BY logged_at DESC LIMIT 100;
FAQ
Can I change a view's schema after deployment?
Yes, create a new version (e.g., view-name@v2) and update endpoint bindings. Old and new versions can coexist.
Do views query across segments?
No. Each segment is materialized independently. Views cannot reference data from previous segments or perform cross-segment aggregations.
What happens if JSON extraction fails?
TRY_CAST returns NULL for invalid data. Use required: true to drop rows with NULL values, or default to substitute a fallback value.
Can I use SQL directly?
No. View definitions compile to SQL internally, but users cannot submit arbitrary SQL for security reasons. Use the declarative YAML format.
How do I test a view before production?
- Create the view as a draft
- Attach with
canary_start_tsto apply only to new data - Monitor output in S3 and verify schema
- Remove canary restriction once validated
What's the difference between required views and optional views?
- Required - View execution failure blocks the commit (data quality enforcement)
- Optional - View execution failure is logged but ingest continues (best-effort)
Configure this when attaching the view to your endpoint.
How many views can I attach per endpoint?
- Pro: 3 views per endpoint
- Enterprise: 10+ views per endpoint
Can I output to formats other than Parquet?
Yes, CSV is also supported. Set output.format: csv in your view definition. CSV supports partitioning but not advanced compression options.
How do I handle high-cardinality partitions?
Avoid partitioning by high-cardinality fields (user_id, order_id, session_id). Instead:
- Partition by date/hour + low-cardinality dimension (region, category)
- Use query predicates to filter by high-cardinality fields
- Consider composite keys in your view (not partitioning)
What query engines can read the output?
Any engine supporting Parquet over S3:
- Snowflake (external tables or COPY INTO)
- Databricks (Auto Loader or direct read)
- AWS Athena (Glue Catalog + partitions)
- Trino/Presto
- Apache Spark
- ClickHouse (S3 table function)
- BigQuery (external tables)
- Any Parquet-compatible analytics tool
Next Steps
- Start your free 14-day trial at console.edgemq.io
- Review Output Formats for format comparison
- See Quickstart Guide for end-to-end setup
- Join the EdgeMQ community for support
Questions? Contact support@edgemq.io or open an issue on GitHub.