Skip to content

Commit 277e90a

Browse files
authored
Client load report for grpclb. (#1200)
1 parent a7fee9f commit 277e90a

File tree

8 files changed

+437
-66
lines changed

8 files changed

+437
-66
lines changed

call.go

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
9393
}
9494

9595
// sendRequest writes out various information of an RPC such as Context and Message.
96-
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {
97-
stream, err := t.NewStream(ctx, callHdr)
98-
if err != nil {
99-
return nil, err
100-
}
96+
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) {
10197
defer func() {
10298
if err != nil {
10399
// If err is connection error, t will be closed, no need to close stream here.
@@ -120,7 +116,7 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
120116
}
121117
outBuf, err := encode(dopts.codec, args, compressor, cbuf, outPayload)
122118
if err != nil {
123-
return nil, Errorf(codes.Internal, "grpc: %v", err)
119+
return Errorf(codes.Internal, "grpc: %v", err)
124120
}
125121
err = t.Write(stream, outBuf, opts)
126122
if err == nil && outPayload != nil {
@@ -131,10 +127,10 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
131127
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
132128
// recvResponse to get the final status.
133129
if err != nil && err != io.EOF {
134-
return nil, err
130+
return err
135131
}
136132
// Sent successfully.
137-
return stream, nil
133+
return nil
138134
}
139135

140136
// Invoke sends the RPC request on the wire and returns after response is received.
@@ -183,6 +179,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
183179
}
184180
}()
185181
}
182+
ctx = newContextWithRPCInfo(ctx)
186183
sh := cc.dopts.copts.StatsHandler
187184
if sh != nil {
188185
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
@@ -246,33 +243,49 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
246243
if c.traceInfo.tr != nil {
247244
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
248245
}
249-
stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, t, args, topts)
246+
stream, err = t.NewStream(ctx, callHdr)
247+
if err != nil {
248+
if put != nil {
249+
if _, ok := err.(transport.ConnectionError); ok {
250+
// If error is connection error, transport was sending data on wire,
251+
// and we are not sure if anything has been sent on wire.
252+
// If error is not connection error, we are sure nothing has been sent.
253+
updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false})
254+
}
255+
put()
256+
}
257+
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
258+
continue
259+
}
260+
return toRPCErr(err)
261+
}
262+
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, stream, t, args, topts)
250263
if err != nil {
251264
if put != nil {
265+
updateRPCInfoInContext(ctx, rpcInfo{
266+
bytesSent: stream.BytesSent(),
267+
bytesReceived: stream.BytesReceived(),
268+
})
252269
put()
253-
put = nil
254270
}
255271
// Retry a non-failfast RPC when
256272
// i) there is a connection error; or
257273
// ii) the server started to drain before this RPC was initiated.
258-
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
259-
if c.failFast {
260-
return toRPCErr(err)
261-
}
274+
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
262275
continue
263276
}
264277
return toRPCErr(err)
265278
}
266279
err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
267280
if err != nil {
268281
if put != nil {
282+
updateRPCInfoInContext(ctx, rpcInfo{
283+
bytesSent: stream.BytesSent(),
284+
bytesReceived: stream.BytesReceived(),
285+
})
269286
put()
270-
put = nil
271287
}
272-
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
273-
if c.failFast {
274-
return toRPCErr(err)
275-
}
288+
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
276289
continue
277290
}
278291
return toRPCErr(err)
@@ -282,8 +295,11 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
282295
}
283296
t.CloseStream(stream, nil)
284297
if put != nil {
298+
updateRPCInfoInContext(ctx, rpcInfo{
299+
bytesSent: stream.BytesSent(),
300+
bytesReceived: stream.BytesReceived(),
301+
})
285302
put()
286-
put = nil
287303
}
288304
return stream.Status().Err()
289305
}

clientconn.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,13 +669,15 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
669669
}
670670
if !ok {
671671
if put != nil {
672+
updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
672673
put()
673674
}
674675
return nil, nil, errConnClosing
675676
}
676677
t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
677678
if err != nil {
678679
if put != nil {
680+
updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
679681
put()
680682
}
681683
return nil, nil, err

grpclb.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ type balancer struct {
145145
done bool
146146
expTimer *time.Timer
147147
rand *rand.Rand
148+
149+
clientStats lbpb.ClientStats
148150
}
149151

150152
func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
@@ -281,6 +283,34 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
281283
return
282284
}
283285

286+
func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
287+
ticker := time.NewTicker(interval)
288+
defer ticker.Stop()
289+
for {
290+
select {
291+
case <-ticker.C:
292+
case <-done:
293+
return
294+
}
295+
b.mu.Lock()
296+
stats := b.clientStats
297+
b.clientStats = lbpb.ClientStats{} // Clear the stats.
298+
b.mu.Unlock()
299+
t := time.Now()
300+
stats.Timestamp = &lbpb.Timestamp{
301+
Seconds: t.Unix(),
302+
Nanos: int32(t.Nanosecond()),
303+
}
304+
if err := s.Send(&lbpb.LoadBalanceRequest{
305+
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
306+
ClientStats: &stats,
307+
},
308+
}); err != nil {
309+
return
310+
}
311+
}
312+
}
313+
284314
func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
285315
ctx, cancel := context.WithCancel(context.Background())
286316
defer cancel()
@@ -322,6 +352,14 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
322352
grpclog.Println("TODO: Delegation is not supported yet.")
323353
return
324354
}
355+
streamDone := make(chan struct{})
356+
defer close(streamDone)
357+
b.mu.Lock()
358+
b.clientStats = lbpb.ClientStats{} // Clear client stats.
359+
b.mu.Unlock()
360+
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
361+
go b.sendLoadReport(stream, d, streamDone)
362+
}
325363
// Retrieve the server list.
326364
for {
327365
reply, err := stream.Recv()
@@ -538,7 +576,32 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre
538576
err = ErrClientConnClosing
539577
return
540578
}
579+
seq := b.seq
541580

581+
defer func() {
582+
if err != nil {
583+
return
584+
}
585+
put = func() {
586+
s, ok := rpcInfoFromContext(ctx)
587+
if !ok {
588+
return
589+
}
590+
b.mu.Lock()
591+
defer b.mu.Unlock()
592+
if b.done || seq < b.seq {
593+
return
594+
}
595+
b.clientStats.NumCallsFinished++
596+
if !s.bytesSent {
597+
b.clientStats.NumCallsFinishedWithClientFailedToSend++
598+
} else if s.bytesReceived {
599+
b.clientStats.NumCallsFinishedKnownReceived++
600+
}
601+
}
602+
}()
603+
604+
b.clientStats.NumCallsStarted++
542605
if len(b.addrs) > 0 {
543606
if b.next >= len(b.addrs) {
544607
b.next = 0
@@ -556,6 +619,13 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre
556619
}
557620
if !opts.BlockingWait {
558621
b.next = next
622+
if a.dropForLoadBalancing {
623+
b.clientStats.NumCallsFinished++
624+
b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
625+
} else if a.dropForRateLimiting {
626+
b.clientStats.NumCallsFinished++
627+
b.clientStats.NumCallsFinishedWithDropForRateLimiting++
628+
}
559629
b.mu.Unlock()
560630
err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
561631
return
@@ -569,6 +639,8 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre
569639
}
570640
if !opts.BlockingWait {
571641
if len(b.addrs) == 0 {
642+
b.clientStats.NumCallsFinished++
643+
b.clientStats.NumCallsFinishedWithClientFailedToSend++
572644
b.mu.Unlock()
573645
err = Errorf(codes.Unavailable, "there is no address available")
574646
return
@@ -590,11 +662,17 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre
590662
for {
591663
select {
592664
case <-ctx.Done():
665+
b.mu.Lock()
666+
b.clientStats.NumCallsFinished++
667+
b.clientStats.NumCallsFinishedWithClientFailedToSend++
668+
b.mu.Unlock()
593669
err = ctx.Err()
594670
return
595671
case <-ch:
596672
b.mu.Lock()
597673
if b.done {
674+
b.clientStats.NumCallsFinished++
675+
b.clientStats.NumCallsFinishedWithClientFailedToSend++
598676
b.mu.Unlock()
599677
err = ErrClientConnClosing
600678
return
@@ -617,6 +695,13 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre
617695
}
618696
if !opts.BlockingWait {
619697
b.next = next
698+
if a.dropForLoadBalancing {
699+
b.clientStats.NumCallsFinished++
700+
b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
701+
} else if a.dropForRateLimiting {
702+
b.clientStats.NumCallsFinished++
703+
b.clientStats.NumCallsFinishedWithDropForRateLimiting++
704+
}
620705
b.mu.Unlock()
621706
err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
622707
return

0 commit comments

Comments
 (0)