diff --git a/cmd/seitask/main.go b/cmd/seitask/main.go index bb61bbd5..6102c2c2 100644 --- a/cmd/seitask/main.go +++ b/cmd/seitask/main.go @@ -1,5 +1,6 @@ // Command seitask is the monolithic Workflow-Task primitive binary: one -// binary, multiple urfave/cli subcommands (keygen, provision-snd, …) that +// binary, multiple urfave/cli subcommands (keygen, provision-snd, +// provision-node, …) that // share the internal/taskruntime shared library. See // https://github.com/sei-protocol/bdchatham-designs/blob/main/designs/test-harness/test-harness-lld.md. package main @@ -44,6 +45,7 @@ func main() { Commands: []*cli.Command{ newKeygenCommand(), newProvisionSNDCommand(), + newProvisionNodeCommand(), newRunnerCommand(), newUploadReportCommand(), }, diff --git a/cmd/seitask/main_test.go b/cmd/seitask/main_test.go index ba7da775..9505a5c7 100644 --- a/cmd/seitask/main_test.go +++ b/cmd/seitask/main_test.go @@ -6,6 +6,8 @@ import ( seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1" ) +const apiGroup = "sei.io" + // TestTaskScheme_RoundTripsSeiNetwork would have caught the first manual fire's // `no kind is registered for the type v1alpha1.SeiNetwork in scheme` // regression at `go test`, not at first cluster fire. Asserts the @@ -19,17 +21,27 @@ func TestTaskScheme_RoundTripsSeiNetwork(t *testing.T) { if len(gvks) == 0 { t.Fatalf("no GVKs returned for SeiNetwork") } - if gvks[0].Group != "sei.io" || gvks[0].Version != "v1alpha1" { + if gvks[0].Group != apiGroup || gvks[0].Version != "v1alpha1" { t.Fatalf("SeiNetwork GVK: %+v; want sei.io/v1alpha1", gvks[0]) } } +func TestTaskScheme_RoundTripsSeiNode(t *testing.T) { + gvks, _, err := taskScheme.ObjectKinds(&seiv1alpha1.SeiNode{}) + if err != nil { + t.Fatalf("SeiNode not registered in taskScheme: %v", err) + } + if len(gvks) == 0 || gvks[0].Group != apiGroup || gvks[0].Version != "v1alpha1" { + t.Fatalf("SeiNode GVK wrong: %+v; want sei.io/v1alpha1", gvks) + } +} + func TestTaskScheme_RoundTripsSeiNodeTask(t *testing.T) { gvks, _, err := taskScheme.ObjectKinds(&seiv1alpha1.SeiNodeTask{}) if err != nil { t.Fatalf("SeiNodeTask not registered in taskScheme: %v", err) } - if len(gvks) == 0 || gvks[0].Group != "sei.io" { + if len(gvks) == 0 || gvks[0].Group != apiGroup { t.Fatalf("SeiNodeTask GVK wrong: %+v", gvks) } } diff --git a/cmd/seitask/provision_node.go b/cmd/seitask/provision_node.go new file mode 100644 index 00000000..7edc1463 --- /dev/null +++ b/cmd/seitask/provision_node.go @@ -0,0 +1,121 @@ +package main + +import ( + "context" + "log" + "strings" + "time" + + "github.com/urfave/cli/v3" + + "github.com/sei-protocol/sei-k8s-controller/internal/seitask/provisionnode" + "github.com/sei-protocol/sei-k8s-controller/internal/taskruntime" +) + +// Flag names shared across the template-rendering subcommands (provision-snd, +// provision-node, runner), declared once so goconst stays green. +const ( + flagTemplate = "template" + flagVar = "var" +) + +func newProvisionNodeCommand() *cli.Command { + return &cli.Command{ + Name: "provision-node", + Usage: "Fan out N standalone SeiNode followers from a template, wait for " + + "Running + per-node TM/EVM readiness, and publish role-scoped endpoints", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "role", + Usage: "Role tag for workflow-vars keys (e.g. rpc); uppercased to RPC_*", + Sources: cli.EnvVars("ROLE"), + Required: true, + }, + &cli.StringFlag{ + Name: "name", + Usage: "Base name; the N followers are -0..-(N-1) (defaults to -)", + Sources: cli.EnvVars("NODE_NAME"), + }, + &cli.StringFlag{ + Name: flagTemplate, + Usage: "Path to the Go text/template producing one kind: SeiNode YAML", + Sources: cli.EnvVars("NODE_TEMPLATE"), + Required: true, + }, + &cli.StringSliceFlag{ + Name: flagVar, + Usage: "KEY=VALUE substitution as .KEY (repeatable); .ORDINAL and .NODE_NAME are runtime-injected", + }, + &cli.IntFlag{ + Name: "replicas", + Usage: "N: number of follower SeiNode CRs to fan out", + Sources: cli.EnvVars("NODE_REPLICAS"), + Value: 1, + }, + &cli.StringFlag{ + Name: "network", + Usage: "Genesis SeiNetwork to follow; drives peer auto-wiring + the sei.io/seinetwork object label", + Sources: cli.EnvVars("NETWORK"), + }, + &cli.StringFlag{ + Name: "network-namespace", + Usage: "Namespace of the genesis SeiNetwork for the synthesized peer selector (defaults to the workflow namespace)", + }, + &cli.DurationFlag{ + Name: "running-timeout", + Usage: "Max wait for all N SeiNodes to reach status.phase=Running", + Value: 15 * time.Minute, + }, + &cli.DurationFlag{ + Name: "first-block-timeout", + Usage: "Per-node post-Running readiness budget (TM /status height>0 and EVM eth_blockNumber 200)", + Value: 5 * time.Minute, + }, + &cli.DurationFlag{ + Name: "poll-interval", + Usage: "Status + RPC poll cadence", + Value: 5 * time.Second, + }, + }, + Action: runProvisionNode, + } +} + +func runProvisionNode(ctx context.Context, cmd *cli.Command) error { + c, err := kubeClientFromEnv() + if err != nil { + return err + } + wf, err := taskruntime.LoadWorkflowIdentity(ctx, c) + if err != nil { + return err + } + + vars, err := parseKVPairs(cmd.StringSlice(flagVar)) + if err != nil { + return err + } + + p := provisionnode.Params{ + Role: cmd.String("role"), + Name: cmd.String("name"), + TemplatePath: cmd.String(flagTemplate), + Vars: vars, + Replicas: cmd.Int("replicas"), + Network: cmd.String("network"), + NetworkNamespace: cmd.String("network-namespace"), + RunningTimeout: cmd.Duration("running-timeout"), + FirstBlockTimeout: cmd.Duration("first-block-timeout"), + PollInterval: cmd.Duration("poll-interval"), + Workflow: wf, + } + res, err := provisionnode.Run(ctx, c, p) + if err != nil { + taskruntime.WriteExitReason(ctx, c, wf, err) + return err + } + taskruntime.WriteExitReason(ctx, c, wf, nil) + log.Printf("provision-node: %d SeiNode(s) Running [%s], chainID=%s, EVM_RPC_LIST=%s", + len(res.Names), strings.Join(res.Names, ","), res.ChainID, res.EVMRPCList) + return nil +} diff --git a/cmd/seitask/provision_snd.go b/cmd/seitask/provision_snd.go index d13e4b9c..794f75be 100644 --- a/cmd/seitask/provision_snd.go +++ b/cmd/seitask/provision_snd.go @@ -31,13 +31,13 @@ func newProvisionSNDCommand() *cli.Command { Sources: cli.EnvVars("SND_NAME"), }, &cli.StringFlag{ - Name: "template", + Name: flagTemplate, Usage: "Path to the Go text/template producing a SeiNetwork YAML", Sources: cli.EnvVars("SND_TEMPLATE"), Required: true, }, &cli.StringSliceFlag{ - Name: "var", + Name: flagVar, Usage: "KEY=VALUE substitution exposed to the template as .KEY (repeatable)", }, &cli.DurationFlag{ @@ -65,7 +65,7 @@ func runProvisionSND(ctx context.Context, cmd *cli.Command) error { return err } - vars, err := parseKVPairs(cmd.StringSlice("var")) + vars, err := parseKVPairs(cmd.StringSlice(flagVar)) if err != nil { return err } @@ -73,7 +73,7 @@ func runProvisionSND(ctx context.Context, cmd *cli.Command) error { p := provisionsnd.Params{ Role: cmd.String("role"), Name: cmd.String("name"), - TemplatePath: cmd.String("template"), + TemplatePath: cmd.String(flagTemplate), Vars: vars, ReadyTimeout: cmd.Duration("ready-timeout"), FirstBlockTimeout: cmd.Duration("first-block-timeout"), diff --git a/cmd/seitask/runner.go b/cmd/seitask/runner.go index 70c87675..a550c436 100644 --- a/cmd/seitask/runner.go +++ b/cmd/seitask/runner.go @@ -26,12 +26,12 @@ func newRunnerCommand() *cli.Command { Usage: "Apply a SeiNodeTask CR from a template and poll until terminal", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "template", + Name: flagTemplate, Usage: "Path to the Go text/template producing a SeiNodeTask manifest (required)", Required: true, }, &cli.StringSliceFlag{ - Name: "var", + Name: flagVar, Usage: "KEY=VALUE substitution exposed to the template as .KEY (repeatable)", }, &cli.StringSliceFlag{ @@ -80,7 +80,7 @@ func newRunnerCommand() *cli.Command { } func runRunner(ctx context.Context, cmd *cli.Command) error { - varMap, err := parseKVSlice(cmd.StringSlice("var")) + varMap, err := parseKVSlice(cmd.StringSlice(flagVar)) if err != nil { return err } @@ -117,7 +117,7 @@ func runRunner(ctx context.Context, cmd *cli.Command) error { r := &runner.Run{ Opts: runner.Options{ - TemplatePath: cmd.String("template"), + TemplatePath: cmd.String(flagTemplate), Vars: varMap, OutputJSONPaths: cmd.StringSlice("output-jsonpath"), OutputEnvFile: cmd.String("output-env-file"), diff --git a/internal/seitask/provisionnode/provision.go b/internal/seitask/provisionnode/provision.go new file mode 100644 index 00000000..0cd3c0a0 --- /dev/null +++ b/internal/seitask/provisionnode/provision.go @@ -0,0 +1,495 @@ +// Package provisionnode implements `seitask provision-node`: fan out N +// standalone SeiNode follower CRs from one Go template, stamp an ownerRef +// to the parent Workflow, Create them, await PhaseRunning, run a two-stage +// per-node readiness probe (Tendermint /status height>0, then EVM +// eth_blockNumber 200), then publish role-scoped endpoints to workflow-vars +// (_EVM_RPC_LIST, _EVM_RPC, _TM_RPC, _REST, CHAIN_ID). +// +// Unlike provision-snd (genesis SeiNetwork, waits Ready, reads the fleet +// aggregate), provision-node provisions followers that join an existing chain. +// It assembles every workflow-vars key from the N per-node .status.endpoint +// scalars because a standalone SeiNode has no fleet ClusterIP to aggregate. +// +// The N CRs are named -0..-(N-1); the controller stamps +// sei.io/node= on each pod, preserving the chaos suite's pod +// selectors. provision-node also stamps sei.io/role=node (always) and +// sei.io/seinetwork= (when --network is set) on each CR's +// metadata.labels — the shared object-label producer contract with +// `seictl node apply`, which the follower-discovery query +// (node list -l sei.io/seinetwork=,sei.io/role=node) matches on. +package provisionnode + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "maps" + "net/http" + "os" + "reflect" + "strconv" + "strings" + "text/template" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/yaml" + + seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1" + "github.com/sei-protocol/sei-k8s-controller/internal/taskruntime" +) + +const fieldOwner client.FieldOwner = "seitask-provision-node" + +// Object-label producer contract (§2.2a) — MUST stay byte-identical to +// `seictl node apply`. The keys/values mirror the controller's canonical +// constants (noderesource: sei.io/role / "node"; seinetwork: sei.io/seinetwork), +// which are unexported, so we re-declare them. The contract test pins these +// literals so an accidental edit here fails; the controller side is independently +// pinned by noderesource_test.go. +const ( + labelRole = "sei.io/role" + roleValueNode = "node" + labelSeiNetwork = "sei.io/seinetwork" +) + +// Params carries the typed inputs to Run. +type Params struct { + // Role tags the workflow-vars keys this Task writes (e.g. "rpc"). + // Uppercased to compose RPC_EVM_RPC_LIST etc. Required. + Role string + + // Name is the BASE name; the N followers are -0..-(N-1). + // Defaults to "-" (or "-" when no + // CHAIN_ID var) so chaos sei.io/node selectors stay valid. + Name string + + // TemplatePath is the on-disk path to the Go text/template producing + // ONE kind: SeiNode YAML. Rendered once per replica with .ORDINAL and + // .NODE_NAME injected. Required. + TemplatePath string + + // Vars are the template's substitution context (the .KEY map). Missing + // keys referenced by the template fail rendering. The runtime injects + // .ORDINAL and .NODE_NAME per replica; a --var collision on either is + // rejected (mirrors the runner's --var NODE= guard). + Vars map[string]string + + // Replicas is N: the number of follower SeiNode CRs to fan out. >=1. + Replicas int + + // Network is the genesis SeiNetwork to follow. When set, the runtime + // (a) synthesizes a LabelPeerSource selecting sei.io/seinetwork= + // and (b) stamps the sei.io/seinetwork= object label. + Network string + + // NetworkNamespace is the namespace of the genesis SeiNetwork for the + // synthesized peer selector. Defaults to the Workflow namespace. + NetworkNamespace string + + // RunningTimeout bounds the wait for all N SeiNodes to reach PhaseRunning. + RunningTimeout time.Duration + + // FirstBlockTimeout bounds the post-Running readiness probe (both the TM + // height>0 stage and the EVM eth_blockNumber stage), per node. + FirstBlockTimeout time.Duration + + // PollInterval is the interval between status reads and RPC reads. + PollInterval time.Duration + + // HTTPClient overrides the RPC client; nil means http.DefaultClient. + HTTPClient *http.Client + + // Workflow is the parent Chaos Mesh Workflow identity (downward-API). + Workflow taskruntime.WorkflowIdentity +} + +// Result is the post-Run summary, returned so main can log it before exit. +type Result struct { + // Names are the N created SeiNode names, ordinal-ordered. + Names []string + // ChainID is the resolved chain ID published as CHAIN_ID. + ChainID string + // EVMRPCList is the assembled _EVM_RPC_LIST CSV. + EVMRPCList string +} + +// Run renders the template N times, creates N SeiNode followers with an +// ownerRef to the parent Workflow, waits for all to reach PhaseRunning, runs +// the per-node two-stage readiness probe, then publishes role-scoped endpoints. +func Run(ctx context.Context, c client.Client, p Params) (Result, error) { + if err := validateParams(p); err != nil { + return Result{}, err + } + p = withDefaults(p) + + names := make([]string, 0, p.Replicas) + for ordinal := 0; ordinal < p.Replicas; ordinal++ { + node, err := renderNode(p, ordinal) + if err != nil { + return Result{}, taskruntime.Task(fmt.Errorf("rendering template %s (ordinal %d): %w", p.TemplatePath, ordinal, err)) + } + stampMetadata(node, p, ordinal) + + if err := c.Create(ctx, node, fieldOwner); err != nil { + if !apierrors.IsAlreadyExists(err) { + return Result{}, taskruntime.Infra(fmt.Errorf("creating SeiNode %s/%s: %w", node.Namespace, node.Name, err)) + } + // Re-runs land here. Surface drift loudly so an operator who + // edited the template since the original Create knows the cluster + // is still at the original spec — we don't force-apply. + warnIfDrift(ctx, c, node) + } + names = append(names, node.Name) + } + + // Wait for all N to reach Running under one shared deadline. + if err := waitForRunning(ctx, c, p.Workflow.Namespace, names, p.RunningTimeout, p.PollInterval); err != nil { + return Result{}, err + } + + // Re-read each node post-Running for its .status.endpoint, then run the + // two-stage readiness probe before publishing. + nodes := make([]*seiv1alpha1.SeiNode, 0, len(names)) + httpClient := p.HTTPClient + if httpClient == nil { + httpClient = http.DefaultClient + } + for _, name := range names { + node := &seiv1alpha1.SeiNode{} + if err := c.Get(ctx, types.NamespacedName{Namespace: p.Workflow.Namespace, Name: name}, node); err != nil { + return Result{}, taskruntime.Infra(fmt.Errorf("re-reading SeiNode %s post-Running: %w", name, err)) + } + ep := node.Status.Endpoint + if ep == nil || ep.TendermintRpc == "" { + return Result{}, taskruntime.Infra(fmt.Errorf("SeiNode %s Running but .status.endpoint.tendermintRpc empty", name)) + } + // Stage 2 — TM liveness: the node has joined the chain and is syncing. + if err := waitForFirstBlock(ctx, httpClient, ep.TendermintRpc, p.FirstBlockTimeout, p.PollInterval); err != nil { + return Result{}, err + } + // Stage 3 — EVM liveness: the JSON-RPC listener is bound before its + // URL enters RPC_EVM_RPC_LIST. height>0 on TM does NOT prove this. + if ep.EvmJsonRpc == "" { + return Result{}, taskruntime.Infra(fmt.Errorf("SeiNode %s Running but .status.endpoint.evmJsonRpc empty", name)) + } + if err := waitForEVMReady(ctx, httpClient, ep.EvmJsonRpc, p.FirstBlockTimeout, p.PollInterval); err != nil { + return Result{}, err + } + nodes = append(nodes, node) + } + + chainID := p.Vars[string(taskruntime.KeyChainID)] + if chainID == "" && len(nodes) > 0 { + chainID = nodes[0].Spec.ChainID + } + + evmList, err := publishEndpoints(ctx, c, p.Workflow, p.Role, chainID, nodes) + if err != nil { + return Result{}, err + } + return Result{Names: names, ChainID: chainID, EVMRPCList: evmList}, nil +} + +func validateParams(p Params) error { + switch { + case p.Role == "": + return fmt.Errorf("provision-node: --role is required") + case p.TemplatePath == "": + return fmt.Errorf("provision-node: --template is required") + case p.Replicas < 1: + return fmt.Errorf("provision-node: --replicas must be >= 1, got %d", p.Replicas) + case p.Workflow.Name == "" || p.Workflow.Namespace == "": + return fmt.Errorf("provision-node: workflow identity not loaded") + } + // The runtime injects .ORDINAL and .NODE_NAME per replica; a --var on + // either would silently shadow them. Reject, mirroring the runner's + // --var NODE= guard under --per-node-selector. + if _, ok := p.Vars["ORDINAL"]; ok { + return fmt.Errorf("provision-node: --var ORDINAL=... collides with the runtime-injected .ORDINAL") + } + if _, ok := p.Vars["NODE_NAME"]; ok { + return fmt.Errorf("provision-node: --var NODE_NAME=... collides with the runtime-injected .NODE_NAME") + } + return nil +} + +func withDefaults(p Params) Params { + if p.Name == "" { + base := p.Workflow.Name + if cid := p.Vars[string(taskruntime.KeyChainID)]; cid != "" { + base = cid + } + p.Name = base + "-" + p.Role + } + if p.NetworkNamespace == "" { + p.NetworkNamespace = p.Workflow.Namespace + } + if p.RunningTimeout == 0 { + p.RunningTimeout = 15 * time.Minute + } + if p.FirstBlockTimeout == 0 { + p.FirstBlockTimeout = 5 * time.Minute + } + if p.PollInterval == 0 { + p.PollInterval = 5 * time.Second + } + return p +} + +// renderNode parses the template, executes it against the caller's vars plus +// the runtime-injected .ORDINAL and .NODE_NAME, then strict-unmarshals the +// rendered bytes into a SeiNode so field typos fail here, not at Create time. +func renderNode(p Params, ordinal int) (*seiv1alpha1.SeiNode, error) { + raw, err := os.ReadFile(p.TemplatePath) + if err != nil { + return nil, fmt.Errorf("read: %w", err) + } + tmpl, err := template.New(p.TemplatePath).Option("missingkey=error").Parse(string(raw)) + if err != nil { + return nil, fmt.Errorf("parse: %w", err) + } + ctxVars := make(map[string]string, len(p.Vars)+2) + maps.Copy(ctxVars, p.Vars) + ctxVars["ORDINAL"] = strconv.Itoa(ordinal) + ctxVars["NODE_NAME"] = nodeName(p.Name, ordinal) + + var buf bytes.Buffer + if err := tmpl.Execute(&buf, ctxVars); err != nil { + return nil, fmt.Errorf("execute: %w", err) + } + out := &seiv1alpha1.SeiNode{} + if err := yaml.UnmarshalStrict(buf.Bytes(), out); err != nil { + return nil, fmt.Errorf("unmarshal rendered yaml: %w", err) + } + return out, nil +} + +func nodeName(base string, ordinal int) string { + return base + "-" + strconv.Itoa(ordinal) +} + +// stampMetadata overwrites metadata fields the template MUST NOT control, +// stamps the shared object-label producer contract (§2.2a), and appends the +// synthesized peer source (§3). OwnerReferences are assigned (not appended) +// so a template that smuggles a bogus ref can't leak through. +func stampMetadata(node *seiv1alpha1.SeiNode, p Params, ordinal int) { + node.APIVersion = seiv1alpha1.GroupVersion.String() + node.Kind = "SeiNode" + node.Name = nodeName(p.Name, ordinal) + node.Namespace = p.Workflow.Namespace + node.OwnerReferences = []metav1.OwnerReference{p.Workflow.OwnerRef()} + + // Object-label producer contract — identical to `seictl node apply`. + if node.Labels == nil { + node.Labels = map[string]string{} + } + node.Labels[labelRole] = roleValueNode + if p.Network != "" { + node.Labels[labelSeiNetwork] = p.Network + } + + // Peer auto-wiring: synthesize the genesis-pool label source. Appended + // (not assigned) so a template's own static seed peers compose naturally. + if p.Network != "" { + node.Spec.Peers = append(node.Spec.Peers, seiv1alpha1.PeerSource{ + Label: &seiv1alpha1.LabelPeerSource{ + Selector: map[string]string{labelSeiNetwork: p.Network}, + Namespace: p.NetworkNamespace, + }, + }) + } +} + +// warnIfDrift logs when a re-run finds the on-cluster SeiNode.Spec different +// from the freshly-rendered one. Operators who edited the template since the +// original Create need to know the cluster still has the old spec. +func warnIfDrift(ctx context.Context, c client.Client, fresh *seiv1alpha1.SeiNode) { + existing := &seiv1alpha1.SeiNode{} + if err := c.Get(ctx, types.NamespacedName{Namespace: fresh.Namespace, Name: fresh.Name}, existing); err != nil { + return + } + if reflect.DeepEqual(existing.Spec, fresh.Spec) { + return + } + fmt.Fprintf(os.Stderr, "WARN: SeiNode %s/%s exists with spec different from rendered template; reusing on-cluster spec\n", fresh.Namespace, fresh.Name) +} + +// waitForRunning polls each of the named SeiNodes until .status.phase == +// PhaseRunning, failing fast on PhaseFailed. All N share one deadline. +func waitForRunning(ctx context.Context, c client.Client, ns string, names []string, timeout, interval time.Duration) error { + return wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { + for _, name := range names { + node := &seiv1alpha1.SeiNode{} + if err := c.Get(ctx, types.NamespacedName{Namespace: ns, Name: name}, node); err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, taskruntime.Infra(fmt.Errorf("reading SeiNode %s: %w", name, err)) + } + switch node.Status.Phase { + case seiv1alpha1.PhaseRunning: + // this node done; check the rest + case seiv1alpha1.PhaseFailed: + return false, taskruntime.Task(fmt.Errorf("SeiNode %s reached Failed phase", name)) + default: + return false, nil + } + } + return true, nil + }) +} + +// tendermintStatusResponse models the subset of Tendermint /status we need. +// Sei's CometBFT fork sometimes returns the body unwrapped (no JSON-RPC +// envelope), so we accept both shapes and fall back via Result/SyncInfo. +type tendermintStatusResponse struct { + Result *struct { + SyncInfo struct { + LatestBlockHeight string `json:"latest_block_height"` + } `json:"sync_info"` + } `json:"result,omitempty"` + SyncInfo struct { + LatestBlockHeight string `json:"latest_block_height"` + } `json:"sync_info"` +} + +func (r *tendermintStatusResponse) latestHeight() string { + if r.Result != nil && r.Result.SyncInfo.LatestBlockHeight != "" { + return r.Result.SyncInfo.LatestBlockHeight + } + return r.SyncInfo.LatestBlockHeight +} + +// waitForFirstBlock polls a Tendermint /status until latest_block_height > 0, +// gating a follower's TM liveness (the node has joined and is syncing). +func waitForFirstBlock(ctx context.Context, hc *http.Client, tmRPC string, timeout, interval time.Duration) error { + return wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, tmRPC+"/status", nil) + if err != nil { + return false, taskruntime.Infra(fmt.Errorf("status req: %w", err)) + } + resp, err := hc.Do(req) + if err != nil { + return false, nil + } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode != http.StatusOK { + return false, nil + } + var parsed tendermintStatusResponse + if err := json.NewDecoder(resp.Body).Decode(&parsed); err != nil { + return false, nil + } + h := parsed.latestHeight() + if h == "" || h == "0" { + return false, nil + } + return true, nil + }) +} + +// evmRPCResponse models a JSON-RPC envelope with a string result, used to +// confirm eth_blockNumber returned a well-formed reply. +type evmRPCResponse struct { + Result string `json:"result"` + Error *struct { + Message string `json:"message"` + } `json:"error,omitempty"` +} + +// waitForEVMReady POSTs eth_blockNumber to the node's EVM JSON-RPC URL and +// requires HTTP 200 plus a non-empty, error-free result. This is the gate +// (§5.2 stage 3) that proves the EVM listener is BOUND before its URL enters +// RPC_EVM_RPC_LIST — TM height>0 does not prove the EVM listener accepts +// connections (WS-A0: .status.endpoint is discoverability, not serve-readiness). +// Non-200 / connection-refused (listener not yet up) keeps polling until the +// timeout, then infra-fails. +func waitForEVMReady(ctx context.Context, hc *http.Client, evmRPC string, timeout, interval time.Duration) error { + const body = `{"jsonrpc":"2.0","id":1,"method":"eth_blockNumber","params":[]}` + if err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, evmRPC, strings.NewReader(body)) + if err != nil { + return false, taskruntime.Infra(fmt.Errorf("eth_blockNumber req: %w", err)) + } + req.Header.Set("Content-Type", "application/json") + resp, err := hc.Do(req) + if err != nil { + return false, nil + } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode != http.StatusOK { + return false, nil + } + var parsed evmRPCResponse + if err := json.NewDecoder(resp.Body).Decode(&parsed); err != nil { + return false, nil + } + if parsed.Error != nil || parsed.Result == "" { + return false, nil + } + return true, nil + }); err != nil { + return taskruntime.Infra(fmt.Errorf("EVM JSON-RPC %s not ready within %s: %w", evmRPC, timeout, err)) + } + return nil +} + +// publishEndpoints assembles all five workflow-vars keys from the N per-node +// .status.endpoint scalars (a standalone SeiNode has no fleet aggregate) and +// writes them. Returns the assembled EVM CSV for the Result summary. +// +// Empty-guard (§6.4): every node's evmJsonRpc must be non-empty (a missing +// follower endpoint is a provisioning fault, not a filterable condition), and +// node-0's tendermintRpc must be non-empty before it feeds _TM_RPC / +// _REST (guards a future non-EVM role from emitting a garbage URL the +// chaos wait-for-caught-up probe would curl). +func publishEndpoints(ctx context.Context, c client.Client, w taskruntime.WorkflowIdentity, role, chainID string, nodes []*seiv1alpha1.SeiNode) (string, error) { + if len(nodes) == 0 { + return "", taskruntime.Infra(fmt.Errorf("provision-node: no SeiNodes to publish")) + } + + urls := make([]string, 0, len(nodes)) + for _, n := range nodes { // nodes ordered 0..N-1 + ep := n.Status.Endpoint + if ep == nil || ep.EvmJsonRpc == "" { + return "", taskruntime.Infra(fmt.Errorf("SeiNode %s Running but .status.endpoint.evmJsonRpc empty", n.Name)) + } + urls = append(urls, ep.EvmJsonRpc) + } + + node0 := nodes[0].Status.Endpoint + if node0.TendermintRpc == "" { + return "", taskruntime.Infra(fmt.Errorf("SeiNode %s Running but .status.endpoint.tendermintRpc empty", nodes[0].Name)) + } + if node0.TendermintRest == "" { + return "", taskruntime.Infra(fmt.Errorf("SeiNode %s Running but .status.endpoint.tendermintRest empty", nodes[0].Name)) + } + + evmList := strings.Join(urls, ",") + + if err := taskruntime.EnsureWorkflowVarsCM(ctx, c, w, map[taskruntime.VarKey]string{ + taskruntime.KeyRunID: w.Name, + }); err != nil { + return "", err + } + vars := map[taskruntime.VarKey]string{ + // CHAIN_ID lives in SetVars (merge), not the EnsureWorkflowVarsCM seed + // (no-op on AlreadyExists): the genesis provision step creates the CM + // first, so a CHAIN_ID seed here would be silently dropped. + taskruntime.KeyChainID: chainID, + taskruntime.RoleScoped(role, taskruntime.KeyEVMJSONRPCList): evmList, + taskruntime.RoleScoped(role, taskruntime.KeyEVMJSONRPC): node0.EvmJsonRpc, + taskruntime.RoleScoped(role, taskruntime.KeyTendermintRPC): node0.TendermintRpc, + taskruntime.RoleScoped(role, taskruntime.KeyTendermintREST): node0.TendermintRest, + } + if err := taskruntime.SetVars(ctx, c, w, vars); err != nil { + return "", err + } + return evmList, nil +} diff --git a/internal/seitask/provisionnode/provision_test.go b/internal/seitask/provisionnode/provision_test.go new file mode 100644 index 00000000..9594a4d6 --- /dev/null +++ b/internal/seitask/provisionnode/provision_test.go @@ -0,0 +1,646 @@ +package provisionnode + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + seiv1alpha1 "github.com/sei-protocol/sei-k8s-controller/api/v1alpha1" + "github.com/sei-protocol/sei-k8s-controller/internal/taskruntime" +) + +const ( + testNamespace = "nightly" + testWorkflowName = "wf-test" + testWorkflowVarsCM = "workflow-vars-wf-test" + testRole = "rpc" + testChainID = "bench-1" + testImage = "ghcr.io/sei/sei-chain:abc123" + testNetwork = "bench-1" + varKeyChainID = "CHAIN_ID" + varKeyImage = "IMAGE" + + testBase = testChainID + "-" + testRole // "bench-1-rpc" + testNode0 = testBase + "-0" // "bench-1-rpc-0" + testNode1 = testBase + "-1" // "bench-1-rpc-1" + + tmSyncInfoField = "sync_info" + tmHeightField = "latest_block_height" +) + +const fullNodeTmpl = `apiVersion: sei.io/v1alpha1 +kind: SeiNode +metadata: + name: PLACEHOLDER +spec: + chainId: {{ .CHAIN_ID }} + image: {{ .IMAGE }} + fullNode: {} +` + +func newScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + for _, add := range []func(*runtime.Scheme) error{ + corev1.AddToScheme, + seiv1alpha1.AddToScheme, + } { + if err := add(s); err != nil { + t.Fatal(err) + } + } + return s +} + +func writeTmpl(t *testing.T, body string) string { + t.Helper() + dir := t.TempDir() + p := filepath.Join(dir, "node.yaml.tmpl") + if err := os.WriteFile(p, []byte(body), 0o600); err != nil { + t.Fatal(err) + } + return p +} + +func testWorkflow() taskruntime.WorkflowIdentity { + return taskruntime.WorkflowIdentity{Name: testWorkflowName, UID: "uid-test", Namespace: testNamespace} +} + +func baseParams() Params { + return Params{ + Role: testRole, + Name: testChainID + "-" + testRole, + TemplatePath: "x.yaml.tmpl", + Replicas: 2, + Workflow: testWorkflow(), + } +} + +// --- renderNode ----------------------------------------------------------- + +func TestRenderNode_SubstitutesVarsAndInjectsOrdinal(t *testing.T) { + path := writeTmpl(t, `apiVersion: sei.io/v1alpha1 +kind: SeiNode +metadata: + name: {{ .NODE_NAME }} +spec: + chainId: {{ .CHAIN_ID }} + image: {{ .IMAGE }} + fullNode: {} + overrides: + ordinal: "{{ .ORDINAL }}" +`) + p := baseParams() + p.Name = testBase + p.TemplatePath = path + p.Vars = map[string]string{varKeyChainID: testChainID, varKeyImage: testImage} + + node, err := renderNode(p, 1) + if err != nil { + t.Fatalf("renderNode: %v", err) + } + if node.Spec.ChainID != testChainID { + t.Errorf("ChainID = %q", node.Spec.ChainID) + } + if node.Spec.Image != testImage { + t.Errorf("Image = %q", node.Spec.Image) + } + // .NODE_NAME and .ORDINAL injected for ordinal 1. + if node.Name != testNode1 { + t.Errorf("NODE_NAME injection: name = %q, want bench-1-rpc-1", node.Name) + } + if got := node.Spec.Overrides["ordinal"]; got != "1" { + t.Errorf("ORDINAL injection: overrides[ordinal] = %q, want 1", got) + } +} + +func TestRenderNode_MissingVarFailsRender(t *testing.T) { + path := writeTmpl(t, fullNodeTmpl) + p := baseParams() + p.TemplatePath = path + p.Vars = map[string]string{varKeyChainID: testChainID} // IMAGE missing + if _, err := renderNode(p, 0); err == nil { + t.Fatalf("expected error: IMAGE not provided") + } +} + +func TestRenderNode_StrictUnmarshalCatchesTypos(t *testing.T) { + path := writeTmpl(t, `apiVersion: sei.io/v1alpha1 +kind: SeiNode +metadata: + name: PLACEHOLDER +spec: + chainId: {{ .CHAIN_ID }} + imagge: {{ .IMAGE }} + fullNode: {} +`) + p := baseParams() + p.TemplatePath = path + p.Vars = map[string]string{varKeyChainID: testChainID, varKeyImage: testImage} + if _, err := renderNode(p, 0); err == nil { + t.Fatalf("expected strict-unmarshal error on `imagge` typo") + } +} + +// --- validateParams (collision guards) ------------------------------------ + +func TestValidateParams(t *testing.T) { + full := Params{ + Role: testRole, + TemplatePath: "x.yaml.tmpl", + Replicas: 2, + Workflow: testWorkflow(), + } + cases := []struct { + name string + mut func(*Params) + want bool + }{ + {"complete", func(*Params) {}, false}, + {"missing role", func(p *Params) { p.Role = "" }, true}, + {"missing template", func(p *Params) { p.TemplatePath = "" }, true}, + {"replicas zero", func(p *Params) { p.Replicas = 0 }, true}, + {"missing workflow.Name", func(p *Params) { p.Workflow.Name = "" }, true}, + {"ORDINAL collision", func(p *Params) { p.Vars = map[string]string{"ORDINAL": "x"} }, true}, + {"NODE_NAME collision", func(p *Params) { p.Vars = map[string]string{"NODE_NAME": "x"} }, true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + p := full + tc.mut(&p) + err := validateParams(p) + if (err != nil) != tc.want { + t.Fatalf("validateParams err=%v wantErr=%v", err, tc.want) + } + }) + } +} + +// --- stampMetadata (object labels + peer wiring) -------------------------- + +func TestStampMetadata_NamingAndOwnerRef(t *testing.T) { + node := &seiv1alpha1.SeiNode{ + ObjectMeta: metav1.ObjectMeta{ + // Template author smuggling a bogus ownerRef: must be overwritten. + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "evil/v1", Kind: "Bogus", Name: "smuggled", UID: "bad", + }}, + }, + } + p := baseParams() + p.Name = testBase + stampMetadata(node, p, 1) + + if node.Name != testNode1 { + t.Fatalf("name = %q, want bench-1-rpc-1", node.Name) + } + if node.Namespace != testNamespace { + t.Fatalf("namespace = %q", node.Namespace) + } + if len(node.OwnerReferences) != 1 || node.OwnerReferences[0].Kind != "Workflow" { + t.Fatalf("ownerRef not replaced: %+v", node.OwnerReferences) + } +} + +func TestStampMetadata_ObjectLabels_WithNetwork(t *testing.T) { + node := &seiv1alpha1.SeiNode{} + p := baseParams() + p.Network = testNetwork + stampMetadata(node, p, 0) + + if got := node.Labels[labelRole]; got != roleValueNode { + t.Errorf("label %s = %q, want %q", labelRole, got, roleValueNode) + } + if got := node.Labels[labelSeiNetwork]; got != testNetwork { + t.Errorf("label %s = %q, want %q", labelSeiNetwork, got, testNetwork) + } + // Producer-contract literals — must match WS-A's seictl node apply. + if labelRole != "sei.io/role" || roleValueNode != "node" || labelSeiNetwork != "sei.io/seinetwork" { + t.Fatalf("object-label producer contract drifted: %s=%s, %s", labelRole, roleValueNode, labelSeiNetwork) + } +} + +func TestStampMetadata_ObjectLabels_NoNetwork_OmitsNetworkLabel(t *testing.T) { + node := &seiv1alpha1.SeiNode{} + p := baseParams() + p.Network = "" // no --network + stampMetadata(node, p, 0) + + if got := node.Labels[labelRole]; got != roleValueNode { + t.Errorf("label %s = %q, want %q (unconditional)", labelRole, got, roleValueNode) + } + if _, ok := node.Labels[labelSeiNetwork]; ok { + t.Errorf("label %s present without --network; must be OMITTED, not stamped empty", labelSeiNetwork) + } +} + +func TestStampMetadata_PeerWiring_WithNetwork(t *testing.T) { + node := &seiv1alpha1.SeiNode{} + p := baseParams() + p.Network = testNetwork + p.NetworkNamespace = "genesis-ns" + stampMetadata(node, p, 0) + + if len(node.Spec.Peers) != 1 { + t.Fatalf("peers = %d, want 1 synthesized", len(node.Spec.Peers)) + } + lbl := node.Spec.Peers[0].Label + if lbl == nil { + t.Fatalf("synthesized peer is not a LabelPeerSource: %+v", node.Spec.Peers[0]) + } + if got := lbl.Selector[labelSeiNetwork]; got != testNetwork { + t.Errorf("peer selector %s = %q, want %q", labelSeiNetwork, got, testNetwork) + } + if lbl.Namespace != "genesis-ns" { + t.Errorf("peer namespace = %q, want genesis-ns", lbl.Namespace) + } +} + +func TestStampMetadata_NoNetwork_NoSynthesizedPeer(t *testing.T) { + node := &seiv1alpha1.SeiNode{} + p := baseParams() + p.Network = "" + stampMetadata(node, p, 0) + if len(node.Spec.Peers) != 0 { + t.Fatalf("peers = %d, want 0 (no --network)", len(node.Spec.Peers)) + } +} + +func TestStampMetadata_PreservesTemplatePeer_Appends(t *testing.T) { + node := &seiv1alpha1.SeiNode{ + Spec: seiv1alpha1.SeiNodeSpec{ + Peers: []seiv1alpha1.PeerSource{{ + Static: &seiv1alpha1.StaticPeerSource{Addresses: []string{"id@1.2.3.4:26656"}}, + }}, + }, + } + p := baseParams() + p.Network = testNetwork + stampMetadata(node, p, 0) + + if len(node.Spec.Peers) != 2 { + t.Fatalf("peers = %d, want 2 (template static + synthesized label)", len(node.Spec.Peers)) + } + if node.Spec.Peers[0].Static == nil { + t.Errorf("template static peer not preserved as first element: %+v", node.Spec.Peers[0]) + } + if node.Spec.Peers[1].Label == nil { + t.Errorf("synthesized label peer not appended as second element: %+v", node.Spec.Peers[1]) + } +} + +// --- waitForRunning ------------------------------------------------------- + +func TestWaitForRunning(t *testing.T) { + cases := []struct { + name string + phase seiv1alpha1.SeiNodePhase + wantErr bool + taskErr bool // expect a task-class (terminal) error + }{ + {"running", seiv1alpha1.PhaseRunning, false, false}, + {"failed", seiv1alpha1.PhaseFailed, true, true}, + {"pending times out", seiv1alpha1.PhasePending, true, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + node := &seiv1alpha1.SeiNode{ + ObjectMeta: metav1.ObjectMeta{Name: testNode0, Namespace: testNamespace}, + Status: seiv1alpha1.SeiNodeStatus{Phase: tc.phase}, + } + c := fake.NewClientBuilder(). + WithScheme(newScheme(t)). + WithObjects(node). + WithStatusSubresource(&seiv1alpha1.SeiNode{}). + Build() + + err := waitForRunning(context.Background(), c, testNamespace, + []string{testNode0}, 200*time.Millisecond, 20*time.Millisecond) + if (err != nil) != tc.wantErr { + t.Fatalf("err=%v wantErr=%v", err, tc.wantErr) + } + if tc.taskErr && taskruntime.ExitCodeFor(err) != taskruntime.ExitTaskFailure { + t.Fatalf("Failed phase should yield a task-class error, got exit code %d", taskruntime.ExitCodeFor(err)) + } + }) + } +} + +// --- readiness probe (stage 2 TM, stage 3 EVM) ---------------------------- + +func TestTendermintStatusResponse_LatestHeight(t *testing.T) { + cases := []struct { + name string + body string + want string + }{ + {"jsonrpc envelope", `{"result":{"sync_info":{"latest_block_height":"42"}}}`, "42"}, + {"bare", `{"sync_info":{"latest_block_height":"7"}}`, "7"}, + {"empty", `{}`, ""}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var r tendermintStatusResponse + if err := json.Unmarshal([]byte(tc.body), &r); err != nil { + t.Fatal(err) + } + if got := r.latestHeight(); got != tc.want { + t.Fatalf("got %q, want %q", got, tc.want) + } + }) + } +} + +func TestWaitForFirstBlock_HeightGtZero(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _ = json.NewEncoder(w).Encode(map[string]any{ + tmSyncInfoField: map[string]any{tmHeightField: "5"}, + }) + })) + defer srv.Close() + if err := waitForFirstBlock(context.Background(), srv.Client(), srv.URL, time.Second, 10*time.Millisecond); err != nil { + t.Fatalf("waitForFirstBlock: %v", err) + } +} + +func TestWaitForEVMReady_BoundListener(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _ = json.NewEncoder(w).Encode(map[string]any{"jsonrpc": "2.0", "id": 1, "result": "0x10"}) + })) + defer srv.Close() + if err := waitForEVMReady(context.Background(), srv.Client(), srv.URL, time.Second, 10*time.Millisecond); err != nil { + t.Fatalf("waitForEVMReady: %v", err) + } +} + +func TestWaitForEVMReady_NotBound_InfraFailsAfterTimeout(t *testing.T) { + // 503 == listener not yet up; must keep polling, then infra-fail. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer srv.Close() + err := waitForEVMReady(context.Background(), srv.Client(), srv.URL, 150*time.Millisecond, 20*time.Millisecond) + if err == nil { + t.Fatalf("expected infra-fail on never-bound EVM listener") + } + if taskruntime.ExitCodeFor(err) != taskruntime.ExitInfraError { + t.Fatalf("EVM-not-ready should be infra-class, got exit code %d", taskruntime.ExitCodeFor(err)) + } +} + +// TestRun_PublishBlockedWhileEVMDialFails is the finding-2 gate: TM reports +// height>0 but the EVM listener never binds, so publish must NOT proceed and +// no workflow-vars are written. +func TestRun_PublishBlockedWhileEVMDialFails(t *testing.T) { + w := testWorkflow() + tmplPath := writeTmpl(t, fullNodeTmpl) + vars := map[string]string{varKeyChainID: testChainID, varKeyImage: testImage} + + // TM /status answers height>0; EVM POST always 503 (never bound). + var tmHits atomic.Int32 + srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodGet { // TM /status + tmHits.Add(1) + _ = json.NewEncoder(rw).Encode(map[string]any{ + tmSyncInfoField: map[string]any{tmHeightField: "9"}, + }) + return + } + rw.WriteHeader(http.StatusServiceUnavailable) // EVM eth_blockNumber + })) + defer srv.Close() + + node := &seiv1alpha1.SeiNode{ + ObjectMeta: metav1.ObjectMeta{Name: testChainID + "-" + testRole + "-0", Namespace: testNamespace}, + Spec: seiv1alpha1.SeiNodeSpec{ChainID: testChainID, FullNode: &seiv1alpha1.FullNodeSpec{}}, + Status: seiv1alpha1.SeiNodeStatus{ + Phase: seiv1alpha1.PhaseRunning, + Endpoint: &seiv1alpha1.NodeEndpointStatus{ + TendermintRpc: srv.URL, + TendermintRest: "http://rest.svc:1317", + EvmJsonRpc: srv.URL, // 503 path + }, + }, + } + c := fake.NewClientBuilder(). + WithScheme(newScheme(t)). + WithObjects(node). + WithStatusSubresource(&seiv1alpha1.SeiNode{}). + Build() + + _, err := Run(context.Background(), c, Params{ + Role: testRole, + TemplatePath: tmplPath, + Vars: vars, + Replicas: 1, + RunningTimeout: time.Second, + FirstBlockTimeout: 150 * time.Millisecond, + PollInterval: 20 * time.Millisecond, + HTTPClient: srv.Client(), + Workflow: w, + }) + if err == nil { + t.Fatalf("Run should fail when EVM never binds even at TM height>0") + } + if tmHits.Load() == 0 { + t.Fatalf("TM stage was never reached") + } + // No workflow-vars CM must be written — publish was blocked. + cm := &corev1.ConfigMap{} + if err := c.Get(context.Background(), types.NamespacedName{Namespace: testNamespace, Name: testWorkflowVarsCM}, cm); err == nil { + t.Fatalf("workflow-vars CM was written despite blocked publish: %+v", cm.Data) + } +} + +// --- publish assembly (the contract test) --------------------------------- + +func fakeNode(name, evm, tmRPC, tmREST string) *seiv1alpha1.SeiNode { + return &seiv1alpha1.SeiNode{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: testNamespace}, + Status: seiv1alpha1.SeiNodeStatus{ + Phase: seiv1alpha1.PhaseRunning, + Endpoint: &seiv1alpha1.NodeEndpointStatus{ + EvmJsonRpc: evm, + TendermintRpc: tmRPC, + TendermintRest: tmREST, + }, + }, + } +} + +func TestPublishEndpoints_AssemblesAllFiveKeys(t *testing.T) { + w := testWorkflow() + nodes := []*seiv1alpha1.SeiNode{ + fakeNode("bench-1-rpc-0", "http://bench-1-rpc-0.nightly.svc:8545", "http://bench-1-rpc-0.nightly.svc:26657", "http://bench-1-rpc-0.nightly.svc:1317"), + fakeNode("bench-1-rpc-1", "http://bench-1-rpc-1.nightly.svc:8545", "http://bench-1-rpc-1.nightly.svc:26657", "http://bench-1-rpc-1.nightly.svc:1317"), + } + c := fake.NewClientBuilder().WithScheme(newScheme(t)).Build() + + evmList, err := publishEndpoints(context.Background(), c, w, testRole, testChainID, nodes) + if err != nil { + t.Fatalf("publishEndpoints: %v", err) + } + wantList := "http://bench-1-rpc-0.nightly.svc:8545,http://bench-1-rpc-1.nightly.svc:8545" + if evmList != wantList { + t.Fatalf("returned EVM list = %q, want %q", evmList, wantList) + } + + cm := &corev1.ConfigMap{} + if err := c.Get(context.Background(), types.NamespacedName{Namespace: testNamespace, Name: testWorkflowVarsCM}, cm); err != nil { + t.Fatalf("get CM: %v", err) + } + want := map[string]string{ + "CHAIN_ID": testChainID, + "RPC_EVM_RPC_LIST": wantList, + "RPC_EVM_RPC": "http://bench-1-rpc-0.nightly.svc:8545", // node-0 scalar + "RPC_TM_RPC": "http://bench-1-rpc-0.nightly.svc:26657", + "RPC_REST": "http://bench-1-rpc-0.nightly.svc:1317", + } + for k, v := range want { + if cm.Data[k] != v { + t.Errorf("CM[%s] = %q, want %q", k, cm.Data[k], v) + } + } +} + +func TestPublishEndpoints_EmptyGuards(t *testing.T) { + w := testWorkflow() + cases := []struct { + name string + nodes []*seiv1alpha1.SeiNode + }{ + { + "nil endpoint", + []*seiv1alpha1.SeiNode{{ObjectMeta: metav1.ObjectMeta{Name: "n0"}, Status: seiv1alpha1.SeiNodeStatus{Phase: seiv1alpha1.PhaseRunning}}}, + }, + { + "empty evmJsonRpc on node-1", + []*seiv1alpha1.SeiNode{ + fakeNode("n0", "http://n0:8545", "http://n0:26657", "http://n0:1317"), + fakeNode("n1", "", "http://n1:26657", "http://n1:1317"), + }, + }, + { + "empty tendermintRpc on node-0 (finding 6c)", + []*seiv1alpha1.SeiNode{fakeNode("n0", "http://n0:8545", "", "http://n0:1317")}, + }, + { + "empty tendermintRest on node-0", + []*seiv1alpha1.SeiNode{fakeNode("n0", "http://n0:8545", "http://n0:26657", "")}, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + c := fake.NewClientBuilder().WithScheme(newScheme(t)).Build() + _, err := publishEndpoints(context.Background(), c, w, testRole, testChainID, tc.nodes) + if err == nil { + t.Fatalf("expected infra-fail empty-guard error") + } + if taskruntime.ExitCodeFor(err) != taskruntime.ExitInfraError { + t.Fatalf("empty-guard should be infra-class, got exit code %d", taskruntime.ExitCodeFor(err)) + } + }) + } +} + +// --- Run end-to-end fan-out (naming + happy publish) ---------------------- + +func TestRun_FanOutNamingAndPublish(t *testing.T) { + w := testWorkflow() + tmplPath := writeTmpl(t, fullNodeTmpl) + vars := map[string]string{varKeyChainID: testChainID, varKeyImage: testImage} + + // Healthy TM + EVM for every node. + srv := healthyRPCServer(t) + defer srv.Close() + + // Pre-stage N=2 SeiNodes already Running with endpoints (the controller's + // job; we test seitask's wait+probe+publish, not reconcile). + objs := make([]*seiv1alpha1.SeiNode, 0, 2) + for i := range 2 { + n := &seiv1alpha1.SeiNode{ + ObjectMeta: metav1.ObjectMeta{Name: nodeName(testChainID+"-"+testRole, i), Namespace: testNamespace}, + Spec: seiv1alpha1.SeiNodeSpec{ChainID: testChainID, FullNode: &seiv1alpha1.FullNodeSpec{}}, + Status: seiv1alpha1.SeiNodeStatus{ + Phase: seiv1alpha1.PhaseRunning, + Endpoint: &seiv1alpha1.NodeEndpointStatus{ + EvmJsonRpc: srv.URL, + TendermintRpc: srv.URL, + TendermintRest: "http://rest.svc:1317", + }, + }, + } + objs = append(objs, n) + } + c := fake.NewClientBuilder(). + WithScheme(newScheme(t)). + WithObjects(objs[0], objs[1]). + WithStatusSubresource(&seiv1alpha1.SeiNode{}). + Build() + + res, err := Run(context.Background(), c, Params{ + Role: testRole, + Name: testChainID + "-" + testRole, + TemplatePath: tmplPath, + Vars: vars, + Replicas: 2, + Network: testNetwork, + RunningTimeout: time.Second, + FirstBlockTimeout: time.Second, + PollInterval: 10 * time.Millisecond, + HTTPClient: srv.Client(), + Workflow: w, + }) + if err != nil { + t.Fatalf("Run: %v", err) + } + wantNames := []string{testNode0, testNode1} + if len(res.Names) != 2 || res.Names[0] != wantNames[0] || res.Names[1] != wantNames[1] { + t.Fatalf("fan-out names = %v, want %v", res.Names, wantNames) + } + + // Pre-staged objects already exist (the AlreadyExists path), so the + // object-label producer contract is exercised by the stampMetadata unit + // tests; here we assert fan-out naming (above) and the publish CM (below). + cm := &corev1.ConfigMap{} + if err := c.Get(context.Background(), types.NamespacedName{Namespace: testNamespace, Name: testWorkflowVarsCM}, cm); err != nil { + t.Fatalf("get CM: %v", err) + } + if cm.Data["CHAIN_ID"] != testChainID { + t.Errorf("CHAIN_ID = %q", cm.Data["CHAIN_ID"]) + } + if cm.Data["RPC_EVM_RPC_LIST"] == "" { + t.Errorf("RPC_EVM_RPC_LIST empty") + } +} + +func healthyRPCServer(t *testing.T) *httptest.Server { + t.Helper() + var mu sync.Mutex + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + defer mu.Unlock() + if r.Method == http.MethodGet { // TM /status + _ = json.NewEncoder(w).Encode(map[string]any{ + tmSyncInfoField: map[string]any{tmHeightField: "12"}, + }) + return + } + _ = json.NewEncoder(w).Encode(map[string]any{"jsonrpc": "2.0", "id": 1, "result": "0x10"}) + })) + t.Cleanup(srv.Close) + return srv +}