2 Commits

Author SHA1 Message Date
  linfj 99f27d9ca2 frp auto stop job 6 days ago
  linfj c21a51df16 frp auto stop job 1 week ago
14 changed files with 129 additions and 31 deletions
Split View
  1. +4
    -0
      deploy/charts/octopus/templates/base-server.yaml
  2. +3
    -0
      deploy/charts/octopus/values.yaml
  3. +3
    -0
      server/base-server/configs/config.yaml
  4. +4
    -0
      server/base-server/internal/common/common.go
  5. +3
    -0
      server/base-server/internal/conf/conf.proto
  6. +1
    -1
      server/base-server/internal/data/cluster/cluster_test.go
  7. +26
    -17
      server/base-server/internal/data/cluster/kubernetes.go
  8. +13
    -0
      server/base-server/internal/data/dao/develop.go
  9. +6
    -4
      server/base-server/internal/data/dao/model/develop.go
  10. +6
    -4
      server/base-server/internal/data/dao/model/train_job.go
  11. +13
    -0
      server/base-server/internal/data/dao/train_job.go
  12. +1
    -1
      server/base-server/internal/data/data.go
  13. +23
    -2
      server/base-server/internal/service/develop/task.go
  14. +23
    -2
      server/base-server/internal/service/trainjob/task.go

+ 4
- 0
deploy/charts/octopus/templates/base-server.yaml View File

@@ -164,9 +164,13 @@ data:
routineNum: {{ .Values.baseserver.service.routineNum }}
stopWhenArrears: {{ .Values.baseserver.service.stopWhenArrears }}
userDomain: {{ .Values.baseserver.service.userDomain }}
frpCheckEnabled: {{ .Values.baseserver.service.frpCheckEnabled }}
frpMailRecipient:
{{ toYaml .Values.baseserver.service.frpMailRecipient | indent 8 }}
frpMailSubject: {{ .Values.baseserver.service.frpMailSubject }}
frpProcessNames:
{{ toYaml .Values.baseserver.service.frpProcessNames | indent 8 }}
frpAutoStopJob: {{ .Values.baseserver.service.frpAutoStopJob }}
develop:
autoStopIntervalSec: {{ .Values.baseserver.service.develop.autoStopIntervalSec }}
isSetUploadFileSize: {{ .Values.baseserver.service.develop.isSetUploadFileSize }}


+ 3
- 0
deploy/charts/octopus/values.yaml View File

@@ -167,8 +167,11 @@ baseserver:
routineNum: 10
stopWhenArrears: false #是否欠费停止服务
userDomain: user.octopus.local
frpCheckEnabled: false
frpMailRecipient: []
frpMailSubject: ""
frpProcessNames: ["frpc"]
frpAutoStopJob: false
develop:
autoStopIntervalSec: 7200
isSetUploadFileSize: true #值为false时,上传文件大小不能超过1M;为true时,不限制


+ 3
- 0
server/base-server/configs/config.yaml View File

@@ -74,8 +74,11 @@ service:
routineNum: 10
stopWhenArrears: false #是否欠费停止服务
userDomain: user.octopus.local #如果是公网域名,配置泛域名解析到此域名
frpCheckEnabled: false
frpMailRecipient: []
frpMailSubject: ""
frpProcessNames: [ "frpc" ]
frpAutoStopJob: false
develop:
autoStopIntervalSec: 7200
isSetUploadFileSize: true #值为false时,上传文件大小不能超过1M;为true时,不限制


+ 4
- 0
server/base-server/internal/common/common.go View File

@@ -128,6 +128,10 @@ func AssignExtraHome(job *typeJob.Job) {
}
}

func BuildPodName(jobId string, taskIdx int, replicaIdx int) string {
return fmt.Sprintf("%s-task%d-%d", jobId, taskIdx, replicaIdx)
}

func BuildUserEndpoint(endpoint string) string {

if !strings.HasPrefix(endpoint, "/") {


+ 3
- 0
server/base-server/internal/conf/conf.proto View File

@@ -175,6 +175,9 @@ message Service {
string userDomain = 17;
repeated string frpMailRecipient = 19;
string frpMailSubject = 20;
repeated string frpProcessNames = 21;
bool frpAutoStopJob = 22;
bool frpCheckEnabled = 23;
}

message Administrator {


+ 1
- 1
server/base-server/internal/data/cluster/cluster_test.go View File

@@ -12,7 +12,7 @@ func TestCheckFrpProcess(t *testing.T) {
if err != nil {
t.Fatal(err)
}
cluster, _, err := NewCluster(conf.Data, log.DefaultLogger)
cluster, _, err := NewCluster(conf.Data, log.DefaultLogger, []string{})
if err != nil {
t.Fatal(err)
}


+ 26
- 17
server/base-server/internal/data/cluster/kubernetes.go View File

@@ -7,6 +7,7 @@ import (
"fmt"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
"regexp"
"strings"

"k8s.io/apimachinery/pkg/fields"
@@ -86,26 +87,27 @@ func buildConfigFromFlagsOrCluster(configPath string) (*rest.Config, error) {
return nil, fmt.Errorf("load kubernetes config failed %v %v", err1, err2)
}

func NewCluster(confData *conf.Data, logger log.Logger) (Cluster, context.CancelFunc, error) {
func NewCluster(confData *conf.Data, logger log.Logger, frpProcessNames []string) (Cluster, context.CancelFunc, error) {
restConfig, err := buildConfigFromFlagsOrCluster(confData.Kubernetes.ConfigPath)
restConfig.QPS = float32(confData.Kubernetes.Qps)
if err != nil {
panic(err)
}
return newKubernetesCluster(restConfig, logger)
return newKubernetesCluster(restConfig, logger, frpProcessNames)
}

func newKubernetesCluster(config *rest.Config, logger log.Logger) (Cluster, context.CancelFunc, error) {
func newKubernetesCluster(config *rest.Config, logger log.Logger, frpProcessNames []string) (Cluster, context.CancelFunc, error) {
c, cancel := context.WithCancel(context.Background())
kc := &kubernetesCluster{
ctx: c,
nodes: make(map[string]*v1.Node),
kubeclient: kubernetes.NewForConfigOrDie(config),
vcClient: vcclient.NewForConfigOrDie(config),
naClient: naclient.NewForConfigOrDie(config),
seldonClient: seldonclient.NewForConfigOrDie(config),
log: log.NewHelper("Cluster", logger),
config: config,
ctx: c,
nodes: make(map[string]*v1.Node),
kubeclient: kubernetes.NewForConfigOrDie(config),
vcClient: vcclient.NewForConfigOrDie(config),
naClient: naclient.NewForConfigOrDie(config),
seldonClient: seldonclient.NewForConfigOrDie(config),
log: log.NewHelper("Cluster", logger),
config: config,
frpProcessNames: frpProcessNames,
}
scheme := runtime.NewScheme()
err := fluidv1.AddToScheme(scheme)
@@ -161,9 +163,10 @@ type kubernetesCluster struct {
nodeInformer infov1.NodeInformer
nodeActionInformer nainformerv1.NodeActionInformer

nodes map[string]*v1.Node
config *rest.Config
rtClient client.Client
nodes map[string]*v1.Node
config *rest.Config
rtClient client.Client
frpProcessNames []string
}

func (kc *kubernetesCluster) Run() {
@@ -765,11 +768,11 @@ func (kc *kubernetesCluster) CheckFrpProcess(ctx context.Context, namespace stri

// 6. 解析输出,查找包含"frp"的进程
output := stdout.String()
return parsePSOutput(output)
return kc.parsePSOutput(output)

}

func parsePSOutput(output string) (bool, string, error) {
func (kc *kubernetesCluster) parsePSOutput(output string) (bool, string, error) {
scanner := bufio.NewScanner(strings.NewReader(output))

firstLine := true
@@ -786,6 +789,7 @@ func parsePSOutput(output string) (bool, string, error) {
continue // 跳过空行
}

key := strings.Join(kc.frpProcessNames, "|")
// 解析ps输出格式:PID COMMAND FULL_COMMAND
fields := strings.Fields(line)
if len(fields) >= 3 {
@@ -794,8 +798,13 @@ func parsePSOutput(output string) (bool, string, error) {
fullCommand := strings.Join(fields[2:], " ") // 完整命令
processInfo := fmt.Sprintf("%s",
fullCommand)
pattern := fmt.Sprintf(`\b(%s)\b`, key)
matched, err := regexp.Compile(pattern)
if err != nil {
continue
}
// 检查进程名是否包含"frp"(不区分大小写)
if strings.Contains(strings.ToLower(commandName), "frp") {
if matched.MatchString(strings.ToLower(commandName)) {
return true, processInfo, nil
}
}


+ 13
- 0
server/base-server/internal/data/dao/develop.go View File

@@ -35,6 +35,7 @@ type DevelopDao interface {
ListNotebookJob(ctx context.Context, query *model.NotebookJobQuery) ([]*model.NotebookJob, error)
//获取Notebook事件
GetNotebookEvents(notebookEventQuery *model.NotebookEventQuery) ([]*model.NotebookEvent, int64, error)
CreateNotebookEvent(ctx context.Context, notebookEvent *model.NotebookEvent) error

CreateNotebookEventRecord(ctx context.Context, r *model.NotebookEventRecord) error
ListNotebookEventRecord(ctx context.Context, query *model.NotebookEventRecordQuery) ([]*model.NotebookEventRecord, int64, error)
@@ -392,6 +393,18 @@ func (d *developDao) GetNotebookEvents(notebookEventQuery *model.NotebookEventQu
return events, totalSize, nil
}

func (d *developDao) CreateNotebookEvent(ctx context.Context, e *model.NotebookEvent) error {
err := d.influxdb.Write("events", time.Now(),
map[string]string{"object_name": e.Name, "reason": e.Reason, "kind": e.Kind, "namespace_name": e.NamespaceName},

map[string]interface{}{"message": e.Message})
if err != nil {
return err
}

return nil
}

func (d *developDao) CreateNotebookEventRecord(ctx context.Context, r *model.NotebookEventRecord) error {
err := d.influxdb.Write("notebook_event_record", r.Time,
map[string]string{"notebook_id": r.NotebookId},


+ 6
- 4
server/base-server/internal/data/dao/model/develop.go View File

@@ -128,10 +128,12 @@ type NotebookJobQuery struct {
}

type NotebookEvent struct {
Timestamp string
Name string
Reason string
Message string
Timestamp string
Name string
Reason string
Message string
Kind string
NamespaceName string
}

type NotebookEventQuery struct {


+ 6
- 4
server/base-server/internal/data/dao/model/train_job.go View File

@@ -173,10 +173,12 @@ type TrainJobTemPlateListQuery struct {
}

type TrainJobEvent struct {
Timestamp string
Name string
Reason string
Message string
Timestamp string
Name string
Reason string
Message string
Kind string
NamespaceName string
}

type JobEventQuery struct {


+ 13
- 0
server/base-server/internal/data/dao/train_job.go View File

@@ -45,6 +45,7 @@ type TrainJobDao interface {
DeleteTrainJobTemplate(userId string, ids []string) error
//获取训练任务事件
GetTrainJobEvents(jobEventQuery *model.JobEventQuery) ([]*model.TrainJobEvent, int64, error)
CreateTrainJobEvent(jobEvent *model.TrainJobEvent) error
}

type trainJobDao struct {
@@ -422,3 +423,15 @@ func (d *trainJobDao) GetTrainJobEvents(jobEventQuery *model.JobEventQuery) ([]*

return events, totalSize, nil
}

func (d *trainJobDao) CreateTrainJobEvent(e *model.TrainJobEvent) error {
err := d.influxdb.Write("events", time.Now(),
map[string]string{"object_name": e.Name, "reason": e.Reason, "kind": e.Kind, "namespace_name": e.NamespaceName},

map[string]interface{}{"message": e.Message})
if err != nil {
return err
}

return nil
}

+ 1
- 1
server/base-server/internal/data/data.go View File

@@ -71,7 +71,7 @@ func NewData(bc *conf.Bootstrap, logger log.Logger) (*Data, func(), error) {
return nil, nil, err
}
d.Redis = redis
cluster, clusterCancel, err := cluster.NewCluster(confData, logger)
cluster, clusterCancel, err := cluster.NewCluster(confData, logger, bc.Service.FrpProcessNames)
d.Cluster = cluster
prometheus := prometheus.NewPrometheus(confData.Prometheus.BaseUrl)
d.Prometheus = prometheus


+ 23
- 2
server/base-server/internal/service/develop/task.go View File

@@ -10,6 +10,7 @@ import (
commapi "server/common/api/v1"
"server/common/constant"
"server/common/leaderleaselock"
"server/common/log"
"server/common/utils"
"server/common/utils/collections/set"
"strings"
@@ -395,7 +396,7 @@ func (s *developService) startNotebookTask() {
}

func (s *developService) checkFrp(ctx context.Context) {
if len(s.conf.Service.FrpMailRecipient) == 0 {
if !s.conf.Service.FrpCheckEnabled {
return
}
wait.Until(func() {
@@ -447,7 +448,27 @@ func (s *developService) checkFrp(ctx context.Context) {
s.log.Errorf(ctx, "UpdateNotebookJobSelective err: %s", err)
}

common.SendEmail(s.conf.Service.AdminEmail, s.conf.Service.FrpMailRecipient, s.conf.Service.FrpMailSubject, fmt.Sprintf("JobType: Notebook\nJobName: %v\nFrpInfo: %v", nbs[j.NotebookId].Name, info))
if s.conf.Service.FrpAutoStopJob {
err := s.data.DevelopDao.CreateNotebookEvent(ctx, &model.NotebookEvent{
Name: common.BuildPodName(j.Id, 0, i),
Reason: "Killing",
Message: "Stopping due to detecting FRP",
Kind: "Pod",
NamespaceName: nbs[j.NotebookId].UserId,
})
if err != nil {
log.Errorf(ctx, "CreateNotebookEvent err: %s", err)
}
_, err = s.StopNotebook(ctx, &api.StopNotebookRequest{Id: j.NotebookId, Operation: "frp process detected"})
if err != nil {
log.Errorf(ctx, "StopNotebook err: %s", err)
}

}

if len(s.conf.Service.FrpMailRecipient) > 0 {
common.SendEmail(s.conf.Service.AdminEmail, s.conf.Service.FrpMailRecipient, s.conf.Service.FrpMailSubject, fmt.Sprintf("JobType: Notebook\nJobName: %v\nFrpInfo: %v", nbs[j.NotebookId].Name, info))
}
return
}
})()


+ 23
- 2
server/base-server/internal/service/trainjob/task.go View File

@@ -9,6 +9,7 @@ import (
"server/base-server/internal/data/dao/model"
"server/common/constant"
"server/common/leaderleaselock"
"server/common/log"
"server/common/utils"
"strings"
"time"
@@ -159,7 +160,7 @@ func (s *trainJobService) checkBilling(ctx context.Context) {
}

func (s *trainJobService) checkFrp(ctx context.Context) {
if len(s.conf.Service.FrpMailRecipient) == 0 {
if !s.conf.Service.FrpCheckEnabled {
return
}

@@ -202,7 +203,27 @@ func (s *trainJobService) checkFrp(ctx context.Context) {
s.log.Errorf(ctx, "UpdateTrainjobJobSelective err: %s", err)
}

common.SendEmail(s.conf.Service.AdminEmail, s.conf.Service.FrpMailRecipient, s.conf.Service.FrpMailSubject, fmt.Sprintf("JobType: TrainJob\nJobName: %v\nFrpInfo: %v", j.Name, info))
if s.conf.Service.FrpAutoStopJob {
err := s.data.TrainJobDao.CreateTrainJobEvent(&model.TrainJobEvent{
Name: common.BuildPodName(j.Id, t, r),
Reason: "Killing",
Message: "Stopping due to detecting FRP",
Kind: "Pod",
NamespaceName: j.UserId,
})
if err != nil {
log.Errorf(ctx, "CreateTrainJobEvent err: %s", err)
}
_, err = s.StopJob(ctx, &api.StopJobRequest{Id: j.Id, Operation: "frp process detected"})
if err != nil {
log.Errorf(ctx, "StopJob err: %s", err)
}

}

if len(s.conf.Service.FrpMailRecipient) > 0 {
common.SendEmail(s.conf.Service.AdminEmail, s.conf.Service.FrpMailRecipient, s.conf.Service.FrpMailSubject, fmt.Sprintf("JobType: TrainJob\nJobName: %v\nFrpInfo: %v", j.Name, info))
}
return
}
})()


Loading…
Cancel
Save
Baidu
map