Materialized Views

Materialized views enable schema-aware data transformation directly at ingest time, producing typed Parquet outputs optimized for analytics and warehouse consumption.

Availability

Materialized views are available on Pro and Enterprise plans. A 14-day free trial is available for all new accounts.

What are Materialized Views?

Materialized views transform raw JSON events into strongly-typed columnar data (Parquet or CSV) as data is ingested. Instead of writing opaque payloads that require downstream parsing, EdgeMQ can extract, cast, filter, and partition your data automatically.

Key benefits:

  • Eliminate ETL jobs - Schema extraction happens at ingest time
  • Type safety - Automatic casting with error handling
  • Partitioning - Organize data by date, region, or other dimensions
  • Query-ready - Direct consumption by Snowflake, Databricks, Trino, Athena, etc.
  • Declarative - Define views as YAML, not SQL (no user SQL execution)

How It Works

JSON Events → EdgeMQ Ingest → WAL Segment → View Materializer → Typed Parquet → S3
                                    ↓
                            View Definition (YAML)
                            - Column extraction
                            - Type casting
                            - Filtering
                            - Partitioning

Architecture

  1. View Definition - You define a declarative YAML mapping (columns, types, filters, partitions)
  2. Validation - EdgeMQ validates and compiles the definition to safe SQL
  3. Per-segment execution - For each sealed WAL segment, the materializer:

- Loads frames into a temporary frames table - Executes the compiled SELECT query - Writes typed Parquet/CSV to S3

  1. Commit markers - Output artifacts are listed in commit manifests for safe consumption

Security Model

  • No user SQL - View definitions compile to safe, validated SQL
  • Sandboxed execution - Each view runs in an isolated execution context
  • Resource limits - Memory and timeout constraints prevent runaway queries
  • Function allowlist - Only approved functions are permitted

Output Formats

Materialized views are powered by the Parquet (Schema-Aware) output format. To use materialized views, you must enable this format on your endpoint.

See Output Formats for comparison with Segments and Raw Parquet formats.


Quick Start

1. Enable Schema-Aware Parquet

In the EdgeMQ Console, navigate to your endpoint's Configuration tab and enable Parquet (Schema-Aware):

Output Formats
☑ Segments (optional)
☑ Parquet (Raw) (optional)
☑ Parquet (Schema-Aware) (required for views)

2. Create a View Definition

Create a YAML file defining your view:

apiVersion: edgemq.io/v1alpha1
kind: ViewDefinition
metadata:
  name: user-events
spec:
  columns:
    - name: user_id
      expr:
        type: json_path
        json_path: "$.userId"
      type: VARCHAR
      required: true

    - name: event_type
      expr:
        type: json_path
        json_path: "$.eventType"
      type: VARCHAR

    - name: event_time
      expr:
        type: fn
        fn: to_timestamp_ms
        args:
          - type: json_path
            json_path: "$.timestamp"
      type: TIMESTAMP

  output:
    partition_by:
      - name: dt
        expr:
          type: fn
          fn: date_trunc
          args:
            - type: lit
              lit: "day"
            - type: col
              col: event_time
    format: parquet
    compression: zstd

3. Attach View to Endpoint

In the Console, go to Configuration → Materialized Views and attach your view. You can specify:

  • Required - Ingest fails if view extraction fails (data quality enforcement)
  • Optional - Errors are logged but ingest continues (best-effort)
  • Canary mode - Apply view only to segments sealed after a specific timestamp (gradual rollout)

4. Query Your Data

Once segments are sealed, query the typed Parquet directly:

-- Example query
SELECT
  user_id,
  event_type,
  event_time
FROM read_parquet('s3://bucket/prefix/views/user-events/dt=2025-01-20/seg=*/part-*.parquet')
WHERE event_type = 'purchase'
ORDER BY event_time DESC
LIMIT 100;

Core Concepts

The frames Table

Your view definition queries a virtual frames table created by the materializer for each segment:

CREATE TEMP TABLE frames(
    ingest_ts_ms BIGINT,    -- Unix epoch milliseconds(ingestion time)
    payload VARCHAR         -- JSON string(your ingested event)
);

Each row represents one ingested event. Views extract and transform data from these frames.

Expression Types

View definitions use expressions to define column values:

ExpressionPurposeExample
json_pathExtract field from payload$.userId, $.data.amount
metaAccess metadata fieldstimestamp_ms (ingest time)
colReference output columnUsed in partitioning, computed columns
fnFunction callto_timestamp_ms, upper, coalesce
litLiteral valueStrings, numbers, booleans, null

Type System

All output columns must declare a supported type:

CategoryTypesNotes
StringVARCHAR, TEXT, STRINGAliases for same type
IntegerBIGINT, INTEGER, INT, SMALLINT, TINYINTUse BIGINT for IDs
FloatDOUBLE, REAL, FLOATUse DOUBLE for precision
BooleanBOOLEAN, BOOLAliases for same type
TemporalTIMESTAMP, TIMESTAMPTZ, DATEFor time-series data
JSONJSON, JSONBPreserve nested structures

All type casts use TRY_CAST for safety - invalid casts return NULL instead of failing.

Required Fields and Defaults

Control data quality with required and default:

- name: status
  expr:
    type: json_path
    json_path: "$.status"
  type: VARCHAR
  required: true        # Drop rows if NULL after casting + default
  default:              # Apply default if extraction returns NULL
    type: lit
    lit: "pending"

Semantics:

  1. Extract field from JSON
  2. Cast to target type (using TRY_CAST)
  3. Apply default if still NULL
  4. If required=true, drop row if still NULL

Filtering

Filter rows with a where clause:

transform:
  where:
    type: fn
    fn: eq
    args:
      - type: json_path
        json_path: "$.eventType"
      - type: lit
        lit: "purchase"

Filtering happens before required field checks. Only rows matching the filter are considered for output.

Partitioning

Organize output files by partition keys for efficient querying:

output:
  partition_by:
    - name: dt
      expr:
        type: fn
        fn: date_trunc
        args:
          - type: lit
            lit: "day"
          - type: col
            col: event_time
    - name: region
      expr:
        type: col
        col: region

Results in S3 layout:

s3://bucket/prefix/views/user-events/
  dt=2025-01-20/region=us-east-1/seg=00001/part-0.parquet
  dt=2025-01-20/region=us-west-2/seg=00001/part-0.parquet
  dt=2025-01-21/region=us-east-1/seg=00002/part-0.parquet

Best practices:

  • Use low-cardinality dimensions (date, region, status)
  • Avoid high-cardinality fields (user_id, session_id, order_id)
  • Limit to 2-3 partition columns max
  • High cardinality creates millions of small files → slow writes + S3 overhead

Supported Functions

Comparison Functions

Use for filtering and conditions:

FunctionArgumentsDescriptionExample
eq2Equality (=)eq(col, lit)
ne2Not equal (<>)ne(status, "deleted")
lt2Less than (<)lt(amount, 100)
lte2Less than or equal (<=)lte(age, 65)
gt2Greater than (>)gt(price, 0)
gte2Greater than or equal (>=)gte(score, 80)

Boolean Functions

Combine conditions:

FunctionArgumentsDescription
and2+Logical AND
or2+Logical OR
not1Logical NOT

String Functions

Transform text data:

FunctionArgumentsDescription
concat1+Concatenate strings
upper1Convert to uppercase
lower1Convert to lowercase
trim1Trim whitespace from both ends
ltrim1Trim whitespace from left
rtrim1Trim whitespace from right
substring2-3Extract substring
regexp_extract2-3Extract using regex
regexp_replace3Replace using regex

Numeric Functions

Mathematical operations:

FunctionArgumentsDescription
abs1Absolute value
round1-2Round to integer or decimal places
floor1Round down to nearest integer
ceil1Round up to nearest integer
trunc1Truncate to integer

Timestamp Functions

Handle time-series data:

FunctionArgumentsDescription
to_timestamp1Convert Unix epoch seconds to TIMESTAMP
to_timestamp_ms1Convert Unix epoch milliseconds to TIMESTAMP
parse_rfc33391Parse ISO 8601/RFC 3339 timestamp string
parse_timestamp2Parse timestamp with custom format
date_trunc2Truncate to precision (day, hour, etc.)
extract2Extract field (year, month, day, etc.)
date1Convert to DATE
epoch_ms1Convert timestamp to Unix epoch milliseconds

JSON Functions

Navigate JSON structures:

FunctionArgumentsDescription
json_extract2Extract JSON value at path
json_extract_string2Extract JSON value at path as string

Type Casting Functions

Explicit type conversion:

FunctionArgumentsDescription
cast2Explicit type cast
try_cast2Type cast returning NULL on failure

Utility Functions

Handle NULL values:

FunctionArgumentsDescription
coalesce1+Return first non-NULL value
nullif2Return NULL if arguments are equal

Common Patterns

Pattern 1: Basic JSON Extraction

Extract simple fields from JSON:

columns:
  - name: user_id
    expr:
      type: json_path
      json_path: "$.userId"
    type: VARCHAR

  - name: amount
    expr:
      type: json_path
      json_path: "$.amount"
    type: DOUBLE

Pattern 2: Nested JSON Navigation

Access nested structures:

columns:
  - name: city
    expr:
      type: json_path
      json_path: "$.address.city"
    type: VARCHAR

  - name: country_code
    expr:
      type: json_path
      json_path: "$.address.country.code"
    type: VARCHAR

Pattern 3: Timestamp Conversion

Convert various timestamp formats:

columns:
  # From ISO 8601 string
  - name: created_at
    expr:
      type: fn
      fn: parse_rfc3339
      args:
        - type: json_path
          json_path: "$.createdAt"
    type: TIMESTAMP

  # From Unix epoch milliseconds
  - name: updated_at
    expr:
      type: fn
      fn: to_timestamp_ms
      args:
        - type: json_path
          json_path: "$.updatedAtMs"
    type: TIMESTAMP

  # From metadata (ingest time)
  - name: ingested_at
    expr:
      type: fn
      fn: to_timestamp_ms
      args:
        - type: meta
          meta: timestamp_ms
    type: TIMESTAMP

Pattern 4: Conditional Logic

Use defaults and coalesce:

columns:
  - name: status
    expr:
      type: fn
      fn: coalesce
      args:
        - type: json_path
          json_path: "$.status"
        - type: lit
          lit: "pending"
    type: VARCHAR

  - name: priority
    expr:
      type: json_path
      json_path: "$.priority"
    type: VARCHAR
    default:
      type: lit
      lit: "normal"

Pattern 5: String Transformations

Normalize text data:

columns:
  - name: email_normalized
    expr:
      type: fn
      fn: lower
      args:
        - type: fn
          fn: trim
          args:
            - type: json_path
              json_path: "$.email"
    type: VARCHAR

  - name: category_upper
    expr:
      type: fn
      fn: upper
      args:
        - type: json_path
          json_path: "$.category"
    type: VARCHAR

Pattern 6: Filtering by Event Type

Process only specific events:

transform:
  where:
    type: fn
    fn: and
    args:
      - type: fn
        fn: eq
        args:
          - type: json_path
            json_path: "$.eventType"
          - type: lit
            lit: "purchase"
      - type: fn
        fn: gt
        args:
          - type: json_path
            json_path: "$.amount"
          - type: lit
            lit: 0

Pattern 7: Date Partitioning

Partition by day for time-series queries:

output:
  partition_by:
    - name: dt
      expr:
        type: fn
        fn: date_trunc
        args:
          - type: lit
            lit: "day"
          - type: col
            col: event_time
  format: parquet
  compression: zstd

Pattern 8: Multi-Dimension Partitioning

Partition by date and region:

output:
  partition_by:
    - name: dt
      expr:
        type: fn
        fn: cast
        args:
          - type: col
            col: event_time
          - type: lit
            lit: "DATE"
    - name: region
      expr:
        type: col
        col: region
  format: parquet

Error Handling

Type Casting Errors

All type casts use TRY_CAST automatically:

- name: amount
  expr:
    type: json_path
    json_path: "$.amount"
  type: DOUBLE

Generated SQL:

TRY_CAST(json_extract_string(payload, '$.amount') AS DOUBLE) AS amount

Behavior:

  • Valid number: "123.45"123.45
  • Invalid number: "invalid"NULL
  • Missing field: undefinedNULL

Required Field Enforcement

Use required: true to drop rows with NULL values:

- name: user_id
  expr:
    type: json_path
    json_path: "$.userId"
  type: VARCHAR
  required: true

Generated WHERE clause:

WHERE user_id IS NOT NULL

Rows with NULL user_id (after casting and defaults) are silently dropped.

View Execution Failures

Views can be configured as:

  • Required - Ingest fails if view extraction fails (blocks commit)
  • Optional - Errors logged but ingest continues (view output may be missing)

Configure this when attaching the view to your endpoint.


Performance Considerations

Memory Usage

The materializer loads entire segments into memory:

memory_needed ≈ (segment_size_uncompressed × 2-3x) + query_overhead

Example:

  • 100 MB uncompressed segment
  • 2x expansion during query → 200 MB
  • Intermediate operations → +100 MB
  • Total: ~300-400 MB

For large segments or complex views, ensure adequate memory allocation.

Query Complexity

  • Keep views simple (projection + filter)
  • Avoid expensive aggregations across large segments
  • Use required fields for filtering instead of complex WHERE clauses
  • Views cannot query across segments (each segment is independent)

Row Group Size

Control Parquet row group size for optimal query performance:

output:
  format: parquet
  row_group_size: 100000  # Default: 100,000 rows

Tuning:

  • Smaller (10k-50k) - Better for selective queries, more metadata overhead
  • Larger (100k-500k) - Better for full scans, less metadata overhead

Partitioning Strategy

Good partitions:

  • Date/hour partitions: dt=2025-01-20
  • Low-cardinality dimensions: region=us-east-1, status=active
  • Typically creates 10-100 folders per dimension

Bad partitions:

  • High-cardinality fields: user_id, session_id, order_id
  • Can create millions of small files
  • Slow S3 writes + list operations
  • High S3 request costs

Operational Model

View Lifecycle

  1. Draft - Create and edit view definition
  2. Publish - Validate and compile view
  3. Attach - Bind view to endpoint(s)
  4. Materialize - Execute per-segment as data arrives
  5. Query - Consume typed Parquet from S3

Versioning

Views are versioned independently:

view-name@v1  → Initial version
view-name@v2  → Updated schema
view-name@v3  → New columns added

Multiple versions can coexist. Update endpoint bindings to migrate to new versions.

Canary Deployments

Test views on new data before full rollout:

canary_start_ts: "2025-01-20T00:00:00Z"

The view only applies to segments sealed after this timestamp. Use this to:

  • Test new view versions on recent data
  • Gradually roll out schema changes
  • Validate output before full migration

Monitoring

Track view execution metrics:

  • Rows processed - Total frames in segment
  • Rows written - Rows passing filters and required checks
  • Execution time - Materialization duration per segment
  • Errors - View execution failures (if required=true, blocks commit)

View metrics are available in the EdgeMQ Console under Endpoints → Configuration → Materialized Views.


Limits and Quotas

View capabilities vary by plan:

PlanMax Views per EndpointMax Columns per ViewMax Partition Dimensions
Pro3503
Enterprise10+100+5+

Other limits:

  • Function call nesting: 10 levels deep
  • WHERE clause complexity: Reasonable (no correlated subqueries)
  • Memory per view: Configured at deployment
  • Execution timeout: 5 minutes (configurable)

Examples

Example 1: E-commerce Purchases

Transform purchase events into analytics-ready table:

apiVersion: edgemq.io/v1alpha1
kind: ViewDefinition
metadata:
  name: purchases
spec:
  columns:
    - name: order_id
      expr:
        type: json_path
        json_path: "$.orderId"
      type: VARCHAR
      required: true

    - name: user_id
      expr:
        type: json_path
        json_path: "$.userId"
      type: VARCHAR
      required: true

    - name: amount
      expr:
        type: json_path
        json_path: "$.amount"
      type: DOUBLE
      required: true

    - name: currency
      expr:
        type: json_path
        json_path: "$.currency"
      type: VARCHAR
      default:
        type: lit
        lit: "USD"

    - name: purchased_at
      expr:
        type: fn
        fn: to_timestamp_ms
        args:
          - type: json_path
            json_path: "$.timestamp"
      type: TIMESTAMP
      required: true

  transform:
    where:
      type: fn
      fn: gt
      args:
        - type: json_path
          json_path: "$.amount"
        - type: lit
          lit: 0

  output:
    partition_by:
      - name: dt
        expr:
          type: fn
          fn: date_trunc
          args:
            - type: lit
              lit: "day"
            - type: col
              col: purchased_at
    format: parquet
    compression: zstd
    row_group_size: 100000

Query example:

SELECT
  DATE_TRUNC('day', purchased_at) as day,
  currency,
  COUNT(*) as orders,
  SUM(amount) as revenue
FROM read_parquet('s3://bucket/prefix/views/purchases/dt=*/seg=*/part-*.parquet')
WHERE dt >= '2025-01-01'
GROUP BY day, currency
ORDER BY day DESC, revenue DESC;

Example 2: IoT Sensor Readings

Process time-series sensor data:

apiVersion: edgemq.io/v1alpha1
kind: ViewDefinition
metadata:
  name: sensor-readings
spec:
  columns:
    - name: device_id
      expr:
        type: json_path
        json_path: "$.deviceId"
      type: VARCHAR
      required: true

    - name: sensor_type
      expr:
        type: json_path
        json_path: "$.sensorType"
      type: VARCHAR
      required: true

    - name: value
      expr:
        type: json_path
        json_path: "$.value"
      type: DOUBLE
      required: true

    - name: unit
      expr:
        type: json_path
        json_path: "$.unit"
      type: VARCHAR

    - name: reading_time
      expr:
        type: fn
        fn: parse_rfc3339
        args:
          - type: json_path
            json_path: "$.timestamp"
      type: TIMESTAMP
      required: true

    - name: location
      expr:
        type: json_path
        json_path: "$.location"
      type: VARCHAR

  output:
    partition_by:
      - name: dt
        expr:
          type: fn
          fn: cast
          args:
            - type: col
              col: reading_time
            - type: lit
              lit: "DATE"
      - name: sensor_type
        expr:
          type: col
          col: sensor_type
    format: parquet
    compression: zstd

Query example:

SELECT
  DATE_TRUNC('hour', reading_time) as hour,
  device_id,
  AVG(value) as avg_value,
  MIN(value) as min_value,
  MAX(value) as max_value,
  COUNT(*) as reading_count
FROM read_parquet('s3://bucket/prefix/views/sensor-readings/dt=2025-01-20/sensor_type=temperature/seg=*/part-*.parquet')
GROUP BY hour, device_id
ORDER BY hour DESC, device_id;

Example 3: Application Logs

Extract structured log events:

apiVersion: edgemq.io/v1alpha1
kind: ViewDefinition
metadata:
  name: app-logs
spec:
  columns:
    - name: level
      expr:
        type: fn
        fn: upper
        args:
          - type: json_path
            json_path: "$.level"
      type: VARCHAR
      required: true

    - name: message
      expr:
        type: json_path
        json_path: "$.message"
      type: VARCHAR
      required: true

    - name: service
      expr:
        type: json_path
        json_path: "$.service"
      type: VARCHAR
      default:
        type: lit
        lit: "unknown"

    - name: request_id
      expr:
        type: json_path
        json_path: "$.requestId"
      type: VARCHAR

    - name: user_id
      expr:
        type: json_path
        json_path: "$.userId"
      type: VARCHAR

    - name: logged_at
      expr:
        type: fn
        fn: to_timestamp_ms
        args:
          - type: meta
            meta: timestamp_ms
      type: TIMESTAMP

  transform:
    where:
      type: fn
      fn: or
      args:
        - type: fn
          fn: eq
          args:
            - type: col
              col: level
            - type: lit
              lit: "ERROR"
        - type: fn
          fn: eq
          args:
            - type: col
              col: level
            - type: lit
              lit: "WARN"

  output:
    partition_by:
      - name: dt
        expr:
          type: fn
          fn: cast
          args:
            - type: col
              col: logged_at
            - type: lit
              lit: "DATE"
      - name: level
        expr:
          type: col
          col: level
    format: parquet
    compression: zstd

Query example:

-- Find all errors for a specific user
SELECT
  logged_at,
  level,
  message,
  service,
  request_id
FROM read_parquet('s3://bucket/prefix/views/app-logs/dt=*/level=ERROR/seg=*/part-*.parquet')
WHERE user_id = 'user_12345'
ORDER BY logged_at DESC
LIMIT 100;

FAQ

Can I change a view's schema after deployment?

Yes, create a new version (e.g., view-name@v2) and update endpoint bindings. Old and new versions can coexist.

Do views query across segments?

No. Each segment is materialized independently. Views cannot reference data from previous segments or perform cross-segment aggregations.

What happens if JSON extraction fails?

TRY_CAST returns NULL for invalid data. Use required: true to drop rows with NULL values, or default to substitute a fallback value.

Can I use SQL directly?

No. View definitions compile to SQL internally, but users cannot submit arbitrary SQL for security reasons. Use the declarative YAML format.

How do I test a view before production?

  1. Create the view as a draft
  2. Attach with canary_start_ts to apply only to new data
  3. Monitor output in S3 and verify schema
  4. Remove canary restriction once validated

What's the difference between required views and optional views?

  • Required - View execution failure blocks the commit (data quality enforcement)
  • Optional - View execution failure is logged but ingest continues (best-effort)

Configure this when attaching the view to your endpoint.

How many views can I attach per endpoint?

  • Pro: 3 views per endpoint
  • Enterprise: 10+ views per endpoint

Can I output to formats other than Parquet?

Yes, CSV is also supported. Set output.format: csv in your view definition. CSV supports partitioning but not advanced compression options.

How do I handle high-cardinality partitions?

Avoid partitioning by high-cardinality fields (user_id, order_id, session_id). Instead:

  • Partition by date/hour + low-cardinality dimension (region, category)
  • Use query predicates to filter by high-cardinality fields
  • Consider composite keys in your view (not partitioning)

What query engines can read the output?

Any engine supporting Parquet over S3:

  • Snowflake (external tables or COPY INTO)
  • Databricks (Auto Loader or direct read)
  • AWS Athena (Glue Catalog + partitions)
  • Trino/Presto
  • Apache Spark
  • ClickHouse (S3 table function)
  • BigQuery (external tables)
  • Any Parquet-compatible analytics tool

Next Steps


Questions? Contact support@edgemq.io or open an issue on GitHub.