Tutorial: Expanding EdgeMQ Segment Files in S3 to NDJSON (Event-Driven, AWS)

EdgeMQ writes compressed segment files to S3. This tutorial walks through automatically expanding those files into Newline-Delimited JSON (NDJSON) and writing them back to S3 under the /expanded/ prefix.

What You Will build

  • S3 (input): receives EdgeMQ segment objects (e.g., .../segments/seg-00000009.wal.zst)
  • S3 Event → Lambda: triggers on ObjectCreated for those paths
  • Lambda (Node.js): streams S3 object → zstd decompress → parses frames → emits NDJSON
  • S3 (output): writes .../expanded/seg-00000009.ndjson

Segment format recap (what you’ll parse): each .wal.zst is a zstd stream of back‑to‑back frames:

[LEN u32 BE][CRC32C u32][FMT u8][TENANT u32 BE][TS u64 BE(ms)][PAYLOAD]

LEN = 1 (FMT) + 4 (TENANT) + 8 (TS) + payload_len
HDR = 21 bytes
CRC32C covers PAYLOAD only

Notes:

  • All integers are big‑endian.
  • Frames repeat until EOF.

1) Architecture

S3 (segments) ──ObjectCreated──▶ Lambda (expand‑segment)
                                  └─ stream: S3 → zstd → frame parser
S3 (expanded NDJSON)

Steps at a glance:

  • S3 (segments) emits ObjectCreated for new .wal.zst objects
  • Lambda streams S3 → zstd → frame parser (incremental, constant memory)
  • NDJSON written to S3 under expanded/
  • Event-driven (no polling), streaming (constant memory), and simple (one Lambda function).

2) Choose a deployment style

You have two easy options:

Option A — Lambda container image (recommended): Bundle the zstd binary for fast, stable decompression, plus Node.js for parsing.

Option B — Lambda ZIP + WASM zstd (no container): Use a pure-JS/WASM zstd library (slower start, but no native binary).

This tutorial shows Option A in full (container). Option B code snippet is included later if you prefer no container.


3) IAM & S3 event setup (minimal)

  • Buckets

- EDGE_INPUT_BUCKET (your existing bucket receiving segments) - EDGE_OUTPUT_BUCKET (can be the same or a different bucket)

  • S3 Event Notification on EDGE_INPUT_BUCKET:

- Prefix: …/segments/ - Suffix: .wal.zst - Target: Lambda function expand-segment

  • Lambda role permissions:

- s3:GetObject on input prefix - s3:PutObject on output prefix

  • Environment variables (on the Lambda):

- OUTPUT_BUCKET (if different from input) - OUTPUT_PREFIX (e.g., expanded/) - VALIDATE_CRC = true|false (CRC32C payload check) - CONCURRENCY (optional; default 1 stream per invocation)


4) Containerized Lambda: Dockerfile + app code

4.1 Dockerfile (Node 20 + zstd)

# Dockerfile
FROM public.ecr.aws/lambda/nodejs:20

# Install zstd CLI
RUN yum -y install yum-utils && \
yum -y install zstd && \
yum clean all

# Copy app
COPY package.json package-lock.json* ./
RUN npm ci --only=production
COPY src ./src

# Lambda entrypoint
CMD [ "src/handler.handler" ]

4.2 package.json

{
  "name":: "expand-segment",
  "version":: "1.0.0",
  "type":: "module",
  "dependencies":: {
    "@aws-sdk/client-s3":: "^3.600.0",
    "fast-crc32c":: "^2.0.0"
  }
}

4.3 src/handler.js — Lambda entry

Streams S3 object → zstd -d -c → incremental frame parsing → uploads NDJSON.

// src/handler.js
import {
  S3Client,
  GetObjectCommand,
  PutObjectCommand,
} from "@aws-sdk/client-s3";
import { spawn } from "child_process";
import { pipeline } from "stream";
import { promisify } from "util";
import { createFrameParser } from "./parser.js";

const pipe = promisify(pipeline);
const s3 = new S3Client({});

const OUTPUT_BUCKET = process.env.OUTPUT_BUCKET || null;
const OUTPUT_PREFIX = process.env.OUTPUT_PREFIX || "expanded/";
const VALIDATE_CRC =
  (process.env.VALIDATE_CRC || "false").toLowerCase() === "true";

export const handler = async (event) => {
  // S3 Put event(s)
  for (const record of event.Records ?? []) {
    const bucket = record.s3.bucket.name;
    const key = decodeURIComponent(record.s3.object.key.replace(/\+/g, " "));
    const outBucket = OUTPUT_BUCKET || bucket;

    if (!key.endsWith(".wal.zst")) continue;

    const base = key
      .split("/")
      .pop()
      .replace(/\.wal\.zst$/, "");
    const outKey = `${OUTPUT_PREFIX}${base}.ndjson`;

    // Get S3 stream
    const obj = await s3.send(
      new GetObjectCommand({ Bucket: bucket, Key: key })
    );

    // Spawn zstd to decompress as a stream
    const z = spawn("zstd", ["-d", "-c"]); // stdin -> stdout decompressed
    // Pump S3 body into zstd
    obj.Body.pipe(z.stdin);

    // Parse frames from zstd stdout, emit NDJSON string chunks
    const { readable, writable } = new TransformToS3Uploader(
      s3,
      outBucket,
      outKey
    );
    const parser = createFrameParser({ validateCrc: VALIDATE_CRC });

    // pipeline: zstd.stdout -> parser (ndjson bytes) -> s3 uploader
    await pipe(z.stdout, parser, writable);

    // ensure close
    readable.destroy();
  }

  return { ok: true };
};

// Minimal writable that buffers NDJSON and uploads once at end.
// If you need very large outputs, switch to multi-part PUT.
import { Writable, PassThrough } from "stream";
class TransformToS3Uploader {
  constructor(s3, bucket, key) {
    this.s3 = s3;
    this.bucket = bucket;
    this.key = key;
    this.chunks = [];
    this.writable = new Writable({
      write: (chunk, _enc, cb) => {
        this.chunks.push(Buffer.from(chunk));
        cb();
      },
      final: async (cb) => {
        try {
          const Body = Buffer.concat(this.chunks);
          await this.s3.send(
            new PutObjectCommand({
              Bucket: this.bucket,
              Key: this.key,
              Body,
              ContentType: "application/x-ndjson",
            })
          );
          cb();
        } catch (e) {
          cb(e);
        }
      },
    });
    this.readable = new PassThrough(); // not used; placeholder symmetry
  }
}

4.4 src/parser.js — WAL frame parser → NDJSON

// src/parser.js
import { Transform } from "stream";
import crc32c from "fast-crc32c";

const HDR = 21; // 4(LEN) + 4(CRC32C) + 1(FMT) + 4(TENANT) + 8(TS)
const MIN_LEN = 13; // 1(FMT) + 4(TENANT) + 8(TS)

export function createFrameParser({ validateCrc = false } = {}) {
  let buf = Buffer.alloc(0);

  return new Transform({
    readableHighWaterMark: 1 << 20,
    writableHighWaterMark: 1 << 20,
    transform(chunk, _enc, cb) {
      buf = Buffer.concat([buf, chunk]);
      try {
        while (buf.length >= HDR) {
          const len = buf.readUInt32BE(0);
          const crc = buf.readUInt32BE(4);
          if (len < MIN_LEN) {
            // truncated/invalid tail → stop parsing
            buf = Buffer.alloc(0);
            break;
          }
          const frameBytes = HDR + len;
          if (buf.length < frameBytes) break;

          // Header fields
          const fmt = buf.readUint8(8); // currently 0
          const tenant = buf.readUInt32BE(9);
          const ts = Number(buf.readBigUInt64BE(13));
          const payloadLen = len - MIN_LEN;
          const payloadStart = HDR;
          const payloadEnd = HDR + payloadLen;
          const payload = buf.subarray(payloadStart, payloadEnd);

          if (validateCrc) {
            const c = crc32c.calculate(payload) >>> 0;
            if (c !== crc >>> 0) {
              throw new Error(`CRC32C mismatch: expected=${crc} got=${c}`);
            }
          }

          // Emit payload as NDJSON line (append newline if missing)
          if (payload.length) {
            if (payload[payload.length - 1] === 0x0a) {
              this.push(payload);
            } else {
              this.push(Buffer.concat([payload, Buffer.from("\n")]));
            }
          }

          // advance
          buf = buf.subarray(frameBytes);
        }
        cb();
      } catch (e) {
        cb(e);
      }
    },
    flush(cb) {
      // leftover < HDR is acceptable (sealed segment shouldn’t be partial)
      cb();
    },
  });
}

Parsing logic exactly follows the frame layout rules: big-endian integers, header size 21, and CRC over payload only.


5) Deploy the container Lambda (quick path)

Build & push the image

aws ecr create-repository --repository-name expand-segment || true
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
REGION=$(aws configure get region)
REPO="$ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com/expand-segment"
aws ecr get-login-password | docker login --username AWS --password-stdin "$ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com"
docker build -t expand-segment .
docker tag expand-segment:latest "$REPO:latest"
docker push "$REPO:latest"

Create the Lambda

aws lambda create-function \
  --function-name expand-segment \
  --package-type Image \
  --code ImageUri="$REPO:latest" \
  --role arn:aws:iam::<ACCOUNT_ID>:role/<YOUR_LAMBDA_ROLE> \
  --timeout 900 --memory-size 1024 \
  --environment "Variables={OUTPUT_BUCKET=<YOUR_OUTPUT_BUCKET>,OUTPUT_PREFIX=expanded/,VALIDATE_CRC=true}"

Wire S3 → Lambda

aws lambda add-permission \
  --function-name expand-segment \
  --statement-id s3invoke \
  --action lambda:InvokeFunction \
  --principal s3.amazonaws.com \
  --source-arn arn:aws:s3:::<EDGE_INPUT_BUCKET>

aws s3api put-bucket-notification-configuration \
  --bucket <EDGE_INPUT_BUCKET> \
  --notification-configuration '{
    "LambdaFunctionConfigurations": [{
      "Id":"ExpandOnObjectCreated",
      "LambdaFunctionArn":"arn:aws:lambda:'"$REGION"':'"$ACCOUNT_ID"':function:expand-segment",
      "Events":["s3:ObjectCreated:*"],
      "Filter":{"Key":{"FilterRules":[
        {"Name":"prefix","Value":"segments/"},
        {"Name":"suffix","Value":".wal.zst"}
      ]}}
    }]
  }'

That’s it. When a new segments/*.wal.zst object arrives, Lambda writes expanded/*.ndjson.


6) Verifying output

  • The output object key matches the input filename with .ndjson under your OUTPUT_PREFIX.
  • Set VALIDATE_CRC=true to verify payload integrity; Lambda will fail on CRC mismatch (visible in CloudWatch Logs).
  • NDJSON lines are the original payloads; if a payload didn’t end with \n, we append one for convenience.

7) Option B: No container (WASM zstd)

If you don’t want a container image, replace the zstd CLI with a WASM library (slower cold start, but ZIP deployable). Example swap:

npm i zstddec
// replace the zstd child_process with WASM
import { ZstdDecoder } from "zstddec";
const dec = new ZstdDecoder();
await dec.init();

// In handler: read S3 Body as a single Buffer (trade memory for simplicity)
const inBuf = await streamToBuffer(obj.Body);
const outBuf = dec.decode(inBuf); // decompressed .wal bytes
// Then feed outBuf into the same frame parser, but as a one-shot Readable.
import { Readable } from "stream";
const oneShot = Readable.from([outBuf]);

await pipe(oneShot, parser, writable);

async function streamToBuffer(stream) {
  const chunks = [];
  for await (const c of stream) chunks.push(c);
  return Buffer.concat(chunks);
}

For very large segments, prefer the container + zstd CLI streaming path.


8) Operational tips

  • Throughput: The parser is streaming; memory stays bounded. For huge NDJSON outputs, switch the uploader to S3 Multi-Part Upload.
  • Failure handling: S3 events are retried by Lambda. If a failure occurs after output was written, writes are idempotent by key (choose a deterministic expanded/ key to overwrite).
  • Security: Use bucket-prefix-scoped IAM. If using SSE-KMS, grant Lambda’s role kms:Encrypt on the key.
  • Cost: Expanding on arrival adds a PUT for the NDJSON file. Consider lifecycle rules on segments/ vs expanded/.

9) Local test

Download a sample segment and run the parser locally with Node + zstd:

aws s3 cp s3://<EDGE_INPUT_BUCKET>/segments/seg-00000009.wal.zst - | zstd -d | node -e '
const fs=require("fs");
const {createFrameParser}=require("./src/parser.js");
process.stdin.pipe(createFrameParser({validateCrc:true})).pipe(process.stdout);
' > out.ndjson

You should see one NDJSON record per frame payload.


10) Troubleshooting

  • “CRC32C mismatch”: the payload was corrupted or truncated; check CloudWatch logs and S3 sha256 metadata on the compressed object.
  • “Unexpected end of input”: the .wal.zst may be incomplete; re-try or adjust parser to tolerate partial tails (spec allows stopping at EOF).
  • Large outputs time out: raise Lambda timeout/memory or use multi-part upload in the uploader.

That’s it

You now have a simple, event-driven path from EdgeMQ segments in S3 → NDJSON using native AWS building blocks, with code you can productionize immediately.