159 Commits

Author SHA1 Message Date
  lewis cc8d00ad12 debug 5 days ago
  lewis d84ae60439 Merge branch 'V20251215' of openi.pcl.ac.cn:openioctopus/Grampus into fix-1192 1 week ago
  lewis 096d22dadf Merge pull request '镜像管理及统一镜像功能' (#1377) from lws-image-0731 into V20251215 1 week ago
  liaowsh b936cfaa21 Merge remote-tracking branch 'origin/V20251215' into lws-image-0731 1 week ago
  liaowsh e018b129f3 update 1 week ago
  liaowsh b5c96a6b31 update 1 week ago
  liaowsh 1abf34f329 update 2 weeks ago
  liaowsh d8b3548f45 update 2 weeks ago
  liaowsh a07a8f6c13 update 2 weeks ago
  liaowsh 0719da26f4 Merge branch 'lws-image-0731' of https://openi.pcl.ac.cn/openioctopus/Grampus into lws-image-0731 2 weeks ago
  liaowsh 7ae8e3698e update 2 weeks ago
  xiongkai 33669cf08c Merge pull request 'code解耦issue' (#1365) from V20251113_fixOtjob into lws-image-0731 2 weeks ago
  xiongk 12d15aaee2 code解耦issue 2 weeks ago
  xiongkai 88553f1b21 Merge pull request 'code解耦issue' (#1364) from V20251113_fixOtjob into lws-image-0731 2 weeks ago
  xiongk a0a432d3ca code解耦issue 2 weeks ago
  liaowsh 772e333388 listUserImage add canter_image_id info 2 weeks ago
  liaowsh 6c482df25c Merge branch 'lws-image-0731' of https://openi.pcl.ac.cn/openioctopus/Grampus into lws-image-0731 2 weeks ago
  liaowsh 7df202456f listUserImage add canter_image_id info 2 weeks ago
  xiongkai f937f6a44a Merge pull request 'code解耦issue' (#1363) from V20251113_fixOtjob into lws-image-0731 2 weeks ago
  xiongk d3876d9e06 code解耦issue 2 weeks ago
  xiongkai 733b25a516 Merge pull request 'code解耦issue' (#1362) from V20251113_fixOtjob into lws-image-0731 2 weeks ago
  xiongk 6dced6e943 code解耦issue 2 weeks ago
  xiongk c1725dc016 Revert "code解耦issue" 2 weeks ago
  xiongk 741ccb3cbe code解耦issue 2 weeks ago
  xiongk a059f89b41 用户自定义镜像字段不为空 2 weeks ago
  lewis 31401c673f Merge branch 'V20251113' of openi.pcl.ac.cn:openioctopus/Grampus into fix-1192 2 weeks ago
  lewis dd593e6ae8 Merge pull request 'V20251113' (#1361) from V20251113 into master 2 weeks ago
  liaowsh 3d9045fbac Merge branch 'lws-image-0731' of https://openi.pcl.ac.cn/openioctopus/Grampus into lws-image-0731 2 weeks ago
  liaowsh 871a867f6c user image add devive info 2 weeks ago
  xiongkai a2b78bad37 Merge pull request 'V20251113_fixOtjob' (#1359) from V20251113_fixOtjob into lws-image-0731 2 weeks ago
  xiongk b60072778e Merge branch 'V20251113' into lws-image-0731 2 weeks ago
  xiongkai 5bea20dc17 Merge pull request 'V20251113' (#1357) from V20251113 into V20251113_fixOtjob 2 weeks ago
  lewis d169e06f16 Merge pull request 'V20251113_fixRestartOtJob' (#1356) from V20251113_fixRestartOtJob into V20251113 2 weeks ago
  liaowsh bf3f0027b8 Merge remote-tracking branch 'origin/V20251113' into lws-image-0731 2 weeks ago
  liaowsh 5e0094de49 update 2 weeks ago
  xiongk db2d0b9805 修复重启任务 2 weeks ago
  xiongk b4327b8d54 修复重启任务 2 weeks ago
  xiongkai 6d32447693 Merge pull request 'V20251113' (#1355) from V20251113 into V20251113_fixRestartOtJob 2 weeks ago
  xiongkai 259fbd2a96 Merge pull request '彻底解决分中心id更新错误的问题' (#1354) from fix-1287 into V20251113 2 weeks ago
  lewis ea5e029ced #1287 2 weeks ago
  liaowsh 2646ccc551 merge conflict 3 weeks ago
  liaowsh f100a61799 update 3 weeks ago
  xiongkai c1470c4b25 Merge pull request 'V20251113' (#1353) from V20251113 into V20251113_fixOtjob 3 weeks ago
  xiongk 3f13e38259 云脑任务与项目解耦 3 weeks ago
  xiongk 11a5ea251c 镜像地址去除前缀 3 weeks ago
  xiongk 1f797f473e 自定义注册镜像一些参数 3 weeks ago
  xiongk 5a6836cbc2 自定义注册镜像一些参数 3 weeks ago
  xiongk d8030eda7b 镜像同步的时候 只有调试的任务需要去注册镜像 3 weeks ago
  xiongk 2f72a3f37f 镜像同步的时候 只有调试的任务需要去注册镜像 3 weeks ago
  xiongk 97b2247f8b 镜像名称逻辑修改镜像名称请输入字母、数字、_-.和:,最长100个字符,且以字母开头 3 weeks ago
  xiongk 7e37d744fe 同步之后调试任务镜像地址修改 4 weeks ago
  xiongk f8de822504 进行类型模糊搜索 4 weeks ago
  xiongk 7cb266f94a 限制镜像名称不能含有中文 4 weeks ago
  xiongk 598499074d 限制镜像名称不能含有中文 4 weeks ago
  liaowsh 24e29ab8d0 add modelarts req info 1 month ago
  xiongkai b2c2dc2c69 Merge pull request 'V20251113_fixRestartOtJob' (#1325) from V20251113_fixRestartOtJob into lws-image-0731 1 month ago
  xiongkai 083b0ed553 Merge pull request 'V20251113' (#1323) from V20251113 into lws-image-0731 1 month ago
  xiongk c1f1e289ce 重启任务类型调整一下submitjob 顺序 1 month ago
  xiongk 92ead17c54 Merge remote-tracking branch 'origin/V20251113' into V20251113 1 month ago
  xiongk b38821b41a service 类型的任务返回详情中增加poolId等字段 和trainjob保持一致 1 month ago
  xiongk 3017961ec6 service 类型的任务返回详情中增加poolId等字段 和trainjob保持一致 1 month ago
  xiongk 355ace76ec service 类型的任务返回详情中增加poolId等字段 和trainjob保持一致 1 month ago
  xiongk 5e95185d53 修复1247issue "cloudbrain2+2","cloudbrain2+0"],其中cloudbrain2+2下架了 1 month ago
  xiongk 00bc946825 修复1247issue "cloudbrain2+2","cloudbrain2+0"],其中cloudbrain2+2下架了 1 month ago
  xiongk b330b03521 修复1247issue "cloudbrain2+2","cloudbrain2+0"],其中cloudbrain2+2下架了 1 month ago
  xiongk 1d7c7bc8b1 修改 在线服务任务返回详情增加字段 1 month ago
  xiongk a79966b0b9 修改 1247issue 1 month ago
  xiongk 385883dc97 修改 1247issue 1 month ago
  xiongk 54ef6c26df 重启任务类型调整一下submitjob 顺序 1 month ago
  xiongkai 2e8f8ea5db Merge pull request 'V20251030_fix1247issues' (#1317) from V20251030_fix1247issues into lws-image-0731 1 month ago
  xiongk 29e7230060 修改 用户密码 校验逻辑数字+字母+长度+特殊字符 1 month ago
  xiongk 697b2ae83c Merge remote-tracking branch 'origin/lws-image-0731' into lws-image-0731 1 month ago
  xiongk 4e9a02d196 修改 用户密码 校验逻辑数字+字母+长度+特殊字符 1 month ago
  xiongkai 01b21ddd8e Merge pull request '修改 1247issue' (#1312) from V20251030_fix1247issues into lws-image-0731 1 month ago
  xiongk e57751ab61 Merge branch 'V20251030_fix1247issues' into lws-image-0731 1 month ago
  xiongk 4e0f6b52bf 修改镜像地址既支持调试任务又支持训练任务的格式 1 month ago
  xiongk 63e46bde18 修改镜像地址既支持调试任务又支持训练任务的格式 1 month ago
  xiongk 5b124f7444 修改镜像地址既支持调试任务又支持训练任务的格式 1 month ago
  xiongk 645191bd66 修改镜像地址既支持调试任务又支持训练任务的格式 1 month ago
  xiongk 4dceaaad6c 增加一个目的分中心地址 1 month ago
  xiongk 038790f37e 超时时间改成36000s 1 month ago
  xiongk 7e7cdb1b2e 修复issue 1 month ago
  xiongkai d8b3a3268d Merge pull request 'V20251030' (#1294) from V20251030 into lws-image-0731 1 month ago
  xiongk 85eabb4585 Merge branch 'V20251030' into lws-image-0731 1 month ago
  xiongk 2e46373f1c 修改ng 的配置到values文件中 1 month ago
  xiongk ac5f422a3b 把ip 替换成域名 1 month ago
  xiongk e0648c4abb Merge remote-tracking branch 'origin/lws-image-0731' into lws-image-0731 1 month ago
  xiongk 3c33235c9e 管理后台增加一个海胆的ng转发 1 month ago
  Liaoweisheng b54ac15a9a Merge branch 'lws-image-0731' of http://192.168.242.88:3000/openioctopus/Grampus into lws-image-0731 1 month ago
  Liaoweisheng ad138ee859 update scow get token 1 month ago
  xiongk 7a9a04b9b9 管理后台增加一个海胆的ng转发 1 month ago
  xiongk 3223209e4f 管理后台增加一个海胆的ng转发 只支持Endpoints 1 month ago
  xiongk 6df0dfb532 管理后台增加一个海胆的ng转发 只支持Endpoints 1 month ago
  xiongk 40f1a6cc90 管理后台增加一个海胆的ng转发 只支持Endpoints 1 month ago
  xiongk 100a18e540 Merge remote-tracking branch 'origin/lws-image-0731' into lws-image-0731 1 month ago
  xiongk f326a4d43a 管理后台增加一个海胆的ng转发 1 month ago
  Liaoweisheng 80c2e17133 update 1 month ago
  Liaoweisheng c66ebe4724 scow token fresh 1 month ago
  Liaoweisheng 2499c46bb8 Merge branch 'lws-image-0731' of http://192.168.242.88:3000/openioctopus/Grampus into lws-image-0731 1 month ago
  Liaoweisheng 10e8fc0a2c Merge branch 'lws-image-0731' of http://192.168.242.88:3000/openioctopus/Grampus into lws-image-0731 1 month ago
  xiongk dbdf2c52ed 管理后台增加一个海胆的ng转发 1 month ago
  xiongk a75df25e9a 管理后台增加一个海胆的ng转发 1 month ago
  xiongk d7f0d55c34 管理后台增加一个海胆的ng转发 1 month ago
  xiongk a6862d83e5 管理后台增加一个海胆的ng转发 1 month ago
  xiongk ad884cf963 管理后台增加一个海胆的ng转发 1 month ago
  xiongkai 46d39c5ec1 Merge pull request 'V20251030' (#1273) from V20251030 into lws-image-0731 1 month ago
  xiongk 8d8fd0fcda 管理后台增加一个海胆的ng转发 1 month ago
  xiongk b2b4951c83 Merge remote-tracking branch 'origin/lws-image-0731' into lws-image-0731 1 month ago
  xiongk 85b66d56af 管理后台增加一个海胆的ng转发 1 month ago
  xiongkai 9660d4b378 Merge pull request 'V20251030' (#1270) from V20251030 into lws-image-0731 1 month ago
  xiongk 521cdfe58a 管理后台增加一个海胆的ng转发 1 month ago
  Liaoweisheng 6a72b620e5 merge conflict 1 month ago
  xiongk 6744d7725d 管理后台增加一个海胆的ng转发 1 month ago
  xiongk 87a7cc1b97 Merge remote-tracking branch 'origin/lws-image-0731' into lws-image-0731 1 month ago
  xiongk a4d8090bf8 管理后台增加一个海胆的ng转发 1 month ago
  xiongk 374d86b845 管理后台增加一个海胆的ng转发 1 month ago
  xiongkai 0681748cf5 Merge pull request 'V20251030' (#1268) from V20251030 into lws-image-0731 1 month ago
  xiongkai 4e44d0c60a Merge pull request 'V20251030' (#1266) from V20251030 into lws-image-0731 1 month ago
  xiongk cf64c2ff3d 管理后台增加一个海胆的ng转发 1 month ago
  xiongk 3763a4f2d0 Merge remote-tracking branch 'origin/lws-image-0731' into lws-image-0731 1 month ago
  xiongk 9638fcc309 管理后台增加一个海胆的ng转发 1 month ago
  xiongkai 2d6bae237c Merge pull request 'V20251030' (#1264) from V20251030 into lws-image-0731 1 month ago
  xiongkai 8ca5f460b9 Merge pull request 'V20251030' (#1262) from V20251030 into lws-image-0731 1 month ago
  Liaoweisheng ac2d81ac7f resolve conflict 1 month ago
  Liaoweisheng 98ebc89df8 update image 1 month ago
  xiongk 702dd270c5 曙光挂载清空为空 1 month ago
  Liaoweisheng 2359582829 Merge remote-tracking branch 'origin/V20251030' into lws-image-0731 1 month ago
  xiongk d9223c0858 曙光挂载清空为空 1 month ago
  xiongk faaec4b428 曙光挂载清空为空 1 month ago
  xiongk 8543a53f7e 曙光挂载清空为空 1 month ago
  xiongk ea55a12cad 曙光挂载清空为空 1 month ago
  xiongk 01dbbe8fe7 曙光挂载清空为空 1 month ago
  xiongk 890efc13ef 曙光挂载清空为空 1 month ago
  xiongk 4fb3085f60 曙光挂载清空为空 1 month ago
  xiongk c234317900 曙光挂载清空为空 1 month ago
  Liaoweisheng a03e1ab47e update image 1 month ago
  xiongk f5ee9cda75 处理cloudbrain2+2 已下线调用不成功的任务 1 month ago
  Liaoweisheng 2d1666e1a8 Merge branch 'lws-image-0731' of http://192.168.242.88:3000/openioctopus/Grampus into lws-image-0731 2 months ago
  Liaoweisheng 0483ad2a73 update image 2 months ago
  xiongk 7ae5155258 打印日志 2 months ago
  Liaoweisheng b8d550de62 update image manage 2 months ago
  Liaoweisheng 71dd30d538 Merge remote-tracking branch 'origin/V20251030' into lws-image-0731 2 months ago
  xiongk 9b8a83739c #1245 曙光ai任务再次调试后code目录被清空 修改output mount的挂载目录路径 2 months ago
  Liaoweisheng d1063437d3 Merge remote-tracking branch 'origin/V20250731' into lws-image-0731 2 months ago
  xiongk 8c754814f8 镜像同步功能 3 months ago
  xiongkai f2eebe6ac7 Merge pull request 'V20250731' (#1244) from V20250731 into V20250605_adminIssue 3 months ago
  xiongk 95f3bfce7a 镜像管理第四版 3 months ago
  xiongkai 8d18b6e08e Merge pull request 'V20250731' (#1232) from V20250731 into V20250605_adminIssue 4 months ago
  xiongkai ddbac7f955 Merge pull request 'V20250731' (#1227) from V20250731 into V20250605_adminIssue 4 months ago
  xiongk 3e4534eba9 Merge remote-tracking branch 'origin/V20250731' into V20250605_adminIssue 4 months ago
  xiongkai 87da108794 Merge pull request 'V20250605_1162Alarm' (#1217) from V20250605_1162Alarm into V20250605_adminIssue 4 months ago
  xiongk 9c515a3614 同步列表增加错误信息 4 months ago
  xiongk bbfc79511c 镜像管理 4 months ago
  xiongk fb2ef5d12f 镜像管理 4 months ago
  xiongk 5fbf280d26 镜像管理 4 months ago
  xiongk 70558d3895 镜像管理第三版 4 months ago
  xiongkai ba7fbad6be Merge pull request 'V20250731' (#1212) from V20250731 into V20250605_adminIssue 4 months ago
  xiongk a82493afa5 镜像管理第三版 4 months ago
  xiongkai 32efb62c11 Merge pull request 'V20250731' (#1203) from V20250731 into V20250605_adminIssue 5 months ago
37 changed files with 1308 additions and 485 deletions
Split View
  1. +66
    -0
      deploy/charts/grampus/templates/_helpers.tpl
  2. +0
    -1
      deploy/charts/grampus/templates/adminapiserver.yaml
  3. +61
    -0
      deploy/charts/grampus/templates/urchin-admin.yaml
  4. +4
    -0
      deploy/charts/grampus/values.yaml
  5. +2
    -0
      server/adapter/api/v1/adapter.proto
  6. +1
    -0
      server/adapter/internal/data/modelarts/model_arts.go
  7. +21
    -17
      server/adapter/internal/service/blsc/blsc.go
  8. +39
    -22
      server/adapter/internal/service/modelarts/model_arts.go
  9. +30
    -26
      server/adapter/internal/service/octopus/octopus.go
  10. +45
    -33
      server/adapter/internal/service/scow/scow.go
  11. +14
    -10
      server/adapter/internal/service/skyForm/skyForm.go
  12. +16
    -13
      server/adapter/internal/service/star_light/star_light.go
  13. +26
    -21
      server/adapter/internal/service/sugon/sugon.go
  14. +10
    -8
      server/adapter/internal/service/sugon_ai/sugon_ai.go
  15. +72
    -17
      server/adminapiserver/api/v1/image/image.proto
  16. +2
    -2
      server/adminapiserver/api/v1/user/user.proto
  17. +32
    -0
      server/adminapiserver/internal/service/image/image.go
  18. +66
    -16
      server/bizserver/api/v1/image/image.proto
  19. +2
    -2
      server/bizserver/api/v1/user/user.proto
  20. +20
    -12
      server/bizserver/internal/data/dao/image/image.go
  21. +18
    -22
      server/bizserver/internal/data/dao/image/model.go
  22. +101
    -0
      server/bizserver/internal/data/dao/image_synchronization/image_synchronization.go
  23. +39
    -0
      server/bizserver/internal/data/dao/image_synchronization/model.go
  24. +1
    -0
      server/bizserver/internal/data/dao/user_image/model.go
  25. +25
    -17
      server/bizserver/internal/data/data.go
  26. +338
    -176
      server/bizserver/internal/service/image/image.go
  27. +24
    -16
      server/bizserver/internal/service/otjob/ot_job.go
  28. +1
    -1
      server/bizserver/internal/service/resource/resource.go
  29. +56
    -1
      server/bizserver/internal/service/user/user.go
  30. +6
    -5
      server/common/consts/const.go
  31. +8
    -2
      server/common/errors/errorcode.go
  32. +7
    -2
      server/openapiserver/api/v1/image/image.proto
  33. +77
    -14
      server/openapiserver/internal/service/image/image.go
  34. +19
    -1
      server/openapiserver/internal/service/notebook/notebook.go
  35. +0
    -8
      server/openapiserver/internal/service/otjob/otjob.go
  36. +23
    -18
      server/ot-provider/pkg/provider/pod.go
  37. +36
    -2
      server/scheduler/pkg/plugins/image/image.go

+ 66
- 0
deploy/charts/grampus/templates/_helpers.tpl View File

@@ -354,6 +354,72 @@ app.kubernetes.io/part-of: {{ include "urchinserver.name" . }}
{{ include "urchinserver.core-labels" . }}
{{- end -}}



{{/******************urchin-admin-server******************/}}

{{- define "urchinadminserver.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" -}}
{{- end -}}


{{- define "urchinadminserver.fullname" -}}
{{- if .Values.fullnameOverride -}}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" -}}
{{- else -}}
{{- $name := default .Chart.Name .Values.nameOverride -}}
{{- if contains $name .Release.Name -}}
{{- printf "%s-urchinadmin" .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- else -}}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{- end -}}
{{- end -}}


{{- define "urchinadminserver.port" -}}
{{- printf "9001" -}}
{{- end -}}


{{- define "urchinadminserver.portName" -}}
{{- printf "urchinadminport" -}}
{{- end -}}


{{- define "urchinadminserver.type" -}}
{{- printf "ExternalName" -}}
{{- end -}}

{{- define "urchinadminserver.pathType" -}}
{{- printf "ImplementationSpecific" -}}
{{- end -}}


{{- define "urchinadminserver.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}}
{{- end -}}


{{- define "urchinadminserver.core-labels" -}}
helm.sh/chart: {{ include "urchinadminserver.chart" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end -}}

{{- define "urchinadminserver.select-labels" -}}
app.kubernetes.io/name: {{ include "urchinadminserver.name" . }}
app.kubernetes.io/instance: {{ include "urchinadminserver.fullname" . }}
app.kubernetes.io/part-of: {{ include "urchinadminserver.name" . }}
{{- end -}}

{{- define "urchinadminserver.labels" -}}
{{ include "urchinadminserver.core-labels" . }}
{{- end -}}


{{/******************shaoguan-server******************/}}

{{- define "shaoguanserver.name" -}}


+ 0
- 1
deploy/charts/grampus/templates/adminapiserver.yaml View File

@@ -44,7 +44,6 @@ spec:
serviceName: {{ template "adminapiserver.fullname" . }}
servicePort: {{ template "adminapiserver.port" . }}
path: {{ $ingressPath }}/(.*)

---

kind: ClusterRoleBinding


+ 61
- 0
deploy/charts/grampus/templates/urchin-admin.yaml View File

@@ -0,0 +1,61 @@
{{- $ingressPath := print "/adminurchin" -}}

apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ template "urchinadminserver.fullname" . }}

---

apiVersion: v1
kind: Service
metadata:
name: {{ template "urchinadminserver.fullname" . }}
labels:
{{ include "urchinadminserver.labels" . | indent 4 }}
spec:
ports:
- name: {{ template "urchinadminserver.portName" . }}
protocol: TCP
port: {{ template "urchinadminserver.port" . }}
targetPort: {{ .Values.adminapiserver.server.http.urchinServerPort | int }}

---

apiVersion: v1
kind: Endpoints
metadata:
name: {{ template "urchinadminserver.fullname" . }}
labels:
{{ include "urchinadminserver.labels" . | indent 4 }}
subsets:
- addresses:
- ip: {{ .Values.adminapiserver.server.http.urchinServerUrl }}
ports:
- port: {{ .Values.adminapiserver.server.http.urchinServerPort | int }}
protocol: TCP
name: {{ template "urchinadminserver.portName" . }}

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ template "urchinadminserver.fullname" . }}
labels:
app.kubernetes.io/name: {{ template "urchinadminserver.fullname" . }}
annotations:
kubernetes.io/ingress.class: "nginx"
kubernetes.io/ingress.allow-http: "true"
ingress.kubernetes.io/ssl-redirect: "false"
nginx.ingress.kubernetes.io/rewrite-target: /$1
spec:
rules:
- http:
paths:
- backend:
service:
name: {{ template "urchinadminserver.fullname" . }}
port:
number: {{ template "urchinadminserver.port" . }}
path: {{ $ingressPath }}/(.*)
pathType: {{ template "urchinadminserver.pathType" . }}

+ 4
- 0
deploy/charts/grampus/values.yaml View File

@@ -112,6 +112,10 @@ openapiserver:
# adminapiserver
adminapiserver:
replicaCount: 1
server:
http:
urchinServerUrl: "192.168.242.56"
urchinServerPort: 30002
image:
name: "adminapiserver"
# scheduler


+ 2
- 0
server/adapter/api/v1/adapter.proto View File

@@ -776,6 +776,8 @@ message CustomRegistrationImageReply{
int64 update_at = 18;
string visibility = 19;
string workspace_id = 20;
string error_code = 21;
string error_msg = 22;
}

message CreateEcsRequest{


+ 1
- 0
server/adapter/internal/data/modelarts/model_arts.go View File

@@ -139,6 +139,7 @@ func (m *ModelArts) CreateTrainJob(c *conf.Bootstrap, req *v1.CreateTrainJobRequ
}
}`)
// post request test test
log.Infof(context.TODO(), "job %s Request Body is:\n%s\n", req.Name, payload)
r, _ := http.NewRequest(method, url, payload)
r.Header.Add("content-type", "application/json")
r.Header.Add("X-Project-Id", c.Service.AiCenter.ModelArts.ProjectId)


+ 21
- 17
server/adapter/internal/service/blsc/blsc.go View File

@@ -378,27 +378,31 @@ func (o *blscAdapter) CreateNotebook(ctx context.Context, req *v1.CreateNotebook
}

//code mount
var codeInfo DataUrl
if err := json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
log.Errorf(ctx, "Unmarshal(%s) failed:%v", req.CodeInfo, err)
return nil, errors.Error(nil, v1.ErrorCode_JsonMarshalFailed)
}
codeInfo.ContainerPath = notebookLocalCodePath
if req.CodeInfo != "" {
var codeInfo DataUrl
if err := json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
log.Errorf(ctx, "Unmarshal(%s) failed:%v", req.CodeInfo, err)
return nil, errors.Error(nil, v1.ErrorCode_JsonMarshalFailed)
}
codeInfo.ContainerPath = notebookLocalCodePath

var envIsCodeNeedUnzip string
if codeInfo.IsNeedUnzip {
envIsCodeNeedUnzip = utils.SetEnv(constant.EnvCodeNeedUnzip, constant.NeedUnzip)
} else {
envIsCodeNeedUnzip = utils.SetEnv(constant.EnvCodeNeedUnzip, constant.UnNeedUnzip)
}
req.Command = envIsCodeNeedUnzip + req.Command
var envIsCodeNeedUnzip string
if codeInfo.IsNeedUnzip {
envIsCodeNeedUnzip = utils.SetEnv(constant.EnvCodeNeedUnzip, constant.NeedUnzip)
} else {
envIsCodeNeedUnzip = utils.SetEnv(constant.EnvCodeNeedUnzip, constant.UnNeedUnzip)
}
req.Command = envIsCodeNeedUnzip + req.Command

bucket, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts[o.volumeID+":/"+bucket+"/"+objectKey] = codeInfo.ContainerPath
if req.CodeUrl != "" {
bucket, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts[o.volumeID+":/"+bucket+"/"+objectKey] = codeInfo.ContainerPath
}
}

// output mount
objectKey = consts.RemoteOutputPrefix + req.GrampusJobName + consts.RemoteOutputSuffix
mounts[o.volumeID+":"+consts.RemoteOutputBucket+"/"+objectKey+":dir"] = notebookLocalOutputPath
outputObjectKey := consts.RemoteOutputPrefix + req.GrampusJobName + consts.RemoteOutputSuffix
mounts[o.volumeID+":"+consts.RemoteOutputBucket+"/"+outputObjectKey+":dir"] = notebookLocalOutputPath

//c2net sdk mount
mounts[o.volumeID+":/"+consts.RemoteOutputBucket+"/"+constant.C2NetSdkPathPrefix] = localSdkPath


+ 39
- 22
server/adapter/internal/service/modelarts/model_arts.go View File

@@ -357,6 +357,13 @@ type SaveNotebookResult struct {
Size int64 `json:"size"` //单位kB
}

type CustomRegistrationImageReq struct {
SwrPath string `json:"swr_path"`
Arch string `json:"arch"`
ResourceCategory []string `json:"resource_category"`
FlavorType string `json:"flavor_type"`
}

func NewModelArts(c *conf.Bootstrap, data *data.Data) (v1.AdapterService, error) {
return &modelArts{
conf: c,
@@ -611,16 +618,18 @@ func (s *modelArts) CreateNotebook(ctx context.Context, req *v1.CreateNotebookRe
}

// code
var codeInfo DataUrl
if err := json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
log.Errorf(ctx, "Unmarshal(%s) failed:%v", req.CodeInfo, err)
return nil, errors.Error(nil, v1.ErrorCode_JsonMarshalFailed)
}
if req.CodeInfo != "" {
var codeInfo DataUrl
if err := json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
log.Errorf(ctx, "Unmarshal(%s) failed:%v", req.CodeInfo, err)
return nil, errors.Error(nil, v1.ErrorCode_JsonMarshalFailed)
}

if codeInfo.IsNeedUnzip {
envVariables[constant.EnvCodeNeedUnzip] = constant.NeedUnzip
} else {
envVariables[constant.EnvCodeNeedUnzip] = constant.UnNeedUnzip
if codeInfo.IsNeedUnzip {
envVariables[constant.EnvCodeNeedUnzip] = constant.NeedUnzip
} else {
envVariables[constant.EnvCodeNeedUnzip] = constant.UnNeedUnzip
}
}

client := getHttpClient()
@@ -1732,6 +1741,7 @@ func (s *modelArts) Ping(ctx context.Context, req *v1.PingRequest) (*v1.PingRepl
}

func (s *modelArts) CustomRegistrationImage(ctx context.Context, req *v1.CustomRegistrationImageRequest) (*v1.CustomRegistrationImageReply, error) {
var reply v1.CustomRegistrationImageReply
accessKey := s.conf.Service.AiCenter.ModelArts.AccessKey
secretKey := s.conf.Service.AiCenter.ModelArts.SecretKey
signer := utils.Signer{
@@ -1739,13 +1749,22 @@ func (s *modelArts) CustomRegistrationImage(ctx context.Context, req *v1.CustomR
Secret: secretKey,
}
url := s.conf.Service.AiCenter.ModelArts.ModelartsUrl + s.conf.Service.AiCenter.ModelArts.ProjectId + "/images"
payload := strings.NewReader(`{
"swr_path": "` + req.SwrPath + `"
}`)
imageReq := CustomRegistrationImageReq{
SwrPath: req.SwrPath,
Arch: "AARCH64",
ResourceCategory: []string{"ASCEND"},
FlavorType: "ASCEND_SNT9",
}
reqBytes, err := json.Marshal(imageReq)
if err != nil {
return nil, fmt.Errorf("JSON 序列化失败: %v", err)
}

payload := strings.NewReader(string(reqBytes))
r, _ := http.NewRequest(http.MethodPost, url, payload)
r.Header.Add("content-type", "application/json")
r.Header.Add("X-Project-Id", s.conf.Service.AiCenter.ModelArts.ProjectId)
err := signer.Sign(r)
err = signer.Sign(r)
if err != nil {
return nil, errors.Error(err, v1.ErrorCode_InternalError)
}
@@ -1755,8 +1774,8 @@ func (s *modelArts) CustomRegistrationImage(ctx context.Context, req *v1.CustomR
client := &http.Client{Transport: tr}
resp, err := client.Do(r)
if err != nil {
log.Errorf(ctx, "/v1/%s/images error", s.conf.Service.AiCenter.ModelArts.ProjectId)
return nil, errors.Error(err, v1.ErrorCode_InternalError)
log.Errorf(ctx, "client.Do failed: %s", err.Error())
return &reply, fmt.Errorf("client.Do failed: %s", err.Error())
}

defer resp.Body.Close()
@@ -1764,16 +1783,14 @@ func (s *modelArts) CustomRegistrationImage(ctx context.Context, req *v1.CustomR
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
log.Info(ctx, "response:{}", string(body))
return nil, errors.Error(err, v1.ErrorCode_InternalError)
}

var reply v1.CustomRegistrationImageReply
err = json.Unmarshal(body, &reply)
if err != nil {
return nil, err
}

return &reply, err
// if resp.StatusCode != http.StatusOK {
// log.Errorf(ctx, "response: %s", string(body))
// return &reply, fmt.Errorf("custom registration image failed:%s", string(body))
// }
return &reply, nil
}

+ 30
- 26
server/adapter/internal/service/octopus/octopus.go View File

@@ -1056,35 +1056,39 @@ func (o *octopusAdapter) CreateNotebook(ctx context.Context, req *v1.CreateNoteb
}

// code mount
var codeInfo DataUrl
if err := json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
log.Errorf(ctx, "Unmarshal(%s) failed:%v", req.CodeInfo, err)
return nil, errors.Error(nil, v1.ErrorCode_JsonMarshalFailed)
}
if req.CodeInfo != "" {
var codeInfo DataUrl
if err := json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
log.Errorf(ctx, "Unmarshal(%s) failed:%v", req.CodeInfo, err)
return nil, errors.Error(nil, v1.ErrorCode_JsonMarshalFailed)
}

if !strings.HasPrefix(codeInfo.ContainerPath, notebookLocalCodePath) {
log.Errorf(ctx, "ContainerPath(%s) is not matched with local(%s)", codeInfo.ContainerPath, notebookLocalCodePath)
tmp := strings.Split(codeInfo.ContainerPath, "/")
codeInfo.ContainerPath = notebookLocalCodePath + "/" + tmp[len(tmp)-1]
}
if !strings.HasPrefix(codeInfo.ContainerPath, notebookLocalCodePath) {
log.Errorf(ctx, "ContainerPath(%s) is not matched with local(%s)", codeInfo.ContainerPath, notebookLocalCodePath)
tmp := strings.Split(codeInfo.ContainerPath, "/")
codeInfo.ContainerPath = notebookLocalCodePath + "/" + tmp[len(tmp)-1]
}

var envIsCodeNeedUnzip string
if codeInfo.IsNeedUnzip {
envIsCodeNeedUnzip = utils.SetEnv(constant.EnvCodeNeedUnzip, constant.NeedUnzip)
} else {
envIsCodeNeedUnzip = utils.SetEnv(constant.EnvCodeNeedUnzip, constant.UnNeedUnzip)
}
req.Command = envIsCodeNeedUnzip + req.Command
var envIsCodeNeedUnzip string
if codeInfo.IsNeedUnzip {
envIsCodeNeedUnzip = utils.SetEnv(constant.EnvCodeNeedUnzip, constant.NeedUnzip)
} else {
envIsCodeNeedUnzip = utils.SetEnv(constant.EnvCodeNeedUnzip, constant.UnNeedUnzip)
}
req.Command = envIsCodeNeedUnzip + req.Command

bucket, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts = append(mounts, Mounts{
ContainerPath: codeInfo.ContainerPath,
ReadOnly: codeInfo.ReadOnly,
Octopus: StorageInfo{
Bucket: bucket,
Object: objectKey,
},
})
if req.CodeUrl != "" {
bucket, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts = append(mounts, Mounts{
ContainerPath: codeInfo.ContainerPath,
ReadOnly: codeInfo.ReadOnly,
Octopus: StorageInfo{
Bucket: bucket,
Object: objectKey,
},
})
}
}

// output mount
mounts = append(mounts, Mounts{


+ 45
- 33
server/adapter/internal/service/scow/scow.go View File

@@ -6,6 +6,14 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net"
nethttp "net/http"
"path/filepath"
"strconv"
"strings"
"time"

v1 "git.openi.org.cn/OpenI/Grampus/server/adapter/api/v1"
"git.openi.org.cn/OpenI/Grampus/server/adapter/internal/common/constant"
"git.openi.org.cn/OpenI/Grampus/server/adapter/internal/common/errors"
@@ -16,13 +24,6 @@ import (
"git.openi.org.cn/OpenI/Grampus/server/common/log"
"github.com/gorilla/mux"
"gopkg.in/resty.v1"
"io"
"net"
nethttp "net/http"
"path/filepath"
"strconv"
"strings"
"time"
)

const (
@@ -77,6 +78,8 @@ type adapter struct {
httpClient *resty.Client
token string
tokenExpireTimestamp int64
lastAuthFailTime int64 // 最后一次 getToken 失败的时间
authFailCooldown int64 // 冷却时间,单位:秒,比如 60
username string
password string
serverUrl string
@@ -565,17 +568,22 @@ func (o *adapter) getToken(ctx context.Context, username, password, authUrl stri

func (o *adapter) refreshToken(ctx context.Context) error {
nowTime := time.Now().Unix()
if o.token == "" || o.tokenExpireTimestamp < nowTime {
token, err := o.getToken(ctx, o.username, o.password, o.tokenUrl)
if err != nil {
log.Errorf(ctx, "refreshToken get token failed, error:%v", err)
return err
}

o.token = token
o.tokenExpireTimestamp = nowTime + 3600*4
// 如果 token 仍有效,跳过
if o.token != "" && o.tokenExpireTimestamp > nowTime {
return nil
}

// 直接尝试刷新token(移除冷却期检查)
token, err := o.getToken(ctx, o.username, o.password, o.tokenUrl)
if err != nil {
log.Warnf(ctx, "getToken failed. error: %v", err)
return err
}

// 成功获取新token
o.token = token
o.tokenExpireTimestamp = nowTime + 3600*4
return nil
}

@@ -974,27 +982,31 @@ func (o *adapter) CreateNotebook(ctx context.Context, req *v1.CreateNotebookRequ
}

//code mount
var codeInfo CodeUrl
if err := json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
log.Errorf(ctx, "Unmarshal(%s) failed:%v", req.CodeInfo, err)
return nil, errors.Error(nil, v1.ErrorCode_JsonMarshalFailed)
}
codeInfo.ContainerPath = notebookLocalCodePath
if req.CodeInfo != "" {
var codeInfo CodeUrl
if err := json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
log.Errorf(ctx, "Unmarshal(%s) failed:%v", req.CodeInfo, err)
return nil, errors.Error(nil, v1.ErrorCode_JsonMarshalFailed)
}
codeInfo.ContainerPath = notebookLocalCodePath

var envIsCodeNeedUnzip string
if codeInfo.IsNeedUnzip {
envIsCodeNeedUnzip = utils.SetEnv(constant.EnvCodeNeedUnzip, constant.NeedUnzip)
} else {
envIsCodeNeedUnzip = utils.SetEnv(constant.EnvCodeNeedUnzip, constant.UnNeedUnzip)
}
req.Command = envIsCodeNeedUnzip + req.Command
var envIsCodeNeedUnzip string
if codeInfo.IsNeedUnzip {
envIsCodeNeedUnzip = utils.SetEnv(constant.EnvCodeNeedUnzip, constant.NeedUnzip)
} else {
envIsCodeNeedUnzip = utils.SetEnv(constant.EnvCodeNeedUnzip, constant.UnNeedUnzip)
}
req.Command = envIsCodeNeedUnzip + req.Command

bucket, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
objectDir := filepath.ToSlash(filepath.Dir(objectKey))
codePath := scowUserHome + "/" + bucket + "/" + objectDir
if req.CodeUrl != "" {
bucket, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
objectDir := filepath.ToSlash(filepath.Dir(objectKey))
codePath := scowUserHome + "/" + bucket + "/" + objectDir
mountPoints = append(mountPoints, codePath)
}

mountPoints = append(mountPoints, codePath)
containerMountPaths = append(containerMountPaths, codeInfo.ContainerPath)
containerMountPaths = append(containerMountPaths, codeInfo.ContainerPath)
}

// todo: output mount 需要等scow上线后调整
//outputMountPoint := scowUserHome + "/PCL/" + req.Name + consts.RemoteOutputSuffix


+ 14
- 10
server/adapter/internal/service/skyForm/skyForm.go View File

@@ -5,6 +5,10 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
nethttp "net/http"
"strconv"
"strings"

v1 "git.openi.org.cn/OpenI/Grampus/server/adapter/api/v1"
"git.openi.org.cn/OpenI/Grampus/server/adapter/internal/common/constant"
"git.openi.org.cn/OpenI/Grampus/server/adapter/internal/common/errors"
@@ -14,9 +18,6 @@ import (
"git.openi.org.cn/OpenI/Grampus/server/common/consts"
"git.openi.org.cn/OpenI/Grampus/server/common/log"
"gopkg.in/resty.v1"
nethttp "net/http"
"strconv"
"strings"
)

const (
@@ -270,7 +271,7 @@ func setEnv(key, value string) string {
return fmt.Sprintf("%s=%s\n", key, value)
}

//调用应用管理接口提交jupyter类型的作业
// 调用应用管理接口提交jupyter类型的作业
func (o *adapter) CreateNotebook(ctx context.Context, req *v1.CreateNotebookRequest) (*v1.CreateNotebookReply, error) {
if err := o.refreshToken(ctx); err != nil {
return nil, err
@@ -379,12 +380,15 @@ func (o *adapter) CreateNotebook(ctx context.Context, req *v1.CreateNotebookRequ
env += setEnv(constant.EnvCodeNeedUnzip, constant.UnNeedUnzip)
}

_, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts = append(mounts, MountInfo{
MountType: mountTypePath,
TargetPath: notebookTempLocalCodePath,
SourcePath: "/" + strings.TrimSuffix(objectKey, "master.zip"), // todo: 后续考虑优化
})
if req.CodeUrl != "" {
_, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts = append(mounts, MountInfo{
MountType: mountTypePath,
TargetPath: notebookTempLocalCodePath,
SourcePath: "/" + strings.TrimSuffix(objectKey, "master.zip"), // todo: 后续考虑优化
})
}

req.Command += "cp -rf " + notebookTempLocalCodePath + "/* " + notebookLocalCodePath + ";"
}



+ 16
- 13
server/adapter/internal/service/star_light/star_light.go View File

@@ -499,19 +499,22 @@ func (o *adapter) CreateNotebook(ctx context.Context, req *v1.CreateNotebookRequ
}

// code mount
var codeInfo DataUrl
if err := json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
log.Errorf(ctx, "Unmarshal(%s) failed:%v", req.CodeInfo, err)
return nil, errors.Error(nil, v1.ErrorCode_JsonMarshalFailed)
}

bucket, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts = append(mounts, Mounts{
MountPath: codeInfo.ContainerPath,
ReadOnly: codeInfo.ReadOnly,
HostPath: bucket + "/" + objectKey[:strings.LastIndex(objectKey, "/")],
Name: "code-vol",
})
if req.CodeInfo != "" {
var codeInfo DataUrl
if err := json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
log.Errorf(ctx, "Unmarshal(%s) failed:%v", req.CodeInfo, err)
return nil, errors.Error(nil, v1.ErrorCode_JsonMarshalFailed)
}
if req.CodeUrl != "" {
bucket, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts = append(mounts, Mounts{
MountPath: codeInfo.ContainerPath,
ReadOnly: codeInfo.ReadOnly,
HostPath: bucket + "/" + objectKey[:strings.LastIndex(objectKey, "/")],
Name: "code-vol",
})
}
}

cpus, _ := strconv.Atoi(req.ResourceSpec["cpuNumber"])
memory, _ := strconv.Atoi(req.ResourceSpec["memory"])


+ 26
- 21
server/adapter/internal/service/sugon/sugon.go View File

@@ -526,30 +526,35 @@ func (o *sugonAdapter) CreateNotebook(ctx context.Context, req *v1.CreateNoteboo
}

//code mount
var codeInfo DataUrl
if err := json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
log.Errorf(ctx, "Unmarshal(%s) failed:%v", req.CodeInfo, err)
return nil, errors.Error(nil, v1.ErrorCode_JsonMarshalFailed)
}
if req.CodeInfo != "" {
var codeInfo DataUrl
if err := json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
log.Errorf(ctx, "Unmarshal(%s) failed:%v", req.CodeInfo, err)
return nil, errors.Error(nil, v1.ErrorCode_JsonMarshalFailed)
}

if !strings.HasPrefix(codeInfo.ContainerPath, notebookLocalCodePath) {
log.Errorf(ctx, "ContainerPath(%s) is not matched with local(%s)", codeInfo.ContainerPath, notebookLocalCodePath)
//return nil, errors.Error(nil, v1.ErrorCode_InternalError)
}
if !strings.HasPrefix(codeInfo.ContainerPath, notebookLocalCodePath) {
log.Errorf(ctx, "ContainerPath(%s) is not matched with local(%s)", codeInfo.ContainerPath, notebookLocalCodePath)
//return nil, errors.Error(nil, v1.ErrorCode_InternalError)
}

if codeInfo.IsNeedUnzip {
env += setEnv(constant.EnvCodeNeedUnzip, constant.NeedUnzip)
} else {
env += setEnv(constant.EnvCodeNeedUnzip, constant.UnNeedUnzip)
}
if codeInfo.IsNeedUnzip {
env += setEnv(constant.EnvCodeNeedUnzip, constant.NeedUnzip)
} else {
env += setEnv(constant.EnvCodeNeedUnzip, constant.UnNeedUnzip)
}

_, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts = append(mounts, MountInfo{
MountType: mountTypePath,
TargetPath: notebookTempLocalCodePath,
SourcePath: "/" + strings.TrimSuffix(objectKey, "master.zip"), // todo: 后续考虑优化
})
req.Command += "cp -rf " + notebookTempLocalCodePath + "/* " + notebookLocalCodePath + ";"
if req.CodeUrl != "" {
_, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts = append(mounts, MountInfo{
MountType: mountTypePath,
TargetPath: notebookTempLocalCodePath,
SourcePath: "/" + strings.TrimSuffix(objectKey, "master.zip"), // todo: 后续考虑优化
})
}

req.Command += "cp -rf " + notebookTempLocalCodePath + "/* " + notebookLocalCodePath + ";"
}

//c2net sdk mount
mounts = append(mounts, MountInfo{


+ 10
- 8
server/adapter/internal/service/sugon_ai/sugon_ai.go View File

@@ -469,6 +469,7 @@ func (o *sugonAdapter) CreateNotebook(ctx context.Context, req *v1.CreateNoteboo

//remoteOutputPrefix := consts.RemoteOutputPrefix
//code mount
basePath := consts.SugonAIBucketPrefix
if req.CodeInfo != "" {
var codeInfo DataUrl
if err = json.Unmarshal([]byte(req.CodeInfo), &codeInfo); err != nil {
@@ -487,12 +488,15 @@ func (o *sugonAdapter) CreateNotebook(ctx context.Context, req *v1.CreateNoteboo
req.Command = utils.SetEnvProfile(constant.EnvCodeNeedUnzip, constant.UnNeedUnzip) + req.Command
}

_, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts = append(mounts, MountInfo{
Permission: permissionReadWrite,
TargetPath: notebookTempLocalCodePath,
SourcePath: "/" + strings.TrimSuffix(objectKey, "master.zip"), // todo: 后续考虑优化
})
if req.CodeUrl != "" {
_, objectKey := utils.GetS3BucketAndObjectKey(req.CodeUrl)
mounts = append(mounts, MountInfo{
Permission: permissionReadWrite,
TargetPath: notebookTempLocalCodePath,
SourcePath: "/" + strings.TrimSuffix(objectKey, "master.zip"), // todo: 后续考虑优化
})
basePath = getBasePath(req.CodeUrl)
}
req.Command += "cp -rf " + notebookTempLocalCodePath + "/* " + notebookLocalCodePath + ";"

//if strings.Contains(objectKey, "jobs/") {
@@ -500,8 +504,6 @@ func (o *sugonAdapter) CreateNotebook(ctx context.Context, req *v1.CreateNoteboo
//}
}

basePath := getBasePath(req.CodeUrl)

//c2net sdk mount
mounts = append(mounts, MountInfo{
Permission: permissionReadOnly,


+ 72
- 17
server/adminapiserver/api/v1/image/image.proto View File

@@ -40,6 +40,64 @@ service ImageService {
delete: "/adminapi/v1/image/delete"
};
};

rpc ListImagesLocalInfo(ImagesLocalInfoRequest) returns(ImagesLocalInfoReply){
option (google.api.http) = {
get: "/adminapi/v1/image/listLocalInfo"
};
};

rpc ListImagesSynchronize(ImagesSynchronizeRequest) returns(ImagesSynchronizeReply){
option (google.api.http) = {
get: "/adminapi/v1/image/listImageSynchronize"
};
};
}

message ImagesSynchronizeRequest{
int64 pageIndex = 1; // 页码
int64 pageSize = 2; // 页大小
string id = 3; // 镜像同步主键id
int32 srcType = 4; // 源镜像类型
string srcCenterId = 5; // 源镜像中心id
string imageName = 6; // 镜像名称
string imageUrl = 7; // 镜像地址
string processorType = 8;// 处理器类型
string trainType = 9; // 训练类型
string destDcCenterId = 10; // 目标数据中心id
int32 status = 11; // 1:同步中 2:同步成功 3:同步失败
}

message ImagesSynchronizeReply{
int64 totalSize = 1; //总数
repeated ImagesSynchronize imagesSynchronize = 2;
}

message ImagesSynchronize{
string id = 1; // 镜像同步主键id
int32 srcType = 2; // 源镜像类型
string srcCenterId = 3; // 源镜像中心id
string imageName = 4; // 镜像名称
string imageUrl = 5; // 镜像地址
string processorType = 6;// 处理器类型
string trainType = 7; // 训练类型
string destDcCenterId = 8; // 目标数据中心id
int32 status = 9; // 1:同步中 2:同步成功 3:同步失败
int64 createdAt = 10;
int64 updatedAt = 11;
string failedReason = 12; // 失败原因
string destImageTag =13; // 目的镜像地址
}


message ImagesLocalInfoRequest{

}

message ImagesLocalInfoReply {
repeated string images =1;
int64 num=2;
string duration =3;
}

message DeleteImageRequest {
@@ -78,8 +136,8 @@ message InsertImageReply {

message AiCenterImage {
string aiCenterId = 1;
string imageUrl = 2;
string imageId = 3;
string trainJobUrl = 2;
string notebookUrl = 3;
string accDeviceModel=4;
repeated string poolIds = 5;
string imageName = 6;
@@ -93,8 +151,6 @@ message Image {
string processorType = 5;
repeated AiCenterImage aiCenterImages = 6;
string trainType = 7;
int32 imageSyncStatus = 8;
string imageSyncType = 9;
}

message GetImageRequest {
@@ -112,9 +168,7 @@ message ListImageRequest {
// nvidia.com/gpu、npu.huawei.com/NPU
string processorType = 4;
string trainType = 5;
int32 imageSyncStatus = 6;
string imageSyncType = 7;
string aiCenterImage = 8;
string aiCenterImage = 6;
}

message ListImageReply {
@@ -123,17 +177,18 @@ message ListImageReply {
}

message createImagesSynstaskRequest{
string destDcName = 1; // 目标数据源名称
string srcName = 2; // 镜像源名称
int32 srcType = 3; // 镜像源类型
string imageTag = 4; // 镜像标签
string name =5; // 镜像名称
string processorType = 6; // 处理器类型
bool notebookType = 7; // 是否为调试任务 true为调试任务 false为训练任务
string notebookName = 8; // 调试任务名称
int32 srcType = 1; // 镜像源类型
string srcCenterId = 2; // 源中心
string imageName = 3; // 镜像名称
string imageUrl = 4; // 镜像标签
string processorType = 5; // 处理器类型
string trainType = 6; // 任务类型
string destDcCenterId =7; // 目的分中心
string id = 8;// 源镜像主键Id
string destNameSpace = 9; // 目标命名空间
}

message createImagesSynstaskReply{
string taskId = 1;
string destImageTag = 2;
string errCode = 1;
string errMsg = 2;
}

+ 2
- 2
server/adminapiserver/api/v1/user/user.proto View File

@@ -68,7 +68,7 @@ message ListUserReply {
message CreateUserRequest {
string fullName = 1 [(validate.rules).string = {min_len: 1, max_len: 30}];
string email = 2 [(validate.rules).string = {email: true}];
string password = 3 [(validate.rules).string = {min_len: 8, max_len: 30}];
string password = 3 ;
}

message CreateUserReply {
@@ -79,7 +79,7 @@ message UpdateUserRequest {
string id = 1 [(validate.rules).string = {min_len: 1, max_len: 36}];
string fullName = 2 [(validate.rules).string = {max_len: 30}];
string email = 3 [(validate.rules).string = {email: true, ignore_empty:true}];
string password = 4 [(validate.rules).string = {min_len: 8, max_len: 30, ignore_empty:true}];
string password = 4 ;
UserStatus status = 5 [(validate.rules).enum = {defined_only: true}];
}



+ 32
- 0
server/adminapiserver/internal/service/image/image.go View File

@@ -45,6 +45,24 @@ func (s *Service) ListImage(ctx context.Context, req *pb.ListImageRequest) (*pb.
return reply, nil
}

func (s *Service) ListImagesSynchronize(ctx context.Context, req *pb.ImagesSynchronizeRequest) (*pb.ImagesSynchronizeReply, error) {
innerReq := &innerpb.ImagesSynchronizeRequest{}
err := copier.Copy(innerReq, req)
if err != nil {
return nil, err
}
innerReply, err := s.data.ImageClient.ListImagesSynchronize(ctx, innerReq)
if err != nil {
return nil, err
}
reply := &pb.ImagesSynchronizeReply{}
err = copier.Copy(reply, innerReply)
if err != nil {
return nil, err
}
return reply, nil
}

func (s *Service) InsertImage(ctx context.Context, req *pb.InsertImageRequest) (*pb.InsertImageReply, error) {
innerReq := &innerpb.CreateImageRequest{}
err := copier.Copy(innerReq, req)
@@ -118,3 +136,17 @@ func (s *Service) CreateImagesSynstask(ctx context.Context, req *pb.CreateImages
}
return reply, nil
}

func (s *Service) ListImagesLocalInfo(ctx context.Context, req *pb.ImagesLocalInfoRequest) (*pb.ImagesLocalInfoReply, error) {
innerReq := &innerpb.ImagesLocalInfoRequest{}
innerReply, err := s.data.ImageClient.ListImagesLocalInfo(ctx, innerReq)
if err != nil {
return nil, err
}
reply := &pb.ImagesLocalInfoReply{}
err = copier.Copy(reply, innerReply)
if err != nil {
return nil, errors.Error(err, errors.ErrorStructCopy)
}
return reply, nil
}

+ 66
- 16
server/bizserver/api/v1/image/image.proto View File

@@ -30,6 +30,55 @@ service ImageService {
};
rpc createImagesSynstask(CreateImagesSynstaskRequest) returns(CreateImagesSynstaskReply);
rpc ListImagesLocalInfo(ImagesLocalInfoRequest) returns(ImagesLocalInfoReply);
rpc ListImagesSynchronize(ImagesSynchronizeRequest) returns(ImagesSynchronizeReply){
};

}

message ImagesSynchronizeRequest{
int64 pageIndex = 1; // 页码
int64 pageSize = 2; // 页大小
string id = 3; // 镜像同步主键id
int32 srcType = 4; // 源镜像类型
string srcCenterId = 5; // 源镜像中心id
string imageName = 6; // 镜像名称
string imageUrl = 7; // 镜像地址
string processorType = 8;// 处理器类型
string trainType = 9; // 训练类型
string destDcCenterId = 10; // 目标数据中心id
int32 status = 11; // 1:同步中 2:同步成功 3:同步失败
}

message ImagesSynchronizeReply{
int64 totalSize = 1; //总数
repeated ImagesSynchronize imagesSynchronize = 2;
}

message ImagesSynchronize{
string id = 1; // 镜像同步主键id
int32 srcType = 2; // 源镜像类型
string srcCenterId = 3; // 源镜像中心id
string imageName = 4; // 镜像名称
string imageUrl = 5; // 镜像地址
string processorType = 6;// 处理器类型
string trainType = 7; // 训练类型
string destDcCenterId = 8; // 目标数据中心id
int32 status = 9; // 1:同步中 2:同步成功 3:同步失败
int64 createdAt = 10;
int64 updatedAt = 11;
string failedReason = 12; // 失败原因
string destImageTag =13; // 目的镜像地址
}

message ImagesLocalInfoRequest{

}

message ImagesLocalInfoReply {
repeated string images =1;
int64 num=2;
string duration =3;
}

message DeleteImageRequest{
@@ -43,8 +92,8 @@ message DeleteImageReply{

message AiCenterImage {
string aiCenterId = 1;
string imageUrl = 2;
string imageId = 3;
string trainJobUrl = 2;
string notebookUrl = 3;
string accDeviceModel=4;
repeated string poolIds = 5;
string imageName = 6;
@@ -58,8 +107,6 @@ message Image {
repeated AiCenterImage aiCenterImages = 5;
string processorType = 6;
string trainType = 7;
int32 imageSyncStatus = 8;
string imageSyncType = 9;
}

message CreateImageRequest {
@@ -88,8 +135,6 @@ message ListImageRequest {
string searchKey = 3; //name等模糊查找
string processorType = 4;
string trainType = 5;
int32 imageSyncStatus = 6;
string imageSyncType = 7;
string aiCenterImage = 8;
}

@@ -127,6 +172,9 @@ message UserImage {
int64 updatedAt = 13;
string accDeviceModel = 14;
repeated string poolIds = 15;
string trainJobUrl = 16;
string notebookUrl = 17;
string trainType = 18;
}

message CreateUserImageRequest {
@@ -137,6 +185,7 @@ message CreateUserImageRequest {
string imageVersion =5;
string description = 6;
string imageId = 7;
string trainType = 8;
}

message CreateUserImageReply {
@@ -186,17 +235,18 @@ message DeleteUserImageReply {
}

message CreateImagesSynstaskRequest{
string destDcName = 1;
string srcName = 2;
int32 srcType = 3;
string imageTag = 4;
string name = 5;
string processorType = 6;
bool notebookType = 7; // 是否为调试任务 true为调试任务 false为训练任务
string notebookName = 8; // 调试任务名称
int32 srcType = 1; // 镜像源类型
string srcCenterId = 2; // 源中心
string imageName = 3; // 镜像名称
string imageUrl = 4; // 镜像标签
string processorType = 5; // 处理器类型
string trainType = 6; // 任务类型
string destDcCenterId =7; // 目的分中心
string id = 8;// 源镜像主键Id
string destNameSpace = 9; // 目标命名空间
}

message CreateImagesSynstaskReply{
string taskId = 1;
string destImageTag = 2;
string errCode = 1;
string errMsg = 2;
}

+ 2
- 2
server/bizserver/api/v1/user/user.proto View File

@@ -43,7 +43,7 @@ message UserItem {
message CreateUserRequest {
string fullName = 1 [(validate.rules).string = {min_len: 1, max_len: 30}];
string email = 2 [(validate.rules).string = {email: true}];
string password = 3 [(validate.rules).string = {min_len: 8, max_len: 30}];
string password = 3 ;
}

message CreateUserReply {
@@ -79,7 +79,7 @@ message UpdateUserRequest {
string id = 1 [(validate.rules).string = {min_len: 1, max_len: 36}];
string fullName = 2 [(validate.rules).string = {max_len: 30}];
string email = 3 [(validate.rules).string = {email: true, ignore_empty:true}];
string password = 4 [(validate.rules).string = {min_len: 8, max_len: 30, ignore_empty:true}];
string password = 4 ;
UserStatus status = 5 [(validate.rules).enum = {defined_only: true}];
}



+ 20
- 12
server/bizserver/internal/data/dao/image/image.go View File

@@ -6,9 +6,8 @@ import (
"fmt"
"time"

"git.openi.org.cn/OpenI/Grampus/server/common/utils"

"git.openi.org.cn/OpenI/Grampus/server/common/errors"
"git.openi.org.cn/OpenI/Grampus/server/common/utils"

"gorm.io/gorm"
)
@@ -22,6 +21,7 @@ type Dao interface {
GetImage(ctx context.Context, id string) (*Image, error)
ListImage(ctx context.Context, query *ImageQuery) ([]*Image, int64, error)
DeleteImage(ctx context.Context, id string) error
CheckNameExists(ctx context.Context, name string, excludeId string) (bool, error)
}

func New(db *gorm.DB) Dao {
@@ -114,16 +114,8 @@ func (d *dao) ListImage(ctx context.Context, query *ImageQuery) ([]*Image, int64
}

if query.TrainType != "" {
querySql += " and train_type = ? "
params = append(params, query.TrainType)
}
if query.ImageSyncStatus != 0 {
querySql += " and image_sync_status = ? "
params = append(params, query.ImageSyncStatus)
}
if query.ImageSyncType != "" {
querySql += " and image_sync_type = ? "
params = append(params, query.ImageSyncType)
querySql += " and train_type like ? "
params = append(params, "%"+query.TrainType+"%")
}
if query.AiCenterImage != "" {
querySql += " and ai_center_images like ?"
@@ -173,3 +165,19 @@ func (d *dao) DeleteImage(ctx context.Context, id string) error {
}
return nil
}

func (d *dao) CheckNameExists(ctx context.Context, name string, excludeId string) (bool, error) {
var count int64
var err error
db := d.db
rs := &Image{}
if excludeId == "" {
err = db.Model(rs).Where("name=?", name).Count(&count).Error
} else {
err = db.Model(rs).Where("name=? and id !=?", name, excludeId).Count(&count).Error
}
if err != nil {
return false, fmt.Errorf("查询失败: %v", err)
}
return count > 0, nil
}

+ 18
- 22
server/bizserver/internal/data/dao/image/model.go View File

@@ -10,13 +10,11 @@ import (

type Image struct {
commdao.Model
Id string `gorm:"primaryKey;type:varchar(100);not null;comment:Id"`
Name string `gorm:"type:varchar(100);not null;default:'';comment:名称"`
AiCenterImages `gorm:"type:json;comment:智算中心镜像"`
ProcessorType string `gorm:"type:varchar(100);comment:处理器类型"`
TrainType string `gorm:"type:varchar(100);comment:训练类型:TrainJob, Notebook;default:'TrainJob'"`
ImageSyncStatus int8 `gorm:"type:tinyint(1);default:;comment:镜像同步状态:1:同步中,2:失败,3:成功"`
ImageSyncType string `gorm:"type:varchar(100);default:'';comment:镜像同步类型"`
Id string `gorm:"primaryKey;type:varchar(100);not null;comment:Id"`
Name string `gorm:"type:varchar(100);not null;default:'';comment:名称"`
AiCenterImages `gorm:"type:json;comment:智算中心镜像"`
ProcessorType string `gorm:"type:varchar(100);comment:处理器类型"`
TrainType string `gorm:"type:varchar(100);comment:训练类型:TrainJob, Notebook;default:'TrainJob'"`
}

func (Image) TableName() string {
@@ -27,9 +25,9 @@ type AiCenterImages []AiCenterImage

type AiCenterImage struct {
AiCenterId string `json:"aiCenterId,omitempty"`
ImageUrl string `json:"imageUrl,omitempty"`
TrainJobUrl string `json:"trainJobUrl,omitempty"`
ImageName string `json:"imageName,omitempty"`
ImageId string `json:"imageId,omitempty"`
NotebookUrl string `json:"notebookUrl,omitempty"`
PoolIds []string `json:"poolIds,omitempty"`
AccDeviceModel string `json:"accDeviceModel,omitempty" `
}
@@ -48,17 +46,15 @@ func (r *AiCenterImages) Scan(input interface{}) error {
}

type ImageQuery struct {
PageIndex int
PageSize int
SortBy string
OrderBy string
CreatedAtGte int64
CreatedAtLt int64
SearchKey string
Name string
ProcessorType string
TrainType string
ImageSyncType string
ImageSyncStatus int8
AiCenterImage string
PageIndex int
PageSize int
SortBy string
OrderBy string
CreatedAtGte int64
CreatedAtLt int64
SearchKey string
Name string
ProcessorType string
TrainType string
AiCenterImage string
}

+ 101
- 0
server/bizserver/internal/data/dao/image_synchronization/image_synchronization.go View File

@@ -0,0 +1,101 @@
package image_synchronization

import (
"context"
"fmt"

"git.openi.org.cn/OpenI/Grampus/server/common/errors"
"gorm.io/gorm"
)

type Dao interface {
CreateImageSynchronization(ctx context.Context, imageSynchronization *ImageSynchronization) error
ListImageSynchronization(ctx context.Context, query *ImageSynchronizationQuery) ([]*ImageSynchronization, int64, error)
UpdateImageSynchronization(ctx context.Context, imageSynchronization *ImageSynchronization) error
}

func New(db *gorm.DB) Dao {
return &dao{
db: db,
}
}

type dao struct {
db *gorm.DB
}

func (d *dao) CreateImageSynchronization(ctx context.Context, imageSynchronization *ImageSynchronization) error {
res := d.db.Create(imageSynchronization)
if res.Error != nil {
return errors.Error(res.Error, errors.ErrorDBCreateFailed)
}
return nil
}

func (d *dao) UpdateImageSynchronization(ctx context.Context, imageSynchronization *ImageSynchronization) error {
res := d.db.Where("id = ?", imageSynchronization.Id).Updates(imageSynchronization)
if res.Error != nil {
return errors.Error(res.Error, errors.ErrorDBUpdateFailed)
}
return nil
}

func (d *dao) ListImageSynchronization(ctx context.Context, query *ImageSynchronizationQuery) ([]*ImageSynchronization, int64, error) {
db := d.db
imageSynchronization := make([]*ImageSynchronization, 0)

querySql := "1 = 1"
params := make([]interface{}, 0)

if query.SrcType != 0 {
querySql += " and src_type = ?"
params = append(params, query.SrcType)
}
if query.SrcCenterId != "" {
querySql += " and src_center_id = ?"
params = append(params, query.SrcCenterId)
}
if query.ImageName != "" {
querySql += " and image_name like ?"
params = append(params, "%"+query.ImageName+"%")
}
if query.ImageUrl != "" {
querySql += " and image_url like ?"
params = append(params, "%"+query.ImageUrl+"%")
}
if query.ProcessorType != "" {
querySql += " and processor_type = ?"
params = append(params, query.ProcessorType)
}
if query.TrainType != "" {
querySql += " and train_type = ?"
params = append(params, query.TrainType)
}

if query.DestDcCenterId != "" {
querySql += " and dest_dc_center_id = ? "
params = append(params, query.DestDcCenterId)
}
if query.Status != 0 {
querySql += " and status = ? "
params = append(params, query.Status)
}
db = db.Where(querySql, params...).Order(fmt.Sprintf("%s desc", "created_at"))

var totalSize int64
res := db.Model(&ImageSynchronization{}).Count(&totalSize)
if res.Error != nil {
return nil, 0, errors.Error(res.Error, errors.ErrorDBCountFailed)
}
if query.PageIndex != 0 {
db = db.Limit(query.PageSize).Offset((query.PageIndex - 1) * query.PageSize)
}

res = db.Find(&imageSynchronization)
if res.Error != nil {
return nil, 0, errors.Error(res.Error, errors.ErrorDBFindFailed)
}

return imageSynchronization, totalSize, nil

}

+ 39
- 0
server/bizserver/internal/data/dao/image_synchronization/model.go View File

@@ -0,0 +1,39 @@
package image_synchronization

import (
commdao "git.openi.org.cn/OpenI/Grampus/server/common/dao"
)

type ImageSynchronization struct {
commdao.Model
Id string `gorm:"type:varchar(100);not null;primaryKey;comment:'同步任务ID'"`
SrcType int32 `gorm:"type:tinyint;comment:镜像类型:1 内部分中心镜像, 2 外部公开镜像 3 本机镜像"`
SrcCenterId string `gorm:"type:varchar(100);comment:源中心"`
ImageName string `gorm:"type:varchar(100);comment:镜像名称"`
ImageUrl string `gorm:"type:varchar(1000);comment:镜像地址"`
ImageId string `gorm:"type:varchar(100);comment:镜像主键ID"`
ProcessorType string `gorm:"type:varchar(100);comment:处理器类型"`
TrainType string `gorm:"type:varchar(100);comment:训练类型:TrainJob, Notebook;default:'TrainJob'"`
DestDcCenterId string `gorm:"type:varchar(100);comment:目的分中心"`
Status int8 `gorm:"type:tinyint;comment:状态 1 同步中 2 失败 3 成功"`
DestImageTag string `gorm:"type:varchar(1000);comment:目的镜像地址"`
TaskId string `gorm:"type:varchar(100);comment:任务ID"`
FailedReason string `gorm:"type:varchar(1000);comment:失败原因"`
}

func (ImageSynchronization) TableName() string {
return "image_synchronization"
}

type ImageSynchronizationQuery struct {
PageIndex int
PageSize int
SrcType int32
SrcCenterId string
ImageName string
ImageUrl string
ProcessorType string
TrainType string
DestDcCenterId string
Status int8
}

+ 1
- 0
server/bizserver/internal/data/dao/user_image/model.go View File

@@ -16,6 +16,7 @@ type UserImage struct {
ImageVersion string `gorm:"type:varchar(100);default:'';comment:镜像版本"`
AiCenterId string `gorm:"type:varchar(100);not null;default:'';comment:智算中心ID"`
ProcessorType string `gorm:"type:varchar(100);comment:处理器类型"`
TrainType string `gorm:"type:varchar(100);comment:镜像使用类型"`
DebugJobId string `gorm:"type:varchar(100);comment:调试任务ID"`
Description string `gorm:"type:varchar(100);comment:镜像描述"`
Status int32 `gorm:"comment:镜像状态"`


+ 25
- 17
server/bizserver/internal/data/data.go View File

@@ -10,6 +10,7 @@ import (
"git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/config"
"git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/data_schedule_record"
"git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/image"
"git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/image_synchronization"
"git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/model"
"git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/otjob"
"git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/otjob_flow"
@@ -27,23 +28,24 @@ import (
)

type Data struct {
AiCenterDao aicenter.Dao
UserDao user.Dao
OtJobDao otjob.Dao
ResourceDao resource.Dao
ImageDao image.Dao
UserImageDao user_image.Dao
Cluster cluster.Cluster
Adapter adapter.Adapter
AdminUserDao dao.AdminUserDao
ConfigDao config.Dao
CardDetailUsedDao card_detail_used.Dao
DataScheduleRecordDao data_schedule_record.Dao
OtJobScheduleRecordDao otjob_schedule_record.Dao
ResourcePoolDao resource_pool.Dao
ServiceModelDao service_model.Dao
CardConfigDao card_config.Dao
OtJobFlowDao otjob_flow.Dao
AiCenterDao aicenter.Dao
UserDao user.Dao
OtJobDao otjob.Dao
ResourceDao resource.Dao
ImageDao image.Dao
UserImageDao user_image.Dao
Cluster cluster.Cluster
Adapter adapter.Adapter
AdminUserDao dao.AdminUserDao
ConfigDao config.Dao
CardDetailUsedDao card_detail_used.Dao
DataScheduleRecordDao data_schedule_record.Dao
OtJobScheduleRecordDao otjob_schedule_record.Dao
ResourcePoolDao resource_pool.Dao
ServiceModelDao service_model.Dao
CardConfigDao card_config.Dao
OtJobFlowDao otjob_flow.Dao
ImageSynchronizationDao image_synchronization.Dao
}

func NewData(c *conf.Bootstrap) (*Data, func(), error) {
@@ -71,6 +73,7 @@ func NewData(c *conf.Bootstrap) (*Data, func(), error) {
d.ServiceModelDao = service_model.New(db)
d.CardConfigDao = card_config.New(db)
d.OtJobFlowDao = otjob_flow.New(db)
d.ImageSynchronizationDao = image_synchronization.New(db)

return d, func() {
clusterCancel()
@@ -161,5 +164,10 @@ func dbInit(confData *conf.Data) (*gorm.DB, error) {
return nil, err
}

err = db.AutoMigrate(&image_synchronization.ImageSynchronization{})
if err != nil {
return nil, err
}

return db, err
}

+ 338
- 176
server/bizserver/internal/service/image/image.go View File

@@ -5,14 +5,15 @@ import (
"encoding/json"
"fmt"
"net/http"
"regexp"
"strings"
"time"

pbAdapter "git.openi.org.cn/OpenI/Grampus/server/adapter/api/v1"
pb "git.openi.org.cn/OpenI/Grampus/server/bizserver/api/v1/image"
"git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/conf"
"git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data"
dao "git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/image"
imageSynDao "git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/image_synchronization"
userDao "git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/user_image"
consts "git.openi.org.cn/OpenI/Grampus/server/common/consts"
"git.openi.org.cn/OpenI/Grampus/server/common/errors"
@@ -21,14 +22,15 @@ import (
"gopkg.in/resty.v1"

"github.com/jinzhu/copier"
"github.com/robfig/cron"
)

const (
StatusfailOrTimeout = iota //0:已执行失败或超时
StatusSucceeded = iota //1:已成功执行完成
StatusWaiting = iota //2:进行中
StatusWaitingLine = iota //3:排队等待中
StatusStopped = iota //4:被停止(服务重启或用户手动)
StatusfailOrTimeout = 0 //0:已执行失败或超时
StatusSucceeded = 1 //1:已成功执行完成
StatusWaiting = 2 //2:进行中
StatusWaitingLine = 3 //3:排队等待中
StatusStopped = 4 //4:被停止(服务重启或用户手动)
)

const (
@@ -40,6 +42,7 @@ const (
const (
ImagesSynctasksCreateURL = "/images/synctasks/create" //创建异步镜像同步任务
ImagesSynctasksStatusURL = "/images/synctasks/status" //查询镜像同步任务状态
ImagesLocalInfoURL = "/images/local/info" //查询本地镜像信息
)

type service struct {
@@ -52,16 +55,18 @@ type service struct {
type ImageSysctaskResp struct {
DestImageTag string `json:"destImageTag"`
TaskId string `json:"taskId"`
Existed bool `json:"existed"`
}

type ImageSysctaskReq struct {
DestDcName string `json:"destDcName"` //目的中心地址
SrcName string `json:"srcName"` //源镜像名称
ImageTag string `json:"imageTag"` //镜像标签
SrcType int8 `json:"srcType"` //镜像源类型 1:镜像源为某内部分中心的镜像仓库 2:镜像源为某外部公开的镜像仓库 3:镜像源为服务部署的本机Docker存储
Timeout int64 `json:"timeout"` //任务超时时间,单位为秒,默认值为0表示不超时
RetryTimes int8 `json:"retryTimes"` //任务超时重试次数,默认值为0表示不重试
Inspect bool `json:"inspect"` //是否在创建任务前检测目标镜像标签是否已存在于目标分中心的镜像仓库
DestDcName string `json:"destDcName"` //目的中心地址
SrcName string `json:"srcName"` //源镜像名称
ImageTag string `json:"imageTag"` //镜像标签
SrcType int32 `json:"srcType"` //镜像源类型 1:镜像源为某内部分中心的镜像仓库 2:镜像源为某外部公开的镜像仓库 3:镜像源为服务部署的本机Docker存储
Timeout int64 `json:"timeout"` //任务超时时间,单位为秒,默认值为0表示不超时
RetryTimes int8 `json:"retryTimes"` //任务超时重试次数,默认值为0表示不重试
Inspect bool `json:"inspect"` //是否在创建任务前检测目标镜像标签是否已存在于目标分中心的镜像仓库
DestNameSpace string `json:"destNameSpace"` //目标分中心镜像仓库命名空间(组织名、用户名等)
}

type ImageSynctasksStatusResp struct {
@@ -73,13 +78,24 @@ type ImageSynctasksStatusResp struct {
type ImageStatusReq struct {
TaskId string `json:"taskId"`
}
type ImagesLocalInfoResp struct {
Images []*string `json:"images"`
Num int64 `json:"num"`
Duration string `json:"duration"`
}

func New(conf *conf.Bootstrap, data *data.Data) pb.ImageServiceServer {
return &service{
s := &service{
conf: conf,
data: data,
httpClient: resty.New(),
}
cr := cron.New()
cr.AddFunc("@every 1m", func() {
go s.trackImageSyncstatusAndRegisterImage()
})
cr.Start()
return s
}

type ErrorReply struct {
@@ -87,28 +103,6 @@ type ErrorReply struct {
ErrorMsg string `json:"errorMsg"`
}

func parseBody(ctx context.Context, reply *json.RawMessage, body interface{}) error {
errorReply := &ErrorReply{}
err := json.Unmarshal(*reply, errorReply)
if err == nil {
if errorReply.ErrorCode != 0 {
log.Info(ctx, fmt.Sprintf("error when parsebody, code: %v, msg: %v", errorReply.ErrorCode, errorReply.ErrorMsg))
return errors.Error(fmt.Errorf(errorReply.ErrorMsg), errors.ErrorProvierRequestFailed)
}
} else {
log.Errorf(ctx, "json.Unmarshal(%s) failed:%v", string(*reply), err)
return errors.Error(nil, errors.ErrorJsonUnmarshal)
}
if body != nil {
err := json.Unmarshal(*reply, body)
if err != nil {
log.Errorf(ctx, "reply(%s), error:%v", string(*reply), err)
return errors.Error(nil, errors.ErrorJsonUnmarshal)
}
}
return nil
}

func (s *service) ListUserImage(ctx context.Context, req *pb.ListUserImageRequest) (*pb.ListUserImageReply, error) {
query := &userDao.UserImageQuery{}
err := copier.Copy(query, req)
@@ -210,7 +204,6 @@ func (s *service) GetImage(ctx context.Context, req *pb.GetImageRequest) (*pb.Ge
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
}

return &pb.GetImageReply{Image: rs}, nil
}

@@ -226,7 +219,9 @@ func (s *service) GetUserImage(ctx context.Context, req *pb.GetUserImageRequest)
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
}

if rsTbl.Limit != nil {
rs.AccDeviceModel = rsTbl.Limit.AccDeviceModel
}
return &pb.GetUserImageReply{UserImage: rs}, nil
}

@@ -240,9 +235,20 @@ func (s *service) DeleteUserImage(ctx context.Context, req *pb.DeleteUserImageRe
}

func (s *service) CreateImage(ctx context.Context, req *pb.CreateImageRequest) (*pb.CreateImageReply, error) {
if validateImageName(req.Name) {
return nil, errors.Error(nil, errors.ErrorImageNameNotValidate)
}
exist, err := s.data.ImageDao.CheckNameExists(ctx, req.Name, "")
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
}
if exist {
return nil, errors.Error(nil, errors.ErrorImageNameExist)
}

id := utils.GetUUIDWithoutSeparator()
rs := &dao.Image{}
err := copier.Copy(rs, req)
err = copier.Copy(rs, req)
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
}
@@ -255,6 +261,23 @@ func (s *service) CreateImage(ctx context.Context, req *pb.CreateImageRequest) (
return &pb.CreateImageReply{}, nil
}

// 输入字母、数字、_-.和:,最长100个字符,且以字母开头
func validateImageName(imageName string) bool {
var imageNameRegex = regexp.MustCompile(`^[a-zA-Z][a-zA-Z0-9_.:-]{0,99}$`)
if strings.TrimSpace(imageName) == "" {
return true
}

if len(imageName) > 100 {
return true
}

if !imageNameRegex.MatchString(imageName) {
return true
}
return false
}

func (s *service) CreateUserImage(ctx context.Context, req *pb.CreateUserImageRequest) (*pb.CreateUserImageReply, error) {
id := utils.GetUUIDWithoutSeparator()
rs := &userDao.UserImage{}
@@ -278,9 +301,10 @@ func (s *service) CreateUserImage(ctx context.Context, req *pb.CreateUserImageRe
if err != nil {
return nil, errors.Error(err, errors.ErrorDBFindEmpty)
}

limit.AccDeviceModel = userImage.Limit.AccDeviceModel
limit.PoolIds = userImage.Limit.PoolIds
if userImage.Limit != nil {
limit.AccDeviceModel = userImage.Limit.AccDeviceModel
limit.PoolIds = userImage.Limit.PoolIds
}
} else {
for _, imageInfo := range image.AiCenterImages {
if imageInfo.AiCenterId == rs.AiCenterId {
@@ -301,8 +325,18 @@ func (s *service) CreateUserImage(ctx context.Context, req *pb.CreateUserImageRe
}

func (s *service) UpdateImage(ctx context.Context, req *pb.UpdateImageRequest) (*pb.UpdateImageReply, error) {
if validateImageName(req.Name) {
return nil, errors.Error(nil, errors.ErrorImageNameNotValidate)
}
exist, err := s.data.ImageDao.CheckNameExists(ctx, req.Name, req.Id)
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
}
if exist {
return nil, errors.Error(nil, errors.ErrorImageNameExist)
}
rs := &dao.Image{}
err := copier.Copy(rs, req)
err = copier.Copy(rs, req)
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
}
@@ -345,51 +379,103 @@ func (s *service) UpdateUserImage(ctx context.Context, req *pb.UpdateUserImageRe
}
}

func (s *service) CreateImagesSynstask(ctx context.Context, req *pb.CreateImagesSynstaskRequest) (*pb.CreateImagesSynstaskReply, error) {
//暂时就支持NPU格式的处理器
if !strings.HasSuffix(req.ProcessorType, consts.ProcessTypeNPU) {
return nil, errors.Error(nil, errors.ErrorNPUFormat)
}

//名称和任务类型不能重复
_, err := checkNameExist(ctx, s, req.Name, consts.JobTypeTrainJob)
func (s *service) ListImagesSynchronize(ctx context.Context, req *pb.ImagesSynchronizeRequest) (*pb.ImagesSynchronizeReply, error) {
innerReq := imageSynDao.ImageSynchronizationQuery{}
err := copier.Copy(&innerReq, req)
if err != nil {
return nil, err
}
var trainJobId, notebookJobId string
trainJobId, err = insertImage(ctx, s, req.ProcessorType, req.Name, consts.JobTypeTrainJob)
innerReply, totalSize, err := s.data.ImageSynchronizationDao.ListImageSynchronization(ctx, &innerReq)
if err != nil {
return nil, err
}
if req.NotebookType {
_, err := checkNameExist(ctx, s, req.Name, consts.JobTypeNotebook)
if err != nil {
return nil, err
ImagesSynchronizeReply := make([]*pb.ImagesSynchronize, len(innerReply))
for idx, item := range innerReply {
imem := &pb.ImagesSynchronize{
Id: item.Id,
DestDcCenterId: item.DestDcCenterId,
ImageName: item.ImageName,
ImageUrl: item.ImageUrl,
ProcessorType: item.ProcessorType,
SrcCenterId: item.SrcCenterId,
SrcType: item.SrcType,
Status: int32(item.Status),
TrainType: item.TrainType,
CreatedAt: item.CreatedAt.Unix(),
UpdatedAt: item.UpdatedAt.Unix(),
FailedReason: item.FailedReason,
DestImageTag: item.DestImageTag,
}
ImagesSynchronizeReply[idx] = imem
}

return &pb.ImagesSynchronizeReply{
ImagesSynchronize: ImagesSynchronizeReply,
TotalSize: totalSize,
}, nil
}

func (s *service) CreateImagesSynstask(ctx context.Context, req *pb.CreateImagesSynstaskRequest) (*pb.CreateImagesSynstaskReply, error) {
//暂时就支持NPU格式的处理器
if !strings.HasSuffix(req.ProcessorType, consts.ProcessTypeNPU) {
return nil, errors.Error(nil, errors.ErrorNPUFormat)
}
if req.SrcType != 1 {
if validateImageName(req.ImageName) {
return nil, errors.Error(nil, errors.ErrorImageNameNotValidate)
}
notebookJobId, err = insertImage(ctx, s, req.ProcessorType, req.NotebookName, consts.JobTypeNotebook)
//名称不能重复
exist, err := s.data.ImageDao.CheckNameExists(ctx, req.ImageName, "")
if err != nil {
return nil, err
return nil, errors.Error(err, errors.ErrorInternalError)
}
if exist {
return nil, errors.Error(nil, errors.ErrorImageNameExist)
}
}
//调用dolphin服务的接口创建镜像同步任务
createImagesSynstaskReply, err := callImagesSynctaskCreate(ctx, s, req)
if err != nil {
log.Errorf(ctx, "callImagesSynctaskCreate failed:%v", err)
return nil, err
}

//调用镜像同步接口
imageSysctaskReq := ImageSysctaskReq{}
err = copier.Copy(&imageSysctaskReq, req)
//创建一条同步记录
err = createImageSynchronization(ctx, s, req, createImagesSynstaskReply)
if err != nil {
return nil, errors.Error(err, errors.ErrorInternalError)
return nil, errors.Error(err, errors.ErrorDBCreateFailed)
}
return &pb.CreateImagesSynstaskReply{}, err
}

func callImagesSynctaskCreate(ctx context.Context, s *service, req *pb.CreateImagesSynstaskRequest) (*ImageSysctaskResp, error) {
imageSysctaskReq := ImageSysctaskReq{
DestDcName: req.DestDcCenterId,
SrcType: req.SrcType,
DestNameSpace: req.DestNameSpace,
RetryTimes: 1,
Timeout: 36000,
}
if req.SrcType == 1 {
imageSysctaskReq.SrcName = req.SrcCenterId
imageSysctaskReq.ImageTag = req.ImageUrl
}
if req.SrcType == 2 {
var part1, part2 = splitByLastSlash(req.ImageUrl)
imageSysctaskReq.SrcName = part1
imageSysctaskReq.ImageTag = part2
}
if imageSysctaskReq.SrcType == 1 || imageSysctaskReq.SrcType == 3 {
imageSysctaskReq.Inspect = true
if req.SrcType == 3 {
imageSysctaskReq.ImageTag = req.ImageUrl
}
imageSysctaskReq.RetryTimes = 1
imageSysctaskReq.Timeout = 3600

createUrl := s.conf.Dolphin.ImagesSynctasksAddr + ImagesSynctasksCreateURL
temp := &json.RawMessage{}
reply := &pb.CreateImagesSynstaskReply{}
reply := &ImageSysctaskResp{}
resp, err := s.httpClient.R().
SetHeader("Content-Type", "application/json").
SetHeader("Accept", "application/json").
SetResult(temp).
SetResult(reply).
SetBody(imageSysctaskReq).
Post(createUrl)
if err != nil {
@@ -397,159 +483,235 @@ func (s *service) CreateImagesSynstask(ctx context.Context, req *pb.CreateImages
return nil, err
}
if resp.StatusCode() != http.StatusOK {
updateImageSyncStatus(ctx, s, trainJobId, ImageSyncStatusfailed)
if req.NotebookType {
updateImageSyncStatus(ctx, s, notebookJobId, ImageSyncStatusfailed)
}
log.Errorf(ctx, "images/synctasks/create failed(%d):%s", resp.StatusCode(), string(resp.Body()))
return nil, errors.Error(err, errors.ErrorCallImagesSynctasksFailed)
}
parseBody(ctx, temp, reply)

//注册自定义镜像
go func(ctx context.Context, taskId string) {
GetImagesSynstaskStatus(ctx, s, reply, req, trainJobId, notebookJobId)
}(ctx, reply.TaskId)
return reply, err
}

func GetImagesSynstaskStatus(ctx context.Context, s *service, replyCreateImagesSynstask *pb.CreateImagesSynstaskReply, req *pb.CreateImagesSynstaskRequest, trainJobId string, notebookJobId string) error {
statusUrl := s.conf.Dolphin.ImagesSynctasksAddr + ImagesSynctasksStatusURL
imageStatusReq := ImageStatusReq{
TaskId: replyCreateImagesSynstask.TaskId,
func (s *service) ListImagesLocalInfo(ctx context.Context, req *pb.ImagesLocalInfoRequest) (*pb.ImagesLocalInfoReply, error) {
reply := &pb.ImagesLocalInfoReply{}
url := s.conf.Dolphin.ImagesSynctasksAddr + ImagesLocalInfoURL
resp, err := s.httpClient.R().
SetHeader("Content-Type", "application/json").
SetResult(reply).
SetBody(strings.NewReader("{}")).
Get(url)
if err != nil {
log.Errorf(ctx, "/images/local/info http request failed, error:%v", err)
return nil, fmt.Errorf("/images/local/info http request failed, error:%v", err)
}
tempReq, err := json.Marshal(imageStatusReq)
if resp.StatusCode() != http.StatusOK {
respBody := resp.String()
log.Errorf(ctx, "/images/local/info http request failed:(%d): %s", resp.StatusCode(), respBody)
return nil, fmt.Errorf("/images/local/info http request failed:(%d): %s", resp.StatusCode(), respBody)
}
return reply, nil
}

func (s *service) trackImageSyncstatusAndRegisterImage() {
ctx := context.Background()
req := imageSynDao.ImageSynchronizationQuery{
Status: ImageSyncStatusing,
}
imageSynchronization, total, err := s.data.ImageSynchronizationDao.ListImageSynchronization(ctx, &req)
if err != nil {
return err
log.Errorf(ctx, "ListImageSynchronization failed:%v", err)
return
}
temp := &json.RawMessage{}
reply := &ImageSynctasksStatusResp{}
for {
resp, err := s.httpClient.R().
SetHeader("Content-Type", "application/json").
SetHeader("Accept", "application/json").
SetResult(temp).
SetBody(strings.NewReader(string(tempReq))).
Get(statusUrl)
if total == 0 {
return
}
for _, item := range imageSynchronization {
status, err := callImagesSynstaskStatus(ctx, s, item.TaskId)
if err != nil {
log.Errorf(ctx, "/images/synctasks/status http request failed, error:%v", err)
return err
}
if resp.StatusCode() != http.StatusOK {
log.Errorf(ctx, "/images/synctasks/status failed(%d):%s", resp.StatusCode(), resp.Error())
return fmt.Errorf("/images/synctasks/status failed(%d):%s", resp.StatusCode(), resp.Error())
log.Errorf(ctx, "callImagesSynstaskStatus failed:%v", err)
updateImageSynchronization(ctx, s, item.Id, ImageSyncStatusfailed, err.Error())
continue
}
parseBody(ctx, temp, reply)
status := reply.Status
switch status {
switch status.Status {
case StatusfailOrTimeout:
log.Errorf(ctx, "已执行失败或超时:%s", reply.ErrorMessage)
updateImageSyncStatus(ctx, s, trainJobId, ImageSyncStatusfailed)
if req.NotebookType {
updateImageSyncStatus(ctx, s, notebookJobId, ImageSyncStatusfailed)
}
return err
case StatusSucceeded:
CustomRegistrationImage(ctx, s, replyCreateImagesSynstask, req, trainJobId, notebookJobId)
return err
log.Errorf(ctx, "Execution failed or timed out %d ", status.Status)
updateImageSynchronization(ctx, s, item.Id, ImageSyncStatusfailed, "Execution failed or timed out")
case StatusWaiting:
time.Sleep(30 * time.Second)
log.Infof(ctx, "Waiting for execution %d ", status.Status)
case StatusWaitingLine:
time.Sleep(30 * time.Second)
log.Infof(ctx, "Waiting queue%d ", status.Status)
case StatusSucceeded:
log.Infof(ctx, "Execution succeeded %d ", status.Status)
customRegistrationImage(ctx, s, item)
case StatusStopped:
log.Errorf(ctx, "被停止(服务重启或用户手动):%s", reply.ErrorMessage)
updateImageSyncStatus(ctx, s, trainJobId, ImageSyncStatusfailed)
if req.NotebookType {
updateImageSyncStatus(ctx, s, notebookJobId, ImageSyncStatusfailed)
}
return err
log.Errorf(ctx, "Stopped (service restart or manual operation by user)")
updateImageSynchronization(ctx, s, item.Id, ImageSyncStatusfailed, "Stopped (service restart or manual operation by user)")
default:
log.Errorf(ctx, "未知任务状态: %d", status)
updateImageSyncStatus(ctx, s, trainJobId, ImageSyncStatusfailed)
if req.NotebookType {
updateImageSyncStatus(ctx, s, notebookJobId, ImageSyncStatusfailed)
}
return err
log.Errorf(ctx, "Unknown task status: %d", status.Status)
updateImageSynchronization(ctx, s, item.Id, ImageSyncStatusfailed, "Unknown task status")
}
}
}

func CustomRegistrationImage(ctx context.Context, s *service, replyCreateImagesSynstask *pb.CreateImagesSynstaskReply, req *pb.CreateImagesSynstaskRequest, trainJobId string, notebookJobId string) error {
//自定义镜像注册
apiBaseUrl, apiKey, err := s.data.AiCenterDao.GetAiCenterConfig(ctx, req.DestDcName)
func callImagesSynstaskStatus(ctx context.Context, s *service, taskId string) (*ImageSynctasksStatusResp, error) {
statusUrl := s.conf.Dolphin.ImagesSynctasksAddr + ImagesSynctasksStatusURL
imageStatusReq := ImageStatusReq{
TaskId: taskId,
}
tempReq, err := json.Marshal(imageStatusReq)
if err != nil {
return err
return nil, err
}
var customRegistrationImageRequest pbAdapter.CustomRegistrationImageRequest
customRegistrationImageRequest.SwrPath = replyCreateImagesSynstask.DestImageTag
reply, err := s.data.Adapter.CustomRegistrationImage(ctx, apiBaseUrl, apiKey, &customRegistrationImageRequest)
reply := &ImageSynctasksStatusResp{}
resp, err := s.httpClient.R().
SetHeader("Content-Type", "application/json").
SetHeader("Accept", "application/json").
SetResult(reply).
SetBody(strings.NewReader(string(tempReq))).
Get(statusUrl)
if err != nil {
updateImageSyncStatus(ctx, s, trainJobId, ImageSyncStatusfailed)
if req.NotebookType {
updateImageSyncStatus(ctx, s, notebookJobId, ImageSyncStatusfailed)
}
log.Error(ctx, "调用自定义镜像注册失败", err)
return err
log.Errorf(ctx, "/images/synctasks/status http request failed, error:%v", err)
return nil, err
}
if resp.StatusCode() != http.StatusOK {
log.Errorf(ctx, "/images/synctasks/status failed(%d):%s", resp.StatusCode(), string(resp.Body()))
return nil, fmt.Errorf("/images/synctasks/status failed(%d):%s", resp.StatusCode(), string(resp.Body()))
}
return reply, nil
}

updateImageAiCenterImages(ctx, s, trainJobId, req.DestDcName, reply.SwrPath)
if req.NotebookType {
updateImageAiCenterImages(ctx, s, notebookJobId, req.DestDcName, reply.SwrPath)
func customRegistrationImage(ctx context.Context, s *service, imageSync *imageSynDao.ImageSynchronization) {
//调试任务需要去model arts注册镜像
var customRegistrationReplyId = ""
if consts.JobTypeTrainJobAndNotebook == imageSync.TrainType || consts.JobTypeNotebook == imageSync.TrainType {
apiBaseUrl, apiKey, err := s.data.AiCenterDao.GetAiCenterConfig(ctx, imageSync.DestDcCenterId)
if err != nil {
log.Error(ctx, fmt.Sprintf("custom registration image failed get aicenter failed : %s", err.Error()))
return
}
customRegistrationImageRequest := pbAdapter.CustomRegistrationImageRequest{
SwrPath: imageSync.DestImageTag,
}
var reply *pbAdapter.CustomRegistrationImageReply
reply, err = s.data.Adapter.CustomRegistrationImage(ctx, apiBaseUrl, apiKey, &customRegistrationImageRequest)
if err != nil {
log.Error(ctx, fmt.Sprintf("custom registration image failed: %s", err.Error()))
updateImageSynchronization(ctx, s, imageSync.Id, ImageSyncStatusfailed, err.Error())
return
}
if reply.ErrorCode != "" {
log.Error(ctx, fmt.Sprintf("custom registration image failed: %s", reply.ErrorMsg))
updateImageSynchronization(ctx, s, imageSync.Id, ImageSyncStatusfailed, reply.ErrorMsg)
return
}
customRegistrationReplyId = reply.Id
}
return nil
if imageSync.SrcType == 1 {
err := updateImageAiCenterImages(ctx, s, imageSync, customRegistrationReplyId)
if err != nil {
updateImageSynchronization(ctx, s, imageSync.Id, ImageSyncStatusfailed, err.Error())
log.Error(ctx, fmt.Sprintf("update image ai center images failed: %s", err.Error()))
return
}
} else {
err := insertImage(ctx, s, imageSync, customRegistrationReplyId)
if err != nil {
updateImageSynchronization(ctx, s, imageSync.Id, ImageSyncStatusfailed, err.Error())
log.Error(ctx, fmt.Sprintf("insert image failed: %s", err.Error()))
return
}
}
updateImageSynchronization(ctx, s, imageSync.Id, ImageSyncStatusSucceeded, "succeeded")
}
func insertImage(ctx context.Context, s *service, processorType string, name string, trainType string) (string, error) {
rs := &dao.Image{}
rs.Name = name
rs.TrainType = trainType
rs.ProcessorType = processorType
rs.ImageSyncType = "imageSync"
rs.ImageSyncStatus = ImageSyncStatusing
rs.Id = utils.GetUUIDWithoutSeparator()
err := s.data.ImageDao.CreateImage(ctx, rs)

func createImageSynchronization(ctx context.Context, s *service, req *pb.CreateImagesSynstaskRequest, reply *ImageSysctaskResp) error {
rs := &imageSynDao.ImageSynchronization{}
err := copier.Copy(rs, req)
if err != nil {
return "", err
return errors.Error(err, errors.ErrorInternalError)
}
return rs.Id, nil
}
func updateImageSyncStatus(ctx context.Context, s *service, id string, imageSyncStatus int8) error {
rs := &dao.Image{}
rs.Id = id
rs.ImageSyncStatus = imageSyncStatus
err := s.data.ImageDao.UpdateImage(ctx, rs)
rs.Id = utils.GetUUIDWithoutSeparator()
rs.Status = ImageSyncStatusing
rs.TaskId = reply.TaskId
rs.DestImageTag = reply.DestImageTag
rs.ImageId = req.Id
err = s.data.ImageSynchronizationDao.CreateImageSynchronization(ctx, rs)
if err != nil {
return err
}
return nil
}

func updateImageAiCenterImages(ctx context.Context, s *service, id string, aiCenterId string, aiImage string) error {
rs := &dao.Image{}
func updateImageSynchronization(ctx context.Context, s *service, id string, status int8, failedReason string) {
rs := &imageSynDao.ImageSynchronization{}
rs.Id = id
rs.ImageSyncStatus = ImageSyncStatusSucceeded
rs.Status = status
rs.FailedReason = failedReason
err := s.data.ImageSynchronizationDao.UpdateImageSynchronization(ctx, rs)
if err != nil {
return
}
}

func insertImage(ctx context.Context, s *service, imageSync *imageSynDao.ImageSynchronization, customRegistrationImageId string) error {
rs := &dao.Image{
Name: imageSync.ImageName,
TrainType: imageSync.TrainType,
ProcessorType: imageSync.ProcessorType,
Id: utils.GetUUIDWithoutSeparator(),
}
var trainJobUrl = imageSync.DestImageTag
if idx := strings.Index(imageSync.DestImageTag, "/"); idx != -1 {
trainJobUrl = imageSync.DestImageTag[idx+1:]
}
var aiCenterImages dao.AiCenterImage
aiCenterImages.AiCenterId = aiCenterId
aiCenterImages.ImageUrl = aiImage
aiCenterImages.AiCenterId = imageSync.DestDcCenterId
switch imageSync.TrainType {
case consts.JobTypeTrainJob:
aiCenterImages.TrainJobUrl = trainJobUrl
case consts.JobTypeNotebook:
aiCenterImages.NotebookUrl = customRegistrationImageId
case consts.JobTypeTrainJobAndNotebook:
aiCenterImages.TrainJobUrl = trainJobUrl
aiCenterImages.NotebookUrl = customRegistrationImageId
}
rs.AiCenterImages = append(rs.AiCenterImages, aiCenterImages)
err := s.data.ImageDao.UpdateImage(ctx, rs)
err := s.data.ImageDao.CreateImage(ctx, rs)
if err != nil {
return err
}
return nil
}

func checkNameExist(ctx context.Context, s *service, name string, trainType string) (error, error) {
query := &dao.ImageQuery{}
query.Name = name
query.TrainType = trainType
_, totalSize, err := s.data.ImageDao.ListImage(ctx, query)
func updateImageAiCenterImages(ctx context.Context, s *service, imageSync *imageSynDao.ImageSynchronization, customRegistrationImageId string) error {
rsDB, err := s.data.ImageDao.GetImage(ctx, imageSync.ImageId)
if err != nil {
return nil, errors.Error(nil, errors.ErrorDBFindFailed)
return err
}
if totalSize > 0 {
return nil, errors.Error(nil, errors.ErrorImageNameExist)
var aiCenterImages dao.AiCenterImage
aiCenterImages.AiCenterId = imageSync.DestDcCenterId
switch imageSync.TrainType {
case consts.JobTypeTrainJob:
aiCenterImages.TrainJobUrl = imageSync.ImageUrl
case consts.JobTypeNotebook:
aiCenterImages.NotebookUrl = customRegistrationImageId
case consts.JobTypeTrainJobAndNotebook:
aiCenterImages.TrainJobUrl = imageSync.ImageUrl
aiCenterImages.NotebookUrl = customRegistrationImageId
}
rsDB.AiCenterImages = append(rsDB.AiCenterImages, aiCenterImages)
err = s.data.ImageDao.UpdateImage(ctx, rsDB)
if err != nil {
return err
}
return nil
}

func splitByLastSlash(s string) (string, string) {
lastSlashIdx := strings.LastIndex(s, "/")
if lastSlashIdx == -1 {
return s, ""
}
return nil, nil
part1 := s[:lastSlashIdx]
part2 := s[lastSlashIdx+1:]
return part1, part2
}

// func (s *service) getImageList(ctx context.Context, centerId string, r *adapterapi.GetImageListReply) error {


+ 24
- 16
server/bizserver/internal/service/otjob/ot_job.go View File

@@ -334,15 +334,20 @@ func (s *service) submitJob(ctx context.Context, job *otjobDao.OtJob, action str
return err
}

code := &otjobDao.Dataset{}
copier.Copy(code, i.Code)
temp, err := s.createDataMigrateRecord(ctx, job.Id, code)
if err != nil {
log.Errorf(ctx, "createDataMigrateRecord(%s) failed:%v", job.Id, err)
return err
var emptyDataCode otjobDao.Dataset
if i.Code != emptyDataCode {
log.Errorf(ctx, "code is not empty")
code := &otjobDao.Dataset{}
copier.Copy(code, i.Code)
temp, err := s.createDataMigrateRecord(ctx, job.Id, code)
if err != nil {
log.Errorf(ctx, "createDataMigrateRecord(%s) failed:%v", job.Id, err)
return err
}
i.Code.InternalMigrateId = temp.ID
} else {
log.Errorf(ctx, "code is empty")
}

i.Code.InternalMigrateId = temp.ID
codeInfo, err = json.Marshal(i.Code)
if err != nil {
return err
@@ -642,12 +647,6 @@ func (s *service) RestartOtJob(ctx context.Context, req *pb.ReStartOtJobRequest)
newJob.Tasks[0].Token = ""
newJob.Tasks[0].HasBeenCreated = ""

//submit job
err = s.submitJob(ctx, newJob, consts.ActionRestart)
if err != nil {
return nil, err
}

err = s.data.OtJobDao.RestartOtJob(ctx, oldJob, newJob)
if err != nil {
log.Errorf(ctx, "OtJobDao.RestartOtJob failed:%v", err)
@@ -668,6 +667,12 @@ func (s *service) RestartOtJob(ctx context.Context, req *pb.ReStartOtJobRequest)
return nil, err
}

//submit job
err = s.submitJob(ctx, newJob, consts.ActionRestart)
if err != nil {
return nil, err
}

otJobDetail, err := s.convertJobFromDb(newJob, make(map[string]string, 0), make(map[string]string, 0))
if err != nil {
return nil, err
@@ -3070,6 +3075,7 @@ func (s *service) createOtJobDataMigrateRecord(ctx context.Context, job *otjobDa
*/

func (s *service) UpdateOtJob(ctx context.Context, req *pb.UpdateOtJobRequest) (*pb.UpdateOtJobReply, error) {
log.Infof(ctx, "UpdateOtJob(%v)", req)
if req.Id == "" {
log.Errorf(ctx, "UpdateOtJob(%s) failed: id is null", req.Id)
return nil, fmt.Errorf("id is null")
@@ -3081,8 +3087,6 @@ func (s *service) UpdateOtJob(ctx context.Context, req *pb.UpdateOtJobRequest) (
return nil, err
}

log.Infof(ctx, "UpdateOtJob(%v)", req)

if consts.IsCompletedState(job.Status) {
log.Errorf(ctx, "job(%s) is completed:%s, no need to update", req.Id, job.Status)
return nil, nil
@@ -3120,6 +3124,10 @@ func (s *service) UpdateOtJob(ctx context.Context, req *pb.UpdateOtJobRequest) (

if len(job.Tasks[0].CenterId) == 0 {
job.Tasks[0].CenterId = append(job.Tasks[0].CenterId, req.CenterId)
} else {
if strings.Contains(job.Tasks[0].CenterId[0], "+") {
job.Tasks[0].CenterId[0] = req.CenterId
}
}

// last schedule step


+ 1
- 1
server/bizserver/internal/service/resource/resource.go View File

@@ -116,7 +116,7 @@ func (s *service) GetResourceSpec(ctx context.Context, req *pb.GetResourceSpecRe
return nil, err
}

for _, info := range rsTbl.CenterResourceSpecs {
for _, info := range rsTbl.NewCenterResourceSpecs {
for _, specInfo := range info.ResourceSpec {
poolInfo, err := s.data.ResourcePoolDao.GetResourceSpecPool(ctx, specInfo.Id, info.AiCenterId, rsTbl.SpecInfo.AccDeviceModel)
if err != nil {


+ 56
- 1
server/bizserver/internal/service/user/user.go View File

@@ -2,6 +2,8 @@ package user

import (
"context"
"regexp"
"strings"

dao "git.openi.org.cn/OpenI/Grampus/server/bizserver/internal/data/dao/user"

@@ -113,7 +115,10 @@ func (s *service) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*p
if existed != nil {
return nil, errors.Error(nil, errors.ErrorUserExisted)
}

err = ValidatePassword(req.Password)
if err != nil {
return nil, err
}
password, err := utils.EncryptPassword(req.Password)
if err != nil {
return nil, err
@@ -146,6 +151,52 @@ func (s *service) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*p
return reply, nil
}

// 当前版本比较低只能采取这种方式校验
func ValidatePassword(password string) error {
var basicRegex = regexp.MustCompile(`^[0-9A-Za-z!@#$%^&*()_+\-=\[\]{};':"\\|,.<>\/?]{8,30}$`)
if !basicRegex.MatchString(password) {
return errors.Error(nil, errors.ErrorUserPassword)
}
hasDigit := false
for _, c := range password {
if '0' <= c && c <= '9' {
hasDigit = true
break
}
}
if !hasDigit {
return errors.Error(nil, errors.ErrorUserPassword)
}
hasLetter := false
for _, c := range password {
if ('A' <= c && c <= 'Z') || ('a' <= c && c <= 'z') {
hasLetter = true
break
}
}
if !hasLetter {
return errors.Error(nil, errors.ErrorUserPassword)
}

hasSpecial := false
allowedSpecial := "!@#$%^&*()_+-=[]{};':\"\\|,.<>/?" // 与 regex 中定义的特殊字符一致
for _, c := range password {
// 判断字符是否是特殊字符(既不是数字也不是字母)
if !('0' <= c && c <= '9') && !('A' <= c && c <= 'Z') && !('a' <= c && c <= 'z') {
// 再确认该特殊字符在允许的范围内(避免 regex 未覆盖的字符)
if strings.Contains(allowedSpecial, string(c)) {
hasSpecial = true
break
}
}
}
if !hasSpecial {
return errors.Error(nil, errors.ErrorUserPassword) // 缺少特殊字符
}

return nil
}

func (s *service) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*pb.UpdateUserReply, error) {
userId := req.Id
user1 := dao.UserUpdate{
@@ -154,6 +205,10 @@ func (s *service) UpdateUser(ctx context.Context, req *pb.UpdateUserRequest) (*p
Status: int32(req.Status),
}
if req.Password != "" {
err := ValidatePassword(req.Password)
if err != nil {
return nil, err
}
password, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost)

if err != nil {


+ 6
- 5
server/common/consts/const.go View File

@@ -152,11 +152,12 @@ const (
ProcessTypeMetaX = "metax-tech.com/metax"
ProcessTypeIluvatar = "iluvatar.com/iluvatar"

JobTypeTrainJob = "TrainJob"
JobTypeNotebook = "Notebook"
JobTypeInference = "Inference"
JobTypeService = "Service"
JobTypeEcs = "Ecs"
JobTypeTrainJob = "TrainJob"
JobTypeNotebook = "Notebook"
JobTypeInference = "Inference"
JobTypeService = "Service"
JobTypeEcs = "Ecs"
JobTypeTrainJobAndNotebook = "TrainJob&Notebook"

AlignStateInit = 0
AlignStateSucceed = 1


+ 8
- 2
server/common/errors/errorcode.go View File

@@ -11,7 +11,8 @@ const (
ErrorAdapterRequestFailed Code = 1007 // adapter请求失败

/* 2001~3000 用户错误*/
ErrorUserExisted = 2001 // 已存在
ErrorUserExisted = 2001 // 已存在
ErrorUserPassword = 2002 // 密码格式不符合

//3000~4000 db操作相关错误
ErrorDBInitFailed = 3000 // db初始化失败
@@ -51,6 +52,8 @@ const (
ErrorImageNameExist = 20001 // 镜像名称已存在
ErrorNPUFormat = 20002 //只支持NPU格式
ErrorCallImagesSynctasksFailed = 20003 //调用imagesynctasks服务失败
ErrorImageTageExist = 20004 //镜像tag已存在
ErrorImageNameNotValidate = 20005 //镜像名称请输入字母、数字、_-.和:,最长100个字符,且以字母开头
)

var DefaultErrorMsg = map[Code]string{
@@ -61,6 +64,7 @@ var DefaultErrorMsg = map[Code]string{
ErrorTokenExpire: "token expire",
ErrorLimitLogin: "login failed times exceed 3",
ErrorAdapterRequestFailed: "adapter request failed",
ErrorUserPassword: "密码格式不符合,请输入密码需为8-30位,包含数字、字母和特殊字符",

// db操作相关错误
ErrorDBInitFailed: "db init failed",
@@ -96,7 +100,9 @@ var DefaultErrorMsg = map[Code]string{

//adminapiserver错误
ErrorNameExist: "name exist",
ErrorImageNameExist: "image name exist",
ErrorImageNameExist: "镜像名称已存在",
ErrorNPUFormat: "only support npu format",
ErrorCallImagesSynctasksFailed: "call image synctasks create service failed",
ErrorImageTageExist: "image tag exist",
ErrorImageNameNotValidate: "镜像名称请输入字母、数字、_-.和:,最长100个字符,且以字母开头",
}

+ 7
- 2
server/openapiserver/api/v1/image/image.proto View File

@@ -38,8 +38,8 @@ service ImageService {

message AiCenterImage {
string aiCenterId = 1;
string imageUrl = 2;
string imageId =3;
string trainJobUrl = 2;
string notebookUrl =3;
string accDeviceModel=4;
repeated string poolIds = 5;
}
@@ -52,6 +52,7 @@ message Image {
string processorType = 5;
string trainType = 6;
repeated AiCenterImage aiCenterImages = 7;
string centerImageId = 8;
}

message GetImageRequest {
@@ -138,4 +139,8 @@ message OctopusImage {
string imageFullAddr = 13;
int64 imageSize = 14;
string errorMsg = 15;
string trainJobUrl = 16;
string notebookUrl = 17;
string trainType = 18;
string accDeviceModel = 19;
}

+ 77
- 14
server/openapiserver/internal/service/image/image.go View File

@@ -4,7 +4,7 @@ import (
"context"
"fmt"
nethttp "net/http"
"time"
"strings"

adapterapi "git.openi.org.cn/OpenI/Grampus/server/adapter/api/v1"
inneraicenterpb "git.openi.org.cn/OpenI/Grampus/server/bizserver/api/v1/aicenter"
@@ -67,27 +67,61 @@ func (s *Service) ListUserImage(ctx context.Context, req *pb.ListUserImageReques
reply.Images = make([]*pb.Image, 0)
reply.TotalSize = innerReply.TotalSize
if innerReply.Images != nil {
aicenterImages := make([]*pb.AiCenterImage, 0)
for _, image := range innerReply.Images {
aiCenterImage := &pb.AiCenterImage{
AiCenterId: image.AiCenterId,
AccDeviceModel: image.AccDeviceModel,
PoolIds: image.PoolIds,
}

//对于用户镜像类型为空的历史数据,TrainType为空,根据所属的分中心id返回类型。
if image.TrainType == "" {
image.TrainType = s.getSupportJobType(image.AiCenterId)
//对于新用户镜像trainType有值,可直接用保存notebook时的类型赋值
switch image.TrainType {
case "TrainJob":
aiCenterImage.TrainJobUrl = image.ImageFullAddr
case "Notebook":
aiCenterImage.NotebookUrl = image.ImageFullAddr
case "TrainJob&Notebook":
aiCenterImage.NotebookUrl = image.ImageFullAddr
aiCenterImage.TrainJobUrl = image.ImageFullAddr
default:
log.Infof(ctx, "user image ID: %s, parent image ID: %s ,image type is null.", image.Id, image.ImageId)
}
}

reply.Images = append(reply.Images, &pb.Image{
Id: image.Id,
Name: image.ImageVersion,
ProcessorType: image.ProcessorType,
AiCenterImages: append(aicenterImages, &pb.AiCenterImage{
AiCenterId: image.AiCenterId,
ImageUrl: image.ImageAddr,
AccDeviceModel: image.AccDeviceModel,
PoolIds: image.PoolIds,
}),
Id: image.Id,
Name: image.ImageVersion,
ProcessorType: image.ProcessorType,
TrainType: image.TrainType,
AiCenterImages: []*pb.AiCenterImage{aiCenterImage},
CenterImageId: image.ImageId,
})
}
}
return reply, nil
}

func (s *Service) getSupportJobType(centerId string) string {
var supportJobType string
switch {
case strings.Contains(centerId, "blsc") || strings.Contains(centerId, "hnaicc") ||
strings.Contains(centerId, "sugon") || strings.Contains(centerId, "beijing-N76H8") ||
strings.Contains(centerId, "biren") || strings.Contains(centerId, "ningxia") || strings.Contains(centerId, "zhengjiang-M6"):
supportJobType = "Notebook"
case strings.Contains(centerId, "inspur"):
supportJobType = "TrainJob"
default:
supportJobType = "TrainJob&Notebook"
}
return supportJobType
}

func (s *Service) GetImage(ctx context.Context, req *pb.GetImageRequest) (*pb.GetImageReply, error) {
innerReq := &innerpb.GetUserImageRequest{Id: req.Id}
log.Errorf(ctx, "Grampus User Image req id(%s) :%v", req.Id)
log.Infof(ctx, "Get grampus User Image req id is (%s).", req.Id)
var err error
innerReply, err := s.data.ImageClient.GetUserImage(ctx, innerReq)
if err != nil {
@@ -100,6 +134,20 @@ func (s *Service) GetImage(ctx context.Context, req *pb.GetImageRequest) (*pb.Ge
if err == nil {
if reply.Image != nil {
re = reply
userImageTrainType := s.getImageType(innerReply.UserImage.AiCenterId)
re.Image.TrainType = userImageTrainType
log.Infof(ctx, "user image ID: %s, image type: %s ,image full addr is: %s.", reply.Image.Id, userImageTrainType, reply.Image.ImageFullAddr)
switch userImageTrainType {
case "TrainJob":
re.Image.TrainJobUrl = reply.Image.ImageFullAddr
case "Notebook":
re.Image.NotebookUrl = reply.Image.ImageFullAddr
case "TrainJob&Notebook":
re.Image.NotebookUrl = reply.Image.ImageFullAddr
re.Image.TrainJobUrl = reply.Image.ImageFullAddr
default:
log.Infof(ctx, "user image ID: %s, image type is null.", reply.Image.Id)
}
}
}

@@ -119,12 +167,28 @@ func (s *Service) GetImage(ctx context.Context, req *pb.GetImageRequest) (*pb.Ge
if err != nil {
log.Errorf(ctx, "UpdateUserImage failed:%v", err)
}
re.Image.AccDeviceModel = innerReply.UserImage.AccDeviceModel
return re, nil
}

return nil, fmt.Errorf("Not found this image,id=" + req.Id)
}

func (s *Service) getImageType(centerId string) string {
var supportJobType string
switch {
case strings.Contains(centerId, "blsc") || strings.Contains(centerId, "hnaicc") ||
strings.Contains(centerId, "sugon") || strings.Contains(centerId, "beijing-N76H8") ||
strings.Contains(centerId, "biren") || strings.Contains(centerId, "ningxia") || strings.Contains(centerId, "zhengjiang-M6"):
supportJobType = "Notebook"
case strings.Contains(centerId, "inspur"):
supportJobType = "TrainJob"
default:
supportJobType = "TrainJob&Notebook"
}
return supportJobType
}

func oldFlow(ctx context.Context, centerImageId string, s *Service) (*pb.GetImageReply, error) {
centerId := "openi-mlu"
replyPclcci, err := s.getImageById(ctx, centerImageId, centerId)
@@ -161,8 +225,7 @@ func (s *Service) getImageById(ctx context.Context, centerImageId, centerId stri
}
// sugon-ai 需要notebookid用于获取对应保存的镜像状态及详情
url := fmt.Sprintf("%s/v1/image/%s", centerUrl, centerImageId)
log.Infof(ctx, "Try to GetImage detail info, center id is:(%s), imageId is:(%s), centerjobId is:(%s)",
time.Now().Format("2006-01-02 15:04:05"), centerId, centerImageId)
log.Infof(ctx, "Try to GetImage detail info, center id is:(%s), imageId is:(%s).", centerId, centerImageId)
request := &adapterapi.GetImageRequest{Id: centerImageId}
r := &adapterapi.GetImageReply{}
res, err := s.httpClient.R().


+ 19
- 1
server/openapiserver/internal/service/notebook/notebook.go View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
nethttp "net/http"
"strings"

adapterapi "git.openi.org.cn/OpenI/Grampus/server/adapter/api/v1"
inneraicenterpb "git.openi.org.cn/OpenI/Grampus/server/bizserver/api/v1/aicenter"
@@ -285,6 +286,8 @@ func (s *service) SaveNotebook(ctx context.Context, req *notebook.SaveNotebookRe
userImagerReq.Description = req.Description
userImagerReq.DebugJobId = req.Id
userImagerReq.ImageId = r.ImageId
//根据分中心类型,保存用户的镜像类型
userImagerReq.TrainType = s.getSupportJobType(centerId)

resp, err := s.data.ImageClient.CreateUserImage(ctx, userImagerReq)
if err != nil {
@@ -295,7 +298,7 @@ func (s *service) SaveNotebook(ctx context.Context, req *notebook.SaveNotebookRe
reply := &notebook.SaveNotebookReply{}
err = copier.Copy(reply, r)
reply.Id = resp.Id
log.Infof(ctx, "errCode:%d, errMsg", reply.ErrorCode, reply.ErrorMsg)
log.Infof(ctx, "errCode:%d, errMsg:%v", reply.ErrorCode, reply.ErrorMsg)
if err != nil {
return nil, err
}
@@ -303,6 +306,21 @@ func (s *service) SaveNotebook(ctx context.Context, req *notebook.SaveNotebookRe
return reply, nil
}

func (s *service) getSupportJobType(centerId string) string {
var supportJobType string
switch {
case strings.Contains(centerId, "blsc") || strings.Contains(centerId, "hnaicc") ||
strings.Contains(centerId, "sugon") || strings.Contains(centerId, "beijing-N76H8") ||
strings.Contains(centerId, "biren") || strings.Contains(centerId, "ningxia") || strings.Contains(centerId, "zhengjiang-M6"):
supportJobType = "Notebook"
case strings.Contains(centerId, "inspur"):
supportJobType = "TrainJob"
default:
supportJobType = "TrainJob&Notebook"
}
return supportJobType
}

// 删除notebook任务
func (s *service) DeleteNotebook(ctx context.Context, req *notebook.DeleteNotebookRequest) (*notebook.DeleteNotebookReply, error) {
deleteReq := &innerapi.DeleteOtJobRequest{Id: req.Id}


+ 0
- 8
server/openapiserver/internal/service/otjob/otjob.go View File

@@ -14,7 +14,6 @@ import (
adapterapi "git.openi.org.cn/OpenI/Grampus/server/adapter/api/v1"
inneraicenterpb "git.openi.org.cn/OpenI/Grampus/server/bizserver/api/v1/aicenter"
innerapi "git.openi.org.cn/OpenI/Grampus/server/bizserver/api/v1/otjob"
innerResourcePb "git.openi.org.cn/OpenI/Grampus/server/bizserver/api/v1/resource"
"git.openi.org.cn/OpenI/Grampus/server/common/consts"
commctx "git.openi.org.cn/OpenI/Grampus/server/common/context"
errors "git.openi.org.cn/OpenI/Grampus/server/common/errors"
@@ -557,13 +556,6 @@ func (s *service) GetOtJobContainerMetrics(w nethttp.ResponseWriter, r *nethttp.
var poolId, centerJobName string
if innerTrainJobInfo.OtJob.Tasks[0].CenterId[0] == consts.CenterECloud {
centerJobName = innerTrainJobInfo.OtJob.Tasks[0].CenterJobName
resourceInfo, _ := s.data.ResourceClient.GetResourceSpec(context.TODO(), &innerResourcePb.GetResourceSpecRequest{Id: innerTrainJobInfo.OtJob.Tasks[0].ResourceSpecId})
for _, info := range resourceInfo.ResourceSpec.CenterResourceSpecs {
if consts.CenterECloud == info.AiCenterId {
//todo: 确定第几条记录
poolId = info.ResourceSpec[0].PoolId
}
}
}

var reqUrl string


+ 23
- 18
server/ot-provider/pkg/provider/pod.go View File

@@ -4,12 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"git.openi.org.cn/OpenI/Grampus/server/common/urchin_new"
"io"
"strconv"
"strings"
"time"

"git.openi.org.cn/OpenI/Grampus/server/common/urchin_new"

pb "git.openi.org.cn/OpenI/Grampus/server/adapter/api/v1"
"git.openi.org.cn/OpenI/Grampus/server/bizserver/api/v1/alarm"
"git.openi.org.cn/OpenI/Grampus/server/bizserver/api/v1/otjob"
@@ -303,7 +304,7 @@ func (c *ClusterClient) migrateData(ctx context.Context, data *Dataset, jobId, s
func (c *ClusterClient) handleDatasetModelStorageSchedule(jobId, scheduleType, datasetsInfo, modelsInfo, codeInfo, userId, userToken, newUrchinServer string) (*scheduleResponse, error) {
var res scheduleResponse
res.IsAllExist = true
var codeUrl, destProxy string
var destProxy string
ctx := context.TODO()

dataList := make([]*urchin_new.UpdateDataReferenceRequest, 0)
@@ -390,23 +391,26 @@ func (c *ClusterClient) handleDatasetModelStorageSchedule(jobId, scheduleType, d

res.CodeName = code.Name

isExist, codeUrl, destProxy, err := c.migrateData(ctx, &code, jobId, scheduleType, userId, userToken)
if err != nil {
klog.Errorf("jobId(%s) migrateData(%s) failed:%v", jobId, code.ObjectKey, err)
return &res, err
}
var emptyDataset Dataset
if code != emptyDataset {
isExist, codeUrl, temp, err := c.migrateData(ctx, &code, jobId, scheduleType, userId, userToken)
if err != nil {
klog.Errorf("jobId(%s) migrateData(%s) failed:%v", jobId, code.ObjectKey, err)
return &res, err
}
destProxy = temp

if !isExist {
res.IsAllExist = false
} else {
res.CodeUrl = codeUrl
res.DestProxy = destProxy
if code.Id != "" {
dataList = append(dataList, &urchin_new.UpdateDataReferenceRequest{
DataId: code.Id,
CenterId: c.conf.Cluster.Id,
Type: urchin_new.LockData,
})
if !isExist {
res.IsAllExist = false
} else {
res.CodeUrl = codeUrl
if code.Id != "" {
dataList = append(dataList, &urchin_new.UpdateDataReferenceRequest{
DataId: code.Id,
CenterId: c.conf.Cluster.Id,
Type: urchin_new.LockData,
})
}
}
}

@@ -424,6 +428,7 @@ func (c *ClusterClient) handleDatasetModelStorageSchedule(jobId, scheduleType, d
return &res, err
}
res.ModelUrl = string(tempModels)
res.DestProxy = destProxy

if len(dataList) != 0 {
go urchin_new.UpdateDataReference(ctx, &urchin_new.UpdateDataReferenceReq{


+ 36
- 2
server/scheduler/pkg/plugins/image/image.go View File

@@ -217,14 +217,28 @@ func (i *Image) PreBind(ctx context.Context, state *framework.CycleState, pod *v
imgUrl = strings.TrimPrefix(userInfo.UserImage.ImageFullAddr, temp[0]+"/")
} else {
imgId = userInfo.UserImage.ImageId
imgUrl = userInfo.UserImage.ImageFullAddr
}
}
klog.Infof("user imgUrl is %v or imgId is %v.", imgUrl, imgId)
}
} else {
for _, i := range info.Image.AiCenterImages {
if i.AiCenterId == nodeName {
imgUrl = i.ImageUrl
imgId = i.ImageId
if i.TrainJobUrl != "" {
// 当前镜像属于 TrainJob 类型或混合类型
// TrainJobUrl 一定是 URL,直接作为镜像地址
imgUrl = i.TrainJobUrl
imgId = i.NotebookUrl // 可能是空、ID、URL,作为元数据保留
} else if isImageReference(i.NotebookUrl) {
// 当前镜像属于 Notebook 类型,且 NotebookUrl 是 URL
imgUrl = i.NotebookUrl // 作为镜像地址
imgId = "" // 清空,表示没有额外 ID
} else {
// Notebook 类型,但 NotebookUrl 不是 URL(是 ID/UUID)
imgUrl = ""
imgId = i.NotebookUrl
}
temp, _ := json.Marshal(i)
imgInfo = string(temp)
imgName = i.ImageName
@@ -252,6 +266,8 @@ func (i *Image) PreBind(ctx context.Context, state *framework.CycleState, pod *v
p.Annotations[consts.AnnotationsImageInfo] = imgInfo
p.Annotations[consts.AnnotationsCenterImageName] = imgName

klog.Infof("AnnotationsCenterImageUrl is %v or AnnotationsCenterImageId is %v.", imgUrl, imgId)

err = i.cluster.UpdatePod(ctx, p)
if err != nil {
return framework.NewStatus(framework.Error, "UpdatePod failed")
@@ -270,3 +286,21 @@ func checkIsModelarts(ctx context.Context, i *Image, nodeName string) bool {
}
return false
}

func isImageReference(s string) bool {
if s == "" {
return false
}
// 包含 '/' 表示有命名空间,如 grampus/mindspore:v3
if strings.Contains(s, "/") {
return true
}
// 包含 ':' 且不是纯 ID(避免 imageid123 被误判)
if strings.Contains(s, ":") {
// 排除像 UUID 的情况
if len(s) < 64 { // UUID 通常 36 字符,但放宽一点
return true
}
}
return false
}

Loading…
Cancel
Save
Baidu
map