6 Commits

Author SHA1 Message Date
  linfj e0124b397e frp 3 weeks ago
  linfj 0398fefe83 frp 3 weeks ago
  linfj 3b8881d5cb fix train job billing 3 weeks ago
  linfj a92a311eb0 frp check 3 weeks ago
  linfj a44e7c2551 delete 1 month ago
  linfj e31db563c2 check frp 1 month ago
24 changed files with 441 additions and 313 deletions
Split View
  1. +3
    -0
      deploy/charts/octopus/templates/base-server.yaml
  2. +6
    -0
      deploy/charts/octopus/values.yaml
  3. +2
    -0
      server/base-server/configs/config.yaml
  4. +0
    -101
      server/base-server/configs/config_project.yaml
  5. +3
    -0
      server/base-server/internal/common/billing.go
  6. +5
    -0
      server/base-server/internal/common/constant.go
  7. +3
    -2
      server/base-server/internal/common/email.go
  8. +5
    -0
      server/base-server/internal/common/job.go
  9. +29
    -0
      server/base-server/internal/common/test.go
  10. +2
    -0
      server/base-server/internal/conf/conf.proto
  11. +1
    -0
      server/base-server/internal/data/cluster/cluster.go
  12. +25
    -0
      server/base-server/internal/data/cluster/cluster_test.go
  13. +80
    -0
      server/base-server/internal/data/cluster/kubernetes.go
  14. +5
    -0
      server/base-server/internal/data/dao/develop.go
  15. +12
    -9
      server/base-server/internal/data/dao/model/develop.go
  16. +17
    -14
      server/base-server/internal/data/dao/model/train_job.go
  17. +5
    -0
      server/base-server/internal/data/dao/train_job.go
  18. +1
    -1
      server/base-server/internal/service/develop/develop.go
  19. +69
    -0
      server/base-server/internal/service/develop/task.go
  20. +154
    -182
      server/base-server/internal/service/trainjob/task.go
  21. +2
    -2
      server/base-server/internal/service/trainjob/train_job.go
  22. +8
    -1
      server/common/utils/encrypt_test.go
  23. +2
    -1
      server/go.mod
  24. +2
    -0
      server/go.sum

+ 3
- 0
deploy/charts/octopus/templates/base-server.yaml View File

@@ -166,6 +166,9 @@ data:
userDomain: {{ .Values.baseserver.service.userDomain }}
shareVolumes:
{{ toYaml .Values.baseserver.service.shareVolumes | indent 8 }}
frpMailRecipient:
{{ toYaml .Values.baseserver.service.frpMailRecipient | indent 8 }}
frpMailSubject: {{ .Values.baseserver.service.frpMailSubject }}
develop:
autoStopIntervalSec: {{ .Values.baseserver.service.develop.autoStopIntervalSec }}
isSetUploadFileSize: {{ .Values.baseserver.service.develop.isSetUploadFileSize }}


+ 6
- 0
deploy/charts/octopus/values.yaml View File

@@ -168,6 +168,8 @@ baseserver:
stopWhenArrears: false #是否欠费停止服务
userDomain: user.octopus.local
shareVolumes: []
frpMailRecipient: []
frpMailSubject: ""
develop:
autoStopIntervalSec: 7200
isSetUploadFileSize: true #值为false时,上传文件大小不能超过1M;为true时,不限制
@@ -317,6 +319,8 @@ mysql:
rootPassword: "root"
volumePermissions:
enabled: true
image:
pullPolicy: IfNotPresent
primary:
service:
type: NodePort
@@ -358,6 +362,8 @@ redis:
password: "abcde"
volumePermissions:
enabled: true
image:
pullPolicy: IfNotPresent

# nginx-ingress-controller
nginx-ingress-controller:


+ 2
- 0
server/base-server/configs/config.yaml View File

@@ -74,6 +74,8 @@ service:
routineNum: 10
stopWhenArrears: false #是否欠费停止服务
userDomain: user.octopus.local #如果是公网域名,配置泛域名解析到此域名
frpMailRecipient: []
frpMailSubject: ""
develop:
autoStopIntervalSec: 7200
isSetUploadFileSize: true #值为false时,上传文件大小不能超过1M;为true时,不限制


+ 0
- 101
server/base-server/configs/config_project.yaml View File

@@ -1,101 +0,0 @@
app:
name: baseserver
version: v1.0
isDev: true #是否本地调试
logLevel: info
server:
http:
addr: 0.0.0.0:8001
timeout: 60s
grpc:
addr: 0.0.0.0:9001
timeout: 60s
data:
database:
driver: mysql
source: root:root@tcp(192.168.202.73:30336)/octopus?charset=utf8&parseTime=True&loc=Local
kubernetes:
masterUrl: https://192.168.202.73:6443/
configPath: ./kubeconfig
qps: 20
minio:
base:
endPoint: 192.168.202.73:31311
accessKeyID: minioadmin
secretAccessKey: minioadmin
useSSL: false
mountPath: /nfsdata/octopus-242-41/minio
pvcName: octopus-minio-pvc
proxyPath: /oss
business:
downloadExpiry: 86400
uploadExpiry: 86400
harbor:
host: 192.168.202.74:5000
username: openi
password: OpenI2018
apiVersion: v1.0
useSSL: false
redis:
addr: 192.168.202.73:30667
username:
password: abcde
influxdb:
addr: 192.168.202.73:30086
username: octopus
password: octopus
database: octopus
jointCloud:
baseUrl: http://192.168.207.141:8709
username: test
password: 7ee15bc8fee766cad1bd70ccf5f4dc14
sessionExpirySec: 540 #实际有效期为600
ambassador:
baseUrl: 192.168.202.73
pytorchServer:
imageAddr: swr.cn-south-1.myhuaweicloud.com/openioctopus/pytorchserver
version: 2.0.2
sftpgo:
baseUrl: 192.168.202.73:30022
username: admin
password: abcde
prometheus:
baseUrl: http://192.168.202.73:30003
service:
baseServerAddr: http://127.0.0.1:8001
dockerDatasetPath: /dataset
dockerCodePath: /code
dockerModelPath: /model
dockerUserHomePath: /userhome
resourceLabelKey: octopus.pcl.ac.cn/type
billingPeriodSec: 60
isUseMultusCNI: false
networksConf: default/macvlan-cx5-bond-conf
routineNum: 10
develop:
autoStopIntervalSec: 7200
isSetUploadFileSize: true #值为false时,上传文件大小不能超过1M;为true时,不限制
resource:
customizedResourceBindingNodeLabelKeyFormat: openi.octopus.resource.%s
customizedResourceBindingNodeLabelValue: bound
defaultPoolName: common-pool
poolInfoStoreKey: ResourcePoolInfo
poolBindingNodeLabelKeyFormat: openi.octopus.resourcepool.%s
poolBindingNodeLabelValue: bound
poolSelectLabelKey: platform
poolSelectLabelValue: openi.octopus
discoveryLeaderLeaseLockName: resourcediscovery
discoveryDuration: 15s
ignoreSystemResources: hugepages-1Gi,pods,hugepages-2Mi,ephemeral-storage
administrator:
username: "admin"
password: "123456"
email: ""
phone: ""
module:
storage:
source:
capacity: "100Gi"
nfs:
server: 192.168.203.72
path: "/data/datasets/data/octopus-dev-minio"

+ 3
- 0
server/base-server/internal/common/billing.go View File

@@ -37,6 +37,9 @@ func CalculateAmount(ctx context.Context, job *typeJob.JobStatusDetail, prices [
}
}

if rs < 0 {
rs = 0
}
return rs, startTime
}



+ 5
- 0
server/base-server/internal/common/constant.go View File

@@ -14,3 +14,8 @@ const (
OctopusServiceRoleLabel = "octopus.openi.pcl.cn/role"
DockerPreTrainModePath = "/pretrainmodel"
)

const (
FrpCheckResult_None = 1
FrpCheckResult_Exist = 2
)

+ 3
- 2
server/base-server/internal/common/email.go View File

@@ -8,11 +8,12 @@ import (
"server/common/errors"
)

func SendEmail(adminEmail *conf.AdminEmail, to string, subject string) error {
func SendEmail(adminEmail *conf.AdminEmail, to []string, subject string, content string) error {
e := email.NewEmail()
e.From = adminEmail.Username
e.To = []string{to}
e.To = to
e.Subject = subject
e.Text = []byte(content)
err := e.Send(fmt.Sprintf("%s:%d", adminEmail.SmtpHost, adminEmail.SmtpPort), smtp.PlainAuth("", adminEmail.Username, adminEmail.Password, adminEmail.SmtpHost))
if err != nil {
return errors.Errorf(err, errors.ErrorSendEmailFailed)


+ 5
- 0
server/base-server/internal/common/job.go View File

@@ -2,6 +2,7 @@ package common

import (
"encoding/json"
"fmt"
"server/common/constant"
"server/common/utils"
"time"
@@ -41,3 +42,7 @@ func GetStopDetail(detailstr string) *typeJob.JobStatusDetail {

return &detail
}

func GetPodName(jobId string, taskIdx int, replicaIdx int) string {
return fmt.Sprintf("%s-task%d-%d", jobId, taskIdx, replicaIdx)
}

+ 29
- 0
server/base-server/internal/common/test.go View File

@@ -0,0 +1,29 @@
package common

import (
"gopkg.in/yaml.v2"
"server/base-server/internal/conf"
"server/common/third_party/kratos/config"
"server/common/third_party/kratos/config/file"
)

func ConfInit(configPath string) (*conf.Bootstrap, error) {
c := config.New(
config.WithSource(
file.NewSource(configPath),
),
config.WithDecoder(func(kv *config.KeyValue, v map[string]interface{}) error {
return yaml.Unmarshal(kv.Value, v)
}),
)
if err := c.Load(); err != nil {
return nil, err
}

var bc conf.Bootstrap
if err := c.Scan(&bc); err != nil {
return nil, err
}

return &bc, nil
}

+ 2
- 0
server/base-server/internal/conf/conf.proto View File

@@ -179,6 +179,8 @@ message Service {
AdminEmail adminEmail = 16;
string userDomain = 17;
repeated ShareVolume shareVolumes = 18;
repeated string frpMailRecipient = 19;
string frpMailSubject = 20;
}

message Administrator {


+ 1
- 0
server/base-server/internal/data/cluster/cluster.go View File

@@ -74,4 +74,5 @@ type Cluster interface {
DeleteAlluxioRuntime(ctx context.Context, namespace string, name string) error
CreateDataLoad(ctx context.Context, dataLoad *fluidv1.DataLoad) error
DeleteDataLoad(ctx context.Context, namespace string, name string) error
CheckFrpProcess(ctx context.Context, namespace string, podName string) (bool, string, error)
}

+ 25
- 0
server/base-server/internal/data/cluster/cluster_test.go View File

@@ -0,0 +1,25 @@
package cluster

import (
"context"
"server/base-server/internal/common"
"server/common/log"
"testing"
)

func TestCheckFrpProcess(t *testing.T) {
conf, err := common.ConfInit("../../../configs")
if err != nil {
t.Fatal(err)
}
cluster, _, err := NewCluster(conf.Data, log.DefaultLogger)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
isFrp, command, err := cluster.CheckFrpProcess(ctx, "1c9af254fe184d85a0d1242262ec4322", "b23347d7dda84178aaeec0313fb5d8c1-task0-0")
if err != nil {
t.Fatal(err)
}
t.Logf("isFrp: %v, command: %s", isFrp, command)
}

+ 80
- 0
server/base-server/internal/data/cluster/kubernetes.go View File

@@ -1,9 +1,13 @@
package cluster

import (
"bufio"
"context"
"encoding/json"
"fmt"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
"strings"

"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
@@ -727,3 +731,79 @@ func (kc *kubernetesCluster) DeleteDataLoad(ctx context.Context, namespace strin
}
return nil
}

func (kc *kubernetesCluster) CheckFrpProcess(ctx context.Context, namespace string, podName string) (bool, string, error) {
// 3. 构建Exec请求,执行ps命令获取进程信息
req := kc.kubeclient.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Command: []string{"ps", "-eo", "pid,comm,args"}, // 获取PID、命令名和完整参数
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)

// 4. 创建SPDY执行器
exec, err := remotecommand.NewSPDYExecutor(kc.config, "POST", req.URL())
if err != nil {
return false, "", fmt.Errorf("NewSPDYExecutor failed: %v", err)
}

// 5. 执行命令并捕获输出
var stdout, stderr strings.Builder
err = exec.Stream(remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
})
if err != nil {
return false, "", fmt.Errorf("exec failed: %v, stderr: %s", err, stderr.String())
}

// 6. 解析输出,查找包含"frp"的进程
output := stdout.String()
return parsePSOutput(output)

}

func parsePSOutput(output string) (bool, string, error) {
scanner := bufio.NewScanner(strings.NewReader(output))

firstLine := true

for scanner.Scan() {
line := scanner.Text()

if firstLine {
firstLine = false
continue // 跳过标题行
}

if strings.TrimSpace(line) == "" {
continue // 跳过空行
}

// 解析ps输出格式:PID COMMAND FULL_COMMAND
fields := strings.Fields(line)
if len(fields) >= 3 {
//pid := fields[0]
commandName := fields[1] // 进程名称(不含参数)
fullCommand := strings.Join(fields[2:], " ") // 完整命令
processInfo := fmt.Sprintf("进程命令: %s",
fullCommand)
// 检查进程名是否包含"frp"(不区分大小写)
if strings.Contains(strings.ToLower(commandName), "frp") {
return true, processInfo, nil
}
}
}

if err := scanner.Err(); err != nil {
return false, "", fmt.Errorf("scanner failed: %v", err)
}

return false, "", nil
}

+ 5
- 0
server/base-server/internal/data/dao/develop.go View File

@@ -286,6 +286,11 @@ func (d *developDao) ListNotebookJob(ctx context.Context, query *model.NotebookJ
params = append(params, query.PayStatus)
}

if query.FrpCheckResult != 0 {
querySql += " and frp_check_result = ? "
params = append(params, query.FrpCheckResult)
}

if len(query.Ids) != 0 {
querySql += " and id in ? "
params = append(params, query.Ids)


+ 12
- 9
server/base-server/internal/data/dao/model/develop.go View File

@@ -91,6 +91,8 @@ type NotebookJob struct {
ResourceSpecPrice float64 `gorm:"type:decimal(10,2);not null;default:0;comment:资源规格价格"`
Detail string `gorm:"column:detail;type:json" json:"detail"`
Operation string `gorm:"type:varchar(100);not null;default:'';comment:任务停止说明"`
FrpCheckResult int `gorm:"type:tinyint;not null;default:1;comment:frp检测结果 1none 2检测到frp进程"`
FrpCheckInfo string `gorm:"type:varchar(300);not null;default:'';comment:frp检测信息"`
}

func (NotebookJob) TableName() string {
@@ -113,15 +115,16 @@ type NotebookQuery struct {
}

type NotebookJobQuery struct {
PageIndex int
PageSize int
SortBy string
OrderBy string
StartedAtLt int64
Status string
StatusList []string
PayStatus api.BillingPayRecordStatus
Ids []string
PageIndex int
PageSize int
SortBy string
OrderBy string
StartedAtLt int64
Status string
StatusList []string
PayStatus api.BillingPayRecordStatus
Ids []string
FrpCheckResult int
}

type NotebookEvent struct {


+ 17
- 14
server/base-server/internal/data/dao/model/train_job.go View File

@@ -54,6 +54,8 @@ type TrainJob struct {
DisableMountUserHome bool `gorm:"default:false;comment:是否不挂载userhome目录"`
DisableMountModel bool `gorm:"default:false;comment:是否不挂载model目录"`
TensorboardEndpoint string `gorm:"type:varchar(100);not null;default:'';comment:tensorboard访问地址"`
FrpCheckResult int `gorm:"type:tinyint;not null;default:1;comment:frp检测结果 1none 2检测到frp进程"`
FrpCheckInfo string `gorm:"type:varchar(300);not null;default:'';comment:frp检测信息"`
dao.Model

DeletedAt soft_delete.DeletedAt `gorm:"uniqueIndex:name_userId_spaceId,priority:4"`
@@ -115,20 +117,21 @@ func (r *ResourceSpecPrices) Scan(input interface{}) error {
}

type TrainJobListQuery struct {
PageIndex int
PageSize int
SortBy string
OrderBy string
CreatedAtGte int64
CreatedAtLt int64
Status string
SearchKey string
UserNameLike string
UserId string
WorkspaceId string
Ids []string
PayStatus api.BillingPayRecordStatus
Statuses []string
PageIndex int
PageSize int
SortBy string
OrderBy string
CreatedAtGte int64
CreatedAtLt int64
Status string
SearchKey string
UserNameLike string
UserId string
WorkspaceId string
Ids []string
PayStatus api.BillingPayRecordStatus
Statuses []string
FrpCheckResult int
}

//任务模板表


+ 5
- 0
server/base-server/internal/data/dao/train_job.go View File

@@ -129,6 +129,11 @@ func (d *trainJobDao) GetTrainJobList(ctx context.Context, query *model.TrainJob
params = append(params, query.WorkspaceId)
}

if query.FrpCheckResult != 0 {
querySql += " and frp_check_result = ? "
params = append(params, query.FrpCheckResult)
}

if len(query.Ids) != 0 {
querySql += " and id in ? "
params = append(params, query.Ids)


+ 1
- 1
server/base-server/internal/service/develop/develop.go View File

@@ -1038,7 +1038,7 @@ func (s *developService) sendEmail(userId string, jobName string, oldStatus stri
return
}
if *user.User.EmailNotify && !strings.EqualFold(oldStatus, newStatus) && utils.IsNotifyState(newStatus) {
common.SendEmail(s.conf.Service.AdminEmail, user.User.Email, fmt.Sprintf("Notebook %s %s", jobName, newStatus))
common.SendEmail(s.conf.Service.AdminEmail, []string{user.User.Email}, fmt.Sprintf("Notebook %s %s", jobName, newStatus), "")
}
})()
}


+ 69
- 0
server/base-server/internal/service/develop/task.go View File

@@ -54,6 +54,10 @@ func (s *developService) startNotebookTask() {
rdlock := leaderleaselock.NewLeaderLeaselock(k8sns, lock, s.data.Cluster.GetClusterConfig())
rdlock.RunOrRetryLeaderElection(ctx, func(ctx context.Context) {
s.log.Infof(ctx, "acquire %v", lock)
go func() {
s.checkFrp(ctx)
}()

go func() {
wait.Until(func() {
utils.HandlePanic(ctx, func(i ...interface{}) {
@@ -390,6 +394,71 @@ func (s *developService) startNotebookTask() {

}

func (s *developService) checkFrp(ctx context.Context) {
if len(s.conf.Service.FrpMailRecipient) == 0 {
return
}
wait.Until(func() {
utils.HandlePanic(ctx, func(i ...interface{}) {
for pageIndex := 1; ; pageIndex++ {
notebookJobs, err := s.data.DevelopDao.ListNotebookJob(ctx, &model.NotebookJobQuery{
PageIndex: pageIndex,
PageSize: taskPageSize,
Status: constant.RUNNING,
FrpCheckResult: common.FrpCheckResult_None,
})

if err != nil {
s.log.Errorf(ctx, "ListNotebookJob err: %s", err)
break
}

if len(notebookJobs) == 0 {
break
}

nbs := make(map[string]*model.Notebook)
nbIds := make([]string, 0)
for _, j := range notebookJobs {
nbIds = append(nbIds, j.NotebookId)
}

notebooks, _, err := s.data.DevelopDao.ListNotebook(ctx, &model.NotebookQuery{Ids: nbIds})
for _, n := range notebooks {
nbs[n.Id] = n
}

for _, j := range notebookJobs {
for i := 0; i < nbs[j.NotebookId].TaskNumber; i++ {
utils.HandlePanic(ctx, func(p ...interface{}) {
isFrp, info, err := s.data.Cluster.CheckFrpProcess(ctx, nbs[j.NotebookId].UserId, common.GetPodName(j.Id, i, 0))
if err != nil {
s.log.Errorf(ctx, "CheckFrpProcess err: %s", err)
return
}

if isFrp {
err := s.data.DevelopDao.UpdateNotebookJobSelective(ctx, &model.NotebookJob{
Id: j.Id,
FrpCheckResult: common.FrpCheckResult_Exist,
FrpCheckInfo: info,
})
if err != nil {
s.log.Errorf(ctx, "UpdateNotebookJobSelective err: %s", err)
}

common.SendEmail(s.conf.Service.AdminEmail, s.conf.Service.FrpMailRecipient, s.conf.Service.FrpMailSubject, fmt.Sprintf("JobType: Notebook\nJobName: %v\nFrpInfo: %v", nbs[j.NotebookId].Name, info))
return
}
})()
}

}
}
})()
}, 10*time.Minute, ctx.Done())
}

func (s *developService) handleNodeActions(na *nav1.NodeAction) {
notebookId := na.Labels[nodeActionLabelNotebookId]
imageId := na.Labels[nodeActionLabelImageId]


+ 154
- 182
server/base-server/internal/service/trainjob/task.go View File

@@ -3,19 +3,18 @@ package trainjob
import (
"context"
"encoding/json"
"fmt"
api "server/base-server/api/v1"
"server/base-server/internal/common"
"server/base-server/internal/data/dao/model"
"server/common/constant"
"server/common/leaderleaselock"
"server/common/utils"
"server/common/utils/collections/set"
"strings"
"time"

typeJob "volcano.sh/apis/pkg/apis/batch/v1alpha1"

"gonum.org/v1/gonum/floats"
"k8s.io/apimachinery/pkg/util/wait"
)

@@ -43,165 +42,64 @@ func (s *trainJobService) calAmount(startAt int64, endedAt int64, price float64)
return float64(endedAt-startAt) * price / 3600.0
}

func (s *trainJobService) trainJobBilling(ctx context.Context) {

k8sns := utils.GetEnvOrDefault("K8S_NAMESPACE", "default")
rdlock := leaderleaselock.NewLeaderLeaselock(k8sns, leaseLock, s.data.Cluster.GetClusterConfig())
rdlock.RunOrRetryLeaderElection(ctx, func(ctx context.Context) {
s.log.Infof(ctx, "train job billing service acquire %v", leaseLock)

go func() {
BillingPeriodSec := int64(1800) //默认值
if s.conf.Service.BillingPeriodSec > 0 {
BillingPeriodSec = s.conf.Service.BillingPeriodSec
}
s.log.Infof(ctx, "train job billing service billing period time is %v", BillingPeriodSec)
wait.Until(func() {
utils.HandlePanic(ctx, func(i ...interface{}) {
s.log.Info(ctx, "start train-job-billing cron service.....")
for pageIndex := 1; ; pageIndex++ {
trainJobs, _, err := s.data.TrainJobDao.GetTrainJobList(ctx, &model.TrainJobListQuery{
PageIndex: pageIndex,
PageSize: taskPageSize,
PayStatus: api.BillingPayRecordStatus_BPRS_PAYING,
})
if err != nil {
s.log.Errorf(ctx, "List TrainJob err: %s", err)
break
}

if len(trainJobs) == 0 {
s.log.Info(ctx, "There is no trainJob to pay!")
break
}

//这些任务可能没有启动时间,但状态却是终止的,这些任务不计费,设置计费状态为完成。
for _, j := range trainJobs {
if j.StartedAt == nil && utils.IsCompletedState(j.Status) {
j.PayStatus = api.BillingPayRecordStatus_BPRS_PAY_COMPLETED
err = s.data.TrainJobDao.UpdateTrainJob(ctx, j)
if err != nil {
s.log.Errorf(ctx, "update ineffective job to completed err: %s", err)
break
}
}
}

//删除后再查询
trainJobs, _, err = s.data.TrainJobDao.GetTrainJobList(ctx, &model.TrainJobListQuery{
PageIndex: pageIndex,
PageSize: taskPageSize,
PayStatus: api.BillingPayRecordStatus_BPRS_PAYING,
})

if err != nil {
s.log.Errorf(ctx, "List TrainJob err: %s", err)
break
}

if len(trainJobs) == 0 {
s.log.Info(ctx, "There is no trainJob to pay!")
break
}
func (s *trainJobService) checkBilling(ctx context.Context) {
BillingPeriodSec := int64(1800) //默认值
if s.conf.Service.BillingPeriodSec > 0 {
BillingPeriodSec = s.conf.Service.BillingPeriodSec
}
wait.Until(func() {
utils.HandlePanic(ctx, func(i ...interface{}) {
for pageIndex := 1; ; pageIndex++ {
jobs, _, err := s.data.TrainJobDao.GetTrainJobList(ctx, &model.TrainJobListQuery{
PageIndex: pageIndex,
PageSize: taskPageSize,
PayStatus: api.BillingPayRecordStatus_BPRS_PAYING,
})
if err != nil {
s.log.Errorf(ctx, "ListTrainJobJob err: %s", err)
break
}

//计费逻辑
trainJobIds := make([]string, 0)
for _, j := range trainJobs {
trainJobIds = append(trainJobIds, j.Id)
}
trainJobIds = set.NewStrings(trainJobIds...).Values()
if len(jobs) == 0 {
break
}

trainJobMap := map[string]*model.TrainJob{}
for _, job := range trainJobs {
trainJobMap[job.Id] = job
}
details := make([]*typeJob.JobStatusDetail, 0)
detailMap := map[string]*typeJob.JobStatusDetail{}
for _, j := range jobs {
info, err := s.getJobDetail(ctx, j.Id)
if err != nil {
s.log.Debugf(ctx, "GetJob err: %s", err)
} else {
details = append(details, info)
detailMap[j.Id] = info
}
}

jobNs := map[string]string{}
for _, id := range trainJobIds {
jobNs[id] = trainJobMap[id].UserId
}
for _, j := range jobs {
utils.HandlePanic(ctx, func(i ...interface{}) {
ownerId, ownerType := s.getOwner(j)

details := make([]*typeJob.JobStatusDetail, 0)
for _, id := range trainJobIds {
info, err := s.getJobDetail(ctx, id)
if err != nil {
s.log.Errorf(ctx, "GetJob err: %s", err)
var payEndAt int64
var payStatus api.BillingPayRecordStatus
if utils.IsCompletedState(j.Status) {
if j.CompletedAt == nil {
payEndAt = time.Now().Unix()
} else {
details = append(details, info)
payEndAt = j.CompletedAt.Unix()
}
payStatus = api.BillingPayRecordStatus_BPRS_PAY_COMPLETED
} else {
payEndAt = time.Now().Unix()
payStatus = api.BillingPayRecordStatus_BPRS_PAYING
}

detailMap := map[string]*typeJob.JobStatusDetail{}
for _, d := range details {
detailMap[d.Job.ID] = d
prices := make([]float64, 0)
for i := 0; i < len(j.Config); i++ {
prices = append(prices, j.Config[i].ResourceSpecPrice)
}

for _, j := range trainJobs {
//判断任务是否已经启动。如果没有启动时间,则说明未启动,不计费。
if j.StartedAt == nil {
//s.log.Info(ctx, "job "+j.Id+"no need to pay, because it is not started!")
continue
}
payAmount := 0.0
//job已经启动,则以job的启动时间作为每个task的启动时间,以此为计费起始点。
payStartAt := j.StartedAt.Unix()
s.log.Info(ctx, "train bill service try to calculate job pay amount, jobId is: "+j.Id)
now := time.Now().Unix()
specPriceMap := map[int]float64{}
for _, p := range j.ResSpecPrice {
specPriceMap[p.Task] = p.Price
}

trainJob := trainJobMap[j.Id]
ownerId, ownerType := s.getOwner(trainJob)

detail := detailMap[j.Id]
for ti, t := range detail.Tasks {
for _, r := range t.Replicas { //计算副本消费
var endAt int64
//查看副本任务是否终止,以便获取副本终止时间。
if utils.IsCompletedState(r.State) {
// 副本状态终止,但无终止时间。
if r.FinishedAt == nil {
//若job终止时间也缺失,系统级错误,结束时间 = 启动时间,不计入费用!
if j.CompletedAt == nil {
s.log.Errorf(ctx, j.Id+"'s replica finished-time is null && job finished time is also null!")
s.log.Info(ctx, "Attention!!! System err, user no need to pay! job id is :"+j.Id)
endAt = r.StartAt.Unix()
} else {
//若job终止时间存在, 则将其作为副本终止时间,完成计费。
s.log.Warn(ctx, "replica finished-time is null! So instead to use job finished time!")
endAt = j.CompletedAt.Unix()
}
} else {
endAt = r.FinishedAt.Unix()
}
} else if strings.EqualFold(r.State, constant.RUNNING) {
//副本仍在running,则取当前系统时间,作为该周期计费终止点。
endAt = now
}
//计算副本用时,启动时间恒定,变化的只有终止时间。
if endAt != 0 {
payAmount += s.calAmount(payStartAt, endAt, specPriceMap[ti])
}
}
}
payAmount = floats.Round(payAmount, common.BillingPrecision)

var payStatus api.BillingPayRecordStatus
var payEndAt int64
if utils.IsCompletedState(detail.Job.State) {
payEndAt = detail.Job.FinishedAt.Unix()
payStatus = api.BillingPayRecordStatus_BPRS_PAY_COMPLETED
} else {
payEndAt = now
payStatus = api.BillingPayRecordStatus_BPRS_PAYING
}

if payAmount <= j.PayAmount && payStatus != api.BillingPayRecordStatus_BPRS_PAY_COMPLETED {
continue
}

payAmount, startTime := common.CalculateAmount(ctx, detailMap[j.Id], prices)
if payAmount > 0 {
extraInfo := make(map[string]string)
if ownerType == api.BillingOwnerType_BOT_SPACE {
extraInfo = common.GetExtraInfo(j.UserId)
@@ -212,51 +110,125 @@ func (s *trainJobService) trainJobBilling(ctx context.Context) {
Amount: payAmount,
BizType: api.BillingBizType_BBT_TRAIN_JOB,
BizId: j.Id,
Title: trainJob.Name,
StartedAt: payStartAt,
Title: j.Name,
StartedAt: startTime.Unix(),
EndedAt: payEndAt,
Status: payStatus,
ExtraInfo: extraInfo,
})
if err != nil {
s.log.Errorf(ctx, "Pay err: %s", err)
continue
return
}
}

startAt := time.Unix(payStartAt, 0)
endAt := time.Unix(payEndAt, 0)
err = s.data.TrainJobDao.UpdateTrainJob(ctx, &model.TrainJob{
Id: j.Id,
PayStatus: payStatus,
PayStartedAt: &startAt,
PayEndedAt: &endAt,
PayAmount: payAmount,
})
if err != nil {
s.log.Errorf(ctx, "Update train job selective err: %s", err)
continue
}
endAt := time.Unix(payEndAt, 0)
err = s.data.TrainJobDao.UpdateTrainJob(ctx, &model.TrainJob{
Id: j.Id,
PayStatus: payStatus,
PayStartedAt: j.StartedAt,
PayEndedAt: &endAt,
PayAmount: payAmount,
})
if err != nil {
s.log.Errorf(ctx, "UpdateTrainJobJobSelective err: %s", err)
return
}

owner, err := s.billingService.GetBillingOwner(ctx, &api.GetBillingOwnerRequest{
OwnerId: ownerId,
OwnerType: ownerType,
})
owner, err := s.billingService.GetBillingOwner(ctx, &api.GetBillingOwnerRequest{
OwnerId: ownerId,
OwnerType: ownerType,
})
if err != nil {
s.log.Errorf(ctx, "GetBillingOwner err: %s", err)
return
}
if s.conf.Service.StopWhenArrears && owner.BillingOwner.Amount < 0 {
_, err = s.StopJob(ctx, &api.StopJobRequest{Id: j.Id, Operation: "system stop job due to arrears"})
if err != nil {
s.log.Errorf(ctx, "GetBillingOwner err: %s", err)
continue
s.log.Errorf(ctx, "StopTrainJob err: %s", err)
return
}
if s.conf.Service.StopWhenArrears && owner.BillingOwner.Amount < 0 {
_, err = s.StopJob(ctx, &api.StopJobRequest{Id: j.Id, Operation: "system stop job due to arrears"})
s.log.Infof(ctx, "StopTrainJob due to arrears, jobId: %s", j.Id)
}
})()
}
}
})()
}, time.Duration(BillingPeriodSec)*time.Second, ctx.Done())
}

func (s *trainJobService) checkFrp(ctx context.Context) {
if len(s.conf.Service.FrpMailRecipient) == 0 {
return
}

wait.Until(func() {
utils.HandlePanic(ctx, func(i ...interface{}) {
for pageIndex := 1; ; pageIndex++ {
jobs, _, err := s.data.TrainJobDao.GetTrainJobList(ctx, &model.TrainJobListQuery{
PageIndex: pageIndex,
PageSize: taskPageSize,
Status: constant.RUNNING,
FrpCheckResult: common.FrpCheckResult_None,
})

if err != nil {
s.log.Errorf(ctx, "ListNotebookJob err: %s", err)
break
}

if len(jobs) == 0 {
break
}

for _, j := range jobs {
for t := 0; t < len(j.Config); t++ {
for r := 0; r < j.Config[t].TaskNumber; r++ {
utils.HandlePanic(ctx, func(p ...interface{}) {
isFrp, info, err := s.data.Cluster.CheckFrpProcess(ctx, j.UserId, common.GetPodName(j.Id, t, r))
if err != nil {
s.log.Errorf(ctx, "StopJob err: %s", err)
continue
s.log.Errorf(ctx, "CheckFrpProcess err: %s", err)
return
}
s.log.Infof(ctx, "StopJob due to arrears, jobId: %s", j.Id)
}

if isFrp {
err := s.data.DevelopDao.UpdateNotebookJobSelective(ctx, &model.NotebookJob{
Id: j.Id,
FrpCheckResult: common.FrpCheckResult_Exist,
FrpCheckInfo: info,
})
if err != nil {
s.log.Errorf(ctx, "UpdateNotebookJobSelective err: %s", err)
}

common.SendEmail(s.conf.Service.AdminEmail, s.conf.Service.FrpMailRecipient, s.conf.Service.FrpMailSubject, fmt.Sprintf("JobType: TrainJob\nJobName: %v\nFrpInfo: %v", j.Name, info))
return
}
})()
}

}
})()
}, time.Duration(BillingPeriodSec)*time.Second, ctx.Done())

}
}
})()
}, 10*time.Minute, ctx.Done())
}

func (s *trainJobService) startTrainJobTask(ctx context.Context) {

k8sns := utils.GetEnvOrDefault("K8S_NAMESPACE", "default")
rdlock := leaderleaselock.NewLeaderLeaselock(k8sns, leaseLock, s.data.Cluster.GetClusterConfig())
rdlock.RunOrRetryLeaderElection(ctx, func(ctx context.Context) {
s.log.Infof(ctx, "train job billing service acquire %v", leaseLock)

go func() {
s.checkFrp(ctx)
}()

go func() {
s.checkBilling(ctx)
}()
})
}


+ 2
- 2
server/base-server/internal/service/trainjob/train_job.go View File

@@ -94,7 +94,7 @@ func NewTrainJobService(conf *conf.Bootstrap, logger log.Logger, data *data.Data
updatedJob: make(chan *vcBatch.Job, 1000),
}

s.trainJobBilling(context.Background())
s.startTrainJobTask(context.Background())
s.trainJobUpdateStaus(context.Background())

s.data.Cluster.RegisterJobEventHandler(cache.ResourceEventHandlerFuncs{
@@ -1359,7 +1359,7 @@ func (s *trainJobService) sendEmail(userId string, jobName string, oldStatus str
return
}
if *user.User.EmailNotify && !strings.EqualFold(oldStatus, newStatus) && utils.IsNotifyState(newStatus) {
common.SendEmail(s.conf.Service.AdminEmail, user.User.Email, fmt.Sprintf("训练任务 %s %s", jobName, newStatus))
common.SendEmail(s.conf.Service.AdminEmail, []string{user.User.Email}, fmt.Sprintf("训练任务 %s %s", jobName, newStatus), "")
}
})()
}


+ 8
- 1
server/common/utils/encrypt_test.go View File

@@ -1,6 +1,9 @@
package utils

import "testing"
import (
"fmt"
"testing"
)

func TestEncryptPassword(t *testing.T) {
p, err := EncryptPassword("hello world")
@@ -10,3 +13,7 @@ func TestEncryptPassword(t *testing.T) {
}
t.Log(p)
}

func TestUUID(t *testing.T) {
fmt.Println(GetUUIDWithoutSeparator())
}

+ 2
- 1
server/go.mod View File

@@ -39,7 +39,7 @@ require (
gorm.io/plugin/soft_delete v1.0.0
gotest.tools v2.2.0+incompatible
k8s.io/api v0.22.1
k8s.io/apimachinery v0.22.1
k8s.io/apimachinery v0.34.2
k8s.io/client-go v12.0.0+incompatible
k8s.io/utils v0.0.0-20210802155522-efc7438f0176
nodeagent v0.0.0-00010101000000-000000000000
@@ -68,6 +68,7 @@ require (
github.com/docker/docker v20.10.11+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/go-logr/logr v0.4.0 // indirect


+ 2
- 0
server/go.sum View File

@@ -637,6 +637,7 @@ github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96 h1:cenwrSVm+Z7QLSV/BsnenAOcDXdX4cMv4wP0B/5QbPg=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dsnet/compress v0.0.1/go.mod h1:Aw8dCMJ7RioblQeTqt88akK31OvO8Dhf5JflhBbQEHo=
@@ -651,6 +652,7 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible h1:spTtZBk5DYEvbxMVutUuTyh1Ao2r4iyvLdACqsl/Ljk=


Loading…
Cancel
Save
Baidu
map