Skip to content

Commit db50911

Browse files
authored
Add source and target postgres connectivity checks (#902)
#### Description This PR adds a new preflight check named `connectivity`. The check command builds a list of `preflight.Check` instances from `*stream.Config` via a category registry (`Builders`), runs them through a `preflight.Run` engine that captures every error without stopping on individual failures, and hands the resulting `Report` to a `ReportPrinter` that renders human or JSON output. `Categories` (today only connectivity) are selected by per-category boolean flags on `pgstream check`. If no category is selected, `pgstream` runs all of them. Adding a new check is a mechanical edit: * one struct satisfying the two-method `Check` interface * one entry in Builders * one CLI flag There is a Claude file to aid adding a new check. ##### Related Issue(s) - Related to #897 #### Type of Change Please select the relevant option(s): - [ ] 🐛 Bug fix (non-breaking change that fixes an issue) - [x] ✨ New feature (non-breaking change that adds functionality) - [ ] 💥 Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] 📚 Documentation update - [ ] 🔧 Refactoring (no functional changes) - [ ] ⚡ Performance improvement - [ ] 🧪 Test coverage improvement - [ ] 🔨 Build/CI changes - [ ] 🧹 Code cleanup #### Changes Made - Added connectivity check for pg2pg connections - `pkg/stream/preflight` package for storing building blocks - Added a new Claude file for adding new checks. #### Testing - [x] Unit tests added/updated - [ ] Integration tests added/updated - [x] Manual testing performed - [x] All existing tests pass #### Checklist - [x] Code follows project style guidelines - [x] Self-review completed - [x] Code is well-commented - [x] Documentation updated where necessary
1 parent 00f4897 commit db50911

10 files changed

Lines changed: 454 additions & 6 deletions

File tree

cli-definition.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
"use": "check",
88
"example": "\n\tpgstream check -c pg2pg.env\n\tpgstream check -c pg2pg.yaml --json\n\t",
99
"flags": [
10+
{
11+
"name": "connectivity",
12+
"description": "Run connectivity checks against the configured source and target",
13+
"default": "false"
14+
},
1015
{
1116
"name": "json",
1217
"description": "Output the check report in JSON format",

cmd/check_cmd.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,32 @@
33
package cmd
44

55
import (
6+
"context"
7+
"errors"
68
"fmt"
79

810
"github.com/pterm/pterm"
911
"github.com/spf13/cobra"
1012
"github.com/spf13/viper"
1113

1214
"github.com/xataio/pgstream/cmd/config"
15+
"github.com/xataio/pgstream/pkg/stream/preflight"
1316
)
1417

18+
var errCheckFailed = errors.New("checks reported errors")
19+
20+
// selectedCategories returns the categories whose CLI flag was set to true.
21+
// An empty result tells preflight.BuildChecks to run every registered category.
22+
func selectedCategories(cmd *cobra.Command) []preflight.Category {
23+
var selected []preflight.Category
24+
for _, b := range preflight.Builders {
25+
if on, _ := cmd.Flags().GetBool(b.Flag); on {
26+
selected = append(selected, b.Category)
27+
}
28+
}
29+
return selected
30+
}
31+
1532
var checkCmd = &cobra.Command{
1633
Use: "check",
1734
Short: "Runs pre-migration checks to catch blocking issues before snapshot/run",
@@ -24,15 +41,33 @@ var checkCmd = &cobra.Command{
2441
if err != nil {
2542
return fmt.Errorf("parsing stream config: %w", err)
2643
}
27-
_ = streamConfig
2844

29-
// TODO: run checks. See https://github.com/xataio/pgstream/issues/897 for the
30-
// list of checks to implement.
45+
checks := preflight.BuildChecks(streamConfig, selectedCategories(cmd))
46+
if len(checks) == 0 {
47+
sp.Success("no checks to run")
48+
return nil
49+
}
50+
51+
report := preflight.Run(context.Background(), checks, preflight.WithProgress(func(idx, total int, name string) {
52+
sp.UpdateText(fmt.Sprintf("running %d/%d checks: %s", idx, total, name))
53+
}))
3154

32-
sp.Success("no checks to run")
55+
if report.HasErrors() {
56+
sp.Stop()
57+
} else {
58+
sp.Success("pgstream checks passed")
59+
}
60+
61+
if err := print(cmd, preflight.ReportPrinter{Report: report}); err != nil {
62+
return fmt.Errorf("failed to format check report: %w", err)
63+
}
64+
65+
if report.HasErrors() {
66+
return errCheckFailed
67+
}
3368
return nil
3469
}()
35-
if err != nil {
70+
if err != nil && !errors.Is(err, errCheckFailed) {
3671
sp.Fail(err.Error())
3772
}
3873

cmd/config/config_yaml_fuzz_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ func FuzzYAMLConfigProperties(f *testing.F) {
9090
}()
9191

9292
cfg := generateYAMLConfigFromProperties(
93-
sourceMode, snapshotMode, targetMode, searchEngine, modifiersMode, batchSize, timeout)
93+
sourceMode, snapshotMode, targetMode, searchEngine, modifiersMode, batchSize, timeout,
94+
)
9495

9596
// Test marshaling/unmarshaling
9697
data, err := yaml.Marshal(cfg)

cmd/root_cmd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ func Prepare() *cobra.Command {
119119
checkCmd.Flags().String("postgres-url", "", "Source postgres URL to run checks against")
120120
checkCmd.Flags().String("target-url", "", "Target URL to run checks against")
121121
checkCmd.Flags().Bool("json", false, "Output the check report in JSON format")
122+
checkCmd.Flags().Bool("connectivity", false, "Run connectivity checks against the configured source and target")
122123

123124
// Flag binding for root cmd
124125
rootFlagBinding(rootCmd)

pkg/stream/preflight/CLAUDE.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# CLAUDE.md
2+
3+
Guidance for Claude Code when working inside `pkg/stream/preflight`. The planned check set lives in `docs/migration_preflight_issue.md`; consult it before designing a new check.
4+
5+
## Package shape
6+
7+
- `preflight.go``Check` interface (`Name()` + `Run(ctx) ([]Finding, error)`), `Finding`, `CheckResult`, `Report`, `Run(ctx, []Check, ...RunOption)` engine.
8+
- `printer.go``ReportPrinter{Report}` is the only thing that formats reports. The `Report` struct itself stays pure data.
9+
- `builder.go``Builder` struct, `Builders` registry slice, per-category builder functions (`BuildConnectivityChecks`, …), `BuildChecks(cfg, selected)`.
10+
- One file per concrete check (`connectivity.go`, future `wal_level.go`, …).
11+
12+
## Adding a new check
13+
14+
Adding a check is meant to be a small, mechanical edit. Keep it that way.
15+
16+
1. **Pick a category.** Categories group checks of the same concern (`connectivity`, `replication`, `access`, `schema`, `resources`).
17+
- Joining an existing category: skip to step 2.
18+
- Creating a new one: add a `Category` constant in `preflight.go`, a builder func + `Builders` entry in `builder.go`, and a boolean flag on `checkCmd` in `cmd/root_cmd.go`. The flag string must match `Builder.Flag`.
19+
2. **Implement the check.** New struct in `<thing>.go`, satisfying the `Check` interface.
20+
- **Every `Finding` is blocking.** A check that finds nothing wrong returns a `nil` slice.
21+
- **Return `error` only when the check itself couldn't run** (timeout, internal bug, malformed input). A detected problem is a `Finding`, not an error.
22+
- **Put remediation in `Finding.Message`** — the user should be able to act on it without reading source.
23+
3. **Materialise instances in the category builder** (e.g. `BuildConnectivityChecks`). The builder is the applicability gate: it reads `*stream.Config` and decides which instances are relevant. Inapplicable checks are silently omitted today; an explicit "skipped: <reason>" mechanism is deferred (see `docs/migration_preflight_issue.md` "Architecture decisions" #6).
24+
4. **Tests.** Unit-test the check directly against mocked dependencies (`internal/postgres/mocks` has the postgres conn mock). For new categories, exercise the builder selection path through the cmd layer too.
25+
26+
## Do not
27+
28+
- Do not add `init()`-time registration, dependency injection frameworks, or other indirection — `Builders` is the registry, keep it a plain literal slice.
29+
- Do not move rendering logic onto `Report`. `ReportPrinter` owns formatting; `Report` stays data-only.
30+
- Do not import `pkg/stream` from anywhere except `builder.go`. Engine code (`preflight.go`, `printer.go`, individual check files) stays stream-agnostic so it can be reused.

pkg/stream/preflight/builder.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
package preflight
4+
5+
import "github.com/xataio/pgstream/pkg/stream"
6+
7+
// Builder turns a stream.Config into the concrete checks for a category. Each
8+
// new category adds an entry to Builders and a matching CLI flag in
9+
// cmd/root_cmd.go.
10+
type Builder struct {
11+
Category Category
12+
Flag string
13+
Build func(*stream.Config) []Check
14+
}
15+
16+
// Builders is the registry of category builders. Adding a new category = one
17+
// Builder entry here + one flag declaration on checkCmd.
18+
var Builders = []Builder{
19+
{CategoryConnectivity, "connectivity", BuildConnectivityChecks},
20+
}
21+
22+
// BuildConnectivityChecks returns the connectivity checks applicable to cfg.
23+
// A source check is added when a source postgres URL is configured; a target
24+
// check is added when a postgres target is configured.
25+
func BuildConnectivityChecks(cfg *stream.Config) []Check {
26+
checks := []Check{}
27+
if url := cfg.SourcePostgresURL(); url != "" {
28+
checks = append(checks, &ConnectivityCheck{Label: "source", URL: url})
29+
}
30+
if cfg.Processor.Postgres != nil {
31+
if url := cfg.Processor.Postgres.BatchWriter.URL; url != "" {
32+
checks = append(checks, &ConnectivityCheck{Label: "target", URL: url})
33+
}
34+
}
35+
return checks
36+
}
37+
38+
// BuildChecks returns the concrete checks for the selected categories,
39+
// preserving the registration order in Builders. An empty selection runs every
40+
// registered category.
41+
func BuildChecks(cfg *stream.Config, selected []Category) []Check {
42+
want := make(map[Category]bool, len(selected))
43+
for _, c := range selected {
44+
want[c] = true
45+
}
46+
checks := []Check{}
47+
for _, b := range Builders {
48+
if len(want) == 0 || want[b.Category] {
49+
checks = append(checks, b.Build(cfg)...)
50+
}
51+
}
52+
return checks
53+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
package preflight
4+
5+
import (
6+
"context"
7+
"fmt"
8+
9+
"github.com/xataio/pgstream/internal/postgres"
10+
)
11+
12+
// ConnectivityCheck verifies a Postgres URL accepts a connection and answers a
13+
// ping. A connection or ping failure is reported as a finding (not a check
14+
// error), since establishing connectivity is the purpose of the check.
15+
type ConnectivityCheck struct {
16+
Label string
17+
URL string
18+
}
19+
20+
func (c *ConnectivityCheck) Name() string {
21+
return c.Label + " connectivity"
22+
}
23+
24+
func (c *ConnectivityCheck) Run(ctx context.Context) ([]Finding, error) {
25+
conn, err := postgres.NewConn(ctx, c.URL)
26+
if err != nil {
27+
return []Finding{{Message: fmt.Sprintf("unable to connect: %v", err)}}, nil
28+
}
29+
defer conn.Close(ctx)
30+
31+
if err := conn.Ping(ctx); err != nil {
32+
return []Finding{{Message: fmt.Sprintf("ping failed: %v", err)}}, nil
33+
}
34+
35+
return nil, nil
36+
}

pkg/stream/preflight/preflight.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
3+
package preflight
4+
5+
import (
6+
"context"
7+
"encoding/json"
8+
)
9+
10+
// Category groups checks of the same concern so callers can opt in by
11+
// category via CLI flags. New categories are added as new check sets land —
12+
// see docs/migration_preflight_issue.md for the planned ones.
13+
type Category string
14+
15+
const (
16+
CategoryConnectivity Category = "connectivity"
17+
)
18+
19+
// Finding describes a single issue detected by a Check. Every finding is an
20+
// error — a check that finds nothing wrong returns no findings at all.
21+
type Finding struct {
22+
Message string `json:"message"`
23+
}
24+
25+
// Check is the minimal contract every preflight check must satisfy. Run returns
26+
// the findings the check produced; a non-nil error means the check itself could
27+
// not complete (distinct from finding a problem with the system under test).
28+
type Check interface {
29+
Name() string
30+
Run(ctx context.Context) ([]Finding, error)
31+
}
32+
33+
// CheckResult bundles a check's name with whatever it produced.
34+
type CheckResult struct {
35+
Name string `json:"name"`
36+
Findings []Finding `json:"findings"`
37+
Err error `json:"-"`
38+
}
39+
40+
// MarshalJSON renders Err as a string so the report is consumable from a
41+
// non-Go process. The default error marshaling drops the message.
42+
func (r CheckResult) MarshalJSON() ([]byte, error) {
43+
out := struct {
44+
Name string `json:"name"`
45+
Findings []Finding `json:"findings"`
46+
Error string `json:"error,omitempty"`
47+
}{
48+
Name: r.Name,
49+
Findings: r.Findings,
50+
}
51+
if r.Err != nil {
52+
out.Error = r.Err.Error()
53+
}
54+
return json.Marshal(out)
55+
}
56+
57+
// Report is the outcome of running a set of checks.
58+
type Report struct {
59+
Results []CheckResult `json:"results"`
60+
}
61+
62+
// ProgressFunc is invoked just before each check runs. idx is 1-based.
63+
type ProgressFunc func(idx, total int, name string)
64+
65+
// RunOption configures Run.
66+
type RunOption func(*runOptions)
67+
68+
type runOptions struct {
69+
progress ProgressFunc
70+
}
71+
72+
// WithProgress installs a callback invoked before each check runs. Useful for
73+
// updating a spinner or log line with "running X of N: <name>".
74+
func WithProgress(fn ProgressFunc) RunOption {
75+
return func(o *runOptions) { o.progress = fn }
76+
}
77+
78+
// Run executes every check in order. A check returning an error does not stop
79+
// the run; subsequent checks still execute and the error is captured in the
80+
// report alongside the findings.
81+
func Run(ctx context.Context, checks []Check, opts ...RunOption) Report {
82+
var ro runOptions
83+
for _, opt := range opts {
84+
opt(&ro)
85+
}
86+
87+
results := make([]CheckResult, 0, len(checks))
88+
total := len(checks)
89+
for i, c := range checks {
90+
if ro.progress != nil {
91+
ro.progress(i+1, total, c.Name())
92+
}
93+
findings, err := c.Run(ctx)
94+
results = append(results, CheckResult{
95+
Name: c.Name(),
96+
Findings: findings,
97+
Err: err,
98+
})
99+
}
100+
return Report{Results: results}
101+
}
102+
103+
// HasErrors reports whether any check produced findings or failed to complete.
104+
func (r Report) HasErrors() bool {
105+
for _, res := range r.Results {
106+
if res.Err != nil || len(res.Findings) > 0 {
107+
return true
108+
}
109+
}
110+
return false
111+
}

0 commit comments

Comments
 (0)