13 Commits

Author SHA1 Message Date
  liaowsh 2646ccc551 merge conflict 3 weeks ago
  xiongkai 5f0f3cb2d7 Merge pull request 'urchin2.0记录dataUrl' (#1350) from urchin2.0 into V20251113 3 weeks ago
  lewis cbf7cdf8f5 dataUrl 3 weeks ago
  lewis b97e91e860 debug 3 weeks ago
  lewis 8063175818 debug 3 weeks ago
  xiongkai 6c70fbbc2a Merge pull request 'const中iota的使用bug' (#1344) from urchin2.0 into V20251113 3 weeks ago
  lewis 591f3f7a39 debug 3 weeks ago
  lewis 05a3cda0a9 iota 3 weeks ago
  lewis 3645d02d90 debug 3 weeks ago
  xiongkai 142e651968 Merge pull request '解决internalMigrateId脏读脏写问题' (#1328) from fix-1326 into V20251113 4 weeks ago
  xiongkai f9cb4a8176 Merge pull request '解决sugon-ai再次调试后code目录为空的问题' (#1329) from fix-sugon-ai-restart into V20251113 4 weeks ago
  lewis dfd0c8d40c #1326 4 weeks ago
  lewis e38dd2a563 #1245 1 month ago
9 changed files with 94 additions and 73 deletions
Split View
  1. +8
    -9
      server/adapter/internal/service/sugon_ai/sugon_ai.go
  2. +4
    -1
      server/bizserver/api/v1/otjob/otJob.proto
  3. +1
    -0
      server/bizserver/internal/data/dao/data_schedule_record/model.go
  4. +2
    -0
      server/bizserver/internal/data/dao/otjob/model.go
  5. +13
    -5
      server/bizserver/internal/service/otjob/ot_job.go
  6. +1
    -0
      server/common/consts/const.go
  7. +17
    -11
      server/common/urchin_new/urchin_new.go
  8. +2
    -2
      server/common/utils/urchin.go
  9. +46
    -45
      server/ot-provider/pkg/provider/pod.go

+ 8
- 9
server/adapter/internal/service/sugon_ai/sugon_ai.go View File

@@ -467,7 +467,7 @@ func (o *sugonAdapter) CreateNotebook(ctx context.Context, req *v1.CreateNoteboo
req.Command += "cp -rf " + notebookTempLocalPreTrainModelPath + "/* " + notebookLocalPreTrainModelPath + ";"
}

remoteOutputPrefix := consts.RemoteOutputPrefix
//remoteOutputPrefix := consts.RemoteOutputPrefix
//code mount
if req.CodeInfo != "" {
var codeInfo DataUrl
@@ -487,19 +487,17 @@ func (o *sugonAdapter) CreateNotebook(ctx context.Context, req *v1.CreateNoteboo
req.Command = utils.SetEnvProfile(constant.EnvCodeNeedUnzip, constant.UnNeedUnzip) + req.Command
}

log.Infof(ctx, "req SourcePath: %s", req.CodeUrl)
_, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts = append(mounts, MountInfo{
Permission: permissionReadWrite,
TargetPath: notebookTempLocalCodePath,
SourcePath: "/" + strings.TrimSuffix(objectKey, "master.zip"), // todo: 后续考虑优化
})
log.Infof(ctx, "code SourcePath: %s.", "/"+strings.TrimSuffix(objectKey, "master.zip"))
req.Command += "cp -rf " + notebookTempLocalCodePath + "/* " + notebookLocalCodePath + ";"

if strings.Contains(objectKey, "jobs/") {
remoteOutputPrefix = "jobs/"
}
//if strings.Contains(objectKey, "jobs/") {
// remoteOutputPrefix = "jobs/"
//}
}

basePath := getBasePath(req.CodeUrl)
@@ -517,10 +515,11 @@ func (o *sugonAdapter) CreateNotebook(ctx context.Context, req *v1.CreateNoteboo
mounts = append(mounts, MountInfo{
Permission: permissionReadWrite,
TargetPath: notebookLocalOutputPath,
SourcePath: basePath + consts.RemoteCacheBucket + "/" + remoteOutputPrefix + req.GrampusJobName,
SourcePath: req.OutputDataUrl,
//SourcePath: basePath + consts.RemoteCacheBucket + "/" + remoteOutputPrefix + req.GrampusJobName,
})
log.Infof(ctx, "output SourcePath: %s.", basePath+consts.RemoteCacheBucket+"/"+remoteOutputPrefix+req.GrampusJobName)
req.Command += "rm -rf " + notebookLocalOutputPath + "/tmp"
//req.Command += "rm -rf " + notebookLocalOutputPath + "/tmp"

imageUrls := strings.Split(req.ImageUrl, "/")
if len(imageUrls) < 2 {


+ 4
- 1
server/bizserver/api/v1/otjob/otJob.proto View File

@@ -292,7 +292,8 @@ message UpdateOtJobRequest {
string ptoken =20;
repeated string selfDomains = 21;
string outputDataId =22;
string outputDataUrl =23;
string outputDataBucket =23;
string outputDataKey =24;
}

message UpdateOtJobReply {
@@ -385,6 +386,7 @@ message GetOtJobDataMigrateInfoReply{
string failedReason = 9;
int32 migrateTaskId = 10;
string dataId = 11;
string destDataUrl = 12;
}

message UpdateOtJobDataMigrateInfoRequest {
@@ -401,6 +403,7 @@ message UpdateOtJobDataMigrateInfoRequest {
int32 migrateTaskId = 11;
uint64 id = 12;
string destCenterId = 13;
string destDataUrl= 14;
}

message UpdateOtJobDataMigrateInfoReply{


+ 1
- 0
server/bizserver/internal/data/dao/data_schedule_record/model.go View File

@@ -18,6 +18,7 @@ type DataScheduleRecord struct {
DestProxy string `gorm:"type:varchar(100)"`
DestBucket string `gorm:"type:varchar(100)"`
DestObjectKey string `gorm:"type:varchar(100)"`
DestDataUrl string `gorm:"type:varchar(100)"`
FailedReason string `gorm:"type:varchar(1024)"`
Type int `gorm:"type:tinyint(1);not null;default:0;comment:'0:回传,1:迁移'"`
DataID string `gorm:"type:varchar(100);index"`


+ 2
- 0
server/bizserver/internal/data/dao/otjob/model.go View File

@@ -105,6 +105,8 @@ type Output struct {
ContainerPath string `json:"containerPath"`
GetBackEndpoint string `json:"getBackEndpoint"`
IsNeedTensorboard bool `json:"isNeedTensorboard"`
SourceBucket string `json:"sourceBucket"`
SourceKey string `json:"sourceKey"`
}

type Dataset struct {


+ 13
- 5
server/bizserver/internal/service/otjob/ot_job.go View File

@@ -345,6 +345,11 @@ func (s *service) submitJob(ctx context.Context, job *otjobDao.OtJob, action str
if err != nil {
return err
}

if err = s.data.OtJobDao.UpdateOtJob(ctx, job); err != nil {
log.Errorf(ctx, "UpdateOtJob(%s) failed:%v", job.Id, err)
return err
}
}

var centerJobID string
@@ -409,6 +414,7 @@ func (s *service) submitJob(ctx context.Context, job *otjobDao.OtJob, action str
consts.AnnotationsEndPoint: i.SelfEndPoint,
consts.AnnotationsTaskID: i.TaskId,
consts.AnnotationsModelID: i.ModelId,
consts.AnnotationsNewUrchinServerUrl: s.conf.Service.NewUrchinServer,
},
},
Spec: v1.PodSpec{
@@ -1953,11 +1959,8 @@ func (s *service) getBackModel(ctx context.Context, job *otjobDao.OtJob) {
record.SrcObjectKey = consts.RemoteOutputPrefix + job.Name + "/"
record.SrcBucket = strings.ReplaceAll(strings.TrimPrefix(consts.SugonBucketPrefix, "/"), "/", "-") + consts.RemoteCacheBucket
} else if job.AICenter == consts.CenterSugonAI {
record.SrcObjectKey = consts.RemoteOutputPrefix + job.Name + "/"
if strings.Contains(job.Tasks[0].Code.ObjectKey, "jobs/") {
record.SrcObjectKey = "jobs/" + job.Name + "/"
}
record.SrcBucket = strings.ReplaceAll(strings.TrimPrefix(consts.SugonAIBucketPrefix, "/"), "/", "-") + consts.RemoteCacheBucket
record.SrcObjectKey = job.Tasks[0].Output.SourceKey
record.SrcBucket = strings.ReplaceAll(strings.TrimSuffix(strings.TrimPrefix(job.Tasks[0].Output.SourceBucket, "/"), "/"), "/", "-")
}

// schedule
@@ -3182,6 +3185,11 @@ func (s *service) UpdateOtJob(ctx context.Context, req *pb.UpdateOtJobRequest) (
job.Tasks[0].SelfDomains = req.SelfDomains
}

if req.OutputDataBucket != "" && req.OutputDataKey != "" && job.Tasks[0].Output != nil {
job.Tasks[0].Output.SourceBucket = req.OutputDataBucket
job.Tasks[0].Output.SourceKey = req.OutputDataKey
}

//set status_detail
var statusDetail string
if req.ExitDiagnostics != "" && consts.IsCompletedState(req.Status) {


+ 1
- 0
server/common/consts/const.go View File

@@ -59,6 +59,7 @@ const (
AnnotationsTaskID = "TaskID" //目前只要sugon_ai使用
AnnotationsModelID = "ModelID"
AnnotationsServiceModel = "ServiceModel"
AnnotationsNewUrchinServerUrl = "NewUrchinServerUrl"

TolerationKey = "virtual-kubelet.io/provider"
TolerationValue = "grampus"


+ 17
- 11
server/common/urchin_new/urchin_new.go View File

@@ -18,26 +18,32 @@ const (
codeAlreadyExist = 10001
codeRepeat = 10100

DataStatusInit = iota
DataStatusUploading
DataStatusCopying
DataStatusSucceed
DataStatusOperating
DataStatusDeleted
DataStatusUploadFailed
DataStatusCopyFailed

DataTypeFile = 1
DataTypeDir = 2

LockData = "lock"
UnLockData = "unlock"
)

const (
TaskStatusInit = iota
TaskStatusProcessing
TaskStatusCompleted
TaskStatusCanceled
)

LockData = "lock"
UnLockData = "unlock"
const (
DataStatusInit = iota
DataStatusUploading
DataStatusCopying
DataStatusSucceed
DataStatusOperating
DataStatusDeleted
DataStatusUploadFailed
DataStatusCopyFailed
)

const (
TaskResultInit = iota
TaskResultSucceed
TaskResultFailed


+ 2
- 2
server/common/utils/urchin.go View File

@@ -53,8 +53,8 @@ func ScheduleStorage(endPoint, bucket, objectKey, dstPeer string, isOverwrite bo
select {
case result := <-ch:
if result.Err != nil {
log.Errorf(ctx, "CheckScheduleTaskStatusByKey(%s) failed:%v", objectKey, result.Err)
return isExist, dataUrl, destBucket, destKey, taskId, fmt.Errorf("(%s)CheckScheduleTaskStatusByKey failed:dstPeer(%s), objectKey(%s), endPoint(%s), bucket(%s), ScheduleDataToPeer failed:%v", consts.AlarmModuleUrchin, dstPeer, objectKey, endPoint, bucket, result.Err)
log.Errorf(ctx, "ScheduleDataToPeer(%s) failed:%v", objectKey, result.Err)
return isExist, dataUrl, destBucket, destKey, taskId, fmt.Errorf("(%s)ScheduleDataToPeer failed:dstPeer(%s), objectKey(%s), endPoint(%s), bucket(%s), ScheduleDataToPeer failed:%v", consts.AlarmModuleUrchin, dstPeer, objectKey, endPoint, bucket, result.Err)
}

res := result.Res


+ 46
- 45
server/ot-provider/pkg/provider/pod.go View File

@@ -79,14 +79,15 @@ type scheduleRequest struct {
}

type scheduleResponse struct {
IsAllExist bool
DatasetUrl string
ModelUrl string
CodeUrl string
CodeName string
DestProxy string
OutputDataId string //urchin2.0
OutputDataUrl string //urchin2.0
IsAllExist bool
DatasetUrl string
ModelUrl string
CodeUrl string
CodeName string
DestProxy string
OutputDataId string //urchin2.0
OutputDataBucket string //urchin2.0
OutputDataKey string //urchin2.0
}

const (
@@ -250,6 +251,9 @@ func (c *ClusterClient) migrateData(ctx context.Context, data *Dataset, jobId, s
ServerUrl: res.DestProxy,
UserToken: userToken,
})
} else {
isExist = true
dataUrl = res.DestDataUrl
}
}
}
@@ -288,6 +292,7 @@ func (c *ClusterClient) migrateData(ctx context.Context, data *Dataset, jobId, s
Status: int32(status),
DestObjectKey: destObject,
DestBucket: destBucket,
DestDataUrl: dataUrl,
MigrateTaskId: taskId,
DestCenterId: c.conf.Cluster.Id,
})
@@ -295,7 +300,7 @@ func (c *ClusterClient) migrateData(ctx context.Context, data *Dataset, jobId, s
return isExist, dataUrl, destProxy, nil
}

func (c *ClusterClient) handleDatasetModelStorageSchedule(jobId, scheduleType, datasetsInfo, modelsInfo, codeInfo, userId, userToken string) (*scheduleResponse, error) {
func (c *ClusterClient) handleDatasetModelStorageSchedule(jobId, scheduleType, datasetsInfo, modelsInfo, codeInfo, userId, userToken, newUrchinServer string) (*scheduleResponse, error) {
var res scheduleResponse
res.IsAllExist = true
var codeUrl, destProxy string
@@ -424,31 +429,29 @@ func (c *ClusterClient) handleDatasetModelStorageSchedule(jobId, scheduleType, d
go urchin_new.UpdateDataReference(ctx, &urchin_new.UpdateDataReferenceReq{
JobId: jobId,
UserId: userId,
ServerUrl: destProxy,
ServerUrl: newUrchinServer,
List: dataList,
UserToken: userToken,
})
}

//todo:创建模型输出对应的数据目录
// go create output dataset by jobName
//暂时先不实现?影响模型输出
/*
temp, err := urchin_new.CreateData(ctx, &urchin_new.CreateDataReq{
UserId: userId,
ServerUrl: destProxy,
TargetNodeId: c.conf.Cluster.Id,
Name: jobId,
UserToken: userToken,
})
if err != nil {
klog.Errorf("job(%s) Create output Data failed:%v", jobId, err)
return &res, err
}
temp, err := urchin_new.CreateData(ctx, &urchin_new.CreateDataReq{
UserId: userId,
ServerUrl: newUrchinServer,
TargetNodeId: c.conf.Cluster.Id,
Name: jobId,
UserToken: userToken,
})
if err != nil {
klog.Errorf("job(%s) Create output Data failed:%v", jobId, err)
//return &res, err
}

res.OutputDataId = temp.DataId
res.OutputDataUrl = temp.Info.Bucket + temp.Info.Location
*/
res.OutputDataId = temp.DataId
res.OutputDataBucket = temp.Info.Bucket
res.OutputDataKey = temp.Info.Location
}

return &res, nil
@@ -460,8 +463,9 @@ func (c *ClusterClient) handleStorageSchedule(ctx context.Context, pod *corev1.P
codeInfo := pod.Annotations[consts.AnnotationsCodeInfo]
jobName := pod.Annotations[consts.AnnotationsGrampusJobName]
jobId := pod.Annotations[consts.AnnotationsId]
newUrchinServer := pod.Annotations[consts.AnnotationsNewUrchinServerUrl]

res, err := c.handleDatasetModelStorageSchedule(jobId, scheduleTypeExecute, datasetsInfo, modelsInfo, codeInfo, userId, userToken)
res, err := c.handleDatasetModelStorageSchedule(jobId, scheduleTypeExecute, datasetsInfo, modelsInfo, codeInfo, userId, userToken, newUrchinServer)
if err != nil {
klog.Errorf("jobId(%s), handleDatasetModelStorageSchedule failed:%v", jobId, err)
c.updatePodTerminatedInfo(pod, consts.ExitCodeFailed, "handleDatasetModelStorageSchedule failed", err.Error())
@@ -473,7 +477,7 @@ func (c *ClusterClient) handleStorageSchedule(ctx context.Context, pod *corev1.P

if res.IsAllExist {
klog.Infof("(%s)isAllExist true", jobId)
pod.Annotations[consts.AnnotationsOutputUrl] = res.OutputDataUrl
pod.Annotations[consts.AnnotationsOutputUrl] = res.OutputDataBucket + res.OutputDataKey
if pod.Annotations[consts.AnnotationsProcessType] == consts.ProcessTypeNPU {
pod.Annotations[consts.AnnotationsRunParams], pod.Annotations[consts.AnnotationsOutputUrl] =
makeRunParams(pod.Annotations[consts.AnnotationsRunParams], res.DatasetUrl, res.ModelUrl, res.CodeUrl, res.CodeName, pod.Annotations[consts.AnnotationsBootFile], jobName)
@@ -486,7 +490,8 @@ func (c *ClusterClient) handleStorageSchedule(ctx context.Context, pod *corev1.P
Id: jobId,
IsAllMigrateSuccess: migrateSuccess,
OutputDataId: res.OutputDataId,
OutputDataUrl: res.OutputDataUrl,
OutputDataBucket: res.OutputDataBucket,
OutputDataKey: res.OutputDataKey,
})

return c.createPod(ctx, pod)
@@ -1338,6 +1343,12 @@ func (c *ClusterClient) PodStatusTicker() {
for _, job := range jobs.OtJobs {
//check failed && success
if job.IsAllMigrateSuccess == migrateProcessing {
pod, err := c.podLister.Pods(job.UserId).Get(job.Id + consts.PodNameSuffix)
if err != nil {
klog.Errorf("could not get pod(%s)", job.Id)
continue
}

datasetsInfo := nullJson
modelsInfo := nullJson
codeInfo := nullJson
@@ -1354,15 +1365,9 @@ func (c *ClusterClient) PodStatusTicker() {
codeInfo = string(tempCode)
}

res, errNew := c.handleDatasetModelStorageSchedule(job.Id, scheduleTypeCheck, datasetsInfo, modelsInfo, codeInfo, job.UserId, job.UserToken)
res, errNew := c.handleDatasetModelStorageSchedule(job.Id, scheduleTypeCheck, datasetsInfo, modelsInfo, codeInfo, job.UserId, job.UserToken, pod.Annotations[consts.AnnotationsNewUrchinServerUrl])
if errNew != nil {
klog.Errorf("jobId(%s), handleDatasetModelStorageSchedule failed:%v", job.Id, errNew)
pod, err := c.podLister.Pods(job.UserId).Get(job.Id + consts.PodNameSuffix)
if err != nil {
klog.Errorf("could not get pod(%s)", job.Id)
continue
}

c.updatePodTerminatedInfo(pod, consts.ExitCodeFailed, "handleDatasetModelStorageSchedule failed", errNew.Error())
go c.bizClient.AlarmClient.SendAlarm(context.TODO(), &alarm.SendAlarmRequest{
Message: "jobName:" + job.Name + " " + errNew.Error(),
@@ -1372,13 +1377,8 @@ func (c *ClusterClient) PodStatusTicker() {

if res.IsAllExist {
klog.Infof("job(%s): isAllExist true", job.Id)
pod, err := c.podLister.Pods(job.UserId).Get(job.Id + consts.PodNameSuffix)
if err != nil {
klog.Errorf("could not get pod(%s)", job.Id)
continue
}

pod.Annotations[consts.AnnotationsOutputUrl] = res.OutputDataUrl
pod.Annotations[consts.AnnotationsOutputUrl] = res.OutputDataBucket + res.OutputDataKey
if job.Tasks[0].ProcessorType == consts.ProcessTypeNPU {
pod.Annotations[consts.AnnotationsRunParams], pod.Annotations[consts.AnnotationsOutputUrl] =
makeRunParams(pod.Annotations[consts.AnnotationsRunParams], res.DatasetUrl, res.ModelUrl, res.CodeUrl, res.CodeName, job.Tasks[0].BootFile, job.Name)
@@ -1392,14 +1392,15 @@ func (c *ClusterClient) PodStatusTicker() {
Id: pod.Annotations[consts.AnnotationsId],
IsAllMigrateSuccess: migrateSuccess,
OutputDataId: res.OutputDataId,
OutputDataUrl: res.OutputDataUrl,
OutputDataBucket: res.OutputDataBucket,
OutputDataKey: res.OutputDataKey,
})

c.createPod(context.TODO(), pod)
c.createPod(ctx, pod)
}
} else {
if len(job.Tasks[0].CenterJobId) != 0 && job.Tasks[0].HasBeenCreated == consts.ProviderHasSentRequest {
if err = c.ProcessJobStatus(context.TODO(), job); err != nil {
if err = c.ProcessJobStatus(ctx, job); err != nil {
klog.Errorf("ProcessJobStatus(%s) failed:%v", job.Id, err)
continue
}


Loading…
Cancel
Save
Baidu
map