-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathhealth.go
295 lines (260 loc) · 9.07 KB
/
health.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
package cslb
/*
The health structs track the health of the target systems in terms of whether we have been able to
establish successful connections to them or not and what the results of the health check is - if one
is running.
The health check URL is used as by a background check to pre-determine the state of the target
rather than waiting for a failed connection. Defining a health check URL is recommended as a
successful connect() does not necessarily imply a successful service - it merely implies a
successful TCP setup. Furthermore a health check URL can be used to administratively turn a target
on and off. Or take it "out of rotation" in devop parlance.
Most of these functions are actually cslb functions rather than healthCache functions because they
need access to cslb variables such as config and resolver. This could be restructured to bring all
those values within a healthStore, but there's not a lot of value in that apart from slightly better
encapsulation.
*/
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
)
type healthCache struct {
sync.RWMutex // Protects everything within this struct
done chan bool // Shuts down the cache cleaner
cache map[string]*ceHealth // The key is ToLower(target:port) - use makeHealthStoreKey()
}
type ceHealth struct {
expires time.Time // When this entry expire out of the cache
goodDials int
failedDials int
nextDialAttempt time.Time // When we can next consider this target - IsZero() means now
lastDialAttempt time.Time
lastDialStatus string
lastHealthCheck time.Time
lastHealthCheckStatus string // From http.Get()
url string // URL to probe to confirm target is healthy
unHealthy bool // True if last health check failed
}
// isGood returns whether a target can be used. Caller must have locked beforehand.
func (t *ceHealth) isGood(now time.Time) bool {
return !t.unHealthy && !t.nextDialAttempt.After(now) // Don't use Before as it might be right now!
}
// makeHealthStoreKey generates the lookup key for the healthStore. It's of the form host:port
func makeHealthStoreKey(host string, port int) string {
return host + ":" + strconv.FormatUint(uint64(port), 10)
}
func unpackHealthStoreKey(targetKey string) (host, port string) {
colon := strings.IndexByte(targetKey, ':')
if colon > 0 {
host = targetKey[:colon]
port = targetKey[colon+1:]
}
return
}
func newHealthCache() *healthCache {
return &healthCache{cache: make(map[string]*ceHealth), done: make(chan bool)}
}
func (t *healthCache) start(cacheInterval time.Duration) {
go t.cleaner(cacheInterval)
}
func (t *healthCache) stop() {
close(t.done)
}
// populateHealthStore adds a list of targets to the healthStore. Supplied keys are fully formed
// cache keys, that is, target:port. It also starts off the health check for each new target if HC
// is enabled.
func (t *cslb) populateHealthStore(now time.Time, healthStoreKeys []string) {
t.healthStore.Lock()
defer t.healthStore.Unlock()
for _, healthStoreKey := range healthStoreKeys {
ceh := t.healthStore.cache[healthStoreKey]
if ceh == nil {
ceh = &ceHealth{expires: now.Add(t.HealthTTL)}
t.healthStore.cache[healthStoreKey] = ceh
if !t.DisableHealthChecks {
go t.fetchAndRunHealthCheck(healthStoreKey, ceh)
}
}
}
}
var zeroTime time.Time
// setDialResult records the results of the last dial attempt. If this is a previously unknown
// target then a health check is start for the target, if HC is enabled. This should rarely be the
// case but it can happen if the HealthTTL is shorter than the SRV TTL or if a connection runs
// across a target expiration.
func (t *cslb) setDialResult(now time.Time, host string, port int, err error) {
t.healthStore.Lock()
defer t.healthStore.Unlock()
healthStoreKey := makeHealthStoreKey(host, port)
ceh := t.healthStore.cache[healthStoreKey]
if ceh == nil { // I would expect an entry to be here
ceh = &ceHealth{expires: now.Add(t.HealthTTL)}
t.healthStore.cache[healthStoreKey] = ceh
if !t.DisableHealthChecks {
go t.fetchAndRunHealthCheck(healthStoreKey, ceh)
}
}
ceh.lastDialAttempt = now
if err == nil {
ceh.goodDials++
ceh.nextDialAttempt = zeroTime
ceh.lastDialStatus = ""
} else {
ceh.failedDials++
ceh.nextDialAttempt = now.Add(t.DialVetoDuration)
ceh.lastDialStatus = err.Error()
}
}
// fetchAndRunHealthCheck is normally started as a separate go-routine when a target is added to the
// healthStore. It fetches the health check URL and if present runs a periodic GET check until the
// ceHealth entry expires. The health check URL is stored in a TXT RR. It could be a TypeURI RR
// (RFC7553) I suppose, but who supports/uses those? The qName for the TXT RR is of the form
// _$port._cslb.$target, thus something like _80._cslb.example.net where port is from the SRV RR.
func (t *cslb) fetchAndRunHealthCheck(healthStoreKey string, ceh *ceHealth) {
host, port := unpackHealthStoreKey(healthStoreKey)
qName := "_" + port + t.HealthCheckTXTPrefix + host
txts, err := t.netResolver.LookupTXT(context.Background(), qName)
if err != nil {
return // No TXT
}
hcURL := strings.Join(txts, "") // TXT is a slice of sub-strings so bang them all together
if len(hcURL) == 0 {
return // Empty string can't be fetched!
}
t.healthStore.Lock()
ceh.url = hcURL // For reporting purposes only
expires := ceh.expires // Extract under protection of the lock
t.healthStore.Unlock()
_, err = url.Parse(hcURL) // Check that the URL is in fact a URL
if err != nil {
return // Doesn't look like it!
}
// Run the health check until the ceh expires.
sleepFor := time.Second // Only wait a short time for the first health check
for {
time.Sleep(sleepFor)
sleepFor = t.HealthCheckFrequency // Second and subsequents wait a normal amount of time
now := time.Now()
if expires.Before(now) {
return
}
resp, err := t.hcClient.Get(hcURL)
if err != nil {
if t.PrintHCResults {
fmt.Println("Health Check:", healthStoreKey, err)
}
t.healthStore.Lock()
ceh.unHealthy = true
ceh.lastHealthCheck = now
ceh.lastHealthCheckStatus = err.Error()
t.healthStore.Unlock()
return // Fatal error - leave the ceh to its own devices
}
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
if t.PrintHCResults {
fmt.Println("Health Check:", healthStoreKey, err)
}
continue
}
ok := resp.StatusCode == http.StatusOK && bytes.Contains(body, []byte(t.HealthCheckContentOk))
if t.PrintHCResults {
fmt.Println("Health Check Set:", healthStoreKey, ok)
}
t.healthStore.Lock()
ceh.unHealthy = !ok
ceh.lastHealthCheck = now
ceh.lastHealthCheckStatus = resp.Status
t.healthStore.Unlock()
}
}
// cleaner periodically scans the cache to delete expired entries. Normally run as a go-routine.
func (t *healthCache) cleaner(cleanInterval time.Duration) {
ticker := time.NewTicker(cleanInterval)
defer ticker.Stop()
for {
select {
case <-t.done:
return
case now := <-ticker.C:
t.clean(now)
}
}
}
func (t *healthCache) clean(now time.Time) {
t.Lock()
defer t.Unlock()
for key, ceh := range t.cache {
if ceh.expires.Before(now) {
delete(t.cache, key)
}
}
}
// ceHealthAsStats is a clone of ceHealth with exported variables for html.Template
type ceHealthAsStats struct {
Key string
GoodDials int
FailedDials int
Expires time.Duration // In the future
NextDialAttempt time.Duration // In the future
LastDialAttempt time.Duration // In the past
LastDialStatus string
LastHealthCheck time.Duration // In the past
LastHealthCheckStatus string
Url string
IsGood bool
}
type healthStats struct {
Targets []ceHealthAsStats
}
// getStats clones all the ceHealth entries into a struct suitable for the status service. This
// shouldn't be too expensive as we don't expect a huge number of targets, but who knows?
func (t *healthCache) getStats() *healthStats {
now := time.Now()
s := &healthStats{}
t.RLock()
defer t.RUnlock()
s.Targets = make([]ceHealthAsStats, 0, len(t.cache))
for k, v := range t.cache {
entry := ceHealthAsStats{
Key: k,
GoodDials: v.goodDials,
FailedDials: v.failedDials,
LastDialStatus: trimTo(v.lastDialStatus, 60),
Url: v.url,
IsGood: v.isGood(now),
}
if !v.expires.IsZero() {
entry.Expires = v.expires.Sub(now).Truncate(time.Second)
}
if !v.nextDialAttempt.IsZero() {
entry.NextDialAttempt = v.nextDialAttempt.Sub(now).Truncate(time.Second)
}
if !v.lastDialAttempt.IsZero() {
entry.LastDialAttempt = now.Sub(v.lastDialAttempt).Truncate(time.Second)
}
if !v.lastHealthCheck.IsZero() {
entry.LastHealthCheck = now.Sub(v.lastHealthCheck).Truncate(time.Second)
entry.LastHealthCheckStatus = trimTo(v.lastHealthCheckStatus, 90)
}
s.Targets = append(s.Targets, entry)
}
return s
}
func trimTo(s string, max int) string {
if len(s) > max {
if max <= 3 {
return "..."
}
s = s[:max-3] + "..."
}
return s
}