Drain Pipeline
In production, sending one HTTP request per log event is wasteful. The drain pipeline buffers events and sends them in batches, retries on transient failures, and drops the oldest events when the buffer overflows.
Quick Start
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
drain.flush() on server shutdown to ensure buffered events are sent before the process exits.How It Works
- Events are buffered in memory as they arrive via the
evlog:drainhook - A batch is flushed when either the batch size is reached or the interval expires (whichever comes first)
- If the drain function fails, the batch is retried with the configured backoff strategy
- If all retries are exhausted,
onDroppedis called with the lost events - If the buffer exceeds
maxBufferSize, the oldest events are dropped to prevent memory leaks
Configuration
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>({
batch: {
size: 50, // Flush every 50 events
intervalMs: 5000, // Or every 5 seconds, whichever comes first
},
retry: {
maxAttempts: 3,
backoff: 'exponential',
initialDelayMs: 1000,
maxDelayMs: 30000,
},
maxBufferSize: 1000,
onDropped: (events, error) => {
console.error(`[evlog] Dropped ${events.length} events:`, error?.message)
},
})
const drain = pipeline(createAxiomDrain())
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
Options Reference
| Option | Default | Description |
|---|---|---|
batch.size | 50 | Maximum events per batch |
batch.intervalMs | 5000 | Max time (ms) before flushing a partial batch |
retry.maxAttempts | 3 | Total attempts including the initial one |
retry.backoff | 'exponential' | 'exponential' | 'linear' | 'fixed' |
retry.initialDelayMs | 1000 | Base delay for the first retry |
retry.maxDelayMs | 30000 | Upper bound for any retry delay |
maxBufferSize | 1000 | Max buffered events before dropping oldest |
onDropped | — | Callback when events are dropped (overflow or retry exhaustion) |
Backoff Strategies
| Strategy | Delay Pattern | Use Case |
|---|---|---|
exponential | 1s, 2s, 4s, 8s... | Default. Best for transient failures that may need time to recover |
linear | 1s, 2s, 3s, 4s... | Predictable delay growth |
fixed | 1s, 1s, 1s, 1s... | Same delay every time. Useful for rate-limited APIs |
Returned Drain Function
The function returned by pipeline(drain) is hook-compatible and exposes:
| Property | Type | Description |
|---|---|---|
drain(ctx) | (ctx: T) => void | Push a single event into the buffer |
drain.flush() | () => Promise<void> | Force-flush all buffered events |
drain.pending | number | Number of events currently buffered |
Multiple Destinations
Wrap multiple adapters with a single pipeline:
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createOTLPDrain } from 'evlog/otlp'
export default defineNitroPlugin((nitroApp) => {
const axiom = createAxiomDrain()
const otlp = createOTLPDrain()
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(async (batch) => {
await Promise.allSettled([axiom(batch), otlp(batch)])
})
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
Custom Drain Function
You don't need an adapter — pass any async function that accepts a batch:
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>({
batch: { size: 100 },
})
const drain = pipeline(async (batch) => {
await fetch('https://your-service.com/logs', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(batch.map(ctx => ctx.event)),
})
})
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
Standalone Usage (Without Nitro)
The pipeline works outside of Nitro. Use the drain option in initLogger to wire it up:
import type { DrainContext } from 'evlog'
import { initLogger, log } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>({ batch: { size: 25 } })
const drain = pipeline(createAxiomDrain())
initLogger({ drain })
log.info({ action: 'started' }) // batched and drained
// Flush before exit
await drain.flush()
Next Steps
- Adapters Overview - Available built-in adapters
- Custom Adapters - Build your own drain function
- Best Practices - Security and production tips
Custom Adapters
Build your own adapter to send logs to any destination. Factory patterns, batching, filtering, and error handling best practices.
Overview
Enrich your wide events with derived context like user agent, geo data, request size, and trace context. Built-in enrichers and custom enricher support.