@@ -7,6 +7,7 @@ import (
"fmt"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
"regexp"
"strings"
"k8s.io/apimachinery/pkg/fields"
@@ -86,26 +87,27 @@ func buildConfigFromFlagsOrCluster(configPath string) (*rest.Config, error) {
return nil, fmt.Errorf("load kubernetes config failed %v %v", err1, err2)
}
func NewCluster(confData *conf.Data, logger log.Logger) (Cluster, context.CancelFunc, error) {
func NewCluster(confData *conf.Data, logger log.Logger, frpProcessNames []string ) (Cluster, context.CancelFunc, error) {
restConfig, err := buildConfigFromFlagsOrCluster(confData.Kubernetes.ConfigPath)
restConfig.QPS = float32(confData.Kubernetes.Qps)
if err != nil {
panic(err)
}
return newKubernetesCluster(restConfig, logger)
return newKubernetesCluster(restConfig, logger, frpProcessNames )
}
func newKubernetesCluster(config *rest.Config, logger log.Logger) (Cluster, context.CancelFunc, error) {
func newKubernetesCluster(config *rest.Config, logger log.Logger, frpProcessNames []string ) (Cluster, context.CancelFunc, error) {
c, cancel := context.WithCancel(context.Background())
kc := &kubernetesCluster{
ctx: c,
nodes: make(map[string]*v1.Node),
kubeclient: kubernetes.NewForConfigOrDie(config),
vcClient: vcclient.NewForConfigOrDie(config),
naClient: naclient.NewForConfigOrDie(config),
seldonClient: seldonclient.NewForConfigOrDie(config),
log: log.NewHelper("Cluster", logger),
config: config,
ctx: c,
nodes: make(map[string]*v1.Node),
kubeclient: kubernetes.NewForConfigOrDie(config),
vcClient: vcclient.NewForConfigOrDie(config),
naClient: naclient.NewForConfigOrDie(config),
seldonClient: seldonclient.NewForConfigOrDie(config),
log: log.NewHelper("Cluster", logger),
config: config,
frpProcessNames: frpProcessNames,
}
scheme := runtime.NewScheme()
err := fluidv1.AddToScheme(scheme)
@@ -161,9 +163,10 @@ type kubernetesCluster struct {
nodeInformer infov1.NodeInformer
nodeActionInformer nainformerv1.NodeActionInformer
nodes map[string]*v1.Node
config *rest.Config
rtClient client.Client
nodes map[string]*v1.Node
config *rest.Config
rtClient client.Client
frpProcessNames []string
}
func (kc *kubernetesCluster) Run() {
@@ -765,11 +768,11 @@ func (kc *kubernetesCluster) CheckFrpProcess(ctx context.Context, namespace stri
// 6. 解析输出,查找包含"frp"的进程
output := stdout.String()
return parsePSOutput(output)
return kc. parsePSOutput(output)
}
func parsePSOutput(output string) (bool, string, error) {
func (kc *kubernetesCluster) parsePSOutput(output string) (bool, string, error) {
scanner := bufio.NewScanner(strings.NewReader(output))
firstLine := true
@@ -786,6 +789,7 @@ func parsePSOutput(output string) (bool, string, error) {
continue // 跳过空行
}
key := strings.Join(kc.frpProcessNames, "|")
// 解析ps输出格式:PID COMMAND FULL_COMMAND
fields := strings.Fields(line)
if len(fields) >= 3 {
@@ -794,8 +798,13 @@ func parsePSOutput(output string) (bool, string, error) {
fullCommand := strings.Join(fields[2:], " ") // 完整命令
processInfo := fmt.Sprintf("%s",
fullCommand)
pattern := fmt.Sprintf(`\b(%s)\b`, key)
matched, err := regexp.Compile(pattern)
if err != nil {
continue
}
// 检查进程名是否包含"frp"(不区分大小写)
if strings.Contains(strings.ToLower(commandName), "frp") {
if matched.MatchString(strings.ToLower(commandName) ) {
return true, processInfo, nil
}
}