Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion internal/temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,12 @@ type NexusOperationStartOptions struct {
Operation string
OperationId string
ScheduleToCloseTimeout cliext.FlagDuration
ScheduleToStartTimeout cliext.FlagDuration
StartToCloseTimeout cliext.FlagDuration
IdConflictPolicy cliext.FlagStringEnum
IdReusePolicy cliext.FlagStringEnum
SearchAttribute []string
StaticSummary string
FlagSet *pflag.FlagSet
}

Expand All @@ -373,13 +377,20 @@ func (v *NexusOperationStartOptions) BuildFlags(f *pflag.FlagSet) {
_ = cobra.MarkFlagRequired(f, "service")
f.StringVar(&v.Operation, "operation", "", "Nexus Operation name. Required.")
_ = cobra.MarkFlagRequired(f, "operation")
f.StringVar(&v.OperationId, "operation-id", "", "Nexus Operation ID. If not supplied, a unique ID is generated.")
f.StringVar(&v.OperationId, "operation-id", "", "Nexus Operation ID. Required.")
_ = cobra.MarkFlagRequired(f, "operation-id")
v.ScheduleToCloseTimeout = 0
f.Var(&v.ScheduleToCloseTimeout, "schedule-to-close-timeout", "Total time the operation is allowed to run.")
v.ScheduleToStartTimeout = 0
f.Var(&v.ScheduleToStartTimeout, "schedule-to-start-timeout", "Maximum time to wait for an operation to be started (or completed synchronously) by a handler.")
v.StartToCloseTimeout = 0
f.Var(&v.StartToCloseTimeout, "start-to-close-timeout", "Maximum time to wait for an asynchronous operation to complete after it has been started.")
v.IdConflictPolicy = cliext.NewFlagStringEnum([]string{"Fail", "UseExisting", "TerminateExisting"}, "")
f.Var(&v.IdConflictPolicy, "id-conflict-policy", "Policy for handling an Operation ID conflict with a running operation. Accepted values: Fail, UseExisting, TerminateExisting.")
v.IdReusePolicy = cliext.NewFlagStringEnum([]string{"AllowDuplicate", "RejectDuplicate"}, "")
f.Var(&v.IdReusePolicy, "id-reuse-policy", "Policy for re-using an Operation ID from a previously closed operation. Accepted values: AllowDuplicate, RejectDuplicate.")
f.StringArrayVar(&v.SearchAttribute, "search-attribute", nil, "Search Attribute in `KEY=VALUE` format. Keys must be identifiers, and values must be JSON values. For example: 'YourKey={\"your\": \"value\"}'. Can be passed multiple times.")
f.StringVar(&v.StaticSummary, "static-summary", "", "Static summary for the Nexus Operation for human consumption in UIs. Uses Temporal Markdown formatting, should be a single line. EXPERIMENTAL.")
}

type QueryModifiersOptions struct {
Expand Down
21 changes: 14 additions & 7 deletions internal/temporalcli/commands.nexus_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/temporalio/cli/internal/printer"
"go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -448,14 +447,22 @@ func buildNexusStartOptions(s *NexusOperationStartOptions, p *PayloadInputOption
Service: s.Service,
}

operationID := s.OperationId
if operationID == "" {
operationID = uuid.NewString()
}

opts := client.StartNexusOperationOptions{
ID: operationID,
ID: s.OperationId,
ScheduleToCloseTimeout: s.ScheduleToCloseTimeout.Duration(),
ScheduleToStartTimeout: s.ScheduleToStartTimeout.Duration(),
StartToCloseTimeout: s.StartToCloseTimeout.Duration(),
Summary: s.StaticSummary,
}

if len(s.SearchAttribute) > 0 {
saMap, err := stringKeysJSONValues(s.SearchAttribute, false)
if err != nil {
return nexusCl, nil, fmt.Errorf("invalid search attribute values: %w", err)
}
if opts.SearchAttributes, err = mapToSearchAttributes(saMap); err != nil {
return nexusCl, nil, err
}
}

if s.IdConflictPolicy.Value != "" {
Expand Down
122 changes: 106 additions & 16 deletions internal/temporalcli/commands.nexus_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,27 +426,15 @@ func (s *SharedServerSuite) TestNexusOperationStart_JSON() {
s.Equal(opID, result.OperationId)
s.NotEmpty(result.RunId)
}

func (s *SharedServerSuite) TestNexusOperationStart_ServerGeneratedID() {
endpointName, w := s.setupNexusEndpointAndWorker(s.T())
defer w.Stop()

func (s *SharedServerSuite) TestNexusOperationStart_OperationIDRequired() {
res := s.Execute(
"nexus", "operation", "start",
"--address", s.Address(),
"--endpoint", endpointName,
"--endpoint", "test-ep",
"--service", "test-service",
"--operation", "test-op",
"--input", `"hello"`,
"--output", "json",
)
s.NoError(res.Err)

var result struct {
OperationId string `json:"operationId"`
}
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &result))
s.NotEmpty(result.OperationId, "server should generate an operation ID")
s.Error(res.Err)
s.ErrorContains(res.Err, "operation-id")
}

func (s *SharedServerSuite) TestNexusOperationStart_ScheduleToCloseTimeout() {
Expand Down Expand Up @@ -725,4 +713,106 @@ func (s *SharedServerSuite) TestNexusOperationStart_MissingRequiredFlags() {
"--operation-id", "some-id",
)
s.Error(res.Err)

// Missing --operation-id
res = s.Execute(
"nexus", "operation", "start",
"--endpoint", "test-ep",
"--service", "test-service",
"--operation", "test-op",
)
s.Error(res.Err)
}

func (s *SharedServerSuite) TestNexusOperationExecute_MissingOperationID() {
res := s.Execute(
"nexus", "operation", "execute",
"--endpoint", "test-ep",
"--service", "test-service",
"--operation", "test-op",
)
s.Error(res.Err)
}

func (s *SharedServerSuite) TestNexusOperationStart_StaticSummary() {
endpointName, w := s.setupNexusEndpointAndWorker(s.T())
defer w.Stop()

opID := "summary-op-" + uuid.NewString()[:8]
summary := "this is the operation summary"

res := s.Execute(
"nexus", "operation", "start",
"--address", s.Address(),
"--endpoint", endpointName,
"--service", "test-service",
"--operation", "test-op",
"--operation-id", opID,
"--input", `"hello"`,
"--static-summary", summary,
)
s.NoError(res.Err)

// Describe and verify the summary is reported back.
s.Eventually(func() bool {
res = s.Execute(
"nexus", "operation", "describe",
"--address", s.Address(),
"--operation-id", opID,
)
return res.Err == nil && strings.Contains(res.Stdout.String(), summary)
}, 30*time.Second, 500*time.Millisecond)
}

func (s *SharedServerSuite) TestNexusOperationStart_SearchAttribute() {
endpointName, w := s.setupNexusEndpointAndWorker(s.T())
defer w.Stop()

opID := "sa-op-" + uuid.NewString()[:8]
uniqueKW := "nexus-sa-" + uuid.NewString()[:8]

res := s.Execute(
"nexus", "operation", "start",
"--address", s.Address(),
"--endpoint", endpointName,
"--service", "test-service",
"--operation", "test-op",
"--operation-id", opID,
"--input", `"hello"`,
"--search-attribute", fmt.Sprintf(`CustomKeywordField="%s"`, uniqueKW),
)
s.NoError(res.Err)

// List with a query filter on the search attribute — confirms the SA was
// attached and indexed.
s.Eventually(func() bool {
res = s.Execute(
"nexus", "operation", "list",
"--address", s.Address(),
"--query", fmt.Sprintf(`CustomKeywordField = "%s"`, uniqueKW),
)
return res.Err == nil && strings.Contains(res.Stdout.String(), opID)
}, 30*time.Second, 500*time.Millisecond)
}

func (s *SharedServerSuite) TestNexusOperationStart_Timeouts() {
endpointName, w := s.setupNexusEndpointAndWorker(s.T())
defer w.Stop()

opID := "timeouts-op-" + uuid.NewString()[:8]

res := s.Execute(
"nexus", "operation", "start",
"--address", s.Address(),
"--endpoint", endpointName,
"--service", "test-service",
"--operation", "test-op",
"--operation-id", opID,
"--input", `"hello"`,
"--schedule-to-close-timeout", "1m",
"--schedule-to-start-timeout", "30s",
"--start-to-close-timeout", "30s",
)
s.NoError(res.Err)
s.Contains(res.Stdout.String(), opID)
}
28 changes: 25 additions & 3 deletions internal/temporalcli/commands.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5222,13 +5222,22 @@ option-sets:
required: true
- name: operation-id
type: string
description: |
Nexus Operation ID.
If not supplied, a unique ID is generated.
description: Nexus Operation ID.
required: true
- name: schedule-to-close-timeout
type: duration
description: |
Total time the operation is allowed to run.
- name: schedule-to-start-timeout
type: duration
description: |
Maximum time to wait for an operation to be started (or completed
synchronously) by a handler.
- name: start-to-close-timeout
type: duration
description: |
Maximum time to wait for an asynchronous operation to complete
after it has been started.
- name: id-conflict-policy
type: string-enum
description: |
Expand All @@ -5246,6 +5255,19 @@ option-sets:
enum-values:
- AllowDuplicate
- RejectDuplicate
- name: search-attribute
type: string[]
description: |
Search Attribute in `KEY=VALUE` format.
Keys must be identifiers, and values must be JSON values.
For example: 'YourKey={"your": "value"}'.
Can be passed multiple times.
- name: static-summary
type: string
experimental: true
description: |
Static summary for the Nexus Operation for human consumption in UIs.
Uses Temporal Markdown formatting, should be a single line.

- name: query-modifiers
options:
Expand Down
Loading