Skip to content

Commit 53d0026

Browse files
committed
https to reactors
1 parent 4cd3457 commit 53d0026

File tree

9 files changed

+182
-46
lines changed

9 files changed

+182
-46
lines changed

client/build.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ type Document[Val any] struct {
131131
Mut Mutations ` + "`json:\"mut\",omitempty`" + `
132132
}
133133
134+
type ValidationRequest[Val any] struct {
135+
Current *Document[Val] ` + "`json:\"current,omitempty\"`" + `
136+
Pending *Document[Val] ` + "`json:\"pending,omitempty\"`" + `
137+
}
138+
134139
{{range $t, $n := .Types}}
135140
type {{$t}} Document[{{$t}}Val]
136141
{{end}}

reactor/builtins.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ func (*immutableReactor) Ready(model *openapi.Document, args interface{}) (inter
1515

1616
func (*immutableReactor) Stop() {}
1717

18-
func (*immutableReactor) Validate(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
18+
func (*immutableReactor) Validate(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
1919
if old != nil && nuw != nil {
2020
return nil, fmt.Errorf("Document is immutable")
2121
}
2222
return nuw, nil
2323
}
2424

25-
func (*immutableReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
25+
func (*immutableReactor) Reconcile(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
2626
return nil
2727
}
2828

reactor/cue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (cr *cueReactor) Ready(model *openapi.Document, args interface{}) (interfac
8484
return &cueReady{cctx: ctx, schema: schema2}, nil
8585
}
8686

87-
func (jsr *cueReactor) Validate(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
87+
func (jsr *cueReactor) Validate(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
8888

8989
if nuw == nil {
9090
return nil, nil
@@ -140,6 +140,6 @@ func (jsr *cueReactor) Validate(ctx context.Context, old *openapi.Document, nuw
140140
return nuw, nil
141141
}
142142

143-
func (cr *cueReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
143+
func (cr *cueReactor) Reconcile(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
144144
return nil
145145
}

reactor/http.go

Lines changed: 88 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ import (
88
openapi "github.com/aep/apogy/api/go"
99
"log/slog"
1010
"net/http"
11+
"strings"
1112
"time"
1213
)
1314

1415
type HttpReactor struct {
15-
url string
16+
validator string
17+
reconciler string
1618
}
1719

1820
func StartHttpReactor(doc *openapi.Document) (Runtime, error) {
@@ -22,13 +24,15 @@ func StartHttpReactor(doc *openapi.Document) (Runtime, error) {
2224
return nil, fmt.Errorf("val must not be empty")
2325
}
2426

25-
url, ok := val["url"].(string)
26-
if !ok || url == "" {
27-
return nil, fmt.Errorf("val.url must be a non-empty string")
27+
validator, _ := val["validator"].(string)
28+
reconciler, _ := val["reconciler"].(string)
29+
if validator == "" && reconciler == "" {
30+
return nil, fmt.Errorf("set val.validator or val.reconciler to a url")
2831
}
2932

3033
return &HttpReactor{
31-
url: url,
34+
validator: validator,
35+
reconciler: reconciler,
3236
}, nil
3337
}
3438

@@ -39,11 +43,57 @@ func (*HttpReactor) Ready(model *openapi.Document, args interface{}) (interface{
3943
return nil, nil
4044
}
4145

42-
func (hr *HttpReactor) Validate(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
46+
func (hr *HttpReactor) Validate(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
47+
48+
if hr.validator == "" {
49+
return nuw, nil
50+
}
51+
52+
payload := openapi.ValidationRequest{
53+
Current: old,
54+
Pending: nuw,
55+
}
56+
57+
payloadBytes, err := json.Marshal(payload)
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
resp, err := hr.makeValidationRequest(ctx, hr.validator, payloadBytes, ro)
63+
if err != nil {
64+
return nil, fmt.Errorf("reconciler failed to respond in time: %w", err)
65+
}
66+
defer resp.Body.Close()
67+
68+
if resp.StatusCode != http.StatusOK {
69+
var errorResp openapi.ErrorResponse
70+
decoder := json.NewDecoder(resp.Body)
71+
if err := decoder.Decode(&errorResp); err != nil || errorResp.Message == nil {
72+
return nil, fmt.Errorf("reconciler failed with status %d %s", resp.StatusCode, resp.Status)
73+
}
74+
return nil, fmt.Errorf("reconcilerfailed: %s", *errorResp.Message)
75+
}
76+
77+
// Check for validation error in response
78+
var response openapi.ValidationResponse
79+
decoder := json.NewDecoder(resp.Body)
80+
if err := decoder.Decode(&response); err != nil {
81+
return nil, fmt.Errorf("reconciler responded with invalid body: %v", err)
82+
}
83+
84+
if response.Reject != nil && response.Reject.Message != nil {
85+
return nil, fmt.Errorf("reconciler rejected: %s", *response.Reject.Message)
86+
}
87+
4388
return nuw, nil
4489
}
4590

46-
func (hr *HttpReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
91+
func (hr *HttpReactor) Reconcile(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
92+
93+
if hr.reconciler == "" {
94+
return nil
95+
}
96+
4797
payload := openapi.ValidationRequest{
4898
Current: old,
4999
Pending: nuw,
@@ -54,7 +104,7 @@ func (hr *HttpReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw
54104
return err
55105
}
56106

57-
resp, err := hr.makeValidationRequest(ctx, hr.url, payloadBytes)
107+
resp, err := hr.makeValidationRequest(ctx, hr.reconciler, payloadBytes, ro)
58108
if err != nil {
59109
return fmt.Errorf("reconciler failed to respond in time: %w", err)
60110
}
@@ -73,7 +123,7 @@ func (hr *HttpReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw
73123
var response openapi.ValidationResponse
74124
decoder := json.NewDecoder(resp.Body)
75125
if err := decoder.Decode(&response); err != nil {
76-
return fmt.Errorf("reconciler responded with invalid msgpack: %v", err)
126+
return fmt.Errorf("reconciler responded with invalid body: %v", err)
77127
}
78128

79129
if response.Reject != nil && response.Reject.Message != nil {
@@ -83,16 +133,33 @@ func (hr *HttpReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw
83133
return nil
84134
}
85135

86-
func (hr *HttpReactor) makeValidationRequest(ctx context.Context, url string, payloadBytes []byte) (*http.Response, error) {
136+
func (hr *HttpReactor) makeValidationRequest(ctx context.Context, url string, payloadBytes []byte, ro *Reactor) (*http.Response, error) {
87137
ctx, cancel := context.WithTimeout(ctx, time.Second)
88138
defer cancel()
89139

90140
initialBackoff := 100 * time.Millisecond
91141
maxAttempts := 5
92142

143+
// Create a custom client with mTLS for https URLs
144+
client := &http.Client{}
145+
93146
backoff := initialBackoff
94147
for attempt := 1; attempt <= maxAttempts; attempt++ {
95-
resp, err := http.Post(url, "application/json", bytes.NewBuffer(payloadBytes))
148+
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(payloadBytes))
149+
if err != nil {
150+
return nil, err
151+
}
152+
req.Header.Set("Content-Type", "application/json")
153+
154+
var resp *http.Response
155+
156+
// If URL starts with https://, use mTLS
157+
if strings.HasPrefix(url, "https://") {
158+
resp, err = hr.doMTLSRequest(req, ro)
159+
} else {
160+
resp, err = client.Do(req)
161+
}
162+
96163
if err != nil {
97164
slog.Error("validator failed", "validator", url, "error", err)
98165
if attempt == maxAttempts {
@@ -119,3 +186,13 @@ func (hr *HttpReactor) makeValidationRequest(ctx context.Context, url string, pa
119186
}
120187
return nil, fmt.Errorf("request failed after %d attempts", maxAttempts)
121188
}
189+
190+
func (hr *HttpReactor) doMTLSRequest(req *http.Request, ro *Reactor) (*http.Response, error) {
191+
// Check if TLS client is already created
192+
if ro.tlsClient == nil {
193+
return nil, fmt.Errorf("apogy cant make https request to reactor since its not running with tls")
194+
}
195+
196+
// Use the pre-configured HTTP client
197+
return ro.tlsClient.Do(req)
198+
}

reactor/reactor.go

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@ package reactor
22

33
import (
44
"context"
5+
"crypto/tls"
6+
"crypto/x509"
57
"fmt"
68
"log/slog"
9+
"net/http"
10+
"os"
711
"sync"
812
"time"
913

@@ -12,8 +16,8 @@ import (
1216

1317
type Runtime interface {
1418
Ready(model *openapi.Document, arg interface{}) (interface{}, error)
15-
Validate(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error)
16-
Reconcile(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) error
19+
Validate(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error)
20+
Reconcile(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) error
1721
Stop()
1822
}
1923

@@ -26,13 +30,52 @@ type Reactor struct {
2630
lock sync.RWMutex
2731
running map[string]Runtime
2832
models2reactors map[string][]reactorReadyArgs
33+
34+
tlsConfig *tls.Config
35+
tlsClient *http.Client
2936
}
3037

31-
func NewReactor() *Reactor {
38+
func NewReactor(caCertPath, serverCertPath, serverKeyPath string) *Reactor {
3239
ro := &Reactor{
3340
running: make(map[string]Runtime),
3441
models2reactors: make(map[string][]reactorReadyArgs),
3542
}
43+
44+
// Load TLS configuration if paths are provided
45+
if caCertPath != "" && serverCertPath != "" && serverKeyPath != "" {
46+
// Load the CA cert
47+
caCert, err := os.ReadFile(caCertPath)
48+
if err != nil {
49+
slog.Error("failed to read CA certificate", "error", err)
50+
} else {
51+
// Create a cert pool and add the CA cert
52+
caCertPool := x509.NewCertPool()
53+
if !caCertPool.AppendCertsFromPEM(caCert) {
54+
slog.Error("failed to append CA certificate to pool")
55+
} else {
56+
// Load client certificate
57+
cert, err := tls.LoadX509KeyPair(serverCertPath, serverKeyPath)
58+
if err != nil {
59+
slog.Error("failed to load client certificate/key", "error", err)
60+
} else {
61+
// Create a TLS config with the certificate and key
62+
ro.tlsConfig = &tls.Config{
63+
Certificates: []tls.Certificate{cert},
64+
RootCAs: caCertPool,
65+
}
66+
67+
// Create a reusable HTTP client with TLS configuration
68+
ro.tlsClient = &http.Client{
69+
Transport: &http.Transport{
70+
TLSClientConfig: ro.tlsConfig,
71+
},
72+
}
73+
slog.Info("TLS configuration loaded successfully")
74+
}
75+
}
76+
}
77+
}
78+
3679
ro.startBuiltins()
3780
return ro
3881
}
@@ -129,7 +172,7 @@ func (ro *Reactor) Validate(ctx context.Context, old *openapi.Document, nuw *ope
129172

130173
if rt != nil {
131174
var err error
132-
nuw, err = rt.Validate(ctx, old, nuw, runArgs.args)
175+
nuw, err = rt.Validate(ctx, ro, old, nuw, runArgs.args)
133176
if err != nil {
134177
return nuw, fmt.Errorf("reactor %s rejected change: %w", runArgs.reactorName, err)
135178
}
@@ -181,7 +224,7 @@ func (ro *Reactor) Reconcile(ctx context.Context, old *openapi.Document, nuw *op
181224
var delay = 10 * time.Millisecond
182225
for i := 0; ; i++ {
183226

184-
err := rt.Reconcile(ctx, old, nuw, a.args)
227+
err := rt.Reconcile(ctx, ro, old, nuw, a.args)
185228

186229
if err == nil {
187230
break

reactor/yema.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (*yemaReactor) Ready(model *openapi.Document, args interface{}) (interface{
4242
func (cr *yemaReactor) Stop() {
4343
}
4444

45-
func (yr *yemaReactor) Validate(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
45+
func (yr *yemaReactor) Validate(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
4646

4747
if nuw == nil {
4848
return nil, nil
@@ -72,6 +72,6 @@ func (yr *yemaReactor) Validate(ctx context.Context, old *openapi.Document, nuw
7272
return nuw, nil
7373
}
7474

75-
func (cr *yemaReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
75+
func (cr *yemaReactor) Reconcile(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
7676
return nil
7777
}

server/crud.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,6 @@ func (s *server) putDocument1(c echo.Context, ctx context.Context, doc_ *openapi
233233
err = w2.Commit(ctx)
234234
commitDuration := time.Since(commitStart)
235235

236-
// Always record the commit duration, even if it failed
237236
kvCommitDuration.WithLabelValues("write_transaction").Observe(commitDuration.Seconds())
238237

239238
if err != nil {
@@ -253,7 +252,7 @@ func (s *server) putDocument1(c echo.Context, ctx context.Context, doc_ *openapi
253252

254253
err = s.ro.Reconcile(ctx, old, doc)
255254
if err != nil {
256-
return echo.NewHTTPError(http.StatusBadGateway, err.Error())
255+
return echo.NewHTTPError(http.StatusUnprocessableEntity, err.Error())
257256
}
258257

259258
return nil

server/crud_test.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@ import (
77
"net/http/httptest"
88
"sync"
99
"testing"
10+
"time"
1011

1112
"encoding/json"
1213
openapi "github.com/aep/apogy/api/go"
1314
"github.com/aep/apogy/bus"
1415
"github.com/aep/apogy/kv"
16+
"github.com/aep/apogy/reactor"
1517
"github.com/labstack/echo/v4"
18+
"github.com/maypok86/otter"
1619
"github.com/stretchr/testify/assert"
1720
)
1821

@@ -27,7 +30,20 @@ func setupTestServer(t *testing.T) (*echo.Echo, *server) {
2730
t.Fatalf("Failed to create test bus: %v", err)
2831
}
2932

30-
s := newServer(kv, bs)
33+
cache, err := otter.MustBuilder[string, *Model](100000).
34+
WithTTL(60 * time.Second).
35+
Build()
36+
37+
if err != nil {
38+
panic(err)
39+
}
40+
41+
s := &server{
42+
kv: kv,
43+
bs: bs,
44+
modelCache: cache,
45+
ro: reactor.NewReactor("", "", ""),
46+
}
3147
e := echo.New()
3248
e.Binder = &Binder{
3349
defaultBinder: &echo.DefaultBinder{},
@@ -140,10 +156,10 @@ func TestPutDocument_Reactor(t *testing.T) {
140156
Model: "Reactor",
141157
Id: "asdasd.example.com",
142158
Val: &map[string]interface{}{
143-
"runtime": "http",
144-
"url": "https://google.com",
145-
"name": "Test Reactor",
146-
"type": "test",
159+
"runtime": "http",
160+
"validator": "https://google.com",
161+
"name": "Test Reactor",
162+
"type": "test",
147163
},
148164
}
149165

0 commit comments

Comments
 (0)