diff --git a/examples/stream-reader/main.go b/examples/stream-reader/main.go new file mode 100644 index 0000000..3498aa3 --- /dev/null +++ b/examples/stream-reader/main.go @@ -0,0 +1,171 @@ +package main + +import ( + "fmt" + "io" + "log" + "os" + + "git.kingecg.top/kingecg/goffmpeg/pkg/ffmpeg" + "git.kingecg.top/kingecg/goffmpeg/pkg/ffmpeg/stream" +) + +// Example 1: Read media info from a URL (local file or network stream) +func exampleReadMediaInfo(inputURL string) { + fmt.Printf("\n=== Example 1: Reading Media Info ===\n") + + sr, err := stream.NewStreamReader(stream.TranscodeOptions{ + InputURL: inputURL, + }) + if err != nil { + log.Printf("Failed to create stream reader: %v", err) + return + } + defer sr.Close() + + info, err := sr.MediaInfo() + if err != nil { + log.Printf("Failed to get media info: %v", err) + return + } + + fmt.Printf("Media Info:\n") + fmt.Printf(" Duration: %v\n", info.Duration) + fmt.Printf(" Start Time: %v\n", info.StartTime) + fmt.Printf(" Bit Rate: %d bps\n", info.BitRate) + fmt.Printf(" Streams: %d\n", len(info.Streams)) + + for i, s := range info.Streams { + fmt.Printf(" Stream %d:\n", i) + fmt.Printf(" Type: %v\n", s.Type) + fmt.Printf(" Codec: %s (%s)\n", s.CodecName, s.CodecLongName) + if s.Type == ffmpeg.CodecTypeVideo { + fmt.Printf(" Resolution: %dx%d\n", s.Width, s.Height) + fmt.Printf(" Pixel Format: %s\n", s.PixelFormat) + } else if s.Type == ffmpeg.CodecTypeAudio { + fmt.Printf(" Sample Rate: %d\n", s.SampleRate) + fmt.Printf(" Channels: %d\n", s.Channels) + fmt.Printf(" Channel Layout: %s\n", s.ChannelLayout) + } + } +} + +// Example 2: Stream copy (remux) from input to output +func exampleStreamCopy(inputURL, outputURL string) { + fmt.Printf("\n=== Example 2: Stream Copy (Remux) ===\n") + + sr, err := stream.NewStreamReader(stream.TranscodeOptions{ + InputURL: inputURL, + }) + if err != nil { + log.Printf("Failed to create stream reader: %v", err) + return + } + defer sr.Close() + + // Create output file + outFile, err := os.Create(outputURL) + if err != nil { + log.Printf("Failed to create output file: %v", err) + return + } + defer outFile.Close() + + // Read data from stream reader and write to output + fmt.Printf("Copying stream from %s to %s...\n", inputURL, outputURL) + + buf := make([]byte, 32768) + written := 0 + for { + n, err := sr.Read(buf) + if n > 0 { + outFile.Write(buf[:n]) + written += n + fmt.Printf("\rWritten: %d bytes", written) + } + if err != nil { + if err == io.EOF { + fmt.Println("\nStream copy completed!") + break + } + log.Printf("\nError reading stream: %v", err) + break + } + } +} + +// Example 3: Read from io.Reader +func exampleReadFromReader(inputURL string) { + fmt.Printf("\n=== Example 3: Read from io.Reader ===\n") + + // Open input file as io.Reader + inFile, err := os.Open(inputURL) + if err != nil { + log.Printf("Failed to open input file: %v", err) + return + } + defer inFile.Close() + + sr, err := stream.NewStreamReader(stream.TranscodeOptions{ + InputIO: inFile, + }) + if err != nil { + log.Printf("Failed to create stream reader: %v", err) + return + } + defer sr.Close() + + info, err := sr.MediaInfo() + if err != nil { + log.Printf("Failed to get media info: %v", err) + return + } + + fmt.Printf("Media info from io.Reader:\n") + fmt.Printf(" Duration: %v\n", info.Duration) + fmt.Printf(" Streams: %d\n", len(info.Streams)) +} + +func main() { + if len(os.Args) < 2 { + fmt.Println("Usage: stream-reader [args]") + fmt.Println("\nCommands:") + fmt.Println(" info - Read and display media info") + fmt.Println(" copy - Copy/remux stream") + fmt.Println(" reader - Read from io.Reader") + fmt.Println("\nExamples:") + fmt.Println(" stream-reader info /path/to/video.mp4") + fmt.Println(" stream-reader copy input.mp4 output.flv") + fmt.Println(" stream-reader reader /path/to/video.mp4") + os.Exit(1) + } + + command := os.Args[1] + + switch command { + case "info": + if len(os.Args) < 3 { + fmt.Println("Usage: stream-reader info ") + os.Exit(1) + } + exampleReadMediaInfo(os.Args[2]) + + case "copy": + if len(os.Args) < 4 { + fmt.Println("Usage: stream-reader copy ") + os.Exit(1) + } + exampleStreamCopy(os.Args[2], os.Args[3]) + + case "reader": + if len(os.Args) < 3 { + fmt.Println("Usage: stream-reader reader ") + os.Exit(1) + } + exampleReadFromReader(os.Args[2]) + + default: + fmt.Printf("Unknown command: %s\n", command) + os.Exit(1) + } +} diff --git a/pkg/ffmpeg/cgo.go b/pkg/ffmpeg/cgo.go index 20f6bc5..f0ec8e8 100644 --- a/pkg/ffmpeg/cgo.go +++ b/pkg/ffmpeg/cgo.go @@ -9,4 +9,36 @@ package ffmpeg #include #include */ -import "C" \ No newline at end of file +import "C" + +// Duration returns the duration in microseconds +func (fc *FormatContext) Duration() int64 { + if fc.ptr == nil { + return 0 + } + return int64(fc.ptr.duration) +} + +// StartTime returns the start time in microseconds +func (fc *FormatContext) StartTime() int64 { + if fc.ptr == nil { + return 0 + } + return int64(fc.ptr.start_time) +} + +// BitRate returns the bit rate +func (fc *FormatContext) BitRate() int64 { + if fc.ptr == nil { + return 0 + } + return int64(fc.ptr.bit_rate) +} + +// ChannelLayout returns the channel layout +func (cp *CodecParameters) ChannelLayout() uint64 { + if cp.ptr == nil { + return 0 + } + return uint64(cp.ptr.channel_layout) +} \ No newline at end of file diff --git a/pkg/ffmpeg/stream/errors.go b/pkg/ffmpeg/stream/errors.go new file mode 100644 index 0000000..e965913 --- /dev/null +++ b/pkg/ffmpeg/stream/errors.go @@ -0,0 +1,13 @@ +package stream + +import "errors" + +var ( + ErrNoInput = errors.New("no input specified") + ErrInvalidInput = errors.New("invalid input") + ErrInvalidOutput = errors.New("invalid output") + ErrStreamClosed = errors.New("stream closed") + ErrNotImplemented = errors.New("not implemented") + ErrCodecNotFound = errors.New("codec not found") + ErrFormatNotSupported = errors.New("format not supported") +) diff --git a/pkg/ffmpeg/stream/helpers.go b/pkg/ffmpeg/stream/helpers.go new file mode 100644 index 0000000..cc66485 --- /dev/null +++ b/pkg/ffmpeg/stream/helpers.go @@ -0,0 +1,153 @@ +package stream + +// codecIDToName converts codec ID to codec name +func codecIDToName(codecID int) string { + switch codecID { + case 1: // AV_CODEC_ID_MPEG1VIDEO + return "mpeg1video" + case 2: // AV_CODEC_ID_MPEG2VIDEO + return "mpeg2video" + case 4: // AV_CODEC_ID_MPEG4 + return "mpeg4" + case 10: // AV_CODEC_ID_MJPEG + return "mjpeg" + case 14: // AV_CODEC_ID_MJPEG2000 + return "jpeg2000" + case 15: // AV_CODEC_ID_H264 + return "h264" + case 16: // AV_CODEC_ID_H265 / AV_CODEC_ID_HEVC + return "hevc" + case 22: // AV_CODEC_ID_VP8 + return "vp8" + case 23: // AV_CODEC_ID_VP9 + return "vp9" + case 27: // AV_CODEC_ID_THEORA + return "theora" + case 28: // AV_CODEC_ID_AAC + return "aac" + case 34: // AV_CODEC_ID_MP3 + return "mp3" + case 64: // AV_CODEC_ID_AC3 + return "ac3" + case 65: // AV_CODEC_ID_DTS + return "dca" + case 86: // AV_CODEC_ID_OPUS + return "opus" + case 87: // AV_CODEC_ID_VORBIS + return "vorbis" + case 88: // AV_CODEC_ID_FLAC + return "flac" + default: + return "unknown" + } +} + +// codecIDToLongName converts codec ID to long name +func codecIDToLongName(codecID int) string { + switch codecID { + case 1: + return "MPEG-1 video" + case 2: + return "MPEG-2 video" + case 4: + return "MPEG-4 part 2" + case 15: + return "H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10" + case 16: + return "H.265 / HEVC" + case 22: + return "On2 VP8" + case 23: + return "Google VP9" + case 27: + return "Theora" + case 28: + return "AAC (Advanced Audio Coding)" + case 34: + return "MP3 (MPEG audio layer 3)" + case 64: + return "ATSC A/52A (AC-3)" + case 65: + return "DCA (DTS Coherent Acoustics)" + case 86: + return "Opus" + case 87: + return "Vorbis" + case 88: + return "FLAC (Free Lossless Audio Codec)" + default: + return "Unknown codec" + } +} + +// pixelFormatToString converts pixel format enum to string +func pixelFormatToString(format int) string { + switch format { + case 0: + return "yuv420p" + case 1: + return "yuyv422" + case 2: + return "yuv422p" + case 3: + return "yuv444p" + case 4: + return "yuv410p" + case 5: + return "yuv411p" + case 6: + return "gray" + case 12: + return "yuv420p16le" + case 13: + return "yuv420p16be" + case 14: + return "yuv422p16le" + case 15: + return "yuv422p16be" + case 23: + return "yuv420p10le" + case 24: + return "yuv420p10be" + case 30: + return "rgb24" + case 31: + return "bgr24" + case 35: + return "rgb0" + case 36: + return "bgr0" + default: + return "unknown" + } +} + +// channelLayoutToString converts channel layout to string +func channelLayoutToString(layout uint64) string { + switch layout { + case 0x1: + return "mono" + case 0x3: + return "stereo" + case 0x7: + return "2.1" + case 0x33: + return "3.0" + case 0x3f: + return "3.1" + case 0x63: + return "4.0" + case 0x107: + return "4.0" + case 0x3ff: + return "5.0" + case 0x303f: + return "5.0(side)" + case 0xc0f: + return "5.1" + case 0x18ff: + return "7.1" + default: + return "unknown" + } +} diff --git a/pkg/ffmpeg/stream/mediaInfo.go b/pkg/ffmpeg/stream/mediaInfo.go new file mode 100644 index 0000000..5021cc4 --- /dev/null +++ b/pkg/ffmpeg/stream/mediaInfo.go @@ -0,0 +1,83 @@ +package stream + +import ( + "time" + + "git.kingecg.top/kingecg/goffmpeg/pkg/ffmpeg" +) + +// MediaInfo represents media stream information +type MediaInfo struct { + Duration time.Duration + StartTime time.Duration + BitRate int64 + Format string + FormatLong string + Streams []*StreamInfo +} + +// StreamInfo represents a single stream's information +type StreamInfo struct { + Index int + Type ffmpeg.CodecType + CodecName string + CodecLongName string + Width int // video only + Height int // video only + PixelFormat string // video only + SampleRate int // audio only + Channels int // audio only + ChannelLayout string // audio only + FrameSize int + TimeBase ffmpeg.Rational + BitRate int64 +} + +// NewMediaInfo creates MediaInfo from FormatContext +func NewMediaInfo(fc *ffmpeg.FormatContext) (*MediaInfo, error) { + if fc == nil { + return nil, nil + } + + info := &MediaInfo{ + Streams: make([]*StreamInfo, 0), + } + + // Get format info + info.Duration = time.Duration(fc.Duration()) * time.Microsecond / 1000 + info.StartTime = time.Duration(fc.StartTime()) * time.Microsecond / 1000 + info.BitRate = fc.BitRate() + + // Get streams info + streams := fc.Streams() + for _, s := range streams { + si := &StreamInfo{ + Index: s.Index(), + Type: s.Type(), + TimeBase: s.TimeBase(), + } + + cp := s.CodecParameters() + if cp != nil { + si.CodecName = codecIDToName(cp.CodecID()) + si.CodecLongName = codecIDToLongName(cp.CodecID()) + si.BitRate = cp.BitRate() + si.FrameSize = cp.FrameSize() + + switch si.Type { + case ffmpeg.CodecTypeVideo: + si.Width = cp.Width() + si.Height = cp.Height() + si.PixelFormat = pixelFormatToString(cp.Format()) + case ffmpeg.CodecTypeAudio: + si.SampleRate = cp.SampleRate() + si.Channels = cp.Channels() + si.ChannelLayout = channelLayoutToString(cp.ChannelLayout()) + } + } + + info.Streams = append(info.Streams, si) + } + + return info, nil +} diff --git a/pkg/ffmpeg/stream/options.go b/pkg/ffmpeg/stream/options.go new file mode 100644 index 0000000..b35ad2f --- /dev/null +++ b/pkg/ffmpeg/stream/options.go @@ -0,0 +1,51 @@ +package stream + +import ( + "io" + + "git.kingecg.top/kingecg/goffmpeg/pkg/ffmpeg" +) + +// TranscodeOptions defines options for stream transcode/transform +type TranscodeOptions struct { + // Input options + InputURL string // rtsp://, http://, https://, ws://, or file path + InputIO io.Reader // If set, InputURL is ignored (data piped to FFmpeg) + + // Output options + OutputFormat string // Output container format: "mp4", "flv", "mkv", "raw" + VideoCodec string // Video codec: "libx264", "libvpx", "copy", empty = no video + AudioCodec string // Audio codec: "aac", "mp3", "copy", empty = no audio + + // Video options + VideoWidth int + VideoHeight int + VideoBitRate int64 + FrameRate ffmpeg.Rational + PixelFormat string + VideoFilter string // FFmpeg video filter string + + // Audio options + AudioBitRate int64 + AudioSampleRate int + AudioChannels int + AudioFilter string // FFmpeg audio filter string + + // General options + BufferSize int // Internal buffer size (default 4096) +} + +// Validate validates the options +func (o *TranscodeOptions) Validate() error { + // At least one input required + if o.InputURL == "" && o.InputIO == nil { + return ErrNoInput + } + + // Buffer size default + if o.BufferSize <= 0 { + o.BufferSize = 4096 + } + + return nil +} diff --git a/pkg/ffmpeg/stream/pipe.go b/pkg/ffmpeg/stream/pipe.go new file mode 100644 index 0000000..95105b8 --- /dev/null +++ b/pkg/ffmpeg/stream/pipe.go @@ -0,0 +1,70 @@ +package stream + +import ( + "io" + "os" + "sync" +) + +// pipeReader bridges io.Reader to a file descriptor that can be used as FFmpeg input +type pipeReader struct { + r io.Reader + pr *io.PipeReader + pw *io.PipeWriter + f *os.File + err error + mu sync.Mutex +} + +// newPipeReader creates a new pipe reader that bridges io.Reader to a pipe +func newPipeReader(r io.Reader) (*pipeReader, error) { + pr, pw := io.Pipe() + return &pipeReader{ + r: r, + pr: pr, + pw: pw, + }, nil +} + +// start begins copying data from the underlying reader to the pipe in background +func (p *pipeReader) start() { + go func() { + buf := make([]byte, 32768) + for { + n, err := p.r.Read(buf) + if n > 0 { + _, writeErr := p.pw.Write(buf[:n]) + if writeErr != nil { + p.mu.Lock() + p.err = writeErr + p.mu.Unlock() + p.pw.CloseWithError(writeErr) + return + } + } + if err != nil { + p.pw.CloseWithError(err) + return + } + } + }() +} + +// Read implements io.Reader +func (p *pipeReader) Read(data []byte) (n int, err error) { + return p.pr.Read(data) +} + +// Close closes the pipe +func (p *pipeReader) Close() error { + err := p.pr.Close() + p.pw.Close() + return err +} + +// Error returns any error that occurred during reading +func (p *pipeReader) Error() error { + p.mu.Lock() + defer p.mu.Unlock() + return p.err +} diff --git a/pkg/ffmpeg/stream/reader.go b/pkg/ffmpeg/stream/reader.go new file mode 100644 index 0000000..50ab117 --- /dev/null +++ b/pkg/ffmpeg/stream/reader.go @@ -0,0 +1,225 @@ +package stream + +import ( + "bytes" + "io" + "sync" + + "git.kingecg.top/kingecg/goffmpeg/pkg/ffmpeg" +) + +// StreamReader provides a high-level interface for reading media streams +type StreamReader interface { + // Read implements io.Reader - reads transcoded data + Read(p []byte) (n int, err error) + + // MediaInfo returns media stream information + MediaInfo() (*MediaInfo, error) + + // Close releases all resources + Close() error +} + +// streamReader implements StreamReader +type streamReader struct { + opts *TranscodeOptions + mediaInfo *MediaInfo + + // FFmpeg components + inputCtx *ffmpeg.FormatContext + outputCtx *ffmpeg.OutputFormatContext + + // Decoding/Encoding contexts + decCtxs map[int]*ffmpeg.Context // stream index -> decoder context + encCtxs map[int]*ffmpeg.Context // stream index -> encoder context + + // Internal buffer for output + buffer *bytes.Buffer + bufferSize int + + // State + mu sync.Mutex + closed bool + eof bool + + // For io.Reader input + pipeReader *pipeReader +} + +// NewStreamReader creates a new StreamReader with the given options +func NewStreamReader(opts TranscodeOptions) (StreamReader, error) { + if err := opts.Validate(); err != nil { + return nil, err + } + + sr := &streamReader{ + opts: &opts, + decCtxs: make(map[int]*ffmpeg.Context), + encCtxs: make(map[int]*ffmpeg.Context), + buffer: new(bytes.Buffer), + bufferSize: opts.BufferSize, + } + + if sr.bufferSize <= 0 { + sr.bufferSize = 4096 + } + + // Handle io.Reader input via pipe + if opts.InputIO != nil { + pr, err := newPipeReader(opts.InputIO) + if err != nil { + return nil, err + } + sr.pipeReader = pr + // TODO: Start background goroutine to feed data to FFmpeg via pipe + // This requires implementing a custom IO context in FFmpeg + } + + // Open input + inputURL := opts.InputURL + if inputURL == "" { + // For io.Reader input, we need to handle it differently + // This is a placeholder - actual implementation would need custom IO + inputURL = "pipe:0" + } + + sr.inputCtx = ffmpeg.AllocFormatContext() + if err := sr.inputCtx.OpenInput(inputURL); err != nil { + sr.inputCtx.Free() + return nil, err + } + + if err := sr.inputCtx.FindStreamInfo(); err != nil { + sr.inputCtx.Free() + return nil, err + } + + // Build media info + sr.mediaInfo, _ = NewMediaInfo(sr.inputCtx) + + return sr, nil +} + +// Read implements io.Reader +func (sr *streamReader) Read(p []byte) (n int, err error) { + sr.mu.Lock() + defer sr.mu.Unlock() + + if sr.closed { + return 0, ErrStreamClosed + } + + // If buffer has data, read from it + if sr.buffer.Len() > 0 { + return sr.buffer.Read(p) + } + + // Check if we're done + if sr.eof { + return 0, io.EOF + } + + // Transcode more data + if err := sr.transcodeNext(); err != nil { + if err == io.EOF { + sr.eof = true + return 0, io.EOF + } + return 0, err + } + + // Try reading again from buffer + if sr.buffer.Len() > 0 { + return sr.buffer.Read(p) + } + + return 0, nil +} + +// transcodeNext reads, decodes, encodes, and muxes the next packet +func (sr *streamReader) transcodeNext() error { + pkt := ffmpeg.AllocPacket() + defer pkt.Free() + + err := sr.inputCtx.ReadPacket(pkt) + if err != nil { + return err + } + + streamIdx := pkt.StreamIndex() + streams := sr.inputCtx.Streams() + if streamIdx < 0 || streamIdx >= len(streams) { + pkt.Unref() + return nil + } + + stream := streams[streamIdx] + _ = stream.Type() // stream type for future transcode logic + + // For now, implement pass-through (stream copy) mode + // Full transcode would require decoding, filtering, re-encoding + + // Write packet directly to output buffer (simplified) + data := pkt.Data() + if len(data) > 0 { + sr.buffer.Write(data) + } + + pkt.Unref() + return nil +} + +// MediaInfo returns the media information +func (sr *streamReader) MediaInfo() (*MediaInfo, error) { + sr.mu.Lock() + defer sr.mu.Unlock() + return sr.mediaInfo, nil +} + +// Close releases all resources +func (sr *streamReader) Close() error { + sr.mu.Lock() + defer sr.mu.Unlock() + + if sr.closed { + return nil + } + + sr.closed = true + + // Close pipe reader if used + if sr.pipeReader != nil { + sr.pipeReader.Close() + } + + // Close decoder contexts + for _, ctx := range sr.decCtxs { + ctx.Close() + ctx.Free() + } + + // Close encoder contexts + for _, ctx := range sr.encCtxs { + ctx.Close() + ctx.Free() + } + + // Close input context + if sr.inputCtx != nil { + sr.inputCtx.Free() + } + + // Close output context + if sr.outputCtx != nil { + sr.outputCtx.CloseOutput() + sr.outputCtx.Free() + } + + return nil +} + +// RawFormatContext returns the underlying input FormatContext for advanced operations +// This is kept for compatibility but returns nil in this implementation +func (sr *streamReader) RawFormatContext() *ffmpeg.FormatContext { + return sr.inputCtx +}