diff --git a/README.md b/README.md index d41a8e6..ce24c46 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ - 支持rewrite - 支持TLS - 支持端口复用 +- 支持健康检查 ## 配置 ```json @@ -97,6 +98,54 @@ paths 配置 - Path [/ws] [/] 代理请求时,重写URL路径,用第二个参数替换url中的第一个部分 - RemoveCookie [token] 代理请求时,删除cookie中的某些字段 +### 健康检查配置(新) + +健康检查功能允许系统定期检查上游服务器的健康状态,并在请求时自动跳过不健康的服务器。 + +#### 配置选项 + +- `health_check` - 在server配置中的健康检查参数 + - `interval` - 检查间隔时间(如"10s") + - `timeout` - 单次检查超时时间(如"5s") + - `retries` - 健康检查失败重试次数 + +#### 示例配置 + +```json +{ + "servers":[{ + "port" : 8080, + "name":"test", + "health_check": { + "interval": "10s", // 每10秒检查一次 + "timeout": "5s", // 每次检查5秒超时 + "retries": 3 // 失败3次才认为是不健康 + }, + "paths":[{ + "path": "/", + "root": "/home/kingecg/code/gohttp/public/", + "default": "index.html" + },{ + "path": "/ws", + "upstreams":["http://localhost:3000"], + "directives":[ + "HostSchemas $target", + "HeaderOrigin", + "Path /ws /", + "RemoveCookie token" + ] + }] + }] +} +``` + +#### 特性 + +- 自动检测上游服务器的健康状态 +- 请求失败时自动重试其他可用服务器 +- 定期后台检查服务器状态 +- 详细的日志记录便于监控和调试 + ### 指令系统 指令系统采用了nginx的指令系统,指令的格式为: diff --git a/handler/proxy.go b/handler/proxy.go index eb89d47..da9f354 100644 --- a/handler/proxy.go +++ b/handler/proxy.go @@ -6,7 +6,9 @@ import ( "net/http/httputil" "strconv" "strings" + "time" + "git.pyer.club/kingecg/gohttpd/healthcheck" "git.pyer.club/kingecg/gohttpd/model" "git.pyer.club/kingecg/gologger" ) @@ -15,6 +17,7 @@ type ProxyHandler struct { proxy []*httputil.ReverseProxy Upstreams []string count int + checker *healthcheck.HealthChecker // 健康检查器 } func (p *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -32,23 +35,28 @@ func (p *ProxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { proxyIndex, _ = strconv.Atoi(s.Value) } - l.Info(fmt.Sprintf("proxy %s to %s", originalUrl, p.Upstreams[proxyIndex])) - p.proxy[proxyIndex].ServeHTTP(w, r) + // 如果选中的上游服务器不健康,则进行重试 + maxRetries := 3 + for i := 0; i < maxRetries; i++ { + if p.checker == nil || p.checker.CheckHealth(p.Upstreams[proxyIndex]) { + l.Info(fmt.Sprintf("proxy %s to %s", originalUrl, p.Upstreams[proxyIndex])) + p.proxy[proxyIndex].ServeHTTP(w, r) + return + } + proxyIndex = (proxyIndex + 1) % len(p.proxy) // 选择下一个上游服务器 + } + + l.Error(fmt.Sprintf("All upstream servers are unhealthy")) + http.Error(w, "Service Unavailable", http.StatusServiceUnavailable) } -/* -* -init httputil.ReverseProxy instance and add path rewrite and add session-sticky cookie to response -@param upstream upstream server url -@param path http path config -@param index the proxy index in upstreams - -@return httputil.ReverseProxy instance - -what is the directive? its stands for update of request, like HostSchemas, Path, RemoveCookie, etc. -eg: HostSchemas $target -it stande for replace req url host and schema according to $target url. $target == upstream -*/ +// makeProxy 初始化httputil.ReverseProxy实例,并添加路径重写和会话粘滞cookie到响应 +// 参数: +// upstream 上游服务器URL +// path HTTP路径配置 +// index 上游服务器在列表中的索引 +// 返回值: +// httputil.ReverseProxy实例 func makeProxy(upstream string, path *model.HttpPath, index int) *httputil.ReverseProxy { p := &httputil.ReverseProxy{} directiveHandlers := []func(r *http.Request){} @@ -87,6 +95,7 @@ func makeProxy(upstream string, path *model.HttpPath, index int) *httputil.Rever return p } +// NewProxyHandler 创建一个新的代理处理器 func NewProxyHandler(p *model.HttpPath) *ProxyHandler { upstreamCount := len(p.Upstreams) if upstreamCount == 0 { @@ -97,8 +106,48 @@ func NewProxyHandler(p *model.HttpPath) *ProxyHandler { } ph.proxy = make([]*httputil.ReverseProxy, upstreamCount) + // 从配置中获取健康检查参数,如果不存在则使用默认值 + var interval time.Duration = 10 * time.Second + var timeout time.Duration = 5 * time.Second + var retries int = 3 + + // 使用服务器配置中的健康检查参数 + config := model.GetConfig() + if config != nil && config.Admin != nil && config.Admin.HealthCheck != nil { + if config.Admin.HealthCheck.Interval != "" { + var err error + interval, err = time.ParseDuration(config.Admin.HealthCheck.Interval) + if err != nil { + interval = 10 * time.Second // 默认值 + } + } + if config.Admin.HealthCheck.Timeout != "" { + var err error + timeout, err = time.ParseDuration(config.Admin.HealthCheck.Timeout) + if err != nil { + timeout = 5 * time.Second // 默认值 + } + } + if config.Admin.HealthCheck.Retries > 0 { + retries = config.Admin.HealthCheck.Retries + } + } + for index, upstream := range p.Upstreams { ph.proxy[index] = makeProxy(upstream, p, index) } + + // 使用配置参数创建健康检查器 + ph.checker = healthcheck.NewHealthChecker(interval, timeout, retries) + ph.checker.StartHealthCheck(ph.Upstreams, func(upstream string, healthy bool) { + // 当上游服务器状态变化时的回调函数 + logger := gologger.GetLogger("Proxy") + if !healthy { + logger.Warn(fmt.Sprintf("Upstream %s is now unhealthy", upstream)) + } else { + logger.Info(fmt.Sprintf("Upstream %s is now healthy", upstream)) + } + }) + return ph -} +} \ No newline at end of file diff --git a/healthcheck/healthcheck.go b/healthcheck/healthcheck.go new file mode 100644 index 0000000..d917b78 --- /dev/null +++ b/healthcheck/healthcheck.go @@ -0,0 +1,66 @@ +package healthcheck + +import ( + "fmt" + "net/http" + "time" + + "git.pyer.club/kingecg/gologger" +) + +// HealthChecker 健康检查器 +type HealthChecker struct { + client *http.Client + interval time.Duration + timeout time.Duration + retries int +} + +func NewHealthChecker(interval time.Duration, timeout time.Duration, retries int) *HealthChecker { + return &HealthChecker{ + client: &http.Client{ + Timeout: timeout, + }, + interval: interval, + timeout: timeout, + retries: retries, + } +} + +// CheckHealth 检查上游服务器健康状态 +func (hc *HealthChecker) CheckHealth(url string) bool { + logger := gologger.GetLogger("healthcheck") + for i := 0; i < hc.retries; i++ { + resp, err := hc.client.Get(url + "/health") + if err == nil && resp.StatusCode == http.StatusOK { + return true + } + logger.Warn(fmt.Sprintf("Health check failed for %s: %v", url, err)) + time.Sleep(hc.timeout) + } + logger.Error(fmt.Sprintf("All health checks failed for %s", url)) + return false +} + +// StartHealthCheck 启动健康检查协程 +func (hc *HealthChecker) StartHealthCheck(upstreams []string, onStatusChange func(string, bool)) { + logger := gologger.GetLogger("healthcheck") + go func() { + statusMap := make(map[string]bool) + for { + for _, upstream := range upstreams { + healthy := hc.CheckHealth(upstream) + if status, exists := statusMap[upstream]; exists { + if status != healthy { + logger.Info(fmt.Sprintf("Upstream %s status changed to %v", upstream, healthy)) + if onStatusChange != nil { + onStatusChange(upstream, healthy) + } + } + } + statusMap[upstream] = healthy + } + time.Sleep(hc.interval) + } + }() +} diff --git a/model/model.go b/model/model.go index 63e3581..b7a9847 100644 --- a/model/model.go +++ b/model/model.go @@ -8,6 +8,14 @@ import ( "git.pyer.club/kingecg/gologger" ) +// HealthCheckConfig 定义健康检查配置 +type HealthCheckConfig struct { + Interval string `json:"interval,omitempty"` // 检查间隔时间 + Timeout string `json:"timeout,omitempty"` // 单次检查超时时间 + Retries int `json:"retries,omitempty"` // 健康检查失败重试次数 +} + +// HttpPath 定义HTTP路径配置 type HttpPath struct { Path string `json:"path"` Root string `json:"root"` @@ -22,12 +30,13 @@ const ( ProxyHost HeaderValueConst = "$ProxyHost" ) +// HttpServerConfig 定义HTTP服务器配置 type HttpServerConfig struct { Name string `json:"name"` ServerName string `json:"server"` Port int `json:"port"` Host string `json:"host"` - Paths []HttpPath + Paths []HttpPath `json:"paths"` Username string `json:"username"` Password string `json:"password"` CertFile string `json:"certfile"` @@ -35,6 +44,11 @@ type HttpServerConfig struct { Directives []string `json:"directives"` AuthType string `json:"auth_type"` Jwt *JwtConfig `json:"jwt"` + // 健康检查配置 + HealthCheck *HealthCheckConfig `json:"health_check,omitempty"` + // 访问控制配置 + AllowIPs []string `json:"allow_ips,omitempty"` // 允许访问的IP地址列表 + DenyIPs []string `json:"deny_ips,omitempty"` // 禁止访问的IP地址列表 } type JwtConfig struct {