Tutorial: Expanding EdgeMQ Segment Files in S3 to NDJSON (Event-Driven, AWS)
EdgeMQ writes compressed segment files to S3. This tutorial walks through automatically expanding those files into Newline-Delimited JSON (NDJSON) and writing them back to S3 under the /expanded/ prefix.
What You Will build
- S3 (input): receives EdgeMQ segment objects (e.g.,
.../segments/seg-00000009.wal.zst) - S3 Event → Lambda: triggers on
ObjectCreatedfor those paths - Lambda (Node.js): streams S3 object → zstd decompress → parses frames → emits NDJSON
- S3 (output): writes
.../expanded/seg-00000009.ndjson
Segment format recap (what you’ll parse): each .wal.zst is a zstd stream of back‑to‑back frames:
[LEN u32 BE][CRC32C u32][FMT u8][TENANT u32 BE][TS u64 BE(ms)][PAYLOAD] LEN = 1 (FMT) + 4 (TENANT) + 8 (TS) + payload_len HDR = 21 bytes CRC32C covers PAYLOAD only
Notes:
- All integers are big‑endian.
- Frames repeat until EOF.
1) Architecture
S3 (segments) ──ObjectCreated──▶ Lambda (expand‑segment)
└─ stream: S3 → zstd → frame parser
S3 (expanded NDJSON)Steps at a glance:
- S3 (segments) emits ObjectCreated for new
.wal.zstobjects - Lambda streams S3 → zstd → frame parser (incremental, constant memory)
- NDJSON written to S3 under
expanded/
- Event-driven (no polling), streaming (constant memory), and simple (one Lambda function).
2) Choose a deployment style
You have two easy options:
Option A — Lambda container image (recommended): Bundle the zstd binary for fast, stable decompression, plus Node.js for parsing.
Option B — Lambda ZIP + WASM zstd (no container): Use a pure-JS/WASM zstd library (slower start, but no native binary).
This tutorial shows Option A in full (container). Option B code snippet is included later if you prefer no container.
3) IAM & S3 event setup (minimal)
- Buckets
- EDGE_INPUT_BUCKET (your existing bucket receiving segments) - EDGE_OUTPUT_BUCKET (can be the same or a different bucket)
- S3 Event Notification on
EDGE_INPUT_BUCKET:
- Prefix: …/segments/ - Suffix: .wal.zst - Target: Lambda function expand-segment
- Lambda role permissions:
- s3:GetObject on input prefix - s3:PutObject on output prefix
- Environment variables (on the Lambda):
- OUTPUT_BUCKET (if different from input) - OUTPUT_PREFIX (e.g., expanded/) - VALIDATE_CRC = true|false (CRC32C payload check) - CONCURRENCY (optional; default 1 stream per invocation)
4) Containerized Lambda: Dockerfile + app code
4.1 Dockerfile (Node 20 + zstd)
# Dockerfile FROM public.ecr.aws/lambda/nodejs:20 # Install zstd CLI RUN yum -y install yum-utils && \ yum -y install zstd && \ yum clean all # Copy app COPY package.json package-lock.json* ./ RUN npm ci --only=production COPY src ./src # Lambda entrypoint CMD [ "src/handler.handler" ]
4.2 package.json
{ "name":: "expand-segment", "version":: "1.0.0", "type":: "module", "dependencies":: { "@aws-sdk/client-s3":: "^3.600.0", "fast-crc32c":: "^2.0.0" } }
4.3 src/handler.js — Lambda entry
Streams S3 object → zstd -d -c → incremental frame parsing → uploads NDJSON.
// src/handler.js import { S3Client, GetObjectCommand, PutObjectCommand, } from "@aws-sdk/client-s3"; import { spawn } from "child_process"; import { pipeline } from "stream"; import { promisify } from "util"; import { createFrameParser } from "./parser.js"; const pipe = promisify(pipeline); const s3 = new S3Client({}); const OUTPUT_BUCKET = process.env.OUTPUT_BUCKET || null; const OUTPUT_PREFIX = process.env.OUTPUT_PREFIX || "expanded/"; const VALIDATE_CRC = (process.env.VALIDATE_CRC || "false").toLowerCase() === "true"; export const handler = async (event) => { // S3 Put event(s) for (const record of event.Records ?? []) { const bucket = record.s3.bucket.name; const key = decodeURIComponent(record.s3.object.key.replace(/\+/g, " ")); const outBucket = OUTPUT_BUCKET || bucket; if (!key.endsWith(".wal.zst")) continue; const base = key .split("/") .pop() .replace(/\.wal\.zst$/, ""); const outKey = `${OUTPUT_PREFIX}${base}.ndjson`; // Get S3 stream const obj = await s3.send( new GetObjectCommand({ Bucket: bucket, Key: key }) ); // Spawn zstd to decompress as a stream const z = spawn("zstd", ["-d", "-c"]); // stdin -> stdout decompressed // Pump S3 body into zstd obj.Body.pipe(z.stdin); // Parse frames from zstd stdout, emit NDJSON string chunks const { readable, writable } = new TransformToS3Uploader( s3, outBucket, outKey ); const parser = createFrameParser({ validateCrc: VALIDATE_CRC }); // pipeline: zstd.stdout -> parser (ndjson bytes) -> s3 uploader await pipe(z.stdout, parser, writable); // ensure close readable.destroy(); } return { ok: true }; }; // Minimal writable that buffers NDJSON and uploads once at end. // If you need very large outputs, switch to multi-part PUT. import { Writable, PassThrough } from "stream"; class TransformToS3Uploader { constructor(s3, bucket, key) { this.s3 = s3; this.bucket = bucket; this.key = key; this.chunks = []; this.writable = new Writable({ write: (chunk, _enc, cb) => { this.chunks.push(Buffer.from(chunk)); cb(); }, final: async (cb) => { try { const Body = Buffer.concat(this.chunks); await this.s3.send( new PutObjectCommand({ Bucket: this.bucket, Key: this.key, Body, ContentType: "application/x-ndjson", }) ); cb(); } catch (e) { cb(e); } }, }); this.readable = new PassThrough(); // not used; placeholder symmetry } }
4.4 src/parser.js — WAL frame parser → NDJSON
// src/parser.js import { Transform } from "stream"; import crc32c from "fast-crc32c"; const HDR = 21; // 4(LEN) + 4(CRC32C) + 1(FMT) + 4(TENANT) + 8(TS) const MIN_LEN = 13; // 1(FMT) + 4(TENANT) + 8(TS) export function createFrameParser({ validateCrc = false } = {}) { let buf = Buffer.alloc(0); return new Transform({ readableHighWaterMark: 1 << 20, writableHighWaterMark: 1 << 20, transform(chunk, _enc, cb) { buf = Buffer.concat([buf, chunk]); try { while (buf.length >= HDR) { const len = buf.readUInt32BE(0); const crc = buf.readUInt32BE(4); if (len < MIN_LEN) { // truncated/invalid tail → stop parsing buf = Buffer.alloc(0); break; } const frameBytes = HDR + len; if (buf.length < frameBytes) break; // Header fields const fmt = buf.readUint8(8); // currently 0 const tenant = buf.readUInt32BE(9); const ts = Number(buf.readBigUInt64BE(13)); const payloadLen = len - MIN_LEN; const payloadStart = HDR; const payloadEnd = HDR + payloadLen; const payload = buf.subarray(payloadStart, payloadEnd); if (validateCrc) { const c = crc32c.calculate(payload) >>> 0; if (c !== crc >>> 0) { throw new Error(`CRC32C mismatch: expected=${crc} got=${c}`); } } // Emit payload as NDJSON line (append newline if missing) if (payload.length) { if (payload[payload.length - 1] === 0x0a) { this.push(payload); } else { this.push(Buffer.concat([payload, Buffer.from("\n")])); } } // advance buf = buf.subarray(frameBytes); } cb(); } catch (e) { cb(e); } }, flush(cb) { // leftover < HDR is acceptable (sealed segment shouldn’t be partial) cb(); }, }); }
Parsing logic exactly follows the frame layout rules: big-endian integers, header size 21, and CRC over payload only.
5) Deploy the container Lambda (quick path)
Build & push the image
aws ecr create-repository --repository-name expand-segment || true ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) REGION=$(aws configure get region) REPO="$ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com/expand-segment" aws ecr get-login-password | docker login --username AWS --password-stdin "$ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com" docker build -t expand-segment . docker tag expand-segment:latest "$REPO:latest" docker push "$REPO:latest"
Create the Lambda
aws lambda create-function \ --function-name expand-segment \ --package-type Image \ --code ImageUri="$REPO:latest" \ --role arn:aws:iam::<ACCOUNT_ID>:role/<YOUR_LAMBDA_ROLE> \ --timeout 900 --memory-size 1024 \ --environment "Variables={OUTPUT_BUCKET=<YOUR_OUTPUT_BUCKET>,OUTPUT_PREFIX=expanded/,VALIDATE_CRC=true}"
Wire S3 → Lambda
aws lambda add-permission \ --function-name expand-segment \ --statement-id s3invoke \ --action lambda:InvokeFunction \ --principal s3.amazonaws.com \ --source-arn arn:aws:s3:::<EDGE_INPUT_BUCKET> aws s3api put-bucket-notification-configuration \ --bucket <EDGE_INPUT_BUCKET> \ --notification-configuration '{ "LambdaFunctionConfigurations": [{ "Id":"ExpandOnObjectCreated", "LambdaFunctionArn":"arn:aws:lambda:'"$REGION"':'"$ACCOUNT_ID"':function:expand-segment", "Events":["s3:ObjectCreated:*"], "Filter":{"Key":{"FilterRules":[ {"Name":"prefix","Value":"segments/"}, {"Name":"suffix","Value":".wal.zst"} ]}} }] }'
That’s it. When a new segments/*.wal.zst object arrives, Lambda writes expanded/*.ndjson.
6) Verifying output
- The output object key matches the input filename with
.ndjsonunder yourOUTPUT_PREFIX. - Set
VALIDATE_CRC=trueto verify payload integrity; Lambda will fail on CRC mismatch (visible in CloudWatch Logs). - NDJSON lines are the original payloads; if a payload didn’t end with
\n, we append one for convenience.
7) Option B: No container (WASM zstd)
If you don’t want a container image, replace the zstd CLI with a WASM library (slower cold start, but ZIP deployable). Example swap:
npm i zstddec// replace the zstd child_process with WASM import { ZstdDecoder } from "zstddec"; const dec = new ZstdDecoder(); await dec.init(); // In handler: read S3 Body as a single Buffer (trade memory for simplicity) const inBuf = await streamToBuffer(obj.Body); const outBuf = dec.decode(inBuf); // decompressed .wal bytes // Then feed outBuf into the same frame parser, but as a one-shot Readable. import { Readable } from "stream"; const oneShot = Readable.from([outBuf]); await pipe(oneShot, parser, writable); async function streamToBuffer(stream) { const chunks = []; for await (const c of stream) chunks.push(c); return Buffer.concat(chunks); }
For very large segments, prefer the container + zstd CLI streaming path.
8) Operational tips
- Throughput: The parser is streaming; memory stays bounded. For huge NDJSON outputs, switch the uploader to S3 Multi-Part Upload.
- Failure handling: S3 events are retried by Lambda. If a failure occurs after output was written, writes are idempotent by key (choose a deterministic
expanded/key to overwrite). - Security: Use bucket-prefix-scoped IAM. If using SSE-KMS, grant Lambda’s role
kms:Encrypton the key. - Cost: Expanding on arrival adds a PUT for the NDJSON file. Consider lifecycle rules on
segments/vsexpanded/.
9) Local test
Download a sample segment and run the parser locally with Node + zstd:
aws s3 cp s3://<EDGE_INPUT_BUCKET>/segments/seg-00000009.wal.zst - | zstd -d | node -e ' const fs=require("fs"); const {createFrameParser}=require("./src/parser.js"); process.stdin.pipe(createFrameParser({validateCrc:true})).pipe(process.stdout); ' > out.ndjson
You should see one NDJSON record per frame payload.
10) Troubleshooting
- “CRC32C mismatch”: the payload was corrupted or truncated; check CloudWatch logs and S3
sha256metadata on the compressed object. - “Unexpected end of input”: the
.wal.zstmay be incomplete; re-try or adjust parser to tolerate partial tails (spec allows stopping at EOF). - Large outputs time out: raise Lambda timeout/memory or use multi-part upload in the uploader.
That’s it
You now have a simple, event-driven path from EdgeMQ segments in S3 → NDJSON using native AWS building blocks, with code you can productionize immediately.