@@ -5,14 +5,15 @@ import (
"encoding/json"
"fmt"
"net/http"
"regexp"
"strings"
"time"
pbAdapter "git.openi.org.cn/OpenI/Grampus/server/adapter/api/v1"
pb "git.openi.org.cn/OpenI/Grampus/server/bizserver/api/v1/image"
"git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/conf"
"git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data"
dao "git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/image"
imageSynDao "git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/image_synchronization"
userDao "git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/user_image"
consts "git.openi.org.cn/OpenI/Grampus/server/common/consts"
"git.openi.org.cn/OpenI/Grampus/server/common/errors"
@@ -21,14 +22,15 @@ import (
"gopkg.in/resty.v1"
"github.com/jinzhu/copier"
"github.com/robfig/cron"
)
const (
StatusfailOrTimeout = iota //0:已执行失败或超时
StatusSucceeded = iota //1:已成功执行完成
StatusWaiting = iota //2:进行中
StatusWaitingLine = iota //3:排队等待中
StatusStopped = iota //4:被停止(服务重启或用户手动)
StatusfailOrTimeout = 0 //0:已执行失败或超时
StatusSucceeded = 1 //1:已成功执行完成
StatusWaiting = 2 //2:进行中
StatusWaitingLine = 3 //3:排队等待中
StatusStopped = 4 //4:被停止(服务重启或用户手动)
)
const (
@@ -40,6 +42,7 @@ const (
const (
ImagesSynctasksCreateURL = "/images/synctasks/create" //创建异步镜像同步任务
ImagesSynctasksStatusURL = "/images/synctasks/status" //查询镜像同步任务状态
ImagesLocalInfoURL = "/images/local/info" //查询本地镜像信息
)
type service struct {
@@ -52,16 +55,18 @@ type service struct {
type ImageSysctaskResp struct {
DestImageTag string `json:"destImageTag"`
TaskId string `json:"taskId"`
Existed bool `json:"existed"`
}
type ImageSysctaskReq struct {
DestDcName string `json:"destDcName"` //目的中心地址
SrcName string `json:"srcName"` //源镜像名称
ImageTag string `json:"imageTag"` //镜像标签
SrcType int8 `json:"srcType"` //镜像源类型 1:镜像源为某内部分中心的镜像仓库 2:镜像源为某外部公开的镜像仓库 3:镜像源为服务部署的本机Docker存储
Timeout int64 `json:"timeout"` //任务超时时间,单位为秒,默认值为0表示不超时
RetryTimes int8 `json:"retryTimes"` //任务超时重试次数,默认值为0表示不重试
Inspect bool `json:"inspect"` //是否在创建任务前检测目标镜像标签是否已存在于目标分中心的镜像仓库
DestDcName string `json:"destDcName"` //目的中心地址
SrcName string `json:"srcName"` //源镜像名称
ImageTag string `json:"imageTag"` //镜像标签
SrcType int32 `json:"srcType"` //镜像源类型 1:镜像源为某内部分中心的镜像仓库 2:镜像源为某外部公开的镜像仓库 3:镜像源为服务部署的本机Docker存储
Timeout int64 `json:"timeout"` //任务超时时间,单位为秒,默认值为0表示不超时
RetryTimes int8 `json:"retryTimes"` //任务超时重试次数,默认值为0表示不重试
Inspect bool `json:"inspect"` //是否在创建任务前检测目标镜像标签是否已存在于目标分中心的镜像仓库
DestNameSpace string `json:"destNameSpace"` //目标分中心镜像仓库命名空间(组织名、用户名等)
}
type ImageSynctasksStatusResp struct {
@@ -73,13 +78,24 @@ type ImageSynctasksStatusResp struct {
type ImageStatusReq struct {
TaskId string `json:"taskId"`
}
type ImagesLocalInfoResp struct {
Images []*string `json:"images"`
Num int64 `json:"num"`
Duration string `json:"duration"`
}
func New(conf *conf.Bootstrap, data *data.Data) pb.ImageServiceServer {
return &service{
s := &service{
conf: conf,
data: data,
httpClient: resty.New(),
}
cr := cron.New()
cr.AddFunc("@every 1m", func() {
go s.trackImageSyncstatusAndRegisterImage()
})
cr.Start()
return s
}
type ErrorReply struct {
@@ -87,28 +103,6 @@ type ErrorReply struct {
ErrorMsg string `json:"errorMsg"`
}
func parseBody(ctx context.Context, reply *json.RawMessage, body interface{}) error {
errorReply := &ErrorReply{}
err := json.Unmarshal(*reply, errorReply)
if err == nil {
if errorReply.ErrorCode != 0 {
log.Info(ctx, fmt.Sprintf("error when parsebody, code: %v, msg: %v", errorReply.ErrorCode, errorReply.ErrorMsg))
return errors.Error(fmt.Errorf(errorReply.ErrorMsg), errors.ErrorProvierRequestFailed)
}
} else {
log.Errorf(ctx, "json.Unmarshal(%s) failed:%v", string(*reply), err)
return errors.Error(nil, errors.ErrorJsonUnmarshal)
}
if body != nil {
err := json.Unmarshal(*reply, body)
if err != nil {
log.Errorf(ctx, "reply(%s), error:%v", string(*reply), err)
return errors.Error(nil, errors.ErrorJsonUnmarshal)
}
}
return nil
}
func (s *service) ListUserImage(ctx context.Context, req *pb.ListUserImageRequest) (*pb.ListUserImageReply, error) {
query := &userDao.UserImageQuery{}
err := copier.Copy(query, req)
@@ -210,7 +204,6 @@ func (s *service) GetImage(ctx context.Context, req *pb.GetImageRequest) (*pb.Ge
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
}
return &pb.GetImageReply{Image: rs}, nil
}
@@ -226,7 +219,9 @@ func (s *service) GetUserImage(ctx context.Context, req *pb.GetUserImageRequest)
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
}
if rsTbl.Limit != nil {
rs.AccDeviceModel = rsTbl.Limit.AccDeviceModel
}
return &pb.GetUserImageReply{UserImage: rs}, nil
}
@@ -240,9 +235,20 @@ func (s *service) DeleteUserImage(ctx context.Context, req *pb.DeleteUserImageRe
}
func (s *service) CreateImage(ctx context.Context, req *pb.CreateImageRequest) (*pb.CreateImageReply, error) {
if validateImageName(req.Name) {
return nil, errors.Error(nil, errors.ErrorImageNameNotValidate)
}
exist, err := s.data.ImageDao.CheckNameExists(ctx, req.Name, "")
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
}
if exist {
return nil, errors.Error(nil, errors.ErrorImageNameExist)
}
id := utils.GetUUIDWithoutSeparator()
rs := &dao.Image{}
err := copier.Copy(rs, req)
err = copier.Copy(rs, req)
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
}
@@ -255,6 +261,23 @@ func (s *service) CreateImage(ctx context.Context, req *pb.CreateImageRequest) (
return &pb.CreateImageReply{}, nil
}
// 输入字母、数字、_-.和:,最长100个字符,且以字母开头
func validateImageName(imageName string) bool {
var imageNameRegex = regexp.MustCompile(`^[a-zA-Z][a-zA-Z0-9_.:-]{0,99}$`)
if strings.TrimSpace(imageName) == "" {
return true
}
if len(imageName) > 100 {
return true
}
if !imageNameRegex.MatchString(imageName) {
return true
}
return false
}
func (s *service) CreateUserImage(ctx context.Context, req *pb.CreateUserImageRequest) (*pb.CreateUserImageReply, error) {
id := utils.GetUUIDWithoutSeparator()
rs := &userDao.UserImage{}
@@ -278,9 +301,10 @@ func (s *service) CreateUserImage(ctx context.Context, req *pb.CreateUserImageRe
if err != nil {
return nil, errors.Error(err, errors.ErrorDBFindEmpty)
}
limit.AccDeviceModel = userImage.Limit.AccDeviceModel
limit.PoolIds = userImage.Limit.PoolIds
if userImage.Limit != nil {
limit.AccDeviceModel = userImage.Limit.AccDeviceModel
limit.PoolIds = userImage.Limit.PoolIds
}
} else {
for _, imageInfo := range image.AiCenterImages {
if imageInfo.AiCenterId == rs.AiCenterId {
@@ -301,8 +325,18 @@ func (s *service) CreateUserImage(ctx context.Context, req *pb.CreateUserImageRe
}
func (s *service) UpdateImage(ctx context.Context, req *pb.UpdateImageRequest) (*pb.UpdateImageReply, error) {
if validateImageName(req.Name) {
return nil, errors.Error(nil, errors.ErrorImageNameNotValidate)
}
exist, err := s.data.ImageDao.CheckNameExists(ctx, req.Name, req.Id)
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
}
if exist {
return nil, errors.Error(nil, errors.ErrorImageNameExist)
}
rs := &dao.Image{}
err := copier.Copy(rs, req)
err = copier.Copy(rs, req)
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
}
@@ -345,51 +379,103 @@ func (s *service) UpdateUserImage(ctx context.Context, req *pb.UpdateUserImageRe
}
}
func (s *service) CreateImagesSynstask(ctx context.Context, req *pb.CreateImagesSynstaskRequest) (*pb.CreateImagesSynstaskReply, error) {
//暂时就支持NPU格式的处理器
if !strings.HasSuffix(req.ProcessorType, consts.ProcessTypeNPU) {
return nil, errors.Error(nil, errors.ErrorNPUFormat)
}
//名称和任务类型不能重复
_, err := checkNameExist(ctx, s, req.Name, consts.JobTypeTrainJob)
func (s *service) ListImagesSynchronize(ctx context.Context, req *pb.ImagesSynchronizeRequest) (*pb.ImagesSynchronizeReply, error) {
innerReq := imageSynDao.ImageSynchronizationQuery{}
err := copier.Copy(&innerReq, req)
if err != nil {
return nil, err
}
var trainJobId, notebookJobId string
trainJobId, err = insertImage(ctx, s, req.ProcessorType, req.Name, consts.JobTypeTrainJob)
innerReply, totalSize, err := s.data.ImageSynchronizationDao.ListImageSynchronization(ctx, &innerReq)
if err != nil {
return nil, err
}
if req.NotebookType {
_, err := checkNameExist(ctx, s, req.Name, consts.JobTypeNotebook)
if err != nil {
return nil, err
ImagesSynchronizeReply := make([]*pb.ImagesSynchronize, len(innerReply))
for idx, item := range innerReply {
imem := &pb.ImagesSynchronize{
Id: item.Id,
DestDcCenterId: item.DestDcCenterId,
ImageName: item.ImageName,
ImageUrl: item.ImageUrl,
ProcessorType: item.ProcessorType,
SrcCenterId: item.SrcCenterId,
SrcType: item.SrcType,
Status: int32(item.Status),
TrainType: item.TrainType,
CreatedAt: item.CreatedAt.Unix(),
UpdatedAt: item.UpdatedAt.Unix(),
FailedReason: item.FailedReason,
DestImageTag: item.DestImageTag,
}
ImagesSynchronizeReply[idx] = imem
}
return &pb.ImagesSynchronizeReply{
ImagesSynchronize: ImagesSynchronizeReply,
TotalSize: totalSize,
}, nil
}
func (s *service) CreateImagesSynstask(ctx context.Context, req *pb.CreateImagesSynstaskRequest) (*pb.CreateImagesSynstaskReply, error) {
//暂时就支持NPU格式的处理器
if !strings.HasSuffix(req.ProcessorType, consts.ProcessTypeNPU) {
return nil, errors.Error(nil, errors.ErrorNPUFormat)
}
if req.SrcType != 1 {
if validateImageName(req.ImageName) {
return nil, errors.Error(nil, errors.ErrorImageNameNotValidate)
}
notebookJobId, err = insertImage(ctx, s, req.ProcessorType, req.NotebookName, consts.JobTypeNotebook)
//名称不能重复
exist, err := s.data.ImageDao.CheckNameExists(ctx, req.ImageName, "")
if err != nil {
return nil, err
return nil, errors.Error(err, errors.ErrorInternalError)
}
if exist {
return nil, errors.Error(nil, errors.ErrorImageNameExist)
}
}
//调用dolphin服务的接口创建镜像同步任务
createImagesSynstaskReply, err := callImagesSynctaskCreate(ctx, s, req)
if err != nil {
log.Errorf(ctx, "callImagesSynctaskCreate failed:%v", err)
return nil, err
}
//调用镜像同步接口
imageSysctaskReq := ImageSysctaskReq{}
err = copier.Copy(&imageSysctaskReq, req)
//创建一条同步记录
err = createImageSynchronization(ctx, s, req, createImagesSynstaskReply)
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
return nil, errors.Error(err, errors.ErrorDBCreateFailed)
}
return &pb.CreateImagesSynstaskReply{}, err
}
func callImagesSynctaskCreate(ctx context.Context, s *service, req *pb.CreateImagesSynstaskRequest) (*ImageSysctaskResp, error) {
imageSysctaskReq := ImageSysctaskReq{
DestDcName: req.DestDcCenterId,
SrcType: req.SrcType,
DestNameSpace: req.DestNameSpace,
RetryTimes: 1,
Timeout: 36000,
}
if req.SrcType == 1 {
imageSysctaskReq.SrcName = req.SrcCenterId
imageSysctaskReq.ImageTag = req.ImageUrl
}
if req.SrcType == 2 {
var part1, part2 = splitByLastSlash(req.ImageUrl)
imageSysctaskReq.SrcName = part1
imageSysctaskReq.ImageTag = part2
}
if imageSysctaskReq.SrcType == 1 || imageSysctaskReq.SrcType == 3 {
imageSysctaskReq.Inspect = true
if req.SrcType == 3 {
imageSysctaskReq.ImageTag = req.ImageUrl
}
imageSysctaskReq.RetryTimes = 1
imageSysctaskReq.Timeout = 3600
createUrl := s.conf.Dolphin.ImagesSynctasksAddr + ImagesSynctasksCreateURL
temp := &json.RawMessage{}
reply := &pb.CreateImagesSynstaskReply{}
reply := &ImageSysctaskResp {}
resp, err := s.httpClient.R().
SetHeader("Content-Type", "application/json").
SetHeader("Accept", "application/json").
SetResult(temp).
SetResult(reply ).
SetBody(imageSysctaskReq).
Post(createUrl)
if err != nil {
@@ -397,159 +483,235 @@ func (s *service) CreateImagesSynstask(ctx context.Context, req *pb.CreateImages
return nil, err
}
if resp.StatusCode() != http.StatusOK {
updateImageSyncStatus(ctx, s, trainJobId, ImageSyncStatusfailed)
if req.NotebookType {
updateImageSyncStatus(ctx, s, notebookJobId, ImageSyncStatusfailed)
}
log.Errorf(ctx, "images/synctasks/create failed(%d):%s", resp.StatusCode(), string(resp.Body()))
return nil, errors.Error(err, errors.ErrorCallImagesSynctasksFailed)
}
parseBody(ctx, temp, reply)
//注册自定义镜像
go func(ctx context.Context, taskId string) {
GetImagesSynstaskStatus(ctx, s, reply, req, trainJobId, notebookJobId)
}(ctx, reply.TaskId)
return reply, err
}
func GetImagesSynstaskStatus(ctx context.Context, s *service, replyCreateImagesSynstask *pb.CreateImagesSynstaskReply, req *pb.CreateImagesSynstaskRequest, trainJobId string, notebookJobId string) error {
statusUrl := s.conf.Dolphin.ImagesSynctasksAddr + ImagesSynctasksStatusURL
imageStatusReq := ImageStatusReq{
TaskId: replyCreateImagesSynstask.TaskId,
func (s *service) ListImagesLocalInfo(ctx context.Context, req *pb.ImagesLocalInfoRequest) (*pb.ImagesLocalInfoReply, error) {
reply := &pb.ImagesLocalInfoReply{}
url := s.conf.Dolphin.ImagesSynctasksAddr + ImagesLocalInfoURL
resp, err := s.httpClient.R().
SetHeader("Content-Type", "application/json").
SetResult(reply).
SetBody(strings.NewReader("{}")).
Get(url)
if err != nil {
log.Errorf(ctx, "/images/local/info http request failed, error:%v", err)
return nil, fmt.Errorf("/images/local/info http request failed, error:%v", err)
}
tempReq, err := json.Marshal(imageStatusReq)
if resp.StatusCode() != http.StatusOK {
respBody := resp.String()
log.Errorf(ctx, "/images/local/info http request failed:(%d): %s", resp.StatusCode(), respBody)
return nil, fmt.Errorf("/images/local/info http request failed:(%d): %s", resp.StatusCode(), respBody)
}
return reply, nil
}
func (s *service) trackImageSyncstatusAndRegisterImage() {
ctx := context.Background()
req := imageSynDao.ImageSynchronizationQuery{
Status: ImageSyncStatusing,
}
imageSynchronization, total, err := s.data.ImageSynchronizationDao.ListImageSynchronization(ctx, &req)
if err != nil {
return err
log.Errorf(ctx, "ListImageSynchronization failed:%v", err)
return
}
temp := &json.RawMessage{}
reply := &ImageSynctasksStatusResp{}
for {
resp, err := s.httpClient.R().
SetHeader("Content-Type", "application/json").
SetHeader("Accept", "application/json").
SetResult(temp).
SetBody(strings.NewReader(string(tempReq))).
Get(statusUrl)
if total == 0 {
return
}
for _, item := range imageSynchronization {
status, err := callImagesSynstaskStatus(ctx, s, item.TaskId)
if err != nil {
log.Errorf(ctx, "/images/synctasks/status http request failed, error:%v", err)
return err
}
if resp.StatusCode() != http.StatusOK {
log.Errorf(ctx, "/images/synctasks/status failed(%d):%s", resp.StatusCode(), resp.Error())
return fmt.Errorf("/images/synctasks/status failed(%d):%s", resp.StatusCode(), resp.Error())
log.Errorf(ctx, "callImagesSynstaskStatus failed:%v", err)
updateImageSynchronization(ctx, s, item.Id, ImageSyncStatusfailed, err.Error())
continue
}
parseBody(ctx, temp, reply)
status := reply.Status
switch status {
switch status.Status {
case StatusfailOrTimeout:
log.Errorf(ctx, "已执行失败或超时:%s", reply.ErrorMessage)
updateImageSyncStatus(ctx, s, trainJobId, ImageSyncStatusfailed)
if req.NotebookType {
updateImageSyncStatus(ctx, s, notebookJobId, ImageSyncStatusfailed)
}
return err
case StatusSucceeded:
CustomRegistrationImage(ctx, s, replyCreateImagesSynstask, req, trainJobId, notebookJobId)
return err
log.Errorf(ctx, "Execution failed or timed out %d ", status.Status)
updateImageSynchronization(ctx, s, item.Id, ImageSyncStatusfailed, "Execution failed or timed out")
case StatusWaiting:
time.Sleep(30 * time.Second )
log.Infof(ctx, "Waiting for execution %d ", status.Status)
case StatusWaitingLine:
time.Sleep(30 * time.Second)
log.Infof(ctx, "Waiting queue%d ", status.Status)
case StatusSucceeded:
log.Infof(ctx, "Execution succeeded %d ", status.Status)
customRegistrationImage(ctx, s, item)
case StatusStopped:
log.Errorf(ctx, "被停止(服务重启或用户手动):%s", reply.ErrorMessage)
updateImageSyncStatus(ctx, s, trainJobId, ImageSyncStatusfailed)
if req.NotebookType {
updateImageSyncStatus(ctx, s, notebookJobId, ImageSyncStatusfailed)
}
return err
log.Errorf(ctx, "Stopped (service restart or manual operation by user)")
updateImageSynchronization(ctx, s, item.Id, ImageSyncStatusfailed, "Stopped (service restart or manual operation by user)")
default:
log.Errorf(ctx, "未知任务状态: %d", status)
updateImageSyncStatus(ctx, s, trainJobId, ImageSyncStatusfailed)
if req.NotebookType {
updateImageSyncStatus(ctx, s, notebookJobId, ImageSyncStatusfailed)
}
return err
log.Errorf(ctx, "Unknown task status: %d", status.Status)
updateImageSynchronization(ctx, s, item.Id, ImageSyncStatusfailed, "Unknown task status")
}
}
}
func CustomRegistrationImage(ctx context.Context, s *service, replyCreateImagesSynstask *pb.CreateImagesSynstaskReply, req *pb.CreateImagesSynstaskRequest, trainJobId string, notebookJobId string) error {
//自定义镜像注册
apiBaseUrl, apiKey, err := s.data.AiCenterDao.GetAiCenterConfig(ctx, req.DestDcName)
func callImagesSynstaskStatus(ctx context.Context, s *service, taskId string) (*ImageSynctasksStatusResp, error) {
statusUrl := s.conf.Dolphin.ImagesSynctasksAddr + ImagesSynctasksStatusURL
imageStatusReq := ImageStatusReq{
TaskId: taskId,
}
tempReq, err := json.Marshal(imageStatusReq)
if err != nil {
return err
return nil, err
}
var customRegistrationImageRequest pbAdapter.CustomRegistrationImageRequest
customRegistrationImageRequest.SwrPath = replyCreateImagesSynstask.DestImageTag
reply, err := s.data.Adapter.CustomRegistrationImage(ctx, apiBaseUrl, apiKey, &customRegistrationImageRequest)
reply := &ImageSynctasksStatusResp{}
resp, err := s.httpClient.R().
SetHeader("Content-Type", "application/json").
SetHeader("Accept", "application/json").
SetResult(reply).
SetBody(strings.NewReader(string(tempReq))).
Get(statusUrl)
if err != nil {
updateImageSyncStatus(ctx, s, trainJobId, ImageSyncStatusfailed)
if req.NotebookType {
updateImageSyncStatus(ctx, s, notebookJobId, ImageSyncStatusfailed)
}
log.Error(ctx, "调用自定义镜像注册失败", err)
return err
log.Errorf(ctx, "/images/synctasks/status http request failed, error:%v", err)
return nil, err
}
if resp.StatusCode() != http.StatusOK {
log.Errorf(ctx, "/images/synctasks/status failed(%d):%s", resp.StatusCode(), string(resp.Body()))
return nil, fmt.Errorf("/images/synctasks/status failed(%d):%s", resp.StatusCode(), string(resp.Body()))
}
return reply, nil
}
updateImageAiCenterImages(ctx, s, trainJobId, req.DestDcName, reply.SwrPath)
if req.NotebookType {
updateImageAiCenterImages(ctx, s, notebookJobId, req.DestDcName, reply.SwrPath)
func customRegistrationImage(ctx context.Context, s *service, imageSync *imageSynDao.ImageSynchronization) {
//调试任务需要去model arts注册镜像
var customRegistrationReplyId = ""
if consts.JobTypeTrainJobAndNotebook == imageSync.TrainType || consts.JobTypeNotebook == imageSync.TrainType {
apiBaseUrl, apiKey, err := s.data.AiCenterDao.GetAiCenterConfig(ctx, imageSync.DestDcCenterId)
if err != nil {
log.Error(ctx, fmt.Sprintf("custom registration image failed get aicenter failed : %s", err.Error()))
return
}
customRegistrationImageRequest := pbAdapter.CustomRegistrationImageRequest{
SwrPath: imageSync.DestImageTag,
}
var reply *pbAdapter.CustomRegistrationImageReply
reply, err = s.data.Adapter.CustomRegistrationImage(ctx, apiBaseUrl, apiKey, &customRegistrationImageRequest)
if err != nil {
log.Error(ctx, fmt.Sprintf("custom registration image failed: %s", err.Error()))
updateImageSynchronization(ctx, s, imageSync.Id, ImageSyncStatusfailed, err.Error())
return
}
if reply.ErrorCode != "" {
log.Error(ctx, fmt.Sprintf("custom registration image failed: %s", reply.ErrorMsg))
updateImageSynchronization(ctx, s, imageSync.Id, ImageSyncStatusfailed, reply.ErrorMsg)
return
}
customRegistrationReplyId = reply.Id
}
return nil
if imageSync.SrcType == 1 {
err := updateImageAiCenterImages(ctx, s, imageSync, customRegistrationReplyId)
if err != nil {
updateImageSynchronization(ctx, s, imageSync.Id, ImageSyncStatusfailed, err.Error())
log.Error(ctx, fmt.Sprintf("update image ai center images failed: %s", err.Error()))
return
}
} else {
err := insertImage(ctx, s, imageSync, customRegistrationReplyId)
if err != nil {
updateImageSynchronization(ctx, s, imageSync.Id, ImageSyncStatusfailed, err.Error())
log.Error(ctx, fmt.Sprintf("insert image failed: %s", err.Error()))
return
}
}
updateImageSynchronization(ctx, s, imageSync.Id, ImageSyncStatusSucceeded, "succeeded")
}
func insertImage(ctx context.Context, s *service, processorType string, name string, trainType string) (string, error) {
rs := &dao.Image{}
rs.Name = name
rs.TrainType = trainType
rs.ProcessorType = processorType
rs.ImageSyncType = "imageSync"
rs.ImageSyncStatus = ImageSyncStatusing
rs.Id = utils.GetUUIDWithoutSeparator()
err := s.data.ImageDao.CreateImage(ctx, rs)
func createImageSynchronization(ctx context.Context, s *service, req *pb.CreateImagesSynstaskRequest, reply *ImageSysctaskResp) error {
rs := &imageSynDao.ImageSynchronization{}
err := copier.Copy(rs, req)
if err != nil {
return "", err
return errors.Error(err, errors.ErrorInternalError)
}
return rs.Id, nil
}
func updateImageSyncStatus(ctx context.Context, s *service, id string, imageSyncStatus int8) error {
rs := &dao.Image{}
rs.Id = id
rs.ImageSyncStatus = imageSyncStatus
err := s.data.ImageDao.UpdateImage(ctx, rs)
rs.Id = utils.GetUUIDWithoutSeparator()
rs.Status = ImageSyncStatusing
rs.TaskId = reply.TaskId
rs.DestImageTag = reply.DestImageTag
rs.ImageId = req.Id
err = s.data.ImageSynchronizationDao.CreateImageSynchronization(ctx, rs)
if err != nil {
return err
}
return nil
}
func updateImageAiCenterImages(ctx context.Context, s *service, id string, aiCenterId string, aiImage string) error {
rs := &dao.Image {}
func updateImageSynchronization(ctx context.Context, s *service, id string, status int8, failedReason string) {
rs := &imageSynDao.ImageSynchronization {}
rs.Id = id
rs.ImageSyncStatus = ImageSyncStatusSucceeded
rs.Status = status
rs.FailedReason = failedReason
err := s.data.ImageSynchronizationDao.UpdateImageSynchronization(ctx, rs)
if err != nil {
return
}
}
func insertImage(ctx context.Context, s *service, imageSync *imageSynDao.ImageSynchronization, customRegistrationImageId string) error {
rs := &dao.Image{
Name: imageSync.ImageName,
TrainType: imageSync.TrainType,
ProcessorType: imageSync.ProcessorType,
Id: utils.GetUUIDWithoutSeparator(),
}
var trainJobUrl = imageSync.DestImageTag
if idx := strings.Index(imageSync.DestImageTag, "/"); idx != -1 {
trainJobUrl = imageSync.DestImageTag[idx+1:]
}
var aiCenterImages dao.AiCenterImage
aiCenterImages.AiCenterId = aiCenterId
aiCenterImages.ImageUrl = aiImage
aiCenterImages.AiCenterId = imageSync.DestDcCenterId
switch imageSync.TrainType {
case consts.JobTypeTrainJob:
aiCenterImages.TrainJobUrl = trainJobUrl
case consts.JobTypeNotebook:
aiCenterImages.NotebookUrl = customRegistrationImageId
case consts.JobTypeTrainJobAndNotebook:
aiCenterImages.TrainJobUrl = trainJobUrl
aiCenterImages.NotebookUrl = customRegistrationImageId
}
rs.AiCenterImages = append(rs.AiCenterImages, aiCenterImages)
err := s.data.ImageDao.UpdateImage(ctx, rs)
err := s.data.ImageDao.Cre ateImage(ctx, rs)
if err != nil {
return err
}
return nil
}
func checkNameExist(ctx context.Context, s *service, name string, trainType string) (error, error) {
query := &dao.ImageQuery{}
query.Name = name
query.TrainType = trainType
_, totalSize, err := s.data.ImageDao.ListImage(ctx, query)
func updateImageAiCenterImages(ctx context.Context, s *service, imageSync *imageSynDao.ImageSynchronization, customRegistrationImageId string) error {
rsDB, err := s.data.ImageDao.GetImage(ctx, imageSync.ImageId)
if err != nil {
return nil, errors.Error(nil, errors.ErrorDBFindFailed)
return err
}
if totalSize > 0 {
return nil, errors.Error(nil, errors.ErrorImageNameExist)
var aiCenterImages dao.AiCenterImage
aiCenterImages.AiCenterId = imageSync.DestDcCenterId
switch imageSync.TrainType {
case consts.JobTypeTrainJob:
aiCenterImages.TrainJobUrl = imageSync.ImageUrl
case consts.JobTypeNotebook:
aiCenterImages.NotebookUrl = customRegistrationImageId
case consts.JobTypeTrainJobAndNotebook:
aiCenterImages.TrainJobUrl = imageSync.ImageUrl
aiCenterImages.NotebookUrl = customRegistrationImageId
}
rsDB.AiCenterImages = append(rsDB.AiCenterImages, aiCenterImages)
err = s.data.ImageDao.UpdateImage(ctx, rsDB)
if err != nil {
return err
}
return nil
}
func splitByLastSlash(s string) (string, string) {
lastSlashIdx := strings.LastIndex(s, "/")
if lastSlashIdx == -1 {
return s, ""
}
return nil, nil
part1 := s[:lastSlashIdx]
part2 := s[lastSlashIdx+1:]
return part1, part2
}
// func (s *service) getImageList(ctx context.Context, centerId string, r *adapterapi.GetImageListReply) error {