Skip to content

Commit

Permalink
Fix: Add save on disk
Browse files Browse the repository at this point in the history
  • Loading branch information
hvantoan committed Jun 17, 2024
1 parent d11cf6c commit 5124d08
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 27 deletions.
10 changes: 8 additions & 2 deletions muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ type Muxer struct {
// than saving them on RAM, but allows to preserve RAM.
Directory string

// Using Directory to save, and using ram to cache segments.
// This segment has save to disk when close.
IsPlayBack bool

// Deprecated: replaced with SegmentMinDuration
SegmentDuration time.Duration
// Deprecated: replaced with PartMinDuration
Expand Down Expand Up @@ -124,7 +128,7 @@ func (m *Muxer) Start() error {

default:
if m.SegmentCount < 3 {
return fmt.Errorf("The minimum number of HLS segments is 3")
return fmt.Errorf("the minimum number of HLS segments is 3")
}
}

Expand All @@ -151,7 +155,7 @@ func (m *Muxer) Start() error {
}

if m.Directory != "" {
m.storageFactory = storage.NewFactoryDisk(m.Directory)
m.storageFactory = storage.NewFactoryDisk(m.Directory, m.IsPlayBack)
} else {
m.storageFactory = storage.NewFactoryRAM()
}
Expand All @@ -162,6 +166,8 @@ func (m *Muxer) Start() error {
videoTrack: m.VideoTrack,
audioTrack: m.AudioTrack,
prefix: m.prefix,
isPlayBack: m.IsPlayBack,
playList: "",
}
m.server.initialize()

Expand Down
64 changes: 54 additions & 10 deletions muxer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"math"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -294,6 +296,8 @@ func filterOutHLSParams(rawQuery string) string {
}

func generateMediaPlaylistFMP4(
playList string,
isPlayaBack bool,
isDeltaUpdate bool,
variant MuxerVariant,
segments []muxerSegment,
Expand All @@ -311,6 +315,8 @@ func generateMediaPlaylistFMP4(
Version: 10,
TargetDuration: targetDuration,
MediaSequence: segmentDeleteCount,
Playlist: playList,
IsPlayBack: isPlayaBack,
}

if variant == MuxerVariantLowLatency {
Expand Down Expand Up @@ -432,6 +438,8 @@ func generateMediaPlaylistFMP4(
}

func generateMediaPlaylist(
playList string,
isPlayBack bool,
isDeltaUpdate bool,
variant MuxerVariant,
segments []muxerSegment,
Expand All @@ -450,6 +458,8 @@ func generateMediaPlaylist(
}

return generateMediaPlaylistFMP4(
playList,
isPlayBack,
isDeltaUpdate,
variant,
segments,
Expand Down Expand Up @@ -479,6 +489,8 @@ type muxerServer struct {
nextSegmentParts []*muxerPart
nextPartID uint64
init []byte
playList string
isPlayBack bool
}

func (s *muxerServer) initialize() {
Expand Down Expand Up @@ -560,7 +572,10 @@ func (s *muxerServer) handle(w http.ResponseWriter, r *http.Request) {

case (s.variant != MuxerVariantMPEGTS && strings.HasSuffix(name, ".mp4")) ||
(s.variant == MuxerVariantMPEGTS && strings.HasSuffix(name, ".ts")):
s.handleSegmentOrPart(name, w)

dir := path.Dir(r.RequestURI)
identifier := path.Base(dir)
s.handleSegmentOrPart(name, identifier, w)
}
}

Expand Down Expand Up @@ -642,6 +657,8 @@ func (s *muxerServer) handleMediaPlaylist(
}

byts, err := generateMediaPlaylist(
s.playList,
s.isPlayBack,
isDeltaUpdate,
s.variant,
s.segments,
Expand Down Expand Up @@ -691,6 +708,8 @@ func (s *muxerServer) handleMediaPlaylist(
}

byts, err := generateMediaPlaylist(
s.playList,
s.isPlayBack,
isDeltaUpdate,
s.variant,
s.segments,
Expand Down Expand Up @@ -741,23 +760,39 @@ func (s *muxerServer) handleInitFile(w http.ResponseWriter) {
w.Write(init)
}

func (s *muxerServer) handleSegmentOrPart(fname string, w http.ResponseWriter) {
func localReader(fpath string) (io.ReadCloser, error) {
r, err := os.Open(fpath)
if err != nil {
return nil, err
}

return r, nil
}

func (s *muxerServer) handleSegmentOrPart(fname string, key string, w http.ResponseWriter) {
switch {
case strings.HasPrefix(fname, s.prefix+"_"+"seg"):
s.mutex.Lock()
segment, ok := s.segmentsByName[fname]
s.mutex.Unlock()

var rw io.ReadCloser
if !ok {
return
r, err := localReader("./.videos/" + key + "/" + fname)
rw = r
if err != nil {
return
}
} else {
r, err := segment.reader()
rw = r
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}

r, err := segment.reader()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
defer r.Close()
defer rw.Close()

w.Header().Set("Cache-Control", "max-age="+segmentMaxAge)
w.Header().Set(
Expand All @@ -770,7 +805,7 @@ func (s *muxerServer) handleSegmentOrPart(fname string, w http.ResponseWriter) {
}(),
)
w.WriteHeader(http.StatusOK)
io.Copy(w, r)
io.Copy(w, rw)

case s.variant == MuxerVariantLowLatency && strings.HasPrefix(fname, s.prefix+"_"+"part"):
s.mutex.Lock()
Expand Down Expand Up @@ -852,6 +887,15 @@ func (s *muxerServer) publishSegmentInner(segment muxerSegment) error {
toDelete := s.segments[0]

if toDeleteSeg, ok := toDelete.(*muxerSegmentFMP4); ok {

Check failure on line 889 in muxer_server.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unnecessary leading newline (whitespace)

// Update Playlist
u := toDeleteSeg.name
plse := &playlist.MediaSegment{
Duration: toDeleteSeg.getDuration(),
URI: u,
DateTime: &toDeleteSeg.startNTP,
}
s.playList += plse.Marshal()
for _, part := range toDeleteSeg.parts {
delete(s.partsByName, part.getName())
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/playlist/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ type Media struct {
Map *MediaMap

// #EXT-X-SKIP
Skip *MediaSkip
Skip *MediaSkip
IsPlayBack bool
Playlist string

// segments
// at least one is required
Expand Down Expand Up @@ -366,9 +368,13 @@ func (m Media) Marshal() ([]byte, error) {
ret += m.PartInf.marshal()
}

ret += "#EXT-X-MEDIA-SEQUENCE:" + strconv.FormatInt(int64(m.MediaSequence), 10) + "\n"
start := strconv.FormatInt(int64(m.MediaSequence), 10)
if m.IsPlayBack {
start = "7"
}
ret += "#EXT-X-MEDIA-SEQUENCE:" + start + "\n"

if m.DiscontinuitySequence != nil {
if m.DiscontinuitySequence != nil && !m.IsPlayBack {
ret += "#EXT-X-DISCONTINUITY-SEQUENCE:" + strconv.FormatInt(int64(m.MediaSequence), 10) + "\n"
}

Expand All @@ -380,12 +386,16 @@ func (m Media) Marshal() ([]byte, error) {
ret += m.Map.marshal()
}

if m.Skip != nil {
if m.Skip != nil && !m.IsPlayBack {
ret += m.Skip.marshal()
}

if m.IsPlayBack {
ret += m.Playlist
}

for _, seg := range m.Segments {
ret += seg.marshal()
ret += seg.Marshal()
}

for _, part := range m.Parts {
Expand Down
2 changes: 1 addition & 1 deletion pkg/playlist/media_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s MediaSegment) validate() error {
return nil
}

func (s MediaSegment) marshal() string {
func (s MediaSegment) Marshal() string {

Check warning on line 50 in pkg/playlist/media_segment.go

View workflow job for this annotation

GitHub Actions / golangci-lint

exported: exported method MediaSegment.Marshal should have comment or be unexported (revive)
ret := ""

if s.DateTime != nil {
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/factory_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import (
)

type factoryDisk struct {
dirPath string
dirPath string
isPlayBack bool
}

// NewFactoryDisk allocates a disk-backed factory.
func NewFactoryDisk(dirPath string) Factory {
func NewFactoryDisk(dirPath string, isPlayBack bool) Factory {
return &factoryDisk{
dirPath: dirPath,
dirPath: dirPath,
isPlayBack: isPlayBack,
}
}

// NewFile implements Factory.
func (s *factoryDisk) NewFile(fileName string) (File, error) {
return newFileDisk(filepath.Join(s.dirPath, fileName))
return newFileDisk(filepath.Join(s.dirPath, fileName), s.isPlayBack)
}
2 changes: 1 addition & 1 deletion pkg/storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestStorage(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

s = NewFactoryDisk(dir)
s = NewFactoryDisk(dir, false)
}

seg, err := s.NewFile("myseg.mp4")
Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/file_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@ import (
)

type fileDisk struct {
noRemove bool
fpath string
f *os.File
parts []*partDisk
finalSize uint64
}

func newFileDisk(fpath string) (File, error) {
func newFileDisk(fpath string, noRemove bool) (File, error) {
f, err := os.Create(fpath)
if err != nil {
return nil, err
}

return &fileDisk{
fpath: fpath,
f: f,
noRemove: noRemove,
fpath: fpath,
f: f,
}, nil
}

Expand All @@ -47,7 +49,9 @@ func (s *fileDisk) Finalize() {

// Remove implements File.
func (s *fileDisk) Remove() {
os.Remove(s.fpath)
if !s.noRemove {
os.Remove(s.fpath)
}
}

// NewPart implements File.
Expand Down

0 comments on commit 5124d08

Please sign in to comment.