diff --git a/components/camera/camera.go b/components/camera/camera.go index 71f9b4e25fc..7c97ad42849 100644 --- a/components/camera/camera.go +++ b/components/camera/camera.go @@ -9,6 +9,7 @@ import ( "fmt" "image" + "github.com/pion/mediadevices/pkg/prop" "github.com/pkg/errors" pb "go.viam.com/api/component/camera/v1" @@ -78,12 +79,6 @@ type ImageMetadata struct { } // A Camera is a resource that can capture frames. -type Camera interface { - resource.Resource - VideoSource -} - -// VideoSource represents anything that can capture frames. // For more information, see the [camera component docs]. // // Image example: @@ -102,17 +97,6 @@ type Camera interface { // // images, metadata, err := myCamera.Images(context.Background()) // -// Stream example: -// -// myCamera, err := camera.FromRobot(machine, "my_camera") -// -// // gets the stream from a camera -// stream, err := myCamera.Stream(context.Background()) -// -// // gets an image from the camera stream -// img, release, err := stream.Next(context.Background()) -// defer release() -// // NextPointCloud example: // // myCamera, err := camera.FromRobot(machine, "my_camera") @@ -127,7 +111,8 @@ type Camera interface { // err = myCamera.Close(context.Background()) // // [camera component docs]: https://docs.viam.com/components/camera/ -type VideoSource interface { +type Camera interface { + resource.Resource // Image returns a byte slice representing an image that tries to adhere to the MIME type hint. // Image also may return metadata about the frame. Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, ImageMetadata, error) @@ -136,10 +121,6 @@ type VideoSource interface { // along with associated metadata (just timestamp for now). It's not for getting a time series of images from the same imager. Images(ctx context.Context) ([]NamedImage, resource.ResponseMetadata, error) - // Stream returns a stream that makes a best effort to return consecutive images - // that may have a MIME type hint dictated in the context via gostream.WithMIMETypeHint. - Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) - // NextPointCloud returns the next immediately available point cloud, not necessarily one // a part of a sequence. In the future, there could be streaming of point clouds. NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) @@ -147,9 +128,13 @@ type VideoSource interface { // Properties returns properties that are intrinsic to the particular // implementation of a camera. Properties(ctx context.Context) (Properties, error) +} - // Close shuts down the resource and prevents further use. - Close(ctx context.Context) error +// StreamCamera is a camera that has `Stream` embedded to directly integrate with gostream. +// Note that generally, when writing camera components from scratch, embedding `Stream` is an anti-pattern. +type StreamCamera interface { + Camera + Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) } // ReadImage reads an image from the given source that is immediately available. @@ -173,6 +158,25 @@ func DecodeImageFromCamera(ctx context.Context, mimeType string, extra map[strin return img, nil } +// VideoSourceFromCamera converts a camera resource into a gostream VideoSource. +func VideoSourceFromCamera(ctx context.Context, cam Camera) gostream.VideoSource { + reader := gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) { + img, err := DecodeImageFromCamera(ctx, "", nil, cam) + if err != nil { + return nil, func() {}, err + } + return img, func() {}, nil + }) + camProps, err := cam.Properties(ctx) + if err != nil { + camProps = Properties{} + } + if camProps.IntrinsicParams == nil { + return gostream.NewVideoSource(reader, prop.Video{Width: 0, Height: 0}) + } + return gostream.NewVideoSource(reader, prop.Video{Width: camProps.IntrinsicParams.Width, Height: camProps.IntrinsicParams.Height}) +} + // A PointCloudSource is a source that can generate pointclouds. type PointCloudSource interface { NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) diff --git a/components/camera/camera_test.go b/components/camera/camera_test.go index 43c5951548f..922cce0a154 100644 --- a/components/camera/camera_test.go +++ b/components/camera/camera_test.go @@ -10,7 +10,6 @@ import ( "go.viam.com/utils/artifact" "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/logging" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" @@ -186,18 +185,6 @@ func TestCameraWithNoProjector(t *testing.T) { _, got := pc.At(0, 0, 0) test.That(t, got, test.ShouldBeTrue) - // TODO(hexbabe): remove below test when Stream is refactored - t.Run("ReadImage depth map without projector", func(t *testing.T) { - img, _, err := camera.ReadImage( - gostream.WithMIMETypeHint(context.Background(), rutils.WithLazyMIMEType(rutils.MimeTypePNG)), - noProj2) - test.That(t, err, test.ShouldBeNil) - depthImg := img.(*rimage.DepthMap) - test.That(t, err, test.ShouldBeNil) - test.That(t, depthImg.Bounds().Dx(), test.ShouldEqual, 1280) - test.That(t, depthImg.Bounds().Dy(), test.ShouldEqual, 720) - }) - img, err := camera.DecodeImageFromCamera(context.Background(), rutils.WithLazyMIMEType(rutils.MimeTypePNG), nil, noProj2) test.That(t, err, test.ShouldBeNil) @@ -247,26 +234,6 @@ func TestCameraWithProjector(t *testing.T) { _, got := pc.At(0, 0, 0) test.That(t, got, test.ShouldBeTrue) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("ReadImage depth map with projector", func(t *testing.T) { - img, _, err := camera.ReadImage( - gostream.WithMIMETypeHint(context.Background(), rutils.MimeTypePNG), - cam2) - test.That(t, err, test.ShouldBeNil) - - depthImg := img.(*rimage.DepthMap) - test.That(t, err, test.ShouldBeNil) - test.That(t, depthImg.Bounds().Dx(), test.ShouldEqual, 1280) - test.That(t, depthImg.Bounds().Dy(), test.ShouldEqual, 720) - // cam2 should implement a default GetImages, that just returns the one image - images, _, err := cam2.Images(context.Background()) - test.That(t, err, test.ShouldBeNil) - test.That(t, len(images), test.ShouldEqual, 1) - test.That(t, images[0].Image, test.ShouldHaveSameTypeAs, &rimage.DepthMap{}) - test.That(t, images[0].Image.Bounds().Dx(), test.ShouldEqual, 1280) - test.That(t, images[0].Image.Bounds().Dy(), test.ShouldEqual, 720) - }) - img, err := camera.DecodeImageFromCamera(context.Background(), rutils.MimeTypePNG, nil, cam2) test.That(t, err, test.ShouldBeNil) diff --git a/components/camera/client_test.go b/components/camera/client_test.go index 45a23e36155..fde61c2622c 100644 --- a/components/camera/client_test.go +++ b/components/camera/client_test.go @@ -178,25 +178,6 @@ func TestClient(t *testing.T) { test.That(t, err, test.ShouldBeNil) camera1Client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger) test.That(t, err, test.ShouldBeNil) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("ReadImage from camera client 1", func(t *testing.T) { - // Test Stream and Next - ctx := gostream.WithMIMETypeHint(context.Background(), rutils.MimeTypeRawRGBA) - stream, err := camera1Client.Stream(ctx) - test.That(t, err, test.ShouldBeNil) - frame, _, err := stream.Next(ctx) - test.That(t, err, test.ShouldBeNil) - compVal, _, err := rimage.CompareImages(img, frame) - test.That(t, err, test.ShouldBeNil) - test.That(t, compVal, test.ShouldEqual, 0) // exact copy, no color conversion - - // Test ReadImage - frame, _, err = camera.ReadImage(ctx, camera1Client) - test.That(t, err, test.ShouldBeNil) - compVal, _, err = rimage.CompareImages(img, frame) - test.That(t, err, test.ShouldBeNil) - test.That(t, compVal, test.ShouldEqual, 0) - }) frame, err := camera.DecodeImageFromCamera(context.Background(), rutils.MimeTypeRawRGBA, nil, camera1Client) test.That(t, err, test.ShouldBeNil) compVal, _, err := rimage.CompareImages(img, frame) @@ -246,17 +227,6 @@ func TestClient(t *testing.T) { client, err := resourceAPI.RPCClient(context.Background(), conn, "", camera.Named(depthCameraName), logger) test.That(t, err, test.ShouldBeNil) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("ReadImage from camera depth", func(t *testing.T) { - ctx := gostream.WithMIMETypeHint( - context.Background(), rutils.WithLazyMIMEType(rutils.MimeTypePNG)) - frame, _, err := camera.ReadImage(ctx, client) - test.That(t, err, test.ShouldBeNil) - dm, err := rimage.ConvertImageToDepthMap(context.Background(), frame) - test.That(t, err, test.ShouldBeNil) - test.That(t, dm, test.ShouldResemble, depthImg) - }) - ctx := context.Background() frame, err := camera.DecodeImageFromCamera(ctx, rutils.WithLazyMIMEType(rutils.MimeTypePNG), nil, client) test.That(t, err, test.ShouldBeNil) @@ -274,10 +244,6 @@ func TestClient(t *testing.T) { client2, err := resourceAPI.RPCClient(context.Background(), conn, "", camera.Named(failCameraName), logger) test.That(t, err, test.ShouldBeNil) - _, _, err = camera.ReadImage(context.Background(), client2) - test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) - _, _, err = client2.Image(context.Background(), "", nil) test.That(t, err, test.ShouldNotBeNil) test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) @@ -305,12 +271,6 @@ func TestClient(t *testing.T) { } ctx := context.Background() - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("ReadImage test failure", func(t *testing.T) { - _, _, err = camera.ReadImage(ctx, camClient) - test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) - }) _, _, err = camClient.Image(ctx, "", nil) test.That(t, err, test.ShouldNotBeNil) test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error()) @@ -509,17 +469,6 @@ func TestClientLazyImage(t *testing.T) { camera1Client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger) test.That(t, err, test.ShouldBeNil) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("lazily decode from ReadImage without lazy suffix", func(t *testing.T) { - ctx := gostream.WithMIMETypeHint(context.Background(), rutils.MimeTypePNG) - frame, _, err := camera.ReadImage(ctx, camera1Client) - test.That(t, err, test.ShouldBeNil) - // Should always lazily decode - test.That(t, frame, test.ShouldHaveSameTypeAs, &rimage.LazyEncodedImage{}) - frameLazy := frame.(*rimage.LazyEncodedImage) - test.That(t, frameLazy.RawData(), test.ShouldResemble, imgBuf.Bytes()) - }) - ctx := context.Background() frame, err := camera.DecodeImageFromCamera(ctx, rutils.MimeTypePNG, nil, camera1Client) test.That(t, err, test.ShouldBeNil) @@ -528,20 +477,6 @@ func TestClientLazyImage(t *testing.T) { frameLazy := frame.(*rimage.LazyEncodedImage) test.That(t, frameLazy.RawData(), test.ShouldResemble, imgBuf.Bytes()) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("lazily decode from ReadImage", func(t *testing.T) { - ctx = gostream.WithMIMETypeHint(context.Background(), rutils.WithLazyMIMEType(rutils.MimeTypePNG)) - frame, _, err = camera.ReadImage(ctx, camera1Client) - test.That(t, err, test.ShouldBeNil) - test.That(t, frame, test.ShouldHaveSameTypeAs, &rimage.LazyEncodedImage{}) - frameLazy = frame.(*rimage.LazyEncodedImage) - test.That(t, frameLazy.RawData(), test.ShouldResemble, imgBuf.Bytes()) - test.That(t, frameLazy.MIMEType(), test.ShouldEqual, rutils.MimeTypePNG) - compVal, _, err := rimage.CompareImages(img, frame) - test.That(t, err, test.ShouldBeNil) - test.That(t, compVal, test.ShouldEqual, 0) // exact copy, no color conversion - }) - ctx = context.Background() frame, err = camera.DecodeImageFromCamera(ctx, rutils.WithLazyMIMEType(rutils.MimeTypePNG), nil, camera1Client) test.That(t, err, test.ShouldBeNil) @@ -619,78 +554,6 @@ func TestClientWithInterceptor(t *testing.T) { test.That(t, conn.Close(), test.ShouldBeNil) } -func TestClientStreamAfterClose(t *testing.T) { - // Set up gRPC server - logger := logging.NewTestLogger(t) - listener, err := net.Listen("tcp", "localhost:0") - test.That(t, err, test.ShouldBeNil) - rpcServer, err := rpc.NewServer(logger, rpc.WithUnauthenticated()) - test.That(t, err, test.ShouldBeNil) - - // Set up camera that can stream images - img := image.NewNRGBA(image.Rect(0, 0, 4, 4)) - injectCamera := &inject.Camera{} - injectCamera.PropertiesFunc = func(ctx context.Context) (camera.Properties, error) { - return camera.Properties{}, nil - } - injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { - imgBytes, err := rimage.EncodeImage(ctx, img, mimeType) - test.That(t, err, test.ShouldBeNil) - return imgBytes, camera.ImageMetadata{MimeType: mimeType}, nil - } - - // Register CameraService API in our gRPC server. - resources := map[resource.Name]camera.Camera{ - camera.Named(testCameraName): injectCamera, - } - cameraSvc, err := resource.NewAPIResourceCollection(camera.API, resources) - test.That(t, err, test.ShouldBeNil) - resourceAPI, ok, err := resource.LookupAPIRegistration[camera.Camera](camera.API) - test.That(t, err, test.ShouldBeNil) - test.That(t, ok, test.ShouldBeTrue) - test.That(t, resourceAPI.RegisterRPCService(context.Background(), rpcServer, cameraSvc), test.ShouldBeNil) - - // Start serving requests. - go rpcServer.Serve(listener) - defer rpcServer.Stop() - - // Make client connection - conn, err := viamgrpc.Dial(context.Background(), listener.Addr().String(), logger) - test.That(t, err, test.ShouldBeNil) - client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger) - test.That(t, err, test.ShouldBeNil) - - // Get a stream - stream, err := client.Stream(context.Background()) - test.That(t, stream, test.ShouldNotBeNil) - test.That(t, err, test.ShouldBeNil) - - // Read from stream - media, _, err := stream.Next(context.Background()) - test.That(t, media, test.ShouldNotBeNil) - test.That(t, err, test.ShouldBeNil) - - // Close client and read from stream - test.That(t, client.Close(context.Background()), test.ShouldBeNil) - media, _, err = stream.Next(context.Background()) - test.That(t, media, test.ShouldBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, "context canceled") - - // Get a new stream - stream, err = client.Stream(context.Background()) - test.That(t, stream, test.ShouldNotBeNil) - test.That(t, err, test.ShouldBeNil) - - // Read from the new stream - media, _, err = stream.Next(context.Background()) - test.That(t, media, test.ShouldNotBeNil) - test.That(t, err, test.ShouldBeNil) - - // Close client and connection - test.That(t, client.Close(context.Background()), test.ShouldBeNil) - test.That(t, conn.Close(), test.ShouldBeNil) -} - // See modmanager_test.go for the happy path (aka, when the // client has a webrtc connection). func TestRTPPassthroughWithoutWebRTC(t *testing.T) { diff --git a/components/camera/fake/camera_test.go b/components/camera/fake/camera_test.go index 06e766baef4..4d14c1ca7ba 100644 --- a/components/camera/fake/camera_test.go +++ b/components/camera/fake/camera_test.go @@ -94,20 +94,6 @@ func TestRTPPassthrough(t *testing.T) { cam, err := NewCamera(context.Background(), nil, cfg, logger) test.That(t, err, test.ShouldBeNil) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("test Stream and Next", func(t *testing.T) { - camera, err := NewCamera(context.Background(), nil, cfg, logger) - test.That(t, err, test.ShouldBeNil) - - stream, err := camera.Stream(context.Background()) - test.That(t, err, test.ShouldBeNil) - img, _, err := stream.Next(context.Background()) - test.That(t, err, test.ShouldBeNil) - // GetImage returns the world jpeg - test.That(t, img.Bounds(), test.ShouldResemble, image.Rectangle{Max: image.Point{X: 480, Y: 270}}) - test.That(t, camera, test.ShouldNotBeNil) - }) - img, err := camera.DecodeImageFromCamera(context.Background(), utils.MimeTypeRawRGBA, nil, cam) test.That(t, err, test.ShouldBeNil) // GetImage returns the world jpeg diff --git a/components/camera/fake/image_file_test.go b/components/camera/fake/image_file_test.go index c1415b5265a..86fbe1ff325 100644 --- a/components/camera/fake/image_file_test.go +++ b/components/camera/fake/image_file_test.go @@ -27,10 +27,6 @@ func TestPCD(t *testing.T) { cam, err := newCamera(ctx, resource.Name{API: camera.API}, cfg, logger) test.That(t, err, test.ShouldBeNil) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - _, err = cam.Stream(ctx) - test.That(t, err, test.ShouldBeNil) - pc, err := cam.NextPointCloud(ctx) test.That(t, err, test.ShouldBeNil) test.That(t, pc.Size(), test.ShouldEqual, 628) @@ -50,16 +46,6 @@ func TestPCD(t *testing.T) { readInImage, err := rimage.NewImageFromFile(artifact.MustPath("vision/objectdetection/detection_test.jpg")) test.That(t, err, test.ShouldBeNil) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("test Stream and Next", func(t *testing.T) { - stream, err := cam.Stream(ctx) - test.That(t, err, test.ShouldBeNil) - strmImg, _, err := stream.Next(ctx) - test.That(t, err, test.ShouldBeNil) - test.That(t, strmImg, test.ShouldResemble, readInImage) - test.That(t, strmImg.Bounds(), test.ShouldResemble, readInImage.Bounds()) - }) - imgBytes, _, err := cam.Image(ctx, utils.MimeTypeJPEG, nil) test.That(t, err, test.ShouldBeNil) expectedBytes, err := rimage.EncodeImage(ctx, readInImage, utils.MimeTypeJPEG) @@ -84,16 +70,6 @@ func TestColor(t *testing.T) { readInImage, err := rimage.NewImageFromFile(artifact.MustPath("vision/objectdetection/detection_test.jpg")) test.That(t, err, test.ShouldBeNil) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("test Stream and Next", func(t *testing.T) { - stream, err := cam.Stream(ctx) - test.That(t, err, test.ShouldBeNil) - strmImg, _, err := stream.Next(ctx) - test.That(t, err, test.ShouldBeNil) - test.That(t, strmImg, test.ShouldResemble, readInImage) - test.That(t, strmImg.Bounds(), test.ShouldResemble, readInImage.Bounds()) - }) - imgBytes, _, err := cam.Image(ctx, utils.MimeTypeJPEG, nil) test.That(t, err, test.ShouldBeNil) expectedBytes, err := rimage.EncodeImage(ctx, readInImage, utils.MimeTypeJPEG) @@ -126,21 +102,6 @@ func TestColorOddResolution(t *testing.T) { cam, err := newCamera(ctx, resource.Name{API: camera.API}, cfg, logger) test.That(t, err, test.ShouldBeNil) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("test Stream and Next", func(t *testing.T) { - stream, err := cam.Stream(ctx) - test.That(t, err, test.ShouldBeNil) - - readInImage, err := rimage.NewImageFromFile(imgFilePath) - test.That(t, err, test.ShouldBeNil) - - strmImg, _, err := stream.Next(ctx) - test.That(t, err, test.ShouldBeNil) - - expectedBounds := image.Rect(0, 0, readInImage.Bounds().Dx()-1, readInImage.Bounds().Dy()-1) - test.That(t, strmImg, test.ShouldResemble, readInImage.SubImage(expectedBounds)) - }) - strmImg, err := camera.DecodeImageFromCamera(ctx, utils.MimeTypeRawRGBA, nil, cam) test.That(t, err, test.ShouldBeNil) diff --git a/components/camera/ffmpeg/ffmpeg.go b/components/camera/ffmpeg/ffmpeg.go index 8291e404354..90988ab9091 100644 --- a/components/camera/ffmpeg/ffmpeg.go +++ b/components/camera/ffmpeg/ffmpeg.go @@ -93,7 +93,7 @@ func (writer stderrWriter) Write(p []byte) (n int, err error) { } // NewFFMPEGCamera instantiates a new camera which leverages ffmpeg to handle a variety of potential video types. -func NewFFMPEGCamera(ctx context.Context, conf *Config, logger logging.Logger) (camera.VideoSource, error) { +func NewFFMPEGCamera(ctx context.Context, conf *Config, logger logging.Logger) (camera.StreamCamera, error) { // make sure ffmpeg is in the path before doing anything else if _, err := exec.LookPath("ffmpeg"); err != nil { return nil, err diff --git a/components/camera/ffmpeg/ffmpeg_test.go b/components/camera/ffmpeg/ffmpeg_test.go index f8f9fb5f626..87a8c66b3ab 100644 --- a/components/camera/ffmpeg/ffmpeg_test.go +++ b/components/camera/ffmpeg/ffmpeg_test.go @@ -8,6 +8,7 @@ import ( "go.viam.com/test" "go.viam.com/utils/artifact" + "go.viam.com/rdk/components/camera" "go.viam.com/rdk/logging" "go.viam.com/rdk/utils" ) @@ -18,16 +19,15 @@ func TestFFMPEGCamera(t *testing.T) { path := artifact.MustPath("components/camera/ffmpeg/testsrc.mpg") cam, err := NewFFMPEGCamera(ctx, &Config{VideoPath: path}, logger) test.That(t, err, test.ShouldBeNil) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - stream, err := cam.Stream(ctx) test.That(t, err, test.ShouldBeNil) for i := 0; i < 5; i++ { - _, _, err := stream.Next(ctx) + _, err := camera.DecodeImageFromCamera(ctx, utils.MimeTypeJPEG, nil, cam) + test.That(t, err, test.ShouldBeNil) + _, _, err = cam.Image(ctx, utils.MimeTypeJPEG, nil) test.That(t, err, test.ShouldBeNil) _, _, err = cam.Image(ctx, utils.MimeTypeJPEG, nil) test.That(t, err, test.ShouldBeNil) } - test.That(t, stream.Close(context.Background()), test.ShouldBeNil) test.That(t, cam.Close(context.Background()), test.ShouldBeNil) } diff --git a/components/camera/replaypcd/replaypcd_test.go b/components/camera/replaypcd/replaypcd_test.go index 90b397edc21..39a60203010 100644 --- a/components/camera/replaypcd/replaypcd_test.go +++ b/components/camera/replaypcd/replaypcd_test.go @@ -660,11 +660,6 @@ func TestReplayPCDUnimplementedFunctions(t *testing.T) { replayCamera, _, serverClose, err := createNewReplayPCDCamera(ctx, t, replayCamCfg, true) test.That(t, err, test.ShouldBeNil) - t.Run("Stream", func(t *testing.T) { - _, err := replayCamera.Stream(ctx, nil) - test.That(t, err.Error(), test.ShouldEqual, "Stream is unimplemented") - }) - err = replayCamera.Close(ctx) test.That(t, err, test.ShouldBeNil) diff --git a/components/camera/server.go b/components/camera/server.go index a38e68e9f5a..d14d8127d4e 100644 --- a/components/camera/server.go +++ b/components/camera/server.go @@ -33,7 +33,11 @@ type serviceServer struct { func NewRPCServiceServer(coll resource.APIResourceCollection[Camera]) interface{} { logger := logging.NewLogger("camserver") imgTypes := make(map[string]ImageType) - return &serviceServer{coll: coll, logger: logger, imgTypes: imgTypes} + return &serviceServer{ + coll: coll, + logger: logger, + imgTypes: imgTypes, + } } // GetImage returns an image from a camera of the underlying robot. If a specific MIME type diff --git a/components/camera/transformpipeline/classifier.go b/components/camera/transformpipeline/classifier.go index 1a13cec9e59..d0c60442481 100644 --- a/components/camera/transformpipeline/classifier.go +++ b/components/camera/transformpipeline/classifier.go @@ -27,7 +27,7 @@ type classifierConfig struct { // classifierSource takes an image from the camera, and overlays labels from the classifier. type classifierSource struct { - src camera.VideoSource + src camera.StreamCamera classifierName string maxClassifications uint32 labelFilter classification.Postprocessor @@ -37,8 +37,8 @@ type classifierSource struct { func newClassificationsTransform( ctx context.Context, - source camera.VideoSource, r robot.Robot, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { + source camera.StreamCamera, r robot.Robot, am utils.AttributeMap, +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*classifierConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/classifier_test.go b/components/camera/transformpipeline/classifier_test.go index 7cb212f139a..fd8666e9451 100644 --- a/components/camera/transformpipeline/classifier_test.go +++ b/components/camera/transformpipeline/classifier_test.go @@ -82,7 +82,6 @@ func buildRobotWithClassifier(logger logging.Logger) (robot.Robot, error) { return r, nil } -//nolint:dupl func TestClassifierSource(t *testing.T) { logger := logging.NewTestLogger(t) ctx, cancel := context.WithCancel(context.Background()) @@ -99,7 +98,9 @@ func TestClassifierSource(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer classifier.Close(ctx) - resImg, _, err := camera.ReadImage(ctx, classifier) + streamClassifier, ok := classifier.(camera.StreamCamera) + test.That(t, ok, test.ShouldBeTrue) + resImg, _, err := camera.ReadImage(ctx, streamClassifier) test.That(t, err, test.ShouldBeNil) ovImg := rimage.ConvertImage(resImg) diff --git a/components/camera/transformpipeline/composed.go b/components/camera/transformpipeline/composed.go index 9b9811132f0..4d1a2bc3b25 100644 --- a/components/camera/transformpipeline/composed.go +++ b/components/camera/transformpipeline/composed.go @@ -18,28 +18,25 @@ import ( // depthToPretty takes a depth image and turns into a colorful image, with blue being // farther away, and red being closest. Actual depth information is lost in the transform. type depthToPretty struct { - src camera.VideoSource + src camera.StreamCamera cameraModel *transform.PinholeCameraModel } -func propsFromVideoSource(ctx context.Context, source camera.VideoSource) (camera.Properties, error) { +func propsFromVideoSource(ctx context.Context, source camera.Camera) (camera.Properties, error) { var camProps camera.Properties - - if cameraSrc, ok := source.(camera.Camera); ok { - props, err := cameraSrc.Properties(ctx) - if err != nil { - return camProps, err - } - camProps = props + props, err := source.Properties(ctx) + if err != nil { + return camProps, err } + camProps = props return camProps, nil } func newDepthToPrettyTransform( ctx context.Context, - source camera.VideoSource, + source camera.StreamCamera, stream camera.ImageType, -) (camera.VideoSource, camera.ImageType, error) { +) (camera.StreamCamera, camera.ImageType, error) { if stream != camera.DepthStream { return nil, camera.UnspecifiedStream, errors.Errorf("source has stream type %s, depth_to_pretty only supports depth stream inputs", stream) @@ -109,16 +106,16 @@ type overlayConfig struct { // overlaySource overlays the depth and color 2D images in order to debug the alignment of the two images. type overlaySource struct { - src camera.VideoSource + src camera.StreamCamera cameraModel *transform.PinholeCameraModel } func newOverlayTransform( ctx context.Context, - src camera.VideoSource, + src camera.StreamCamera, stream camera.ImageType, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*overlayConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/composed_test.go b/components/camera/transformpipeline/composed_test.go index bf288cf938e..ff24b83fb4b 100644 --- a/components/camera/transformpipeline/composed_test.go +++ b/components/camera/transformpipeline/composed_test.go @@ -63,22 +63,23 @@ func TestComposed(t *testing.T) { test.That(t, err, test.ShouldBeNil) test.That(t, pc.Size(), test.ShouldEqual, 1) - myOverlay, stream, err := newOverlayTransform(context.Background(), cloudSource, camera.ColorStream, am) + streamedCloudSource := streamCameraFromCamera(context.Background(), cloudSource) + myOverlay, stream, err := newOverlayTransform(context.Background(), streamedCloudSource, camera.ColorStream, am) test.That(t, err, test.ShouldBeNil) test.That(t, stream, test.ShouldEqual, camera.ColorStream) pic, _, err := camera.ReadImage(context.Background(), myOverlay) test.That(t, err, test.ShouldBeNil) test.That(t, pic.Bounds(), test.ShouldResemble, image.Rect(0, 0, 1280, 720)) - myPipeline, err := newTransformPipeline(context.Background(), cloudSource, conf, robot, logger) + myPipeline, err := newTransformPipeline(context.Background(), streamedCloudSource, nil, conf, robot, logger) test.That(t, err, test.ShouldBeNil) defer myPipeline.Close(context.Background()) - pic, _, err = camera.ReadImage(context.Background(), myPipeline) + pic, err = camera.DecodeImageFromCamera(context.Background(), utils.MimeTypeJPEG, nil, myPipeline) test.That(t, err, test.ShouldBeNil) test.That(t, pic.Bounds(), test.ShouldResemble, image.Rect(0, 0, 1280, 720)) // wrong result with bad config - _, _, err = newOverlayTransform(context.Background(), cloudSource, camera.ColorStream, utils.AttributeMap{}) + _, _, err = newOverlayTransform(context.Background(), streamedCloudSource, camera.ColorStream, utils.AttributeMap{}) test.That(t, err, test.ShouldNotBeNil) test.That(t, err, test.ShouldWrap, transform.ErrNoIntrinsics) // wrong config, still no intrinsics @@ -88,7 +89,7 @@ func TestComposed(t *testing.T) { Height: 720, }, } - _, _, err = newOverlayTransform(context.Background(), cloudSource, camera.ColorStream, am) + _, _, err = newOverlayTransform(context.Background(), streamedCloudSource, camera.ColorStream, am) test.That(t, err, test.ShouldNotBeNil) test.That(t, err, test.ShouldWrap, transform.ErrNoIntrinsics) // wrong config, no attributes @@ -99,7 +100,7 @@ func TestComposed(t *testing.T) { }, }, } - _, err = newTransformPipeline(context.Background(), cloudSource, conf, robot, logger) + _, err = newTransformPipeline(context.Background(), streamedCloudSource, nil, conf, robot, logger) test.That(t, err, test.ShouldNotBeNil) test.That(t, err, test.ShouldWrap, transform.ErrNoIntrinsics) } diff --git a/components/camera/transformpipeline/depth_edges.go b/components/camera/transformpipeline/depth_edges.go index 841c2809cb3..61b4e8dc24b 100644 --- a/components/camera/transformpipeline/depth_edges.go +++ b/components/camera/transformpipeline/depth_edges.go @@ -22,13 +22,13 @@ type depthEdgesConfig struct { // depthEdgesSource applies a Canny Edge Detector to the depth map. type depthEdgesSource struct { - src camera.VideoSource + src camera.StreamCamera detector *rimage.CannyEdgeDetector blurRadius float64 } -func newDepthEdgesTransform(ctx context.Context, source camera.VideoSource, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { +func newDepthEdgesTransform(ctx context.Context, source camera.StreamCamera, am utils.AttributeMap, +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*depthEdgesConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/detector.go b/components/camera/transformpipeline/detector.go index 0728da400ea..23d50a35366 100644 --- a/components/camera/transformpipeline/detector.go +++ b/components/camera/transformpipeline/detector.go @@ -26,7 +26,7 @@ type detectorConfig struct { // detectorSource takes an image from the camera, and overlays the detections from the detector. type detectorSource struct { - src camera.VideoSource + src camera.StreamCamera detectorName string labelFilter objectdetection.Postprocessor // must build from ValidLabels confFilter objectdetection.Postprocessor @@ -35,10 +35,10 @@ type detectorSource struct { func newDetectionsTransform( ctx context.Context, - source camera.VideoSource, + source camera.StreamCamera, r robot.Robot, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*detectorConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/detector_test.go b/components/camera/transformpipeline/detector_test.go index 1b24e217aca..334e9a57469 100644 --- a/components/camera/transformpipeline/detector_test.go +++ b/components/camera/transformpipeline/detector_test.go @@ -104,7 +104,6 @@ func buildRobotWithFakeCamera(logger logging.Logger) (robot.Robot, error) { return robotimpl.RobotFromConfigPath(context.Background(), newConfFile, logger) } -//nolint:dupl func TestColorDetectionSource(t *testing.T) { logger := logging.NewTestLogger(t) ctx, cancel := context.WithCancel(context.Background()) @@ -121,7 +120,7 @@ func TestColorDetectionSource(t *testing.T) { test.That(t, err, test.ShouldBeNil) defer detector.Close(ctx) - resImg, _, err := camera.ReadImage(ctx, detector) + resImg, err := camera.DecodeImageFromCamera(ctx, rutils.MimeTypePNG, nil, detector) test.That(t, err, test.ShouldBeNil) ovImg := rimage.ConvertImage(resImg) test.That(t, ovImg.GetXY(852, 431), test.ShouldResemble, rimage.Red) @@ -146,7 +145,7 @@ func BenchmarkColorDetectionSource(b *testing.B) { b.ResetTimer() // begin benchmarking for i := 0; i < b.N; i++ { - _, _, _ = camera.ReadImage(ctx, detector) + _, _ = camera.DecodeImageFromCamera(ctx, rutils.MimeTypeJPEG, nil, detector) } test.That(b, detector.Close(context.Background()), test.ShouldBeNil) } diff --git a/components/camera/transformpipeline/mods.go b/components/camera/transformpipeline/mods.go index b115fde64a3..e91e4deb74b 100644 --- a/components/camera/transformpipeline/mods.go +++ b/components/camera/transformpipeline/mods.go @@ -26,14 +26,14 @@ type rotateConfig struct { // rotateSource is the source to be rotated and the kind of image type. type rotateSource struct { - src camera.VideoSource + src camera.StreamCamera stream camera.ImageType angle float64 } // newRotateTransform creates a new rotation transform. -func newRotateTransform(ctx context.Context, source camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { +func newRotateTransform(ctx context.Context, source camera.StreamCamera, stream camera.ImageType, am utils.AttributeMap, +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*rotateConfig](am) if err != nil { return nil, camera.UnspecifiedStream, errors.Wrap(err, "cannot parse rotate attribute map") @@ -97,7 +97,7 @@ type resizeConfig struct { } type resizeSource struct { - src camera.VideoSource + src camera.StreamCamera stream camera.ImageType height int width int @@ -105,8 +105,8 @@ type resizeSource struct { // newResizeTransform creates a new resize transform. func newResizeTransform( - ctx context.Context, source camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { + ctx context.Context, source camera.StreamCamera, stream camera.ImageType, am utils.AttributeMap, +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*resizeConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err @@ -167,7 +167,7 @@ type cropConfig struct { } type cropSource struct { - src camera.VideoSource + src camera.StreamCamera imgType camera.ImageType cropWindow image.Rectangle cropRel []float64 @@ -177,8 +177,8 @@ type cropSource struct { // newCropTransform creates a new crop transform. func newCropTransform( - ctx context.Context, source camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { + ctx context.Context, source camera.StreamCamera, stream camera.ImageType, am utils.AttributeMap, +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*cropConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/mods_test.go b/components/camera/transformpipeline/mods_test.go index f4a347de15a..61b701572a2 100644 --- a/components/camera/transformpipeline/mods_test.go +++ b/components/camera/transformpipeline/mods_test.go @@ -608,7 +608,8 @@ func BenchmarkColorRotate(b *testing.B) { am := utils.AttributeMap{ "angle_degs": 180, } - rs, stream, err := newRotateTransform(context.Background(), src, camera.ColorStream, am) + streamSrc := streamCameraFromCamera(context.Background(), src) + rs, stream, err := newRotateTransform(context.Background(), streamSrc, camera.ColorStream, am) test.That(b, err, test.ShouldBeNil) test.That(b, stream, test.ShouldEqual, camera.ColorStream) @@ -633,7 +634,8 @@ func BenchmarkDepthRotate(b *testing.B) { am := utils.AttributeMap{ "angle_degs": 180, } - rs, stream, err := newRotateTransform(context.Background(), src, camera.DepthStream, am) + streamSrc := streamCameraFromCamera(context.Background(), src) + rs, stream, err := newRotateTransform(context.Background(), streamSrc, camera.DepthStream, am) test.That(b, err, test.ShouldBeNil) test.That(b, stream, test.ShouldEqual, camera.DepthStream) diff --git a/components/camera/transformpipeline/pipeline.go b/components/camera/transformpipeline/pipeline.go index 2932c61682d..ca76263a01b 100644 --- a/components/camera/transformpipeline/pipeline.go +++ b/components/camera/transformpipeline/pipeline.go @@ -13,6 +13,7 @@ import ( "go.uber.org/multierr" "go.viam.com/rdk/components/camera" + "go.viam.com/rdk/gostream" "go.viam.com/rdk/logging" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" @@ -48,11 +49,12 @@ func init() { if err != nil { return nil, fmt.Errorf("no source camera for transform pipeline (%s): %w", sourceName, err) } - src, err := newTransformPipeline(ctx, source, newConf, actualR, logger) + streamCamera := streamCameraFromCamera(ctx, source) + src, err := newTransformPipeline(ctx, streamCamera, conf.ResourceName().AsNamed(), newConf, actualR, logger) if err != nil { return nil, err } - return camera.FromVideoSource(conf.ResourceName(), src, logger), nil + return src, nil }, }) } @@ -86,13 +88,36 @@ func (cfg *transformConfig) Validate(path string) ([]string, error) { return deps, nil } +type streamCamera struct { + camera.Camera + vs gostream.VideoSource +} + +func (sc *streamCamera) Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { + if sc.vs != nil { + return sc.vs.Stream(ctx, errHandlers...) + } + return sc.Stream(ctx, errHandlers...) +} + +func streamCameraFromCamera(ctx context.Context, cam camera.Camera) camera.StreamCamera { + if streamCam, ok := cam.(camera.StreamCamera); ok { + return streamCam + } + return &streamCamera{ + Camera: cam, + vs: camera.VideoSourceFromCamera(ctx, cam), + } +} + func newTransformPipeline( ctx context.Context, - source camera.VideoSource, + source camera.StreamCamera, + named resource.Named, cfg *transformConfig, r robot.Robot, logger logging.Logger, -) (camera.VideoSource, error) { +) (camera.StreamCamera, error) { if source == nil { return nil, errors.New("no source camera for transform pipeline") } @@ -100,7 +125,7 @@ func newTransformPipeline( return nil, errors.New("pipeline has no transforms in it") } // check if the source produces a depth image or color image - img, release, err := camera.ReadImage(ctx, source) + img, err := camera.DecodeImageFromCamera(ctx, "", nil, source) var streamType camera.ImageType if err != nil { @@ -112,33 +137,32 @@ func newTransformPipeline( } else { streamType = camera.ColorStream } - if release != nil { - release() - } // loop through the pipeline and create the image flow - pipeline := make([]camera.VideoSource, 0, len(cfg.Pipeline)) - lastSource := source + pipeline := make([]camera.StreamCamera, 0, len(cfg.Pipeline)) + lastSource := streamCameraFromCamera(ctx, source) for _, tr := range cfg.Pipeline { src, newStreamType, err := buildTransform(ctx, r, lastSource, streamType, tr, cfg.Source) if err != nil { return nil, err } - pipeline = append(pipeline, src) - lastSource = src + streamSrc := streamCameraFromCamera(ctx, src) + pipeline = append(pipeline, streamSrc) + lastSource = streamSrc streamType = newStreamType } cameraModel := camera.NewPinholeModelWithBrownConradyDistortion(cfg.CameraParameters, cfg.DistortionParameters) return camera.NewVideoSourceFromReader( ctx, - transformPipeline{pipeline, lastSource, cfg.CameraParameters, logger}, + transformPipeline{named, pipeline, lastSource, cfg.CameraParameters, logger}, &cameraModel, streamType, ) } type transformPipeline struct { - pipeline []camera.VideoSource - src camera.VideoSource + resource.Named + pipeline []camera.StreamCamera + src camera.Camera intrinsicParameters *transform.PinholeCameraIntrinsics logger logging.Logger } @@ -146,7 +170,11 @@ type transformPipeline struct { func (tp transformPipeline) Read(ctx context.Context) (image.Image, func(), error) { ctx, span := trace.StartSpan(ctx, "camera::transformpipeline::Read") defer span.End() - return camera.ReadImage(ctx, tp.src) + img, err := camera.DecodeImageFromCamera(ctx, "", nil, tp.src) + if err != nil { + return nil, func() {}, err + } + return img, func() {}, nil } func (tp transformPipeline) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) { diff --git a/components/camera/transformpipeline/pipeline_test.go b/components/camera/transformpipeline/pipeline_test.go index eb31cd06e52..7b1b8ae8bb9 100644 --- a/components/camera/transformpipeline/pipeline_test.go +++ b/components/camera/transformpipeline/pipeline_test.go @@ -34,13 +34,14 @@ func TestTransformPipelineColor(t *testing.T) { test.That(t, err, test.ShouldBeNil) source := gostream.NewVideoSource(&fake.StaticSource{ColorImg: img}, prop.Video{}) src, err := camera.WrapVideoSourceWithProjector(context.Background(), source, nil, camera.ColorStream) + streamSrc := streamCameraFromCamera(context.Background(), src) test.That(t, err, test.ShouldBeNil) - inImg, _, err := camera.ReadImage(context.Background(), src) + inImg, err := camera.DecodeImageFromCamera(context.Background(), "", nil, src) test.That(t, err, test.ShouldBeNil) test.That(t, inImg.Bounds().Dx(), test.ShouldEqual, 128) test.That(t, inImg.Bounds().Dy(), test.ShouldEqual, 72) - color, err := newTransformPipeline(context.Background(), src, transformConf, r, logger) + color, err := newTransformPipeline(context.Background(), streamSrc, nil, transformConf, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err := camera.ReadImage(context.Background(), color) @@ -81,12 +82,13 @@ func TestTransformPipelineDepth(t *testing.T) { source := gostream.NewVideoSource(&fake.StaticSource{DepthImg: dm}, prop.Video{}) src, err := camera.WrapVideoSourceWithProjector(context.Background(), source, nil, camera.DepthStream) test.That(t, err, test.ShouldBeNil) - inImg, _, err := camera.ReadImage(context.Background(), src) + inImg, err := camera.DecodeImageFromCamera(context.Background(), "", nil, src) test.That(t, err, test.ShouldBeNil) test.That(t, inImg.Bounds().Dx(), test.ShouldEqual, 128) test.That(t, inImg.Bounds().Dy(), test.ShouldEqual, 72) - depth, err := newTransformPipeline(context.Background(), src, transformConf, r, logger) + streamSrc := streamCameraFromCamera(context.Background(), src) + depth, err := newTransformPipeline(context.Background(), streamSrc, nil, transformConf, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err := camera.ReadImage(context.Background(), depth) @@ -134,7 +136,7 @@ func TestTransformPipelineDepth2(t *testing.T) { test.That(t, err, test.ShouldBeNil) // first depth transform - depth1, err := newTransformPipeline(context.Background(), source, transform1, r, logger) + depth1, err := newTransformPipeline(context.Background(), source, nil, transform1, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err := camera.ReadImage(context.Background(), depth1) test.That(t, err, test.ShouldBeNil) @@ -142,7 +144,7 @@ func TestTransformPipelineDepth2(t *testing.T) { test.That(t, outImg.Bounds().Dy(), test.ShouldEqual, 20) test.That(t, depth1.Close(context.Background()), test.ShouldBeNil) // second depth image - depth2, err := newTransformPipeline(context.Background(), source, transform2, r, logger) + depth2, err := newTransformPipeline(context.Background(), source, nil, transform2, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err = camera.ReadImage(context.Background(), depth2) test.That(t, err, test.ShouldBeNil) @@ -164,7 +166,7 @@ func TestNullPipeline(t *testing.T) { test.That(t, err, test.ShouldBeNil) source, err := camera.NewVideoSourceFromReader(context.Background(), &fake.StaticSource{ColorImg: img}, nil, camera.UnspecifiedStream) test.That(t, err, test.ShouldBeNil) - _, err = newTransformPipeline(context.Background(), source, transform1, r, logger) + _, err = newTransformPipeline(context.Background(), source, nil, transform1, r, logger) test.That(t, err, test.ShouldNotBeNil) test.That(t, err.Error(), test.ShouldContainSubstring, "pipeline has no transforms") @@ -172,7 +174,7 @@ func TestNullPipeline(t *testing.T) { Source: "source", Pipeline: []Transformation{{Type: "identity", Attributes: nil}}, } - pipe, err := newTransformPipeline(context.Background(), source, transform2, r, logger) + pipe, err := newTransformPipeline(context.Background(), source, nil, transform2, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err := camera.ReadImage(context.Background(), pipe) // should not transform anything test.That(t, err, test.ShouldBeNil) @@ -206,7 +208,7 @@ func TestPipeIntoPipe(t *testing.T) { }, } - pipe1, err := newTransformPipeline(context.Background(), source, transform1, r, logger) + pipe1, err := newTransformPipeline(context.Background(), source, nil, transform1, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err := camera.ReadImage(context.Background(), pipe1) test.That(t, err, test.ShouldBeNil) @@ -217,7 +219,7 @@ func TestPipeIntoPipe(t *testing.T) { test.That(t, prop.IntrinsicParams.Width, test.ShouldEqual, 128) test.That(t, prop.IntrinsicParams.Height, test.ShouldEqual, 72) // transform pipeline into pipeline - pipe2, err := newTransformPipeline(context.Background(), pipe1, transform2, r, logger) + pipe2, err := newTransformPipeline(context.Background(), pipe1, nil, transform2, r, logger) test.That(t, err, test.ShouldBeNil) outImg, _, err = camera.ReadImage(context.Background(), pipe2) test.That(t, err, test.ShouldBeNil) diff --git a/components/camera/transformpipeline/preprocessing.go b/components/camera/transformpipeline/preprocessing.go index 73624c1bcbe..508b85936f7 100644 --- a/components/camera/transformpipeline/preprocessing.go +++ b/components/camera/transformpipeline/preprocessing.go @@ -14,11 +14,11 @@ import ( // preprocessDepthTransform applies pre-processing functions to depth maps in order to smooth edges and fill holes. type preprocessDepthTransform struct { - src camera.VideoSource + src camera.StreamCamera } -func newDepthPreprocessTransform(ctx context.Context, source camera.VideoSource, -) (camera.VideoSource, camera.ImageType, error) { +func newDepthPreprocessTransform(ctx context.Context, source camera.StreamCamera, +) (camera.StreamCamera, camera.ImageType, error) { reader := &preprocessDepthTransform{source} props, err := propsFromVideoSource(ctx, source) diff --git a/components/camera/transformpipeline/segmenter.go b/components/camera/transformpipeline/segmenter.go index 0ba705ce544..98fe74be804 100644 --- a/components/camera/transformpipeline/segmenter.go +++ b/components/camera/transformpipeline/segmenter.go @@ -23,7 +23,7 @@ type segmenterConfig struct { // segmenterSource takes a pointcloud from the camera and applies a segmenter to it. type segmenterSource struct { - src camera.VideoSource + src camera.StreamCamera cameraName string segmenterName string r robot.Robot @@ -31,11 +31,11 @@ type segmenterSource struct { func newSegmentationsTransform( ctx context.Context, - source camera.VideoSource, + source camera.StreamCamera, r robot.Robot, am utils.AttributeMap, sourceString string, -) (camera.VideoSource, camera.ImageType, error) { +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*segmenterConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/transformpipeline/segmenter_test.go b/components/camera/transformpipeline/segmenter_test.go index a00242a3999..f77d0bdfff5 100644 --- a/components/camera/transformpipeline/segmenter_test.go +++ b/components/camera/transformpipeline/segmenter_test.go @@ -63,7 +63,8 @@ func TestTransformSegmenterProps(t *testing.T) { _, err = conf.Validate("path") test.That(t, err, test.ShouldBeNil) - _, err = newTransformPipeline(context.Background(), cam, transformConf, r, logger) + streamCamera := streamCameraFromCamera(context.Background(), cam) + _, err = newTransformPipeline(context.Background(), streamCamera, nil, transformConf, r, logger) test.That(t, err, test.ShouldBeNil) transformConf = &transformConfig{ @@ -151,7 +152,8 @@ func TestTransformSegmenterFunctionality(t *testing.T) { }, } - pipeline, err := newTransformPipeline(context.Background(), cam, transformConf, r, logger) + streamCamera := streamCameraFromCamera(context.Background(), cam) + pipeline, err := newTransformPipeline(context.Background(), streamCamera, nil, transformConf, r, logger) test.That(t, err, test.ShouldBeNil) pc, err := pipeline.NextPointCloud(context.Background()) diff --git a/components/camera/transformpipeline/transform.go b/components/camera/transformpipeline/transform.go index 2b23743bb5a..5ff3c8efb8e 100644 --- a/components/camera/transformpipeline/transform.go +++ b/components/camera/transformpipeline/transform.go @@ -136,11 +136,11 @@ func (Transformation) JSONSchema() *jsonschema.Schema { func buildTransform( ctx context.Context, r robot.Robot, - source camera.VideoSource, + source camera.StreamCamera, stream camera.ImageType, tr Transformation, sourceString string, -) (camera.VideoSource, camera.ImageType, error) { +) (camera.StreamCamera, camera.ImageType, error) { switch transformType(tr.Type) { case transformTypeUnspecified, transformTypeIdentity: return source, stream, nil diff --git a/components/camera/transformpipeline/undistort.go b/components/camera/transformpipeline/undistort.go index a24dc18cceb..3565f7a23be 100644 --- a/components/camera/transformpipeline/undistort.go +++ b/components/camera/transformpipeline/undistort.go @@ -22,14 +22,14 @@ type undistortConfig struct { // undistortSource will undistort the original image according to the Distortion parameters // within the intrinsic parameters. type undistortSource struct { - originalSource camera.VideoSource + originalSource camera.StreamCamera stream camera.ImageType cameraParams *transform.PinholeCameraModel } func newUndistortTransform( - ctx context.Context, source camera.VideoSource, stream camera.ImageType, am utils.AttributeMap, -) (camera.VideoSource, camera.ImageType, error) { + ctx context.Context, source camera.StreamCamera, stream camera.ImageType, am utils.AttributeMap, +) (camera.StreamCamera, camera.ImageType, error) { conf, err := resource.TransformAttributeMap[*undistortConfig](am) if err != nil { return nil, camera.UnspecifiedStream, err diff --git a/components/camera/videosource/query.go b/components/camera/videosource/query.go new file mode 100644 index 00000000000..4fd9361af85 --- /dev/null +++ b/components/camera/videosource/query.go @@ -0,0 +1,225 @@ +package videosource + +import ( + "math" + "strings" + "time" + + "github.com/pion/mediadevices" + "github.com/pion/mediadevices/pkg/driver" + "github.com/pion/mediadevices/pkg/driver/availability" + "github.com/pion/mediadevices/pkg/driver/camera" + "github.com/pion/mediadevices/pkg/io/video" + "github.com/pion/mediadevices/pkg/prop" + "github.com/pkg/errors" + + "go.viam.com/rdk/logging" +) + +// Below is adapted from github.com/pion/mediadevices. +// It is further adapted from gostream's query.go +// However, this is the minimum code needed for webcam to work, placed in this directory. +// This vastly improves the debugging and feature development experience, by not over-DRY-ing. + +// GetNamedVideoSource attempts to find a device (not a screen) by the given name. +// If name is empty, it finds any device. +func getReaderAndDriver( + name string, + constraints mediadevices.MediaStreamConstraints, + logger logging.Logger, +) (video.Reader, driver.Driver, error) { + var ptr *string + if name == "" { + ptr = nil + } else { + ptr = &name + } + d, selectedMedia, err := getUserVideoDriver(constraints, ptr, logger) + if err != nil { + return nil, nil, err + } + reader, err := newReaderFromDriver(d, selectedMedia) + if err != nil { + return nil, nil, err + } + return reader, d, nil +} + +func getUserVideoDriver( + constraints mediadevices.MediaStreamConstraints, + label *string, + logger logging.Logger, +) (driver.Driver, prop.Media, error) { + var videoConstraints mediadevices.MediaTrackConstraints + if constraints.Video != nil { + constraints.Video(&videoConstraints) + } + return selectVideo(videoConstraints, label, logger) +} + +func newReaderFromDriver( + videoDriver driver.Driver, + mediaProp prop.Media, +) (video.Reader, error) { + recorder, ok := videoDriver.(driver.VideoRecorder) + if !ok { + return nil, errors.New("driver not a driver.VideoRecorder") + } + + if ok, err := driver.IsAvailable(videoDriver); !errors.Is(err, availability.ErrUnimplemented) && !ok { + return nil, errors.Wrap(err, "video driver not available") + } else if driverStatus := videoDriver.Status(); driverStatus != driver.StateClosed { + return nil, errors.New("video driver in use") + } else if err := videoDriver.Open(); err != nil { + return nil, errors.Wrap(err, "cannot open video driver") + } + + mediaProp.DiscardFramesOlderThan = time.Second + reader, err := recorder.VideoRecord(mediaProp) + if err != nil { + return nil, err + } + return reader, nil +} + +func labelFilter(target string, useSep bool) driver.FilterFn { + return driver.FilterFn(func(d driver.Driver) bool { + if !useSep { + return d.Info().Label == target + } + labels := strings.Split(d.Info().Label, camera.LabelSeparator) + for _, label := range labels { + if label == target { + return true + } + } + return false + }) +} + +func selectVideo( + constraints mediadevices.MediaTrackConstraints, + label *string, + logger logging.Logger, +) (driver.Driver, prop.Media, error) { + return selectBestDriver(getVideoFilterBase(), getVideoFilter(label), constraints, logger) +} + +func getVideoFilterBase() driver.FilterFn { + typeFilter := driver.FilterVideoRecorder() + notScreenFilter := driver.FilterNot(driver.FilterDeviceType(driver.Screen)) + return driver.FilterAnd(typeFilter, notScreenFilter) +} + +func getVideoFilter(label *string) driver.FilterFn { + filter := getVideoFilterBase() + if label != nil { + filter = driver.FilterAnd(filter, labelFilter(*label, true)) + } + return filter +} + +// select implements SelectSettings algorithm. +// Reference: https://w3c.github.io/mediacapture-main/#dfn-selectsettings +func selectBestDriver( + baseFilter driver.FilterFn, + filter driver.FilterFn, + constraints mediadevices.MediaTrackConstraints, + logger logging.Logger, +) (driver.Driver, prop.Media, error) { + var bestDriver driver.Driver + var bestProp prop.Media + minFitnessDist := math.Inf(1) + + baseDrivers := driver.GetManager().Query(baseFilter) + logger.Debugw("before specific filter, we found the following drivers", "count", len(baseDrivers)) + for _, d := range baseDrivers { + logger.Debugw(d.Info().Label, "priority", float32(d.Info().Priority), "type", d.Info().DeviceType) + } + + driverProperties := queryDriverProperties(filter, logger) + if len(driverProperties) == 0 { + logger.Debugw("found no drivers matching filter") + } else { + logger.Debugw("found drivers matching specific filter", "count", len(driverProperties)) + } + for d, props := range driverProperties { + priority := float64(d.Info().Priority) + logger.Debugw( + "considering driver", + "label", d.Info().Label, + "priority", priority) + for _, p := range props { + fitnessDist, ok := constraints.MediaConstraints.FitnessDistance(p) + if !ok { + logger.Debugw("driver does not satisfy any constraints", "label", d.Info().Label) + continue + } + fitnessDistWithPriority := fitnessDist - priority + logger.Debugw( + "driver properties satisfy some constraints", + "label", d.Info().Label, + "props", p, + "distance", fitnessDist, + "distance_with_priority", fitnessDistWithPriority) + if fitnessDistWithPriority < minFitnessDist { + minFitnessDist = fitnessDistWithPriority + bestDriver = d + bestProp = p + } + } + } + + if bestDriver == nil { + return nil, prop.Media{}, errors.New("failed to find the best driver that fits the constraints") + } + + logger.Debugw("winning driver", "label", bestDriver.Info().Label, "props", bestProp) + selectedMedia := prop.Media{} + selectedMedia.MergeConstraints(constraints.MediaConstraints) + selectedMedia.Merge(bestProp) + return bestDriver, selectedMedia, nil +} + +func queryDriverProperties( + filter driver.FilterFn, + logger logging.Logger, +) map[driver.Driver][]prop.Media { + var needToClose []driver.Driver + drivers := driver.GetManager().Query(filter) + m := make(map[driver.Driver][]prop.Media) + + for _, d := range drivers { + var status string + isAvailable, err := driver.IsAvailable(d) + if errors.Is(err, availability.ErrUnimplemented) { + s := d.Status() + status = string(s) + isAvailable = s == driver.StateClosed + } else if err != nil { + status = err.Error() + } + + if isAvailable { + err := d.Open() + if err != nil { + logger.Debugw("error opening driver for querying", "error", err) + // Skip this driver if we failed to open because we can't get the properties + continue + } + needToClose = append(needToClose, d) + m[d] = d.Properties() + } else { + logger.Debugw("driver not available", "name", d.Info().Name, "label", d.Info().Label, "status", status) + } + } + + for _, d := range needToClose { + // Since it was closed, we should close it to avoid a leak + if err := d.Close(); err != nil { + logger.Errorw("error closing driver", "error", err) + } + } + + return m +} diff --git a/components/camera/videosource/webcam.go b/components/camera/videosource/webcam.go index f061fe3cc24..0b5dfad57f3 100644 --- a/components/camera/videosource/webcam.go +++ b/components/camera/videosource/webcam.go @@ -1,4 +1,4 @@ -// Package videosource implements various camera models including webcam +// Package videosource implements webcam. It should be renamed webcam. package videosource import ( @@ -6,29 +6,29 @@ import ( _ "embed" "encoding/json" "fmt" - "image" "path/filepath" "strings" "sync" "time" "github.com/pion/mediadevices" - "github.com/pion/mediadevices/pkg/driver" + driverutils "github.com/pion/mediadevices/pkg/driver" "github.com/pion/mediadevices/pkg/driver/availability" mediadevicescamera "github.com/pion/mediadevices/pkg/driver/camera" "github.com/pion/mediadevices/pkg/frame" + "github.com/pion/mediadevices/pkg/io/video" "github.com/pion/mediadevices/pkg/prop" "github.com/pkg/errors" - "go.uber.org/multierr" pb "go.viam.com/api/component/camera/v1" goutils "go.viam.com/utils" "go.viam.com/rdk/components/camera" - "go.viam.com/rdk/gostream" "go.viam.com/rdk/logging" "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" + "go.viam.com/rdk/rimage" "go.viam.com/rdk/rimage/transform" + "go.viam.com/rdk/utils" ) // ModelWebcam is the name of the webcam component. @@ -37,12 +37,11 @@ var ModelWebcam = resource.DefaultModelFamily.WithModel("webcam") //go:embed data/intrinsics.json var intrinsics []byte -var data map[string]transform.PinholeCameraIntrinsics - -// getVideoDrivers is a helper callback passed to the registered Discover func to get all video drivers. -func getVideoDrivers() []driver.Driver { - return driver.GetManager().Query(driver.FilterVideoRecorder()) -} +var ( + errClosed = errors.New("camera has been closed") + errDisconnected = errors.New("camera is disconnected; please try again in a few moments") + data map[string]transform.PinholeCameraIntrinsics +) func init() { resource.RegisterComponent( @@ -51,6 +50,10 @@ func init() { resource.Registration[camera.Camera, *WebcamConfig]{ Constructor: NewWebcam, Discover: func(ctx context.Context, logger logging.Logger, extra map[string]interface{}) (interface{}, error) { + // getVideoDrivers is a callback passed to the registered Discover func to get all video drivers. + getVideoDrivers := func() []driverutils.Driver { + return driverutils.GetManager().Query(driverutils.FilterVideoRecorder()) + } return Discover(ctx, getVideoDrivers, logger) }, }) @@ -59,11 +62,10 @@ func init() { } } -// getProperties is a helper func for webcam discovery that returns the Media properties of a specific driver. -// It is NOT related to the GetProperties camera proto API. -func getProperties(d driver.Driver) (_ []prop.Media, err error) { +// getDriverProperties is a helper func for webcam discovery that returns the Media properties of a specific driver. +func getDriverProperties(d driverutils.Driver) (_ []prop.Media, err error) { // Need to open driver to get properties - if d.Status() == driver.StateClosed { + if d.Status() == driverutils.StateClosed { errOpen := d.Open() if errOpen != nil { return nil, errOpen @@ -78,14 +80,13 @@ func getProperties(d driver.Driver) (_ []prop.Media, err error) { } // Discover webcam attributes. -func Discover(ctx context.Context, getDrivers func() []driver.Driver, logger logging.Logger) (*pb.Webcams, error) { +func Discover(ctx context.Context, getDrivers func() []driverutils.Driver, logger logging.Logger) (*pb.Webcams, error) { mediadevicescamera.Initialize() var webcams []*pb.Webcam drivers := getDrivers() for _, d := range drivers { driverInfo := d.Info() - - props, err := getProperties(d) + props, err := getDriverProperties(d) if len(props) == 0 { logger.CDebugw(ctx, "no properties detected for driver, skipping discovery...", "driver", driverInfo.Label) continue @@ -94,7 +95,7 @@ func Discover(ctx context.Context, getDrivers func() []driver.Driver, logger log continue } - if d.Status() == driver.StateRunning { + if d.Status() == driverutils.StateRunning { logger.CDebugw(ctx, "driver is in use, skipping discovery...", "driver", driverInfo.Label) continue } @@ -162,14 +163,6 @@ func (c WebcamConfig) Validate(path string) ([]string, error) { return []string{}, nil } -// needsDriverReinit is a helper to check for config diffs and returns true iff the driver needs to be reinitialized. -func (c WebcamConfig) needsDriverReinit(other WebcamConfig) bool { - return !(c.Format == other.Format && - c.Path == other.Path && - c.Width == other.Width && - c.Height == other.Height) -} - // makeConstraints is a helper that returns constraints to mediadevices in order to find and make a video source. // Constraints are specifications for the video stream such as frame format, resolution etc. func makeConstraints(conf *WebcamConfig, debug bool, logger logging.Logger) mediadevices.MediaStreamConstraints { @@ -216,109 +209,70 @@ func makeConstraints(conf *WebcamConfig, debug bool, logger logging.Logger) medi } } -// getNamedVideoSource is a helper function for trying to open a webcam that attempts to -// find a video device (not a screen) by the given name. -// -// First it will try to use the path name after evaluating any symbolic links. If -// evaluation fails, it will try to use the path name as provided. -func getNamedVideoSource( +// findReaderAndDriver finds a video device and returns an image reader and the driver instance, +// as well as the path to the driver. +func findReaderAndDriver( + conf *WebcamConfig, path string, - fromLabel bool, - constraints mediadevices.MediaStreamConstraints, logger logging.Logger, -) (gostream.MediaSource[image.Image], error) { - if !fromLabel { +) (video.Reader, driverutils.Driver, string, error) { + mediadevicescamera.Initialize() + debug := conf.Debug + constraints := makeConstraints(conf, debug, logger) + + // Handle specific path + if path != "" { resolvedPath, err := filepath.EvalSymlinks(path) if err == nil { path = resolvedPath } - } - return gostream.GetNamedVideoSource(filepath.Base(path), constraints, logger) -} - -// tryWebcamOpen is a helper for finding and making the video source that uses getNamedVideoSource to try and find -// a video device (gostream.MediaSource). If successful, it will wrap that MediaSource in a VideoSource. -func tryWebcamOpen( - ctx context.Context, - conf *WebcamConfig, - path string, - fromLabel bool, - constraints mediadevices.MediaStreamConstraints, - logger logging.Logger, -) (gostream.VideoSource, error) { - source, err := getNamedVideoSource(path, fromLabel, constraints, logger) - if err != nil { - return nil, err - } - - if conf.Width != 0 && conf.Height != 0 { - img, release, err := gostream.ReadMedia(ctx, source) - if release != nil { - defer release() - } + reader, driver, err := getReaderAndDriver(filepath.Base(path), constraints, logger) if err != nil { - return nil, err - } - if img.Bounds().Dx() != conf.Width || img.Bounds().Dy() != conf.Height { - return nil, errors.Errorf("requested width and height (%dx%d) are not available for this webcam"+ - " (closest driver found by gostream supports resolution %dx%d)", - conf.Width, conf.Height, img.Bounds().Dx(), img.Bounds().Dy()) + return nil, nil, "", err } - } - return source, nil -} - -// getPathFromVideoSource is a helper function for finding and making the video source that -// returns the path derived from the underlying driver via MediaSource or an empty string if a path is not found. -func getPathFromVideoSource(src gostream.VideoSource, logger logging.Logger) string { - labels, err := gostream.LabelsFromMediaSource[image.Image, prop.Video](src) - if err != nil || len(labels) == 0 { - logger.Errorw("could not get labels from media source", "error", err) - return "" - } - return labels[0] // path is always the first element -} - -// findAndMakeVideoSource finds a video device and returns a video source with that video device as the source. -func findAndMakeVideoSource( - ctx context.Context, - conf *WebcamConfig, - path string, - logger logging.Logger, -) (gostream.VideoSource, string, error) { - mediadevicescamera.Initialize() - debug := conf.Debug - constraints := makeConstraints(conf, debug, logger) - if path != "" { - cam, err := tryWebcamOpen(ctx, conf, path, false, constraints, logger) - if err != nil { - return nil, "", errors.Wrap(err, "cannot open webcam") + if conf.Width != 0 && conf.Height != 0 { + img, release, err := reader.Read() + if release != nil { + defer release() + } + if err != nil { + return nil, nil, "", err + } + if img.Bounds().Dx() != conf.Width || img.Bounds().Dy() != conf.Height { + return nil, nil, "", errors.Errorf("requested width and height (%dx%d) are not available for this webcam"+ + " (closest driver found supports resolution %dx%d)", + conf.Width, conf.Height, img.Bounds().Dx(), img.Bounds().Dy()) + } } - return cam, path, nil + return reader, driver, path, nil } - source, err := gostream.GetAnyVideoSource(constraints, logger) + // Handle "any" path + reader, driver, err := getReaderAndDriver("", constraints, logger) if err != nil { - return nil, "", errors.Wrap(err, "found no webcams") + return nil, nil, "", errors.Wrap(err, "found no webcams") } - - if path == "" { - path = getPathFromVideoSource(source, logger) + labels := strings.Split(driver.Info().Label, mediadevicescamera.LabelSeparator) + if len(labels) == 0 { + logger.Error("no labels parsed from driver") + return nil, nil, "", nil } + path = labels[0] // path is always the first element - return source, path, nil + return reader, driver, path, nil } -// webcam is a video driver wrapper camera that ensures its underlying source stays connected. +// webcam is a video driver wrapper camera that ensures its underlying driver stays connected. type webcam struct { resource.Named mu sync.RWMutex hasLoggedIntrinsicsInfo bool - underlyingSource gostream.VideoSource - exposedSwapper gostream.HotSwappableVideoSource - exposedProjector camera.VideoSource + cameraModel transform.PinholeCameraModel + + reader video.Reader + driver driverutils.Driver // this is returned to us as a label in mediadevices but our config // treats it as a video path. @@ -356,16 +310,6 @@ func NewWebcam( return cam, nil } -// noopCloser overwrites the actual close method so that the real close method isn't called on Reconfigure. -// TODO(hexbabe): https://viam.atlassian.net/browse/RSDK-9264 -type noopCloser struct { - gostream.VideoSource -} - -func (n *noopCloser) Close(ctx context.Context) error { - return nil -} - func (c *webcam) Reconfigure( ctx context.Context, _ resource.Dependencies, @@ -379,24 +323,14 @@ func (c *webcam) Reconfigure( c.mu.Lock() defer c.mu.Unlock() - cameraModel := camera.NewPinholeModelWithBrownConradyDistortion(newConf.CameraParameters, newConf.DistortionParameters) - projector, err := camera.WrapVideoSourceWithProjector( - ctx, - &noopCloser{c}, - &cameraModel, - camera.ColorStream, - ) - if err != nil { - return err - } + c.cameraModel = camera.NewPinholeModelWithBrownConradyDistortion(newConf.CameraParameters, newConf.DistortionParameters) - needDriverReinit := c.conf.needsDriverReinit(*newConf) - if c.exposedProjector != nil { - goutils.UncheckedError(c.exposedProjector.Close(ctx)) - } - c.exposedProjector = projector + driverReinitNotNeeded := c.conf.Format == newConf.Format && + c.conf.Path == newConf.Path && + c.conf.Width == newConf.Width && + c.conf.Height == newConf.Height - if c.underlyingSource != nil && !needDriverReinit { + if c.driver != nil && c.reader != nil && driverReinitNotNeeded { c.conf = *newConf return nil } @@ -414,59 +348,38 @@ func (c *webcam) Reconfigure( return nil } -// MediaProperties returns the mediadevices Video properties of the underlying camera. -// It fulfills the MediaPropertyProvider interface. -func (c *webcam) MediaProperties(ctx context.Context) (prop.Video, error) { - c.mu.RLock() - defer c.mu.RUnlock() - if c.underlyingSource == nil { - return prop.Video{}, errors.New("no configured camera") - } - if provider, ok := c.underlyingSource.(gostream.VideoPropertyProvider); ok { - return provider.MediaProperties(ctx) - } - return prop.Video{}, nil -} - -// isCameraConnected is a helper for the alive-ness monitoring. +// isCameraConnected is a helper for monitoring connectivity to the driver. func (c *webcam) isCameraConnected() (bool, error) { c.mu.RLock() defer c.mu.RUnlock() - if c.underlyingSource == nil { + if c.driver == nil { return true, errors.New("no configured camera") } - d, err := gostream.DriverFromMediaSource[image.Image, prop.Video](c.underlyingSource) - if err != nil { - return true, errors.Wrap(err, "cannot get driver from media source") - } // TODO(RSDK-1959): this only works for linux - _, err = driver.IsAvailable(d) + _, err := driverutils.IsAvailable(c.driver) return !errors.Is(err, availability.ErrNoDevice), nil } // reconnectCamera tries to reconnect the camera to a driver that matches the config. // Assumes a write lock is held. func (c *webcam) reconnectCamera(conf *WebcamConfig) error { - if c.underlyingSource != nil { + if c.driver != nil { c.logger.Debug("closing current camera") - if err := c.underlyingSource.Close(c.cancelCtx); err != nil { - c.logger.Errorw("failed to close currents camera", "error", err) + if err := c.driver.Close(); err != nil { + c.logger.Errorw("failed to close current camera", "error", err) } - c.underlyingSource = nil + c.driver = nil + c.reader = nil } - newSrc, foundLabel, err := findAndMakeVideoSource(c.cancelCtx, conf, c.targetPath, c.logger) + reader, driver, foundLabel, err := findReaderAndDriver(conf, c.targetPath, c.logger) if err != nil { return errors.Wrap(err, "failed to find camera") } - if c.exposedSwapper == nil { - c.exposedSwapper = gostream.NewHotSwappableVideoSource(newSrc) - } else { - c.exposedSwapper.Swap(newSrc) - } - c.underlyingSource = newSrc + c.reader = reader + c.driver = driver c.disconnected = false c.closed = false if c.targetPath == "" { @@ -537,10 +450,7 @@ func (c *webcam) Monitor() { } func (c *webcam) Images(ctx context.Context) ([]camera.NamedImage, resource.ResponseMetadata, error) { - if c, ok := c.underlyingSource.(camera.ImagesSource); ok { - return c.Images(ctx) - } - img, release, err := camera.ReadImage(ctx, c.underlyingSource) + img, release, err := c.reader.Read() if err != nil { return nil, resource.ResponseMetadata{}, errors.Wrap(err, "monitoredWebcam: call to get Images failed") } @@ -563,113 +473,99 @@ func (c *webcam) ensureActive() error { return nil } -func (c *webcam) Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) { - c.mu.RLock() - defer c.mu.RUnlock() - if err := c.ensureActive(); err != nil { - return nil, err - } - return c.exposedSwapper.Stream(ctx, errHandlers...) -} - func (c *webcam) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { - return camera.ReadImageBytes(ctx, c.underlyingSource, mimeType) -} - -func (c *webcam) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) { - c.mu.RLock() - defer c.mu.RUnlock() - if err := c.ensureActive(); err != nil { - return nil, err + img, release, err := c.reader.Read() + if err != nil { + return nil, camera.ImageMetadata{}, err } - return c.exposedProjector.NextPointCloud(ctx) -} + defer release() -// driverInfo gets the mediadevices Info struct containing info such as name and device type of the given driver. -// It is a helper func for serving Properties. -func (c *webcam) driverInfo() (driver.Info, error) { - c.mu.RLock() - defer c.mu.RUnlock() - if c.underlyingSource == nil { - return driver.Info{}, errors.New("no underlying source found in camera") + if mimeType == "" { + mimeType = utils.MimeTypeJPEG } - d, err := gostream.DriverFromMediaSource[image.Image, prop.Video](c.underlyingSource) + imgBytes, err := rimage.EncodeImage(ctx, img, mimeType) if err != nil { - return driver.Info{}, errors.Wrap(err, "cannot get driver from media source") + return nil, camera.ImageMetadata{}, err } - return d.Info(), nil + return imgBytes, camera.ImageMetadata{MimeType: mimeType}, nil +} + +func (c *webcam) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) { + return nil, errors.New("NextPointCloud unimplemented") } func (c *webcam) Properties(ctx context.Context) (camera.Properties, error) { c.mu.RLock() defer c.mu.RUnlock() + if err := c.ensureActive(); err != nil { return camera.Properties{}, err } - props, err := c.exposedProjector.Properties(ctx) - if err != nil { - return camera.Properties{}, err - } - // Looking for intrinsics in map built using viam camera - // calibration here https://github.com/viam-labs/camera-calibration/tree/main - if props.IntrinsicParams == nil { - dInfo, err := c.driverInfo() - if err != nil { - if !c.hasLoggedIntrinsicsInfo { - c.logger.CErrorw(ctx, "can't find driver info for camera") - c.hasLoggedIntrinsicsInfo = true - } - } - - cameraIntrinsics, exists := data[dInfo.Name] - if !exists { - if !c.hasLoggedIntrinsicsInfo { - c.logger.CInfo(ctx, "camera model not found in known camera models for: ", dInfo.Name, ". returning "+ - "properties without intrinsics") - c.hasLoggedIntrinsicsInfo = true - } - return props, nil - } - if c.conf.Width != 0 { - if c.conf.Width != cameraIntrinsics.Width { - if !c.hasLoggedIntrinsicsInfo { - c.logger.CInfo(ctx, "camera model found in known camera models for: ", dInfo.Name, " but "+ - "intrinsics width doesn't match configured image width") - c.hasLoggedIntrinsicsInfo = true - } - return props, nil - } - } - if c.conf.Height != 0 { - if c.conf.Height != cameraIntrinsics.Height { - if !c.hasLoggedIntrinsicsInfo { - c.logger.CInfo(ctx, "camera model found in known camera models for: ", dInfo.Name, " but "+ - "intrinsics height doesn't match configured image height") - c.hasLoggedIntrinsicsInfo = true - } - return props, nil - } - } - if !c.hasLoggedIntrinsicsInfo { - c.logger.CInfo(ctx, "Intrinsics are known for camera model: ", dInfo.Name, ". adding intrinsics "+ - "to camera properties") - c.hasLoggedIntrinsicsInfo = true - } - props.IntrinsicParams = &cameraIntrinsics - - if c.conf.FrameRate > 0 { - props.FrameRate = c.conf.FrameRate - } - } - return props, nil + var frameRate float32 + if c.conf.FrameRate > 0 { + frameRate = c.conf.FrameRate + } + return camera.Properties{ + SupportsPCD: false, + ImageType: camera.ColorStream, + IntrinsicParams: c.cameraModel.PinholeCameraIntrinsics, + DistortionParams: c.cameraModel.Distortion, + MimeTypes: []string{utils.MimeTypeJPEG, utils.MimeTypePNG, utils.MimeTypeRawRGBA}, + FrameRate: frameRate, + }, nil + + // props, err := c.exposedProjector.Properties(ctx) + // if err != nil { + // return camera.Properties{}, err + // } + // // Looking for intrinsics in map built using viam camera + // // calibration here https://github.com/viam-labs/camera-calibration/tree/main + // if props.IntrinsicParams == nil { + // dInfo := c.getDriverInfo() + // cameraIntrinsics, exists := data[dInfo.Name] + // if !exists { + // if !c.hasLoggedIntrinsicsInfo { + // c.logger.CInfo(ctx, "camera model not found in known camera models for: ", dInfo.Name, ". returning "+ + // "properties without intrinsics") + // c.hasLoggedIntrinsicsInfo = true + // } + // return props, nil + // } + // if c.conf.Width != 0 { + // if c.conf.Width != cameraIntrinsics.Width { + // if !c.hasLoggedIntrinsicsInfo { + // c.logger.CInfo(ctx, "camera model found in known camera models for: ", dInfo.Name, " but "+ + // "intrinsics width doesn't match configured image width") + // c.hasLoggedIntrinsicsInfo = true + // } + // return props, nil + // } + // } + // if c.conf.Height != 0 { + // if c.conf.Height != cameraIntrinsics.Height { + // if !c.hasLoggedIntrinsicsInfo { + // c.logger.CInfo(ctx, "camera model found in known camera models for: ", dInfo.Name, " but "+ + // "intrinsics height doesn't match configured image height") + // c.hasLoggedIntrinsicsInfo = true + // } + // return props, nil + // } + // } + // if !c.hasLoggedIntrinsicsInfo { + // c.logger.CInfo(ctx, "Intrinsics are known for camera model: ", dInfo.Name, ". adding intrinsics "+ + // "to camera properties") + // c.hasLoggedIntrinsicsInfo = true + // } + // props.IntrinsicParams = &cameraIntrinsics + + // if c.conf.FrameRate > 0 { + // props.FrameRate = c.conf.FrameRate + // } + // } + // return props, nil } -var ( - errClosed = errors.New("camera has been closed") - errDisconnected = errors.New("camera is disconnected; please try again in a few moments") -) - func (c *webcam) Close(ctx context.Context) error { c.mu.Lock() if c.closed { @@ -681,15 +577,5 @@ func (c *webcam) Close(ctx context.Context) error { c.cancel() c.activeBackgroundWorkers.Wait() - var err error - if c.exposedSwapper != nil { - err = multierr.Combine(err, c.exposedSwapper.Close(ctx)) - } - if c.exposedProjector != nil { - err = multierr.Combine(err, c.exposedProjector.Close(ctx)) - } - if c.underlyingSource != nil { - err = multierr.Combine(err, c.underlyingSource.Close(ctx)) - } - return err + return c.driver.Close() } diff --git a/components/camera/videosourcewrappers.go b/components/camera/videosourcewrappers.go index 1d98a9c197a..691fd1f6847 100644 --- a/components/camera/videosourcewrappers.go +++ b/components/camera/videosourcewrappers.go @@ -17,33 +17,40 @@ import ( "go.viam.com/rdk/rimage" "go.viam.com/rdk/rimage/depthadapter" "go.viam.com/rdk/rimage/transform" + "go.viam.com/rdk/utils" ) +// FromVideoSource is DEPRECATED!!! Please implement cameras according to the camera.Camera interface. // FromVideoSource creates a Camera resource from a VideoSource. // Note: this strips away Reconfiguration and DoCommand abilities. // If needed, implement the Camera another way. For example, a webcam // implements a Camera manually so that it can atomically reconfigure itself. -func FromVideoSource(name resource.Name, src VideoSource, logger logging.Logger) Camera { +func FromVideoSource(name resource.Name, src StreamCamera, logger logging.Logger) StreamCamera { var rtpPassthroughSource rtppassthrough.Source if ps, ok := src.(rtppassthrough.Source); ok { rtpPassthroughSource = ps } return &sourceBasedCamera{ rtpPassthroughSource: rtpPassthroughSource, - Named: name.AsNamed(), - VideoSource: src, + StreamCamera: src, + name: name, Logger: logger, } } type sourceBasedCamera struct { - resource.Named + StreamCamera resource.AlwaysRebuild - VideoSource + name resource.Name rtpPassthroughSource rtppassthrough.Source logging.Logger } +// Explicitly define Reconfigure to resolve ambiguity. +func (vs *sourceBasedCamera) Reconfigure(ctx context.Context, deps resource.Dependencies, conf resource.Config) error { + return vs.AlwaysRebuild.Reconfigure(ctx, deps, conf) +} + func (vs *sourceBasedCamera) SubscribeRTP( ctx context.Context, bufferSize int, @@ -80,6 +87,7 @@ func (vs *videoSource) Unsubscribe(ctx context.Context, id rtppassthrough.Subscr return errors.New("Unsubscribe unimplemented") } +// NewPinholeModelWithBrownConradyDistortion is DEPRECATED!!! Please implement cameras according to the camera.Camera interface. // NewPinholeModelWithBrownConradyDistortion creates a transform.PinholeCameraModel from // a *transform.PinholeCameraIntrinsics and a *transform.BrownConrady. // If *transform.BrownConrady is `nil`, transform.PinholeCameraModel.Distortion @@ -96,6 +104,7 @@ func NewPinholeModelWithBrownConradyDistortion(pinholeCameraIntrinsics *transfor return cameraModel } +// NewVideoSourceFromReader is DEPRECATED!!! Please implement cameras according to the camera.Camera interface. // NewVideoSourceFromReader creates a VideoSource either with or without a projector. The stream type // argument is for detecting whether or not the resulting camera supports return // of pointcloud data in the absence of an implemented NextPointCloud function. @@ -104,7 +113,7 @@ func NewVideoSourceFromReader( ctx context.Context, reader gostream.VideoReader, syst *transform.PinholeCameraModel, imageType ImageType, -) (VideoSource, error) { +) (StreamCamera, error) { if reader == nil { return nil, errors.New("cannot have a nil reader") } @@ -116,7 +125,7 @@ func NewVideoSourceFromReader( vs := gostream.NewVideoSource(reader, prop.Video{}) actualSystem := syst if actualSystem == nil { - srcCam, ok := reader.(VideoSource) + srcCam, ok := reader.(Camera) if ok { props, err := srcCam.Properties(ctx) if err != nil { @@ -142,6 +151,7 @@ func NewVideoSourceFromReader( }, nil } +// WrapVideoSourceWithProjector is DEPRECATED!!! Please implement cameras according to the camera.Camera interface. // WrapVideoSourceWithProjector creates a Camera either with or without a projector. The stream type // argument is for detecting whether or not the resulting camera supports return // of pointcloud data in the absence of an implemented NextPointCloud function. @@ -150,14 +160,13 @@ func WrapVideoSourceWithProjector( ctx context.Context, source gostream.VideoSource, syst *transform.PinholeCameraModel, imageType ImageType, -) (VideoSource, error) { +) (StreamCamera, error) { if source == nil { return nil, errors.New("cannot have a nil source") } actualSystem := syst if actualSystem == nil { - //nolint:staticcheck srcCam, ok := source.(Camera) if ok { props, err := srcCam.Properties(ctx) @@ -185,6 +194,7 @@ func WrapVideoSourceWithProjector( // videoSource implements a Camera with a gostream.VideoSource. type videoSource struct { + resource.AlwaysRebuild rtpPassthroughSource rtppassthrough.Source videoSource gostream.VideoSource videoStream gostream.VideoStream @@ -197,15 +207,18 @@ func (vs *videoSource) Stream(ctx context.Context, errHandlers ...gostream.Error return vs.videoSource.Stream(ctx, errHandlers...) } -// ReadImageBytes wraps ReadImage given a mimetype to encode the image as bytes data, returning -// supplementary metadata for downstream processing. -// TODO(hexbabe): make function private or remove altogether once the usages are limited to this file. -func ReadImageBytes(ctx context.Context, src gostream.VideoSource, mimeType string) ([]byte, ImageMetadata, error) { - img, release, err := ReadImage(ctx, src) +func (vs *videoSource) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, ImageMetadata, error) { + if sourceCam, ok := vs.actualSource.(Camera); ok { + return sourceCam.Image(ctx, mimeType, extra) + } + img, release, err := ReadImage(ctx, vs.videoSource) if err != nil { return nil, ImageMetadata{}, err } defer release() + if mimeType == "" { + mimeType = utils.MimeTypePNG // default to lossless mimetype such as PNG + } imgBytes, err := rimage.EncodeImage(ctx, img, mimeType) if err != nil { return nil, ImageMetadata{}, err @@ -213,10 +226,6 @@ func ReadImageBytes(ctx context.Context, src gostream.VideoSource, mimeType stri return imgBytes, ImageMetadata{MimeType: mimeType}, nil } -func (vs *videoSource) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, ImageMetadata, error) { - return ReadImageBytes(ctx, vs.videoSource, mimeType) -} - // Images is for getting simultaneous images from different sensors // If the underlying source did not specify an Images function, a default is applied. // The default returns a list of 1 image from ReadImage, and the current time. @@ -268,6 +277,13 @@ func (vs *videoSource) DoCommand(ctx context.Context, cmd map[string]interface{} return nil, resource.ErrDoUnimplemented } +func (vs *videoSource) Name() resource.Name { + if namedResource, ok := vs.actualSource.(resource.Named); ok { + return namedResource.Name() + } + return resource.Name{} +} + func (vs *videoSource) Properties(ctx context.Context) (Properties, error) { _, supportsPCD := vs.actualSource.(PointCloudSource) result := Properties{ @@ -290,5 +306,8 @@ func (vs *videoSource) Properties(ctx context.Context) (Properties, error) { } func (vs *videoSource) Close(ctx context.Context) error { + if res, ok := vs.actualSource.(resource.Resource); ok { + return multierr.Combine(vs.videoStream.Close(ctx), vs.videoSource.Close(ctx), res.Close(ctx)) + } return multierr.Combine(vs.videoStream.Close(ctx), vs.videoSource.Close(ctx)) } diff --git a/robot/client/client_test.go b/robot/client/client_test.go index d5802efd7e6..c18ddfd4173 100644 --- a/robot/client/client_test.go +++ b/robot/client/client_test.go @@ -55,7 +55,6 @@ import ( "go.viam.com/rdk/components/sensor" "go.viam.com/rdk/components/servo" "go.viam.com/rdk/config" - "go.viam.com/rdk/gostream" rgrpc "go.viam.com/rdk/grpc" "go.viam.com/rdk/logging" "go.viam.com/rdk/operation" @@ -505,12 +504,6 @@ func TestStatusClient(t *testing.T) { camera1, err := camera.FromRobot(client, "camera1") test.That(t, err, test.ShouldBeNil) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("ReadImage on missing camera", func(t *testing.T) { - _, _, err = camera.ReadImage(context.Background(), camera1) - test.That(t, err, test.ShouldNotBeNil) - test.That(t, err.Error(), test.ShouldContainSubstring, "not found") - }) imgBytes, metadata, err := camera1.Image(context.Background(), rutils.MimeTypeJPEG, nil) test.That(t, err, test.ShouldNotBeNil) test.That(t, err.Error(), test.ShouldContainSubstring, "not found") @@ -592,16 +585,6 @@ func TestStatusClient(t *testing.T) { camera1, err = camera.FromRobot(client, "camera1") test.That(t, err, test.ShouldBeNil) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("ReadImage on camera with valid response", func(t *testing.T) { - ctx := gostream.WithMIMETypeHint(context.Background(), rutils.MimeTypeRawRGBA) - frame, _, err := camera.ReadImage(ctx, camera1) - test.That(t, err, test.ShouldBeNil) - compVal, _, err := rimage.CompareImages(img, frame) - test.That(t, err, test.ShouldBeNil) - test.That(t, compVal, test.ShouldEqual, 0) // exact copy, no color conversion - }) - frame, err := camera.DecodeImageFromCamera(context.Background(), rutils.MimeTypeRawRGBA, nil, camera1) test.That(t, err, test.ShouldBeNil) compVal, _, err := rimage.CompareImages(img, frame) diff --git a/robot/impl/local_robot_test.go b/robot/impl/local_robot_test.go index ac273051f31..1cc837e617f 100644 --- a/robot/impl/local_robot_test.go +++ b/robot/impl/local_robot_test.go @@ -94,14 +94,6 @@ func TestConfig1(t *testing.T) { test.That(t, err, test.ShouldBeNil) test.That(t, c1.Name(), test.ShouldResemble, camera.Named("c1")) - // TODO(hexbabe): remove below test when Stream/ReadImage pattern is refactored - t.Run("ReadImage from camera", func(t *testing.T) { - pic, _, err := camera.ReadImage(context.Background(), c1) - test.That(t, err, test.ShouldBeNil) - bounds := pic.Bounds() - test.That(t, bounds.Max.X, test.ShouldBeGreaterThanOrEqualTo, 32) - }) - pic, err := camera.DecodeImageFromCamera(context.Background(), rutils.MimeTypeJPEG, nil, c1) test.That(t, err, test.ShouldBeNil) diff --git a/robot/web/stream/server.go b/robot/web/stream/server.go index a54928dcfa3..d858dc766ac 100644 --- a/robot/web/stream/server.go +++ b/robot/web/stream/server.go @@ -22,7 +22,7 @@ import ( "go.viam.com/rdk/logging" "go.viam.com/rdk/resource" "go.viam.com/rdk/robot" - streamCamera "go.viam.com/rdk/robot/web/stream/camera" + cameraStreamUtils "go.viam.com/rdk/robot/web/stream/camera" "go.viam.com/rdk/robot/web/stream/state" rutils "go.viam.com/rdk/utils" ) @@ -170,7 +170,7 @@ func (server *Server) AddStream(ctx context.Context, req *streampb.AddStreamRequ } // return error if resource is neither a camera nor audioinput - _, isCamErr := streamCamera.Camera(server.robot, streamStateToAdd.Stream) + _, isCamErr := cameraStreamUtils.Camera(server.robot, streamStateToAdd.Stream) _, isAudioErr := audioinput.FromRobot(server.robot, resource.SDPTrackNameToShortName(streamStateToAdd.Stream.Name())) if isCamErr != nil && isAudioErr != nil { return nil, errors.Errorf("stream is neither a camera nor audioinput. streamName: %v", streamStateToAdd.Stream) @@ -307,7 +307,7 @@ func (server *Server) RemoveStream(ctx context.Context, req *streampb.RemoveStre shortName := resource.SDPTrackNameToShortName(streamToRemove.Stream.Name()) _, isAudioResourceErr := audioinput.FromRobot(server.robot, shortName) - _, isCameraResourceErr := streamCamera.Camera(server.robot, streamToRemove.Stream) + _, isCameraResourceErr := cameraStreamUtils.Camera(server.robot, streamToRemove.Stream) if isAudioResourceErr != nil && isCameraResourceErr != nil { return &streampb.RemoveStreamResponse{}, nil @@ -392,12 +392,12 @@ func (server *Server) SetStreamOptions( defer server.mu.Unlock() switch cmd { case optionsCommandResize: - err = server.resizeVideoSource(req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) + err = server.resizeVideoSource(ctx, req.Name, int(req.Resolution.Width), int(req.Resolution.Height)) if err != nil { return nil, fmt.Errorf("failed to resize video source for stream %q: %w", req.Name, err) } case optionsCommandReset: - err = server.resetVideoSource(req.Name) + err = server.resetVideoSource(ctx, req.Name) if err != nil { return nil, fmt.Errorf("failed to reset video source for stream %q: %w", req.Name, err) } @@ -426,7 +426,7 @@ func validateSetStreamOptionsRequest(req *streampb.SetStreamOptionsRequest) (int } // resizeVideoSource resizes the video source with the given name. -func (server *Server) resizeVideoSource(name string, width, height int) error { +func (server *Server) resizeVideoSource(ctx context.Context, name string, width, height int) error { existing, ok := server.videoSources[name] if !ok { return fmt.Errorf("video source %q not found", name) @@ -440,7 +440,7 @@ func (server *Server) resizeVideoSource(name string, width, height int) error { if !ok { return fmt.Errorf("stream state not found with name %q", name) } - resizer := gostream.NewResizeVideoSource(cam, width, height) + resizer := gostream.NewResizeVideoSource(camera.VideoSourceFromCamera(ctx, cam), width, height) server.logger.Debugf( "resizing video source to width %d and height %d", width, height, @@ -454,7 +454,7 @@ func (server *Server) resizeVideoSource(name string, width, height int) error { } // resetVideoSource resets the video source with the given name to the source resolution. -func (server *Server) resetVideoSource(name string) error { +func (server *Server) resetVideoSource(ctx context.Context, name string) error { existing, ok := server.videoSources[name] if !ok { return fmt.Errorf("video source %q not found", name) @@ -468,7 +468,7 @@ func (server *Server) resetVideoSource(name string) error { return fmt.Errorf("stream state not found with name %q", name) } server.logger.Debug("resetting video source") - existing.Swap(cam) + existing.Swap(camera.VideoSourceFromCamera(ctx, cam)) err = streamState.Reset() if err != nil { return fmt.Errorf("failed to reset stream %q: %w", name, err) @@ -482,7 +482,7 @@ func (server *Server) resetVideoSource(name string) error { func (server *Server) AddNewStreams(ctx context.Context) error { // Refreshing sources will walk the robot resources for anything implementing the camera and // audioinput APIs and mutate the `svc.videoSources` and `svc.audioSources` maps. - server.refreshVideoSources() + server.refreshVideoSources(ctx) server.refreshAudioSources() if server.streamConfig == (gostream.StreamConfig{}) { @@ -643,18 +643,19 @@ func (server *Server) removeMissingStreams() { } // refreshVideoSources checks and initializes every possible video source that could be viewed from the robot. -func (server *Server) refreshVideoSources() { +func (server *Server) refreshVideoSources(ctx context.Context) { for _, name := range camera.NamesFromRobot(server.robot) { cam, err := camera.FromRobot(server.robot, name) if err != nil { continue } existing, ok := server.videoSources[cam.Name().SDPTrackName()] + src := camera.VideoSourceFromCamera(ctx, cam) if ok { - existing.Swap(cam) + existing.Swap(src) continue } - newSwapper := gostream.NewHotSwappableVideoSource(cam) + newSwapper := gostream.NewHotSwappableVideoSource(src) server.videoSources[cam.Name().SDPTrackName()] = newSwapper } } @@ -743,29 +744,20 @@ func GenerateResolutions(width, height int32, logger logging.Logger) []Resolutio } // sampleFrameSize takes in a camera.Camera, starts a stream, attempts to -// pull a frame using Stream.Next, and returns the width and height. +// pull a frame, and returns the width and height. func sampleFrameSize(ctx context.Context, cam camera.Camera, logger logging.Logger) (int, int, error) { logger.Debug("sampling frame size") - stream, err := cam.Stream(ctx) - if err != nil { - return 0, 0, err - } - defer func() { - if cerr := stream.Close(ctx); cerr != nil { - logger.Error("failed to close stream:", cerr) - } - }() // Attempt to get a frame from the stream with a maximum of 5 retries. // This is useful if cameras have a warm-up period before they can start streaming. var frame image.Image - var release func() + var err error retryLoop: for i := 0; i < 5; i++ { select { case <-ctx.Done(): return 0, 0, ctx.Err() default: - frame, release, err = stream.Next(ctx) + frame, err = camera.DecodeImageFromCamera(ctx, "", nil, cam) if err == nil { break retryLoop // Break out of the for loop, not just the select. } @@ -773,9 +765,6 @@ retryLoop: time.Sleep(retryDelay) } } - if release != nil { - defer release() - } if err != nil { return 0, 0, fmt.Errorf("failed to get frame after 5 attempts: %w", err) } diff --git a/robot/web/stream/state/state.go b/robot/web/stream/state/state.go index de4c0c03cc8..50ea3fcac94 100644 --- a/robot/web/stream/state/state.go +++ b/robot/web/stream/state/state.go @@ -19,7 +19,7 @@ import ( "go.viam.com/rdk/gostream" "go.viam.com/rdk/logging" "go.viam.com/rdk/robot" - streamCamera "go.viam.com/rdk/robot/web/stream/camera" + cameraStreamUtils "go.viam.com/rdk/robot/web/stream/camera" ) // ErrClosed indicates that the StreamState is already closed. @@ -332,7 +332,7 @@ func (state *StreamState) tick() { } func (state *StreamState) streamH264Passthrough() error { - cam, err := streamCamera.Camera(state.robot, state.Stream) + cam, err := cameraStreamUtils.Camera(state.robot, state.Stream) if err != nil { return err } @@ -393,7 +393,7 @@ func (state *StreamState) streamH264Passthrough() error { } func (state *StreamState) unsubscribeH264Passthrough(ctx context.Context, id rtppassthrough.SubscriptionID) error { - cam, err := streamCamera.Camera(state.robot, state.Stream) + cam, err := cameraStreamUtils.Camera(state.robot, state.Stream) if err != nil { return err } diff --git a/services/vision/obstaclesdepth/obstacles_depth.go b/services/vision/obstaclesdepth/obstacles_depth.go index ed450b98d9b..eb76f7f2ace 100644 --- a/services/vision/obstaclesdepth/obstacles_depth.go +++ b/services/vision/obstaclesdepth/obstacles_depth.go @@ -99,8 +99,8 @@ func registerObstaclesDepth( // BuildObsDepth will check for intrinsics and determine how to build based on that. func (o *obsDepth) buildObsDepth(logger logging.Logger) func( - ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { - return func(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { + ctx context.Context, src camera.Camera) ([]*vision.Object, error) { + return func(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { props, err := src.Properties(ctx) if err != nil { logger.CWarnw(ctx, "could not find camera properties. obstacles depth started without camera's intrinsic parameters", "error", err) @@ -116,14 +116,13 @@ func (o *obsDepth) buildObsDepth(logger logging.Logger) func( } // buildObsDepthNoIntrinsics will return the median depth in the depth map as a Geometry point. -func (o *obsDepth) obsDepthNoIntrinsics(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { - pic, release, err := camera.ReadImage(ctx, src) +func (o *obsDepth) obsDepthNoIntrinsics(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { + img, err := camera.DecodeImageFromCamera(ctx, "", nil, src) if err != nil { return nil, errors.Errorf("could not get image from %s", src) } - defer release() - dm, err := rimage.ConvertImageToDepthMap(ctx, pic) + dm, err := rimage.ConvertImageToDepthMap(ctx, img) if err != nil { return nil, errors.New("could not convert image to depth map") } @@ -144,17 +143,16 @@ func (o *obsDepth) obsDepthNoIntrinsics(ctx context.Context, src camera.VideoSou // buildObsDepthWithIntrinsics will use the methodology in Manduchi et al. to find obstacle points // before clustering and projecting those points into 3D obstacles. -func (o *obsDepth) obsDepthWithIntrinsics(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { +func (o *obsDepth) obsDepthWithIntrinsics(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { // Check if we have intrinsics here. If not, don't even try if o.intrinsics == nil { return nil, errors.New("tried to build obstacles depth with intrinsics but no instrinsics found") } - pic, release, err := camera.ReadImage(ctx, src) + img, err := camera.DecodeImageFromCamera(ctx, "", nil, src) if err != nil { return nil, errors.Errorf("could not get image from %s", src) } - defer release() - dm, err := rimage.ConvertImageToDepthMap(ctx, pic) + dm, err := rimage.ConvertImageToDepthMap(ctx, img) if err != nil { return nil, errors.New("could not convert image to depth map") } diff --git a/services/vision/obstaclesdistance/obstacles_distance.go b/services/vision/obstaclesdistance/obstacles_distance.go index ef887e48275..2aec6b39fcf 100644 --- a/services/vision/obstaclesdistance/obstacles_distance.go +++ b/services/vision/obstaclesdistance/obstacles_distance.go @@ -75,7 +75,7 @@ func registerObstacleDistanceDetector( return nil, errors.New("config for obstacles_distance cannot be nil") } - segmenter := func(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { + segmenter := func(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { clouds := make([]pointcloud.PointCloud, 0, conf.NumQueries) for i := 0; i < conf.NumQueries; i++ { diff --git a/services/vision/vision.go b/services/vision/vision.go index e763c31db23..c4dd3add012 100644 --- a/services/vision/vision.go +++ b/services/vision/vision.go @@ -308,11 +308,10 @@ func (vm *vizModel) DetectionsFromCamera( if err != nil { return nil, errors.Wrapf(err, "could not find camera named %s", cameraName) } - img, release, err := camera.ReadImage(ctx, cam) + img, err := camera.DecodeImageFromCamera(ctx, "", extra, cam) if err != nil { return nil, errors.Wrapf(err, "could not get image from %s", cameraName) } - defer release() return vm.detectorFunc(ctx, img) } @@ -351,11 +350,10 @@ func (vm *vizModel) ClassificationsFromCamera( if err != nil { return nil, errors.Wrapf(err, "could not find camera named %s", cameraName) } - img, release, err := camera.ReadImage(ctx, cam) + img, err := camera.DecodeImageFromCamera(ctx, "", extra, cam) if err != nil { return nil, errors.Wrapf(err, "could not get image from %s", cameraName) } - defer release() fullClassifications, err := vm.classifierFunc(ctx, img) if err != nil { return nil, errors.Wrap(err, "could not get classifications from image") @@ -401,11 +399,10 @@ func (vm *vizModel) CaptureAllFromCamera( if err != nil { return viscapture.VisCapture{}, errors.Wrapf(err, "could not find camera named %s", cameraName) } - img, release, err := camera.ReadImage(ctx, cam) + img, err := camera.DecodeImageFromCamera(ctx, "", extra, cam) if err != nil { return viscapture.VisCapture{}, errors.Wrapf(err, "could not get image from %s", cameraName) } - defer release() logger := vm.r.Logger() var detections []objectdetection.Detection if opt.ReturnDetections { diff --git a/testutils/inject/camera.go b/testutils/inject/camera.go index 975bb9b58d0..1671cf07005 100644 --- a/testutils/inject/camera.go +++ b/testutils/inject/camera.go @@ -52,20 +52,6 @@ func (c *Camera) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, err return nil, errors.New("NextPointCloud unimplemented") } -// Stream calls the injected Stream or the real version. -func (c *Camera) Stream( - ctx context.Context, - errHandlers ...gostream.ErrorHandler, -) (gostream.VideoStream, error) { - if c.StreamFunc != nil { - return c.StreamFunc(ctx, errHandlers...) - } - if c.Camera != nil { - return c.Camera.Stream(ctx, errHandlers...) - } - return nil, errors.Wrap(ctx.Err(), "no stream function available") -} - // Image calls the injected Image or the real version. func (c *Camera) Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) { if c.ImageFunc != nil { diff --git a/vision/segmentation/detections_to_objects.go b/vision/segmentation/detections_to_objects.go index 8bce8e4dbbf..10f79ac1cac 100644 --- a/vision/segmentation/detections_to_objects.go +++ b/vision/segmentation/detections_to_objects.go @@ -37,7 +37,7 @@ func (dsc *DetectionSegmenterConfig) ConvertAttributes(am utils.AttributeMap) er func cameraToProjector( ctx context.Context, - source camera.VideoSource, + source camera.Camera, ) (transform.Projector, error) { if source == nil { return nil, errors.New("cannot have a nil source") @@ -76,7 +76,7 @@ func DetectionSegmenter(detector objectdetection.Detector, meanK int, sigma, con } } // return the segmenter - seg := func(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { + seg := func(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { proj, err := cameraToProjector(ctx, src) if err != nil { return nil, err diff --git a/vision/segmentation/er_ccl_clustering.go b/vision/segmentation/er_ccl_clustering.go index 987c7ceb792..f5e684524af 100644 --- a/vision/segmentation/er_ccl_clustering.go +++ b/vision/segmentation/er_ccl_clustering.go @@ -129,7 +129,7 @@ func NewERCCLClustering(params utils.AttributeMap) (Segmenter, error) { } // ErCCLAlgorithm applies the connected components clustering algorithm to a VideoSource. -func (erCCL *ErCCLConfig) ErCCLAlgorithm(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { +func (erCCL *ErCCLConfig) ErCCLAlgorithm(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { // get next point cloud cloud, err := src.NextPointCloud(ctx) if err != nil { diff --git a/vision/segmentation/radius_clustering.go b/vision/segmentation/radius_clustering.go index 588ab8f6596..1d2628a6ddb 100644 --- a/vision/segmentation/radius_clustering.go +++ b/vision/segmentation/radius_clustering.go @@ -87,7 +87,7 @@ func NewRadiusClustering(params utils.AttributeMap) (Segmenter, error) { } // RadiusClustering applies the radius clustering algorithm directly on a given point cloud. -func (rcc *RadiusClusteringConfig) RadiusClustering(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { +func (rcc *RadiusClusteringConfig) RadiusClustering(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { // get next point cloud cloud, err := src.NextPointCloud(ctx) if err != nil { diff --git a/vision/segmentation/radius_clustering_voxel.go b/vision/segmentation/radius_clustering_voxel.go index 06966ba1001..939c7437ba5 100644 --- a/vision/segmentation/radius_clustering_voxel.go +++ b/vision/segmentation/radius_clustering_voxel.go @@ -85,7 +85,7 @@ func NewRadiusClusteringFromVoxels(params utils.AttributeMap) (Segmenter, error) } // RadiusClusteringVoxels turns the cloud into a voxel grid and then does radius clustering to segment it. -func (rcc *RadiusClusteringVoxelConfig) RadiusClusteringVoxels(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) { +func (rcc *RadiusClusteringVoxelConfig) RadiusClusteringVoxels(ctx context.Context, src camera.Camera) ([]*vision.Object, error) { // get next point cloud and convert it to a VoxelGrid // NOTE(bh): Maybe one day cameras will return voxel grids directly. cloud, err := src.NextPointCloud(ctx) diff --git a/vision/segmentation/segmenter.go b/vision/segmentation/segmenter.go index b856ea0a693..e418daeaf2f 100644 --- a/vision/segmentation/segmenter.go +++ b/vision/segmentation/segmenter.go @@ -8,4 +8,4 @@ import ( ) // A Segmenter is a function that takes images/pointclouds from an input source and segments them into objects. -type Segmenter func(ctx context.Context, src camera.VideoSource) ([]*vision.Object, error) +type Segmenter func(ctx context.Context, src camera.Camera) ([]*vision.Object, error)