Skip to content

Commit 8c259d4

Browse files
committed
https to reactors
1 parent 4cd3457 commit 8c259d4

File tree

7 files changed

+117
-34
lines changed

7 files changed

+117
-34
lines changed

reactor/builtins.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ func (*immutableReactor) Ready(model *openapi.Document, args interface{}) (inter
1515

1616
func (*immutableReactor) Stop() {}
1717

18-
func (*immutableReactor) Validate(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
18+
func (*immutableReactor) Validate(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
1919
if old != nil && nuw != nil {
2020
return nil, fmt.Errorf("Document is immutable")
2121
}
2222
return nuw, nil
2323
}
2424

25-
func (*immutableReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
25+
func (*immutableReactor) Reconcile(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
2626
return nil
2727
}
2828

reactor/cue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (cr *cueReactor) Ready(model *openapi.Document, args interface{}) (interfac
8484
return &cueReady{cctx: ctx, schema: schema2}, nil
8585
}
8686

87-
func (jsr *cueReactor) Validate(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
87+
func (jsr *cueReactor) Validate(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
8888

8989
if nuw == nil {
9090
return nil, nil
@@ -140,6 +140,6 @@ func (jsr *cueReactor) Validate(ctx context.Context, old *openapi.Document, nuw
140140
return nuw, nil
141141
}
142142

143-
func (cr *cueReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
143+
func (cr *cueReactor) Reconcile(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
144144
return nil
145145
}

reactor/http.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
openapi "github.com/aep/apogy/api/go"
99
"log/slog"
1010
"net/http"
11+
"strings"
1112
"time"
1213
)
1314

@@ -39,11 +40,11 @@ func (*HttpReactor) Ready(model *openapi.Document, args interface{}) (interface{
3940
return nil, nil
4041
}
4142

42-
func (hr *HttpReactor) Validate(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
43+
func (hr *HttpReactor) Validate(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
4344
return nuw, nil
4445
}
4546

46-
func (hr *HttpReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
47+
func (hr *HttpReactor) Reconcile(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
4748
payload := openapi.ValidationRequest{
4849
Current: old,
4950
Pending: nuw,
@@ -54,7 +55,7 @@ func (hr *HttpReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw
5455
return err
5556
}
5657

57-
resp, err := hr.makeValidationRequest(ctx, hr.url, payloadBytes)
58+
resp, err := hr.makeValidationRequest(ctx, hr.url, payloadBytes, ro)
5859
if err != nil {
5960
return fmt.Errorf("reconciler failed to respond in time: %w", err)
6061
}
@@ -83,16 +84,33 @@ func (hr *HttpReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw
8384
return nil
8485
}
8586

86-
func (hr *HttpReactor) makeValidationRequest(ctx context.Context, url string, payloadBytes []byte) (*http.Response, error) {
87+
func (hr *HttpReactor) makeValidationRequest(ctx context.Context, url string, payloadBytes []byte, ro *Reactor) (*http.Response, error) {
8788
ctx, cancel := context.WithTimeout(ctx, time.Second)
8889
defer cancel()
8990

9091
initialBackoff := 100 * time.Millisecond
9192
maxAttempts := 5
9293

94+
// Create a custom client with mTLS for https URLs
95+
client := &http.Client{}
96+
9397
backoff := initialBackoff
9498
for attempt := 1; attempt <= maxAttempts; attempt++ {
95-
resp, err := http.Post(url, "application/json", bytes.NewBuffer(payloadBytes))
99+
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(payloadBytes))
100+
if err != nil {
101+
return nil, err
102+
}
103+
req.Header.Set("Content-Type", "application/json")
104+
105+
var resp *http.Response
106+
107+
// If URL starts with https://, use mTLS
108+
if strings.HasPrefix(url, "https://") {
109+
resp, err = hr.doMTLSRequest(req, ro)
110+
} else {
111+
resp, err = client.Do(req)
112+
}
113+
96114
if err != nil {
97115
slog.Error("validator failed", "validator", url, "error", err)
98116
if attempt == maxAttempts {
@@ -119,3 +137,13 @@ func (hr *HttpReactor) makeValidationRequest(ctx context.Context, url string, pa
119137
}
120138
return nil, fmt.Errorf("request failed after %d attempts", maxAttempts)
121139
}
140+
141+
func (hr *HttpReactor) doMTLSRequest(req *http.Request, ro *Reactor) (*http.Response, error) {
142+
// Check if TLS client is already created
143+
if ro.tlsClient == nil {
144+
return nil, fmt.Errorf("apogy cant make https request to reactor since its not running with tls")
145+
}
146+
147+
// Use the pre-configured HTTP client
148+
return ro.tlsClient.Do(req)
149+
}

reactor/reactor.go

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@ package reactor
22

33
import (
44
"context"
5+
"crypto/tls"
6+
"crypto/x509"
57
"fmt"
68
"log/slog"
9+
"net/http"
10+
"os"
711
"sync"
812
"time"
913

@@ -12,8 +16,8 @@ import (
1216

1317
type Runtime interface {
1418
Ready(model *openapi.Document, arg interface{}) (interface{}, error)
15-
Validate(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error)
16-
Reconcile(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) error
19+
Validate(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error)
20+
Reconcile(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) error
1721
Stop()
1822
}
1923

@@ -26,13 +30,52 @@ type Reactor struct {
2630
lock sync.RWMutex
2731
running map[string]Runtime
2832
models2reactors map[string][]reactorReadyArgs
33+
34+
tlsConfig *tls.Config
35+
tlsClient *http.Client
2936
}
3037

31-
func NewReactor() *Reactor {
38+
func NewReactor(caCertPath, serverCertPath, serverKeyPath string) *Reactor {
3239
ro := &Reactor{
3340
running: make(map[string]Runtime),
3441
models2reactors: make(map[string][]reactorReadyArgs),
3542
}
43+
44+
// Load TLS configuration if paths are provided
45+
if caCertPath != "" && serverCertPath != "" && serverKeyPath != "" {
46+
// Load the CA cert
47+
caCert, err := os.ReadFile(caCertPath)
48+
if err != nil {
49+
slog.Error("failed to read CA certificate", "error", err)
50+
} else {
51+
// Create a cert pool and add the CA cert
52+
caCertPool := x509.NewCertPool()
53+
if !caCertPool.AppendCertsFromPEM(caCert) {
54+
slog.Error("failed to append CA certificate to pool")
55+
} else {
56+
// Load client certificate
57+
cert, err := tls.LoadX509KeyPair(serverCertPath, serverKeyPath)
58+
if err != nil {
59+
slog.Error("failed to load client certificate/key", "error", err)
60+
} else {
61+
// Create a TLS config with the certificate and key
62+
ro.tlsConfig = &tls.Config{
63+
Certificates: []tls.Certificate{cert},
64+
RootCAs: caCertPool,
65+
}
66+
67+
// Create a reusable HTTP client with TLS configuration
68+
ro.tlsClient = &http.Client{
69+
Transport: &http.Transport{
70+
TLSClientConfig: ro.tlsConfig,
71+
},
72+
}
73+
slog.Info("TLS configuration loaded successfully")
74+
}
75+
}
76+
}
77+
}
78+
3679
ro.startBuiltins()
3780
return ro
3881
}
@@ -129,7 +172,7 @@ func (ro *Reactor) Validate(ctx context.Context, old *openapi.Document, nuw *ope
129172

130173
if rt != nil {
131174
var err error
132-
nuw, err = rt.Validate(ctx, old, nuw, runArgs.args)
175+
nuw, err = rt.Validate(ctx, ro, old, nuw, runArgs.args)
133176
if err != nil {
134177
return nuw, fmt.Errorf("reactor %s rejected change: %w", runArgs.reactorName, err)
135178
}
@@ -181,7 +224,7 @@ func (ro *Reactor) Reconcile(ctx context.Context, old *openapi.Document, nuw *op
181224
var delay = 10 * time.Millisecond
182225
for i := 0; ; i++ {
183226

184-
err := rt.Reconcile(ctx, old, nuw, a.args)
227+
err := rt.Reconcile(ctx, ro, old, nuw, a.args)
185228

186229
if err == nil {
187230
break

reactor/yema.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (*yemaReactor) Ready(model *openapi.Document, args interface{}) (interface{
4242
func (cr *yemaReactor) Stop() {
4343
}
4444

45-
func (yr *yemaReactor) Validate(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
45+
func (yr *yemaReactor) Validate(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) (*openapi.Document, error) {
4646

4747
if nuw == nil {
4848
return nil, nil
@@ -72,6 +72,6 @@ func (yr *yemaReactor) Validate(ctx context.Context, old *openapi.Document, nuw
7272
return nuw, nil
7373
}
7474

75-
func (cr *yemaReactor) Reconcile(ctx context.Context, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
75+
func (cr *yemaReactor) Reconcile(ctx context.Context, ro *Reactor, old *openapi.Document, nuw *openapi.Document, args interface{}) error {
7676
return nil
7777
}

server/crud_test.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@ import (
77
"net/http/httptest"
88
"sync"
99
"testing"
10+
"time"
1011

1112
"encoding/json"
1213
openapi "github.com/aep/apogy/api/go"
1314
"github.com/aep/apogy/bus"
1415
"github.com/aep/apogy/kv"
16+
"github.com/aep/apogy/reactor"
1517
"github.com/labstack/echo/v4"
18+
"github.com/maypok86/otter"
1619
"github.com/stretchr/testify/assert"
1720
)
1821

@@ -27,7 +30,20 @@ func setupTestServer(t *testing.T) (*echo.Echo, *server) {
2730
t.Fatalf("Failed to create test bus: %v", err)
2831
}
2932

30-
s := newServer(kv, bs)
33+
cache, err := otter.MustBuilder[string, *Model](100000).
34+
WithTTL(60 * time.Second).
35+
Build()
36+
37+
if err != nil {
38+
panic(err)
39+
}
40+
41+
s := &server{
42+
kv: kv,
43+
bs: bs,
44+
modelCache: cache,
45+
ro: reactor.NewReactor("", "", ""),
46+
}
3147
e := echo.New()
3248
e.Binder = &Binder{
3349
defaultBinder: &echo.DefaultBinder{},

server/server.go

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,17 @@ type server struct {
2626
modelCache otter.Cache[string, *Model]
2727
}
2828

29-
func newServer(kv kv.KV, bs bus.Bus) *server {
29+
func Main(caCertPath, serverCertPath, serverKeyPath string) {
30+
31+
kv, err := kv.NewTikv()
32+
if err != nil {
33+
panic(err)
34+
}
35+
36+
bs, err := bus.NewSolo()
37+
if err != nil {
38+
panic(err)
39+
}
3040

3141
cache, err := otter.MustBuilder[string, *Model](100000).
3242
WithTTL(60 * time.Second).
@@ -36,28 +46,14 @@ func newServer(kv kv.KV, bs bus.Bus) *server {
3646
panic(err)
3747
}
3848

39-
nu := &server{
49+
s := &server{
4050
kv: kv,
4151
bs: bs,
4252
modelCache: cache,
4353
}
44-
nu.ro = reactor.NewReactor()
45-
return nu
46-
}
4754

48-
func Main(caCertPath, serverCertPath, serverKeyPath string) {
49-
50-
kv, err := kv.NewTikv()
51-
if err != nil {
52-
panic(err)
53-
}
54-
55-
st, err := bus.NewSolo()
56-
if err != nil {
57-
panic(err)
58-
}
55+
s.ro = reactor.NewReactor(caCertPath, serverCertPath, serverKeyPath)
5956

60-
s := newServer(kv, st)
6157
e := echo.New()
6258
e.HideBanner = true
6359

0 commit comments

Comments
 (0)