NEBU /ˈnɛ.buː/
ROOT / DOCS / BUILD

# BUILD_A_PROCESSOR

Ship a binary against the nebu contract. Your module, your repo, your release cadence.

The contract is intentionally tiny — six interfaces, committed-stable, enforced by CI. When you build against it, you get a CLI, a JSON-schema-emitting --describe-json contract, and runtime hooks for free.

## 01_THE_CONTRACT

The stable surface lives in pkg/processor and pkg/source. Anything outside those packages is free to evolve.

Processor · root interface
Origin · reads ledgers, emits events
Transform · events → events
Sink · events → side effects
Emitter[T] · typed output channel
Reporter · progress / metrics hook
LedgerSource · ledger stream (RPC / archive)

Full API snapshots live in .api/ and are diffed on every PR. See docs/STABILITY.md for the versioning policy.

## 02_PROCESSOR_TYPES

ORIGIN

Consumes ledger XDR, emits typed events as NDJSON. The "producer" end of a pipe.

go pkg/processor
type Origin interface {
    ProcessLedger(ctx context.Context, ledger xdr.LedgerCloseMeta) error
}
TRANSFORM

Consumes events, emits events. Filters, enrichers, deduplicators, windowers.

go pkg/processor
type Transform interface {
    ProcessEvent(ctx context.Context, event proto.Message) error
}
SINK

Consumes events, produces side effects — DB writes, files, message queues, metrics.

go pkg/processor
type Sink interface {
    WriteEvent(ctx context.Context, event proto.Message) error
}

## 03_MINIMAL_ORIGIN

A complete origin processor that counts transactions per ledger and emits one event per ledger close. Real code — starting point for anything you'd build.

go · main.go
package main

import (
    "context"
    "encoding/json"
    "os"

    "github.com/stellar/go/xdr"
    "github.com/withObsrvr/nebu/pkg/processor"
    "github.com/withObsrvr/nebu/pkg/runtime"
    "github.com/withObsrvr/nebu/pkg/source/rpc"
)

// Emitted event schema
type TxCount struct {
    Schema    string `json:"_schema"`
    NebuVer   string `json:"_nebu_version"`
    Ledger    uint32 `json:"ledger"`
    TxCount   int    `json:"tx_count"`
}

type TxCountOrigin struct {
    out *json.Encoder
}

func (o *TxCountOrigin) ProcessLedger(ctx context.Context, lcm xdr.LedgerCloseMeta) error {
    return o.out.Encode(TxCount{
        Schema:  "example.tx_count.v1",
        NebuVer: "v0.6.7",
        Ledger:  uint32(lcm.LedgerSequence()),
        TxCount: len(lcm.TransactionEnvelopes()),
    })
}

func main() {
    src, _ := rpc.NewLedgerSource("https://archive-rpc.lightsail.network")
    defer src.Close()

    p := &TxCountOrigin{out: json.NewEncoder(os.Stdout)}

    rt := runtime.NewRuntime()
    rt.RunOrigin(context.Background(), src, p, 60200000, 60200010)
}

That's the whole processor. No framework, no DI container, no config file — the CLI flags, the --describe-json manifest, the observability hooks are all wired up by the runtime.

## 04_RUNTIME

runtime.NewRuntime() wires the source, the processor, and the CLI together. It owns flag parsing, signal handling, metrics emission, and graceful shutdown.

go
// Origin: ledger source → processor
rt.RunOrigin(ctx, source, proc, startLedger, endLedger)

// Transform: stdin (NDJSON) → processor → stdout
rt.RunTransform(ctx, proc)

// Sink: stdin (NDJSON) → processor (side effects)
rt.RunSink(ctx, proc)

## 05_DESCRIBE_JSON

Every processor binary must respond to --describe-json with a JSON-schema-valid manifest. This is the agent-legible contract — tooling and LLMs can discover your processor's flags, inputs, outputs, and emitted schema IDs without reading source.

json --describe-json
{
  "name":        "tx-count",
  "type":        "origin",
  "description": "Emit one event per ledger close, with tx count",
  "flags": [
    { "name": "--start-ledger", "type": "uint32", "required": true },
    { "name": "--end-ledger",   "type": "uint32" }
  ],
  "inputs":  [ { "kind": "ledger_stream" } ],
  "outputs": [ { "schema": "example.tx_count.v1" } ]
}

The runtime generates this manifest from struct tags + a small Describe() hook you implement — you don't hand-write the JSON.

## 06_SHIP_BINARY

Processors are standalone binaries, not plugins. Ship them via go install-able modules — GoReleaser, GitHub Releases, or a Nix flake.

bash · layout
my-processor/
├── go.mod                   # depends on nebu/pkg/processor only
├── cmd/my-processor/
│   └── main.go              # runtime.NewRuntime().RunOrigin(...)
├── description.yml          # registry manifest (see § 07)
├── SCHEMA.md                # document your emitted _schema
└── README.md
bash · publish
# Tag a release
git tag v0.1.0 && git push --tags

# Anyone can now install it
go install github.com/you/my-processor/cmd/my-processor@v0.1.0

## 07_REGISTER

Drop a description.yml at the root of your repo and submit a PR to the community registry. Once merged, nebu list surfaces it for everyone.

yaml · description.yml
name: my-processor
type: origin                  # origin | transform | sink
description: One-line summary for `nebu list`
location:
  type: git
  repo: github.com/you/my-processor
  module: github.com/you/my-processor/cmd/my-processor
schemas:
  - example.tx_count.v1
maintainer:
  name: Your Name
  url:  https://github.com/you

Schema IDs are namespaced — use <org>.<processor>.v<N>. Bump the version on breaking changes; never reuse a version for a different payload.

## 08_HOOKS

The runtime exposes six lifecycle callbacks — OnStart, BeforeLedger, AfterLedger, OnWarning, OnFatal, OnEnd. Register any subset via Runtime.Use(Hooks{...}) or OriginConfig.Hooks; nil fields are no-ops. See docs/HOOKS.md for lifecycle, firing order, and abort semantics.

[ REFERENCE IMPLEMENTATIONS · 4 FILES, DROP-IN ]

Each file is self-contained. Copy into your processor's cmd/<name>/ directory, rename package hooks to your package, and wire it into your Hooks slice. No framework, no abstract factory.

progress.go — live stderr progress bar, TTY-aware, 108 lines, stdlib only
rate_limit.go — token-bucket throttle via BeforeLedger, stdlib only
metrics.go — Prometheus counters + histograms (nebu_ledgers_processed_total, nebu_ledger_duration_seconds, nebu_warnings_total, nebu_fatals_total, nebu_pipeline_duration_seconds)
tracing.go — OpenTelemetry parent nebu.pipeline span with one nebu.ledger child per ledger; warnings as span events, fatals as span error records

Smoke test: hooks-demo wires all three into a real RPC run. go run ./examples/hooks/cmd/hooks-demo.

Common patterns the six callbacks enable:

Progress OnStart + AfterLedger + OnEnd — live stderr output, health-check endpoints, flowctl control-plane reporting.
Metrics AfterLedger + OnWarning + OnFatal + OnEnd — Prometheus / OTel / StatsD counters and histograms.
Tracing OnStart + BeforeLedger + AfterLedger + OnEnd — OTel parent + per-ledger child spans, span events on warnings, span errors on fatals.
Rate limiting BeforeLedger — token bucket or explicit sleep between ledgers for back-pressure against downstream sinks.
Agent gates BeforeLedger error-abort for human-in-the-loop approval on destructive sinks. [ pattern · use rate_limit.go as a starting point ]

## 09_STABILITY

STABLE: pkg/processor and pkg/source. Breaking changes require a new major version and a deprecation cycle.

UNSTABLE: pkg/source/rpc, pkg/source/storage, pkg/runtime. Can change without notice — pin minor versions if you depend on them.

PRIVATE: internal/*. Do not import.

A CI check diffs every PR against API snapshots in .api/. If you accidentally break the stable surface, CI blocks the merge.

## 10_NEXT

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
OBSRVR NEBU(7)