@@ -79,14 +79,15 @@ type scheduleRequest struct {
}
type scheduleResponse struct {
IsAllExist bool
DatasetUrl string
ModelUrl string
CodeUrl string
CodeName string
DestProxy string
OutputDataId string //urchin2.0
OutputDataUrl string //urchin2.0
IsAllExist bool
DatasetUrl string
ModelUrl string
CodeUrl string
CodeName string
DestProxy string
OutputDataId string //urchin2.0
OutputDataBucket string //urchin2.0
OutputDataKey string //urchin2.0
}
const (
@@ -250,6 +251,9 @@ func (c *ClusterClient) migrateData(ctx context.Context, data *Dataset, jobId, s
ServerUrl: res.DestProxy,
UserToken: userToken,
})
} else {
isExist = true
dataUrl = res.DestDataUrl
}
}
}
@@ -288,6 +292,7 @@ func (c *ClusterClient) migrateData(ctx context.Context, data *Dataset, jobId, s
Status: int32(status),
DestObjectKey: destObject,
DestBucket: destBucket,
DestDataUrl: dataUrl,
MigrateTaskId: taskId,
DestCenterId: c.conf.Cluster.Id,
})
@@ -295,7 +300,7 @@ func (c *ClusterClient) migrateData(ctx context.Context, data *Dataset, jobId, s
return isExist, dataUrl, destProxy, nil
}
func (c *ClusterClient) handleDatasetModelStorageSchedule(jobId, scheduleType, datasetsInfo, modelsInfo, codeInfo, userId, userToken string) (*scheduleResponse, error) {
func (c *ClusterClient) handleDatasetModelStorageSchedule(jobId, scheduleType, datasetsInfo, modelsInfo, codeInfo, userId, userToken, newUrchinServer string) (*scheduleResponse, error) {
var res scheduleResponse
res.IsAllExist = true
var codeUrl, destProxy string
@@ -424,31 +429,29 @@ func (c *ClusterClient) handleDatasetModelStorageSchedule(jobId, scheduleType, d
go urchin_new.UpdateDataReference(ctx, &urchin_new.UpdateDataReferenceReq{
JobId: jobId,
UserId: userId,
ServerUrl: destProxy ,
ServerUrl: newUrchinServer ,
List: dataList,
UserToken: userToken,
})
}
//todo:创建模型输出对应的数据目录
// go create output dataset by jobName
//暂时先不实现?影响模型输出
/*
temp, err := urchin_new.CreateData(ctx, &urchin_new.CreateDataReq{
UserId: userId,
ServerUrl: destProxy,
TargetNodeId: c.conf.Cluster.Id,
Name: jobId,
UserToken: userToken,
})
if err != nil {
klog.Errorf("job(%s) Create output Data failed:%v", jobId, err)
return &res, err
}
temp, err := urchin_new.CreateData(ctx, &urchin_new.CreateDataReq{
UserId: userId,
ServerUrl: newUrchinServer,
TargetNodeId: c.conf.Cluster.Id,
Name: jobId,
UserToken: userToken,
})
if err != nil {
klog.Errorf("job(%s) Create output Data failed:%v", jobId, err)
//return &res, err
}
res.OutputDataId = temp.DataId
res.OutputDataUrl = temp.Info.Bucket + temp.Info.Location
*/
res.OutputDataId = temp.DataId
res.OutputDataBucket = temp.Info.Bucket
res.OutputDataKey = temp.Info.Location
}
return &res, nil
@@ -460,8 +463,9 @@ func (c *ClusterClient) handleStorageSchedule(ctx context.Context, pod *corev1.P
codeInfo := pod.Annotations[consts.AnnotationsCodeInfo]
jobName := pod.Annotations[consts.AnnotationsGrampusJobName]
jobId := pod.Annotations[consts.AnnotationsId]
newUrchinServer := pod.Annotations[consts.AnnotationsNewUrchinServerUrl]
res, err := c.handleDatasetModelStorageSchedule(jobId, scheduleTypeExecute, datasetsInfo, modelsInfo, codeInfo, userId, userToken)
res, err := c.handleDatasetModelStorageSchedule(jobId, scheduleTypeExecute, datasetsInfo, modelsInfo, codeInfo, userId, userToken, newUrchinServer )
if err != nil {
klog.Errorf("jobId(%s), handleDatasetModelStorageSchedule failed:%v", jobId, err)
c.updatePodTerminatedInfo(pod, consts.ExitCodeFailed, "handleDatasetModelStorageSchedule failed", err.Error())
@@ -473,7 +477,7 @@ func (c *ClusterClient) handleStorageSchedule(ctx context.Context, pod *corev1.P
if res.IsAllExist {
klog.Infof("(%s)isAllExist true", jobId)
pod.Annotations[consts.AnnotationsOutputUrl] = res.OutputDataUrl
pod.Annotations[consts.AnnotationsOutputUrl] = res.OutputDataBucket + res.OutputDataKey
if pod.Annotations[consts.AnnotationsProcessType] == consts.ProcessTypeNPU {
pod.Annotations[consts.AnnotationsRunParams], pod.Annotations[consts.AnnotationsOutputUrl] =
makeRunParams(pod.Annotations[consts.AnnotationsRunParams], res.DatasetUrl, res.ModelUrl, res.CodeUrl, res.CodeName, pod.Annotations[consts.AnnotationsBootFile], jobName)
@@ -486,7 +490,8 @@ func (c *ClusterClient) handleStorageSchedule(ctx context.Context, pod *corev1.P
Id: jobId,
IsAllMigrateSuccess: migrateSuccess,
OutputDataId: res.OutputDataId,
OutputDataUrl: res.OutputDataUrl,
OutputDataBucket: res.OutputDataBucket,
OutputDataKey: res.OutputDataKey,
})
return c.createPod(ctx, pod)
@@ -1338,6 +1343,12 @@ func (c *ClusterClient) PodStatusTicker() {
for _, job := range jobs.OtJobs {
//check failed && success
if job.IsAllMigrateSuccess == migrateProcessing {
pod, err := c.podLister.Pods(job.UserId).Get(job.Id + consts.PodNameSuffix)
if err != nil {
klog.Errorf("could not get pod(%s)", job.Id)
continue
}
datasetsInfo := nullJson
modelsInfo := nullJson
codeInfo := nullJson
@@ -1354,15 +1365,9 @@ func (c *ClusterClient) PodStatusTicker() {
codeInfo = string(tempCode)
}
res, errNew := c.handleDatasetModelStorageSchedule(job.Id, scheduleTypeCheck, datasetsInfo, modelsInfo, codeInfo, job.UserId, job.UserToken)
res, errNew := c.handleDatasetModelStorageSchedule(job.Id, scheduleTypeCheck, datasetsInfo, modelsInfo, codeInfo, job.UserId, job.UserToken, pod.Annotations[consts.AnnotationsNewUrchinServerUrl] )
if errNew != nil {
klog.Errorf("jobId(%s), handleDatasetModelStorageSchedule failed:%v", job.Id, errNew)
pod, err := c.podLister.Pods(job.UserId).Get(job.Id + consts.PodNameSuffix)
if err != nil {
klog.Errorf("could not get pod(%s)", job.Id)
continue
}
c.updatePodTerminatedInfo(pod, consts.ExitCodeFailed, "handleDatasetModelStorageSchedule failed", errNew.Error())
go c.bizClient.AlarmClient.SendAlarm(context.TODO(), &alarm.SendAlarmRequest{
Message: "jobName:" + job.Name + " " + errNew.Error(),
@@ -1372,13 +1377,8 @@ func (c *ClusterClient) PodStatusTicker() {
if res.IsAllExist {
klog.Infof("job(%s): isAllExist true", job.Id)
pod, err := c.podLister.Pods(job.UserId).Get(job.Id + consts.PodNameSuffix)
if err != nil {
klog.Errorf("could not get pod(%s)", job.Id)
continue
}
pod.Annotations[consts.AnnotationsOutputUrl] = res.OutputDataUrl
pod.Annotations[consts.AnnotationsOutputUrl] = res.OutputDataBucket + res.OutputDataKey
if job.Tasks[0].ProcessorType == consts.ProcessTypeNPU {
pod.Annotations[consts.AnnotationsRunParams], pod.Annotations[consts.AnnotationsOutputUrl] =
makeRunParams(pod.Annotations[consts.AnnotationsRunParams], res.DatasetUrl, res.ModelUrl, res.CodeUrl, res.CodeName, job.Tasks[0].BootFile, job.Name)
@@ -1392,14 +1392,15 @@ func (c *ClusterClient) PodStatusTicker() {
Id: pod.Annotations[consts.AnnotationsId],
IsAllMigrateSuccess: migrateSuccess,
OutputDataId: res.OutputDataId,
OutputDataUrl: res.OutputDataUrl,
OutputDataBucket: res.OutputDataBucket,
OutputDataKey: res.OutputDataKey,
})
c.createPod(con te xt.TODO() , pod)
c.createPod(ctx, pod)
}
} else {
if len(job.Tasks[0].CenterJobId) != 0 && job.Tasks[0].HasBeenCreated == consts.ProviderHasSentRequest {
if err = c.ProcessJobStatus(con te xt.TODO() , job); err != nil {
if err = c.ProcessJobStatus(ctx, job); err != nil {
klog.Errorf("ProcessJobStatus(%s) failed:%v", job.Id, err)
continue
}