Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
wader committed Nov 16, 2023
1 parent 35f8379 commit 0331698
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 118 deletions.
10 changes: 8 additions & 2 deletions format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@ type Pg_BTree_In struct {
Page int `doc:"First page number in file, default is 0"`
}

type MpegTsIn struct {
MaxSyncSeek int `doc:"Max byte distance to next sync"`
}

type MpegTsStream struct {
ProgramPid int
Type int
Expand All @@ -415,12 +419,14 @@ type MpegTsProgram struct {
}

type MpegTsPacketIn struct {
ProgramMap map[int]MpegTsProgram
StreamMap map[int]MpegTsStream
ProgramMap map[int]MpegTsProgram
StreamMap map[int]MpegTsStream
ContinuityMap map[int]int
}

type MpegTsPacketOut struct {
Pid int
TransportErrorIndicator bool
ContinuityCounter int
TransportScramblingControl int
PayloadUnitStart bool
Expand Down
54 changes: 38 additions & 16 deletions format/mpeg/mpeg_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package mpeg
// TODO: mpeg_pes, share code?
// TODO: mpeg_pes_packet, length 0 for video?
// TODO: dup start?
// TODO: transport error indicator, count somehow? now mpeg_ts_packet fails

// ffmpeg $(for i in $(seq 0 50); do echo "-f lavfi -i sine"; done) -t 100ms $(for i in $(seq 0 50); do echo "-map $i:0"; done) test2.ts

Expand Down Expand Up @@ -45,6 +46,9 @@ func init() {
{Groups: []*decode.Group{format.MPEG_TS_PMT}, Out: &mpegTsMpegTsPmtGroup},
{Groups: []*decode.Group{format.MPEG_PES_Packet}, Out: &mpegTsMpegPesPacketGroup},
},
DefaultInArg: format.MpegTsIn{
MaxSyncSeek: 100 * 1024,
},
})
}

Expand All @@ -65,18 +69,19 @@ func (tb *tsBuffer) Reset() {
// new bytes buffer to not share byte slice
tb.buf = bytes.Buffer{}
tb.packetIndexes = nil

}

type tsContinuityMap map[int]int

func (tcm tsContinuityMap) Update(pid int, n int) bool {
current, currentOk := tcm[pid]
tcm[pid] = n
if currentOk {
return (current+1)&0xf == n
func tsContinuityUpdate(tcm map[int]int, pid int, current int) bool {
prev, prevFound := tcm[pid]
valid := (prevFound && ((prev+1)&0xf == current)) || current == 0
if valid {
tcm[pid] = current
return true
}
return n == 0
if prevFound {
delete(tcm, pid)
}
return valid
}

func tsPesDecode(d *decode.D, pid int, programPid int, streamType int, pesBuf *tsBuffer) {
Expand All @@ -93,37 +98,54 @@ func tsPesDecode(d *decode.D, pid int, programPid int, streamType int, pesBuf *t
}

func tsDecode(d *decode.D) any {
var ti format.MpegTsIn

d.ArgAs(&ti)

var tableReassemble = map[int]*tsBuffer{}
var pesReassemble = map[int]*tsBuffer{}
pidProgramMap := map[int]format.MpegTsProgram{}
pidStreamMap := map[int]format.MpegTsStream{}
continuityMap := tsContinuityMap(map[int]int{})
continuityMap := map[int]int{}
packetIndex := 0
decodeFailures := 0

tablesD := d.FieldArrayValue("tables")
pesD := d.FieldArrayValue("pes")

d.FieldArray("packets", func(d *decode.D) {
for !d.End() {
_, v, err := d.TryFieldFormat(
syncLen, _, err := d.TryPeekFind(8, 8, int64(ti.MaxSyncSeek), func(v uint64) bool {
return v == 0x47
})
if err != nil || syncLen < 0 {
break
}
if syncLen > 0 {
d.SeekRel(syncLen)
}

_, v, err := d.TryFieldFormatLen(
"packet",
tsPacketLength,
&mpegTsMpegTsPacketGroup,
format.MpegTsPacketIn{
ProgramMap: pidProgramMap,
StreamMap: pidStreamMap,
ProgramMap: pidProgramMap,
StreamMap: pidStreamMap,
ContinuityMap: continuityMap,
},
)
if err != nil {
// TODO: malformted packet, how?
d.FieldRawLen("packet", tsPacketLength)
decodeFailures++
d.SeekRel(8)
continue
}
mtpo, mtpoOk := v.(format.MpegTsPacketOut)
if !mtpoOk {
panic("packet is not a MpegTsPacketOut")
}

isContinous := continuityMap.Update(mtpo.Pid, mtpo.ContinuityCounter)
isContinous := tsContinuityUpdate(continuityMap, mtpo.Pid, mtpo.ContinuityCounter)
isTable := tsPidIsTable(mtpo.Pid, pidProgramMap)
stream, isStream := pidStreamMap[mtpo.Pid]

Expand Down
213 changes: 113 additions & 100 deletions format/mpeg/mpeg_ts_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,119 +22,132 @@ func tsPacketDecode(d *decode.D) any {
if d.ArgAs(&mtpi) {
mtpi.ProgramMap = map[int]format.MpegTsProgram{}
mtpi.StreamMap = map[int]format.MpegTsStream{}
mtpi.ContinuityMap = map[int]int{}
}

var mtpo format.MpegTsPacketOut

d.FramedFn(tsPacketLength, func(d *decode.D) {
d.FieldU8("sync", scalar.UintHex) // TODO: sometimes not 0x47? d.UintAssert(0x47)
d.FieldBool("transport_error_indicator")
mtpo.PayloadUnitStart = d.FieldBool("payload_unit_start")
d.FieldBool("transport_priority")
pid := d.FieldU13("pid", tsPidMap, scalar.UintHex)
if p, ok := mtpi.ProgramMap[int(pid)]; ok {
d.FieldU8("sync", scalar.UintHex, d.UintAssert(0x47))
mtpo.TransportErrorIndicator = d.FieldBool("transport_error_indicator", d.BoolAssert(false))
mtpo.PayloadUnitStart = d.FieldBool("payload_unit_start")
d.FieldBool("transport_priority")
pid := d.FieldU13("pid", tsPidMap, scalar.UintHex)
if p, ok := mtpi.ProgramMap[int(pid)]; ok {
d.FieldValueUint("program", uint64(p.Number), scalar.UintHex)
} else if s, ok := mtpi.StreamMap[int(pid)]; ok {
if p, ok := mtpi.ProgramMap[s.ProgramPid]; ok {
d.FieldValueUint("program", uint64(p.Number), scalar.UintHex)
} else if s, ok := mtpi.StreamMap[int(pid)]; ok {
if p, ok := mtpi.ProgramMap[s.ProgramPid]; ok {
d.FieldValueUint("program", uint64(p.Number), scalar.UintHex)
}
d.FieldValueUint("stream_type", uint64(s.Type), tsStreamTypeMap)
}
mtpo.Pid = int(pid)
mtpo.TransportScramblingControl = int(d.FieldU2("transport_scrambling_control", scalar.UintMapSymStr{
0b00: "not_scrambled",
0b01: "reserved",
0b10: "even_key",
0b11: "odd_key",
}))
adaptationFieldControl := d.FieldU2("adaptation_field_control", scalar.UintMapSymStr{
0b00: "reserved",
adaptationFieldControlPayloadOnly: "payload_only",
adaptationFieldControlAdaptationFieldOnly: "adaptation_field_only",
adaptationFieldControlAdaptationFieldAndPayload: "adaptation_and_payload",
})
mtpo.ContinuityCounter = int(d.FieldU4("continuity_counter"))
d.FieldValueUint("stream_type", uint64(s.Type), tsStreamTypeMap)
}
mtpo.Pid = int(pid)
mtpo.TransportScramblingControl = int(d.FieldU2("transport_scrambling_control", scalar.UintMapSymStr{
0b00: "not_scrambled",
0b01: "reserved",
0b10: "even_key",
0b11: "odd_key",
}))
adaptationFieldControl := d.FieldU2("adaptation_field_control", scalar.UintMapSymStr{
0b00: "reserved",
adaptationFieldControlPayloadOnly: "payload_only",
adaptationFieldControlAdaptationFieldOnly: "adaptation_field_only",
adaptationFieldControlAdaptationFieldAndPayload: "adaptation_and_payload",
})
mtpo.ContinuityCounter = int(d.FieldU4("continuity_counter", scalar.UintFn(func(s scalar.Uint) (scalar.Uint, error) {
prev, prevFound := mtpi.ContinuityMap[int(pid)]
current := int(s.Actual)

switch adaptationFieldControl {
case adaptationFieldControlAdaptationFieldOnly,
adaptationFieldControlAdaptationFieldAndPayload:
d.FieldStruct("adaptation_field", func(d *decode.D) {
length := d.FieldU8("length") // Number of bytes in the adaptation field immediately following this byte
d.FramedFn(int64(length)*8, func(d *decode.D) {
d.FieldBool("discontinuity_indicator") // Set if current TS packet is in a discontinuity state with respect to either the continuity counter or the program clock reference
d.FieldBool("random_access_indicator") // Set when the stream may be decoded without errors from this point
d.FieldBool("elementary_stream_priority_indicator") // Set when this stream should be considered "high priority"
pcrPresent := d.FieldBool("pcr_present") // Set when PCR field is present
opcrPresent := d.FieldBool("opcr_present") // Set when OPCR field is present
splicingPointPresent := d.FieldBool("splicing_point_present") // Set when splice countdown field is present
transportPrivatePresent := d.FieldBool("transport_private_present") // Set when transport private data is present
adaptationFieldExtensionPresent := d.FieldBool("adaptation_field_extension_present") // Set when adaptation extension data is present
if pcrPresent {
d.FieldU("pcr", 48)
}
if opcrPresent {
d.FieldU("opcr", 48)
}
if splicingPointPresent {
d.FieldU8("splicing_point")
}
if transportPrivatePresent {
d.FieldStruct("transport_private", func(d *decode.D) {
length := d.FieldU8("length")
d.FieldRawLen("data", int64(length)*8)
})
}
if adaptationFieldExtensionPresent {
d.FieldStruct("adaptation_extension", func(d *decode.D) {
length := d.FieldU8("length")
d.FramedFn(int64(length)*8, func(d *decode.D) {
d.FieldBool("legal_time_window")
d.FieldBool("piecewise_rate")
d.FieldBool("seamless_splice")
d.FieldU5("reserved", scalar.UintHex)
d.FieldRawLen("data", d.BitsLeft())
})
switch {
case prevFound && (prev+1)&0xf == current:
s.Description = "valid"
case prevFound:
s.Description = "invalid"
default:
s.Description = "unknown"
}

return s, nil
})))

switch adaptationFieldControl {
case adaptationFieldControlAdaptationFieldOnly,
adaptationFieldControlAdaptationFieldAndPayload:
d.FieldStruct("adaptation_field", func(d *decode.D) {
length := d.FieldU8("length") // Number of bytes in the adaptation field immediately following this byte
d.FramedFn(int64(length)*8, func(d *decode.D) {
d.FieldBool("discontinuity_indicator") // Set if current TS packet is in a discontinuity state with respect to either the continuity counter or the program clock reference
d.FieldBool("random_access_indicator") // Set when the stream may be decoded without errors from this point
d.FieldBool("elementary_stream_priority_indicator") // Set when this stream should be considered "high priority"
pcrPresent := d.FieldBool("pcr_present") // Set when PCR field is present
opcrPresent := d.FieldBool("opcr_present") // Set when OPCR field is present
splicingPointPresent := d.FieldBool("splicing_point_present") // Set when splice countdown field is present
transportPrivatePresent := d.FieldBool("transport_private_present") // Set when transport private data is present
adaptationFieldExtensionPresent := d.FieldBool("adaptation_field_extension_present") // Set when adaptation extension data is present
if pcrPresent {
d.FieldU("pcr", 48)
}
if opcrPresent {
d.FieldU("opcr", 48)
}
if splicingPointPresent {
d.FieldU8("splicing_point")
}
if transportPrivatePresent {
d.FieldStruct("transport_private", func(d *decode.D) {
length := d.FieldU8("length")
d.FieldRawLen("data", int64(length)*8)
})
}
if adaptationFieldExtensionPresent {
d.FieldStruct("adaptation_extension", func(d *decode.D) {
length := d.FieldU8("length")
d.FramedFn(int64(length)*8, func(d *decode.D) {
d.FieldBool("legal_time_window")
d.FieldBool("piecewise_rate")
d.FieldBool("seamless_splice")
d.FieldU5("reserved", scalar.UintHex)
d.FieldRawLen("data", d.BitsLeft())
})
})

// Optional fields
// LTW flag set (2 bytes)
// LTW valid flag 1 0x8000
// LTW offset 15 0x7fff Extra information for rebroadcasters to determine the state of buffers when packets may be missing.
// Piecewise flag set (3 bytes)
// Reserved 2 0xc00000
// Piecewise rate 22 0x3fffff The rate of the stream, measured in 188-byte packets, to define the end-time of the LTW.
// Seamless splice flag set (5 bytes)
// Splice type 4 0xf000000000 Indicates the parameters of the H.262 splice.
// DTS next access unit 36 0x0efffefffe The PES DTS of the splice point. Split up as multiple fields, 1 marker bit (0x1), 15 bits, 1 marker bit, 15 bits, and 1 marker bit, for 33 data bits total.
}
if d.BitsLeft() > 0 {
d.FieldRawLen("stuffing", d.BitsLeft())
}
})
// Optional fields
// LTW flag set (2 bytes)
// LTW valid flag 1 0x8000
// LTW offset 15 0x7fff Extra information for rebroadcasters to determine the state of buffers when packets may be missing.
// Piecewise flag set (3 bytes)
// Reserved 2 0xc00000
// Piecewise rate 22 0x3fffff The rate of the stream, measured in 188-byte packets, to define the end-time of the LTW.
// Seamless splice flag set (5 bytes)
// Splice type 4 0xf000000000 Indicates the parameters of the H.262 splice.
// DTS next access unit 36 0x0efffefffe The PES DTS of the splice point. Split up as multiple fields, 1 marker bit (0x1), 15 bits, 1 marker bit, 15 bits, and 1 marker bit, for 33 data bits total.
}
if d.BitsLeft() > 0 {
d.FieldRawLen("stuffing", d.BitsLeft())
}
})
}
})
}

isTable := tsPidIsTable(mtpo.Pid, mtpi.ProgramMap)
if isTable {
var payloadPointer uint64
if mtpo.PayloadUnitStart {
payloadPointer = d.FieldU8("payload_pointer")
}
if payloadPointer > 0 {
d.FieldRawLen("stuffing", int64(payloadPointer)*8)
}
isTable := tsPidIsTable(mtpo.Pid, mtpi.ProgramMap)
if isTable {
var payloadPointer uint64
if mtpo.PayloadUnitStart {
payloadPointer = d.FieldU8("payload_pointer")
}

switch adaptationFieldControl {
case adaptationFieldControlPayloadOnly,
adaptationFieldControlAdaptationFieldAndPayload:
payload := d.FieldRawLen("payload", d.BitsLeft())
mtpo.Payload = d.ReadAllBits(payload)
default:
// TODO: unknown adaption control flags
d.FieldRawLen("unknown", d.BitsLeft())
if payloadPointer > 0 {
d.FieldRawLen("stuffing", int64(payloadPointer)*8)
}
})
}

switch adaptationFieldControl {
case adaptationFieldControlPayloadOnly,
adaptationFieldControlAdaptationFieldAndPayload:
payload := d.FieldRawLen("payload", d.BitsLeft())
mtpo.Payload = d.ReadAllBits(payload)
default:
// TODO: unknown adaption control flags
d.FieldRawLen("unknown", d.BitsLeft())
}

return mtpo
}

0 comments on commit 0331698

Please sign in to comment.