Skip to content

Commit

Permalink
Adds logging for k8s client
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Creasy <[email protected]>
  • Loading branch information
alexcreasy committed Jan 17, 2025
1 parent 518390a commit 350d705
Show file tree
Hide file tree
Showing 22 changed files with 156 additions and 116 deletions.
5 changes: 5 additions & 0 deletions clients/ui/bff/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ If you want to use a different port, mock kubernetes client or model registry cl
```shell
make run PORT=8000 MOCK_K8S_CLIENT=true MOCK_MR_CLIENT=true
```
If you want to change the log level on deployment, add the LOG_LEVEL argument when running, supported levels are: ERROR, WARN, INFO, DEBUG. The default level is INFO.
```shell
# Run with debug logging
make run LOG_LEVEL=DEBUG
```

# Building and Deploying

Expand Down
3 changes: 2 additions & 1 deletion clients/ui/bff/internal/api/healthcheck__handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"github.com/kubeflow/model-registry/ui/bff/internal/config"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"github.com/kubeflow/model-registry/ui/bff/internal/mocks"
"github.com/kubeflow/model-registry/ui/bff/internal/models"
"github.com/kubeflow/model-registry/ui/bff/internal/repositories"
Expand All @@ -26,7 +27,7 @@ func TestHealthCheckHandler(t *testing.T) {

rr := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, HealthCheckPath, nil)
ctx := context.WithValue(req.Context(), KubeflowUserIdKey, mocks.KubeflowUserIDHeaderValue)
ctx := context.WithValue(req.Context(), constants.KubeflowUserIdKey, mocks.KubeflowUserIDHeaderValue)
req = req.WithContext(ctx)
assert.NoError(t, err)

Expand Down
3 changes: 2 additions & 1 deletion clients/ui/bff/internal/api/healthcheck_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package api
import (
"errors"
"github.com/julienschmidt/httprouter"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"net/http"
)

func (app *App) HealthcheckHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {

userId, ok := r.Context().Value(KubeflowUserIdKey).(string)
userId, ok := r.Context().Value(constants.KubeflowUserIdKey).(string)
if !ok || userId == "" {
app.serverErrorResponse(w, r, errors.New("failed to retrieve kubeflow-userid from context"))
return
Expand Down
63 changes: 24 additions & 39 deletions clients/ui/bff/internal/api/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,21 @@ import (
"github.com/google/uuid"
"github.com/julienschmidt/httprouter"
"github.com/kubeflow/model-registry/ui/bff/internal/config"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"github.com/kubeflow/model-registry/ui/bff/internal/integrations"
"log/slog"
"net/http"
"runtime/debug"
"strings"
)

type contextKey string

const (
ModelRegistryHttpClientKey contextKey = "ModelRegistryHttpClientKey"
NamespaceHeaderParameterKey contextKey = "namespace"

//Kubeflow authorization operates using custom authentication headers:
// Note: The functionality for `kubeflow-groups` is not fully operational at Kubeflow platform at this time
// but it's supported on Model Registry BFF
KubeflowUserIdKey contextKey = "kubeflowUserId" // kubeflow-userid :contains the user's email address
KubeflowUserIDHeader = "kubeflow-userid"
KubeflowUserGroupsKey contextKey = "kubeflowUserGroups" // kubeflow-groups : Holds a comma-separated list of user groups
KubeflowUserGroupsIdHeader = "kubeflow-groups"

TraceIdKey contextKey = "TraceIdKey"
TraceLoggerKey contextKey = "TraceLoggerKey"
)

func (app *App) RecoverPanic(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if err := recover(); err != nil {
w.Header().Set("Connection", "close")
app.serverErrorResponse(w, r, fmt.Errorf("%s", err))
app.logger.Error("Recover from panic: " + string(debug.Stack()))
}
}()

Expand All @@ -53,8 +38,8 @@ func (app *App) InjectUserHeaders(next http.Handler) http.Handler {
return
}

userIdHeader := r.Header.Get(KubeflowUserIDHeader)
userGroupsHeader := r.Header.Get(KubeflowUserGroupsIdHeader)
userIdHeader := r.Header.Get(constants.KubeflowUserIDHeader)
userGroupsHeader := r.Header.Get(constants.KubeflowUserGroupsIdHeader)
//`kubeflow-userid`: Contains the user's email address.
if userIdHeader == "" {
app.badRequestResponse(w, r, errors.New("missing required header: kubeflow-userid"))
Expand All @@ -74,8 +59,8 @@ func (app *App) InjectUserHeaders(next http.Handler) http.Handler {
}

ctx := r.Context()
ctx = context.WithValue(ctx, KubeflowUserIdKey, userIdHeader)
ctx = context.WithValue(ctx, KubeflowUserGroupsKey, userGroups)
ctx = context.WithValue(ctx, constants.KubeflowUserIdKey, userIdHeader)
ctx = context.WithValue(ctx, constants.KubeflowUserGroupsKey, userGroups)

next.ServeHTTP(w, r.WithContext(ctx))
})
Expand All @@ -95,12 +80,12 @@ func (app *App) EnableTelemetry(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Adds a unique id to the context to allow tracing of requests
traceId := uuid.NewString()
ctx := context.WithValue(r.Context(), TraceIdKey, traceId)
ctx := context.WithValue(r.Context(), constants.TraceIdKey, traceId)

// logger will only be nil in tests.
if app.logger != nil {
traceLogger := app.logger.With(slog.String("trace_id", traceId))
ctx = context.WithValue(ctx, TraceLoggerKey, traceLogger)
ctx = context.WithValue(ctx, constants.TraceLoggerKey, traceLogger)

if traceLogger.Enabled(ctx, slog.LevelDebug) {
cloneBody, err := integrations.CloneBody(r)
Expand All @@ -121,12 +106,12 @@ func (app *App) AttachRESTClient(next func(http.ResponseWriter, *http.Request, h

modelRegistryID := ps.ByName(ModelRegistryId)

namespace, ok := r.Context().Value(NamespaceHeaderParameterKey).(string)
namespace, ok := r.Context().Value(constants.NamespaceHeaderParameterKey).(string)
if !ok || namespace == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing namespace in the context"))
}

modelRegistryBaseURL, err := resolveModelRegistryURL(namespace, modelRegistryID, app.kubernetesClient, app.config)
modelRegistryBaseURL, err := resolveModelRegistryURL(r.Context(), namespace, modelRegistryID, app.kubernetesClient, app.config)
if err != nil {
app.notFoundResponse(w, r)
return
Expand All @@ -135,7 +120,7 @@ func (app *App) AttachRESTClient(next func(http.ResponseWriter, *http.Request, h
// Set up a child logger for the rest client that automatically adds the request id to all statements for
// tracing.
restClientLogger := app.logger
traceId, ok := r.Context().Value(TraceIdKey).(string)
traceId, ok := r.Context().Value(constants.TraceIdKey).(string)
if app.logger != nil {
if ok {
restClientLogger = app.logger.With(slog.String("trace_id", traceId))
Expand All @@ -149,14 +134,14 @@ func (app *App) AttachRESTClient(next func(http.ResponseWriter, *http.Request, h
app.serverErrorResponse(w, r, fmt.Errorf("failed to create Kubernetes client: %v", err))
return
}
ctx := context.WithValue(r.Context(), ModelRegistryHttpClientKey, client)
ctx := context.WithValue(r.Context(), constants.ModelRegistryHttpClientKey, client)
next(w, r.WithContext(ctx), ps)
}
}

func resolveModelRegistryURL(namespace string, serviceName string, client integrations.KubernetesClientInterface, config config.EnvConfig) (string, error) {
func resolveModelRegistryURL(sessionCtx context.Context, namespace string, serviceName string, client integrations.KubernetesClientInterface, config config.EnvConfig) (string, error) {

serviceDetails, err := client.GetServiceDetailsByName(namespace, serviceName)
serviceDetails, err := client.GetServiceDetailsByName(sessionCtx, namespace, serviceName)
if err != nil {
return "", err
}
Expand All @@ -172,13 +157,13 @@ func resolveModelRegistryURL(namespace string, serviceName string, client integr

func (app *App) AttachNamespace(next func(http.ResponseWriter, *http.Request, httprouter.Params)) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
namespace := r.URL.Query().Get(string(NamespaceHeaderParameterKey))
namespace := r.URL.Query().Get(string(constants.NamespaceHeaderParameterKey))
if namespace == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing required query parameter: %s", NamespaceHeaderParameterKey))
app.badRequestResponse(w, r, fmt.Errorf("missing required query parameter: %s", constants.NamespaceHeaderParameterKey))
return
}

ctx := context.WithValue(r.Context(), NamespaceHeaderParameterKey, namespace)
ctx := context.WithValue(r.Context(), constants.NamespaceHeaderParameterKey, namespace)
r = r.WithContext(ctx)

next(w, r, ps)
Expand All @@ -187,19 +172,19 @@ func (app *App) AttachNamespace(next func(http.ResponseWriter, *http.Request, ht

func (app *App) PerformSARonGetListServicesByNamespace(next func(http.ResponseWriter, *http.Request, httprouter.Params)) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
user, ok := r.Context().Value(KubeflowUserIdKey).(string)
user, ok := r.Context().Value(constants.KubeflowUserIdKey).(string)
if !ok || user == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing user in context"))
return
}
namespace, ok := r.Context().Value(NamespaceHeaderParameterKey).(string)
namespace, ok := r.Context().Value(constants.NamespaceHeaderParameterKey).(string)
if !ok || namespace == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing namespace in context"))
return
}

var userGroups []string
if groups, ok := r.Context().Value(KubeflowUserGroupsKey).([]string); ok {
if groups, ok := r.Context().Value(constants.KubeflowUserGroupsKey).([]string); ok {
userGroups = groups
} else {
userGroups = []string{}
Expand All @@ -222,13 +207,13 @@ func (app *App) PerformSARonGetListServicesByNamespace(next func(http.ResponseWr
func (app *App) PerformSARonSpecificService(next func(http.ResponseWriter, *http.Request, httprouter.Params)) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {

user, ok := r.Context().Value(KubeflowUserIdKey).(string)
user, ok := r.Context().Value(constants.KubeflowUserIdKey).(string)
if !ok || user == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing user in context"))
return
}

namespace, ok := r.Context().Value(NamespaceHeaderParameterKey).(string)
namespace, ok := r.Context().Value(constants.NamespaceHeaderParameterKey).(string)
if !ok || namespace == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing namespace in context"))
return
Expand All @@ -241,7 +226,7 @@ func (app *App) PerformSARonSpecificService(next func(http.ResponseWriter, *http
}

var userGroups []string
if groups, ok := r.Context().Value(KubeflowUserGroupsKey).([]string); ok {
if groups, ok := r.Context().Value(constants.KubeflowUserGroupsKey).([]string); ok {
userGroups = groups
} else {
userGroups = []string{}
Expand Down
5 changes: 3 additions & 2 deletions clients/ui/bff/internal/api/model_registry_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"fmt"
"github.com/julienschmidt/httprouter"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"github.com/kubeflow/model-registry/ui/bff/internal/models"
"net/http"
)
Expand All @@ -11,12 +12,12 @@ type ModelRegistryListEnvelope Envelope[[]models.ModelRegistryModel, None]

func (app *App) ModelRegistryHandler(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {

namespace, ok := r.Context().Value(NamespaceHeaderParameterKey).(string)
namespace, ok := r.Context().Value(constants.NamespaceHeaderParameterKey).(string)
if !ok || namespace == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing namespace in the context"))
}

registries, err := app.repositories.ModelRegistry.GetAllModelRegistries(app.kubernetesClient, namespace)
registries, err := app.repositories.ModelRegistry.GetAllModelRegistries(r.Context(), app.kubernetesClient, namespace)
if err != nil {
app.serverErrorResponse(w, r, err)
return
Expand Down
5 changes: 4 additions & 1 deletion clients/ui/bff/internal/api/model_registry_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"github.com/kubeflow/model-registry/ui/bff/internal/mocks"
"github.com/kubeflow/model-registry/ui/bff/internal/models"
"github.com/kubeflow/model-registry/ui/bff/internal/repositories"
. "github.com/onsi/ginkgo/v2"
Expand All @@ -28,7 +30,8 @@ var _ = Describe("TestModelRegistryHandler", func() {
requestPath := fmt.Sprintf(" %s?namespace=kubeflow", ModelRegistryListPath)
req, err := http.NewRequest(http.MethodGet, requestPath, nil)

ctx := context.WithValue(req.Context(), NamespaceHeaderParameterKey, "kubeflow")
ctx := mocks.NewMockSessionContext(req.Context())
ctx = context.WithValue(ctx, constants.NamespaceHeaderParameterKey, "kubeflow")
req = req.WithContext(ctx)

Expect(err).NotTo(HaveOccurred())
Expand Down
13 changes: 7 additions & 6 deletions clients/ui/bff/internal/api/model_versions_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/julienschmidt/httprouter"
"github.com/kubeflow/model-registry/pkg/openapi"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"github.com/kubeflow/model-registry/ui/bff/internal/integrations"
"github.com/kubeflow/model-registry/ui/bff/internal/validation"
"net/http"
Expand All @@ -19,7 +20,7 @@ type ModelArtifactListEnvelope Envelope[*openapi.ModelArtifactList, None]
type ModelArtifactEnvelope Envelope[*openapi.ModelArtifact, None]

func (app *App) GetAllModelVersionHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
client, ok := r.Context().Value(ModelRegistryHttpClientKey).(integrations.HTTPClientInterface)
client, ok := r.Context().Value(constants.ModelRegistryHttpClientKey).(integrations.HTTPClientInterface)
if !ok {
app.serverErrorResponse(w, r, errors.New("REST client not found"))
return
Expand All @@ -43,7 +44,7 @@ func (app *App) GetAllModelVersionHandler(w http.ResponseWriter, r *http.Request
}

func (app *App) GetModelVersionHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
client, ok := r.Context().Value(ModelRegistryHttpClientKey).(integrations.HTTPClientInterface)
client, ok := r.Context().Value(constants.ModelRegistryHttpClientKey).(integrations.HTTPClientInterface)
if !ok {
app.serverErrorResponse(w, r, errors.New("REST client not found"))
return
Expand Down Expand Up @@ -71,7 +72,7 @@ func (app *App) GetModelVersionHandler(w http.ResponseWriter, r *http.Request, p
}

func (app *App) CreateModelVersionHandler(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
client, ok := r.Context().Value(ModelRegistryHttpClientKey).(integrations.HTTPClientInterface)
client, ok := r.Context().Value(constants.ModelRegistryHttpClientKey).(integrations.HTTPClientInterface)
if !ok {
app.serverErrorResponse(w, r, errors.New("REST client not found"))
return
Expand Down Expand Up @@ -125,7 +126,7 @@ func (app *App) CreateModelVersionHandler(w http.ResponseWriter, r *http.Request
}

func (app *App) UpdateModelVersionHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
client, ok := r.Context().Value(ModelRegistryHttpClientKey).(integrations.HTTPClientInterface)
client, ok := r.Context().Value(constants.ModelRegistryHttpClientKey).(integrations.HTTPClientInterface)
if !ok {
app.serverErrorResponse(w, r, errors.New("REST client not found"))
return
Expand Down Expand Up @@ -175,7 +176,7 @@ func (app *App) UpdateModelVersionHandler(w http.ResponseWriter, r *http.Request
}

func (app *App) GetAllModelArtifactsByModelVersionHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
client, ok := r.Context().Value(ModelRegistryHttpClientKey).(integrations.HTTPClientInterface)
client, ok := r.Context().Value(constants.ModelRegistryHttpClientKey).(integrations.HTTPClientInterface)
if !ok {
app.serverErrorResponse(w, r, errors.New("REST client not found"))
return
Expand All @@ -198,7 +199,7 @@ func (app *App) GetAllModelArtifactsByModelVersionHandler(w http.ResponseWriter,
}

func (app *App) CreateModelArtifactByModelVersionHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
client, ok := r.Context().Value(ModelRegistryHttpClientKey).(integrations.HTTPClientInterface)
client, ok := r.Context().Value(constants.ModelRegistryHttpClientKey).(integrations.HTTPClientInterface)
if !ok {
app.serverErrorResponse(w, r, errors.New("REST client not found"))
return
Expand Down
5 changes: 3 additions & 2 deletions clients/ui/bff/internal/api/namespaces_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"errors"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"github.com/kubeflow/model-registry/ui/bff/internal/models"
"net/http"

Expand All @@ -12,14 +13,14 @@ type NamespacesEnvelope Envelope[[]models.NamespaceModel, None]

func (app *App) GetNamespacesHandler(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {

userId, ok := r.Context().Value(KubeflowUserIdKey).(string)
userId, ok := r.Context().Value(constants.KubeflowUserIdKey).(string)
if !ok || userId == "" {
app.serverErrorResponse(w, r, errors.New("failed to retrieve kubeflow-userid from context"))
return
}

var userGroups []string
if groups, ok := r.Context().Value(KubeflowUserGroupsKey).([]string); ok {
if groups, ok := r.Context().Value(constants.KubeflowUserGroupsKey).([]string); ok {
userGroups = groups
} else {
userGroups = []string{}
Expand Down
7 changes: 4 additions & 3 deletions clients/ui/bff/internal/api/namespaces_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"github.com/kubeflow/model-registry/ui/bff/internal/config"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"github.com/kubeflow/model-registry/ui/bff/internal/mocks"
"github.com/kubeflow/model-registry/ui/bff/internal/models"
"github.com/kubeflow/model-registry/ui/bff/internal/repositories"
Expand Down Expand Up @@ -31,7 +32,7 @@ var _ = Describe("TestNamespacesHandler", func() {
It("should return only dora-namespace for [email protected]", func() {
By("creating the HTTP request with the kubeflow-userid header")
req, err := http.NewRequest(http.MethodGet, NamespaceListPath, nil)
ctx := context.WithValue(req.Context(), KubeflowUserIdKey, mocks.DoraNonAdminUser)
ctx := context.WithValue(req.Context(), constants.KubeflowUserIdKey, mocks.DoraNonAdminUser)
req = req.WithContext(ctx)
Expect(err).NotTo(HaveOccurred())
rr := httptest.NewRecorder()
Expand All @@ -57,7 +58,7 @@ var _ = Describe("TestNamespacesHandler", func() {
It("should return all namespaces for [email protected]", func() {
By("creating the HTTP request with the kubeflow-userid header")
req, err := http.NewRequest(http.MethodGet, NamespaceListPath, nil)
ctx := context.WithValue(req.Context(), KubeflowUserIdKey, mocks.KubeflowUserIDHeaderValue)
ctx := context.WithValue(req.Context(), constants.KubeflowUserIdKey, mocks.KubeflowUserIDHeaderValue)
req = req.WithContext(ctx)
Expect(err).NotTo(HaveOccurred())
req.Header.Set("kubeflow-userid", "[email protected]")
Expand Down Expand Up @@ -87,7 +88,7 @@ var _ = Describe("TestNamespacesHandler", func() {
It("should return no namespaces for non-existent user", func() {
By("creating the HTTP request with a non-existent kubeflow-userid")
req, err := http.NewRequest(http.MethodGet, NamespaceListPath, nil)
ctx := context.WithValue(req.Context(), KubeflowUserIdKey, "[email protected]")
ctx := context.WithValue(req.Context(), constants.KubeflowUserIdKey, "[email protected]")
req = req.WithContext(ctx)
Expect(err).NotTo(HaveOccurred())
rr := httptest.NewRecorder()
Expand Down
Loading

0 comments on commit 350d705

Please sign in to comment.