# BUILD_A_PROCESSOR
Ship a binary against the nebu contract. Your module, your repo, your release cadence.
The contract is intentionally tiny — six interfaces, committed-stable, enforced by CI. When you build against it, you get a CLI, a JSON-schema-emitting --describe-json contract, and runtime hooks for free.
## 01_THE_CONTRACT
The stable surface lives in pkg/processor and pkg/source. Anything outside those packages is free to evolve.
Full API snapshots live in .api/ and are diffed on every PR. See docs/STABILITY.md for the versioning policy.
## 02_PROCESSOR_TYPES
Consumes ledger XDR, emits typed events as NDJSON. The "producer" end of a pipe.
type Origin interface { ProcessLedger(ctx context.Context, ledger xdr.LedgerCloseMeta) error }
Consumes events, emits events. Filters, enrichers, deduplicators, windowers.
type Transform interface { ProcessEvent(ctx context.Context, event proto.Message) error }
Consumes events, produces side effects — DB writes, files, message queues, metrics.
type Sink interface { WriteEvent(ctx context.Context, event proto.Message) error }
## 03_MINIMAL_ORIGIN
A complete origin processor that counts transactions per ledger and emits one event per ledger close. Real code — starting point for anything you'd build.
package main import ( "context" "encoding/json" "os" "github.com/stellar/go/xdr" "github.com/withObsrvr/nebu/pkg/processor" "github.com/withObsrvr/nebu/pkg/runtime" "github.com/withObsrvr/nebu/pkg/source/rpc" ) // Emitted event schema type TxCount struct { Schema string `json:"_schema"` NebuVer string `json:"_nebu_version"` Ledger uint32 `json:"ledger"` TxCount int `json:"tx_count"` } type TxCountOrigin struct { out *json.Encoder } func (o *TxCountOrigin) ProcessLedger(ctx context.Context, lcm xdr.LedgerCloseMeta) error { return o.out.Encode(TxCount{ Schema: "example.tx_count.v1", NebuVer: "v0.6.7", Ledger: uint32(lcm.LedgerSequence()), TxCount: len(lcm.TransactionEnvelopes()), }) } func main() { src, _ := rpc.NewLedgerSource("https://archive-rpc.lightsail.network") defer src.Close() p := &TxCountOrigin{out: json.NewEncoder(os.Stdout)} rt := runtime.NewRuntime() rt.RunOrigin(context.Background(), src, p, 60200000, 60200010) }
That's the whole processor. No framework, no DI container, no config file — the CLI flags, the --describe-json manifest, the observability hooks are all wired up by the runtime.
## 04_RUNTIME
runtime.NewRuntime() wires the source, the processor, and the CLI together. It owns flag parsing, signal handling, metrics emission, and graceful shutdown.
// Origin: ledger source → processor rt.RunOrigin(ctx, source, proc, startLedger, endLedger) // Transform: stdin (NDJSON) → processor → stdout rt.RunTransform(ctx, proc) // Sink: stdin (NDJSON) → processor (side effects) rt.RunSink(ctx, proc)
## 05_DESCRIBE_JSON
Every processor binary must respond to --describe-json with a JSON-schema-valid manifest. This is the agent-legible contract — tooling and LLMs can discover your processor's flags, inputs, outputs, and emitted schema IDs without reading source.
{ "name": "tx-count", "type": "origin", "description": "Emit one event per ledger close, with tx count", "flags": [ { "name": "--start-ledger", "type": "uint32", "required": true }, { "name": "--end-ledger", "type": "uint32" } ], "inputs": [ { "kind": "ledger_stream" } ], "outputs": [ { "schema": "example.tx_count.v1" } ] }
The runtime generates this manifest from struct tags + a small Describe() hook you implement — you don't hand-write the JSON.
## 06_SHIP_BINARY
Processors are standalone binaries, not plugins. Ship them via go install-able modules — GoReleaser, GitHub Releases, or a Nix flake.
my-processor/ ├── go.mod # depends on nebu/pkg/processor only ├── cmd/my-processor/ │ └── main.go # runtime.NewRuntime().RunOrigin(...) ├── description.yml # registry manifest (see § 07) ├── SCHEMA.md # document your emitted _schema └── README.md
# Tag a release git tag v0.1.0 && git push --tags # Anyone can now install it go install github.com/you/my-processor/cmd/my-processor@v0.1.0
## 07_REGISTER
Drop a description.yml at the root of your repo and submit a PR to the community registry. Once merged, nebu list surfaces it for everyone.
name: my-processor type: origin # origin | transform | sink description: One-line summary for `nebu list` location: type: git repo: github.com/you/my-processor module: github.com/you/my-processor/cmd/my-processor schemas: - example.tx_count.v1 maintainer: name: Your Name url: https://github.com/you
Schema IDs are namespaced — use <org>.<processor>.v<N>. Bump the version on breaking changes; never reuse a version for a different payload.
## 08_HOOKS
The runtime exposes six lifecycle callbacks — OnStart, BeforeLedger, AfterLedger, OnWarning, OnFatal, OnEnd. Register any subset via Runtime.Use(Hooks{...}) or OriginConfig.Hooks; nil fields are no-ops. See docs/HOOKS.md for lifecycle, firing order, and abort semantics.
Each file is self-contained. Copy into your processor's cmd/<name>/ directory, rename package hooks to your package, and wire it into your Hooks slice. No framework, no abstract factory.
nebu_ledgers_processed_total, nebu_ledger_duration_seconds, nebu_warnings_total, nebu_fatals_total, nebu_pipeline_duration_seconds)
nebu.pipeline span with one nebu.ledger child per ledger; warnings as span events, fatals as span error records
Smoke test: hooks-demo wires all three into a real RPC run. go run ./examples/hooks/cmd/hooks-demo.
Common patterns the six callbacks enable:
## 09_STABILITY
STABLE: pkg/processor and pkg/source. Breaking changes require a new major version and a deprecation cycle.
UNSTABLE: pkg/source/rpc, pkg/source/storage, pkg/runtime. Can change without notice — pin minor versions if you depend on them.
PRIVATE: internal/*. Do not import.
A CI check diffs every PR against API snapshots in .api/. If you accidentally break the stable surface, CI blocks the merge.