实现健康检查功能并优化代码结构

This commit is contained in:
程广 2025-05-29 16:53:30 +08:00
parent 039643e08e
commit 3bccf2187f
4 changed files with 195 additions and 17 deletions

View File

@ -10,6 +10,7 @@
- 支持rewrite
- 支持TLS
- 支持端口复用
- 支持健康检查
## 配置
```json
@ -97,6 +98,54 @@ paths 配置
- Path [/ws] [/] 代理请求时重写URL路径用第二个参数替换url中的第一个部分
- RemoveCookie [token] 代理请求时删除cookie中的某些字段
### 健康检查配置(新)
健康检查功能允许系统定期检查上游服务器的健康状态,并在请求时自动跳过不健康的服务器。
#### 配置选项
- `health_check` - 在server配置中的健康检查参数
- `interval` - 检查间隔时间(如"10s"
- `timeout` - 单次检查超时时间(如"5s"
- `retries` - 健康检查失败重试次数
#### 示例配置
```json
{
"servers":[{
"port" : 8080,
"name":"test",
"health_check": {
"interval": "10s", // 每10秒检查一次
"timeout": "5s", // 每次检查5秒超时
"retries": 3 // 失败3次才认为是不健康
},
"paths":[{
"path": "/",
"root": "/home/kingecg/code/gohttp/public/",
"default": "index.html"
},{
"path": "/ws",
"upstreams":["http://localhost:3000"],
"directives":[
"HostSchemas $target",
"HeaderOrigin",
"Path /ws /",
"RemoveCookie token"
]
}]
}]
}
```
#### 特性
- 自动检测上游服务器的健康状态
- 请求失败时自动重试其他可用服务器
- 定期后台检查服务器状态
- 详细的日志记录便于监控和调试
### 指令系统
指令系统采用了nginx的指令系统指令的格式为

View File

@ -6,7 +6,9 @@ import (
"net/http/httputil"
"strconv"
"strings"
"time"
"git.pyer.club/kingecg/gohttpd/healthcheck"
"git.pyer.club/kingecg/gohttpd/model"
"git.pyer.club/kingecg/gologger"
)
@ -15,6 +17,7 @@ type ProxyHandler struct {
proxy []*httputil.ReverseProxy
Upstreams []string
count int
checker *healthcheck.HealthChecker // 健康检查器
}
func (p *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -32,23 +35,28 @@ func (p *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
proxyIndex, _ = strconv.Atoi(s.Value)
}
l.Info(fmt.Sprintf("proxy %s to %s", originalUrl, p.Upstreams[proxyIndex]))
p.proxy[proxyIndex].ServeHTTP(w, r)
// 如果选中的上游服务器不健康,则进行重试
maxRetries := 3
for i := 0; i < maxRetries; i++ {
if p.checker == nil || p.checker.CheckHealth(p.Upstreams[proxyIndex]) {
l.Info(fmt.Sprintf("proxy %s to %s", originalUrl, p.Upstreams[proxyIndex]))
p.proxy[proxyIndex].ServeHTTP(w, r)
return
}
proxyIndex = (proxyIndex + 1) % len(p.proxy) // 选择下一个上游服务器
}
l.Error(fmt.Sprintf("All upstream servers are unhealthy"))
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
}
/*
*
init httputil.ReverseProxy instance and add path rewrite and add session-sticky cookie to response
@param upstream upstream server url
@param path http path config
@param index the proxy index in upstreams
@return httputil.ReverseProxy instance
what is the directive? its stands for update of request, like HostSchemas, Path, RemoveCookie, etc.
eg: HostSchemas $target
it stande for replace req url host and schema according to $target url. $target == upstream
*/
// makeProxy 初始化httputil.ReverseProxy实例并添加路径重写和会话粘滞cookie到响应
// 参数:
// upstream 上游服务器URL
// path HTTP路径配置
// index 上游服务器在列表中的索引
// 返回值:
// httputil.ReverseProxy实例
func makeProxy(upstream string, path *model.HttpPath, index int) *httputil.ReverseProxy {
p := &httputil.ReverseProxy{}
directiveHandlers := []func(r *http.Request){}
@ -87,6 +95,7 @@ func makeProxy(upstream string, path *model.HttpPath, index int) *httputil.Rever
return p
}
// NewProxyHandler 创建一个新的代理处理器
func NewProxyHandler(p *model.HttpPath) *ProxyHandler {
upstreamCount := len(p.Upstreams)
if upstreamCount == 0 {
@ -97,8 +106,48 @@ func NewProxyHandler(p *model.HttpPath) *ProxyHandler {
}
ph.proxy = make([]*httputil.ReverseProxy, upstreamCount)
// 从配置中获取健康检查参数,如果不存在则使用默认值
var interval time.Duration = 10 * time.Second
var timeout time.Duration = 5 * time.Second
var retries int = 3
// 使用服务器配置中的健康检查参数
config := model.GetConfig()
if config != nil && config.Admin != nil && config.Admin.HealthCheck != nil {
if config.Admin.HealthCheck.Interval != "" {
var err error
interval, err = time.ParseDuration(config.Admin.HealthCheck.Interval)
if err != nil {
interval = 10 * time.Second // 默认值
}
}
if config.Admin.HealthCheck.Timeout != "" {
var err error
timeout, err = time.ParseDuration(config.Admin.HealthCheck.Timeout)
if err != nil {
timeout = 5 * time.Second // 默认值
}
}
if config.Admin.HealthCheck.Retries > 0 {
retries = config.Admin.HealthCheck.Retries
}
}
for index, upstream := range p.Upstreams {
ph.proxy[index] = makeProxy(upstream, p, index)
}
// 使用配置参数创建健康检查器
ph.checker = healthcheck.NewHealthChecker(interval, timeout, retries)
ph.checker.StartHealthCheck(ph.Upstreams, func(upstream string, healthy bool) {
// 当上游服务器状态变化时的回调函数
logger := gologger.GetLogger("Proxy")
if !healthy {
logger.Warn(fmt.Sprintf("Upstream %s is now unhealthy", upstream))
} else {
logger.Info(fmt.Sprintf("Upstream %s is now healthy", upstream))
}
})
return ph
}
}

View File

@ -0,0 +1,66 @@
package healthcheck
import (
"fmt"
"net/http"
"time"
"git.pyer.club/kingecg/gologger"
)
// HealthChecker 健康检查器
type HealthChecker struct {
client *http.Client
interval time.Duration
timeout time.Duration
retries int
}
func NewHealthChecker(interval time.Duration, timeout time.Duration, retries int) *HealthChecker {
return &HealthChecker{
client: &http.Client{
Timeout: timeout,
},
interval: interval,
timeout: timeout,
retries: retries,
}
}
// CheckHealth 检查上游服务器健康状态
func (hc *HealthChecker) CheckHealth(url string) bool {
logger := gologger.GetLogger("healthcheck")
for i := 0; i < hc.retries; i++ {
resp, err := hc.client.Get(url + "/health")
if err == nil && resp.StatusCode == http.StatusOK {
return true
}
logger.Warn(fmt.Sprintf("Health check failed for %s: %v", url, err))
time.Sleep(hc.timeout)
}
logger.Error(fmt.Sprintf("All health checks failed for %s", url))
return false
}
// StartHealthCheck 启动健康检查协程
func (hc *HealthChecker) StartHealthCheck(upstreams []string, onStatusChange func(string, bool)) {
logger := gologger.GetLogger("healthcheck")
go func() {
statusMap := make(map[string]bool)
for {
for _, upstream := range upstreams {
healthy := hc.CheckHealth(upstream)
if status, exists := statusMap[upstream]; exists {
if status != healthy {
logger.Info(fmt.Sprintf("Upstream %s status changed to %v", upstream, healthy))
if onStatusChange != nil {
onStatusChange(upstream, healthy)
}
}
}
statusMap[upstream] = healthy
}
time.Sleep(hc.interval)
}
}()
}

View File

@ -8,6 +8,14 @@ import (
"git.pyer.club/kingecg/gologger"
)
// HealthCheckConfig 定义健康检查配置
type HealthCheckConfig struct {
Interval string `json:"interval,omitempty"` // 检查间隔时间
Timeout string `json:"timeout,omitempty"` // 单次检查超时时间
Retries int `json:"retries,omitempty"` // 健康检查失败重试次数
}
// HttpPath 定义HTTP路径配置
type HttpPath struct {
Path string `json:"path"`
Root string `json:"root"`
@ -22,12 +30,13 @@ const (
ProxyHost HeaderValueConst = "$ProxyHost"
)
// HttpServerConfig 定义HTTP服务器配置
type HttpServerConfig struct {
Name string `json:"name"`
ServerName string `json:"server"`
Port int `json:"port"`
Host string `json:"host"`
Paths []HttpPath
Paths []HttpPath `json:"paths"`
Username string `json:"username"`
Password string `json:"password"`
CertFile string `json:"certfile"`
@ -35,6 +44,11 @@ type HttpServerConfig struct {
Directives []string `json:"directives"`
AuthType string `json:"auth_type"`
Jwt *JwtConfig `json:"jwt"`
// 健康检查配置
HealthCheck *HealthCheckConfig `json:"health_check,omitempty"`
// 访问控制配置
AllowIPs []string `json:"allow_ips,omitempty"` // 允许访问的IP地址列表
DenyIPs []string `json:"deny_ips,omitempty"` // 禁止访问的IP地址列表
}
type JwtConfig struct {