Skip to content

Commit

Permalink
add passive fec
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Jan 3, 2024
1 parent 637838a commit 375407b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 21 deletions.
47 changes: 29 additions & 18 deletions fec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kcp
import (
"encoding/binary"
"sync/atomic"
"time"

"github.com/klauspost/reedsolomon"
)
Expand Down Expand Up @@ -280,19 +281,24 @@ type (
// caches
shardCache [][]byte
encodeCache [][]byte
tsCache []int64

// RS encoder
codec reedsolomon.Encoder

// record min rto
minRTO int
}
)

func newFECEncoder(dataShards, parityShards, offset int) *fecEncoder {
func newFECEncoder(dataShards, parityShards, offset, minRTO int) *fecEncoder {
if dataShards <= 0 || parityShards <= 0 {
return nil
}
enc := new(fecEncoder)
enc.dataShards = dataShards
enc.parityShards = parityShards
enc.minRTO = minRTO
enc.shardSize = dataShards + parityShards
enc.paws = 0xffffffff / uint32(enc.shardSize) * uint32(enc.shardSize)
enc.headerOffset = offset
Expand All @@ -307,6 +313,7 @@ func newFECEncoder(dataShards, parityShards, offset int) *fecEncoder {
// caches
enc.encodeCache = make([][]byte, enc.shardSize)
enc.shardCache = make([][]byte, enc.shardSize)
enc.tsCache = make([]int64, enc.shardSize)
for k := range enc.shardCache {
enc.shardCache[k] = make([]byte, mtuLimit)
}
Expand All @@ -326,6 +333,7 @@ func (enc *fecEncoder) encode(b []byte) (ps [][]byte) {
sz := len(b)
enc.shardCache[enc.shardCount] = enc.shardCache[enc.shardCount][:sz]
copy(enc.shardCache[enc.shardCount][enc.payloadOffset:], b[enc.payloadOffset:])
enc.tsCache[enc.shardCount] = time.Now().UnixNano() / int64(time.Millisecond)
enc.shardCount++

// track max datashard length
Expand All @@ -335,25 +343,28 @@ func (enc *fecEncoder) encode(b []byte) (ps [][]byte) {

// Generation of Reed-Solomon Erasure Code
if enc.shardCount == enc.dataShards {
// fill '0' into the tail of each datashard
for i := 0; i < enc.dataShards; i++ {
shard := enc.shardCache[i]
slen := len(shard)
clear(shard[slen:enc.maxSize])
}
// generate the rs-code only if the data is continuous.
if enc.tsCache[enc.shardCount-1]-enc.tsCache[0] < int64(enc.minRTO) {
// fill '0' into the tail of each datashard
for i := 0; i < enc.dataShards; i++ {
shard := enc.shardCache[i]
slen := len(shard)
clear(shard[slen:enc.maxSize])
}

// construct equal-sized slice with stripped header
cache := enc.encodeCache
for k := range cache {
cache[k] = enc.shardCache[k][enc.payloadOffset:enc.maxSize]
}
// construct equal-sized slice with stripped header
cache := enc.encodeCache
for k := range cache {
cache[k] = enc.shardCache[k][enc.payloadOffset:enc.maxSize]
}

// encoding
if err := enc.codec.Encode(cache); err == nil {
ps = enc.shardCache[enc.dataShards:]
for k := range ps {
enc.markParity(ps[k][enc.headerOffset:])
ps[k] = ps[k][:enc.maxSize]
// encoding
if err := enc.codec.Encode(cache); err == nil {
ps = enc.shardCache[enc.dataShards:]
for k := range ps {
enc.markParity(ps[k][enc.headerOffset:])
ps[k] = ps[k][:enc.maxSize]
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion fec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BenchmarkFECEncode(b *testing.B) {

b.ReportAllocs()
b.SetBytes(payLoad)
encoder := newFECEncoder(dataSize, paritySize, 0)
encoder := newFECEncoder(dataSize, paritySize, 0, 200)
for i := 0; i < b.N; i++ {
data := make([]byte, payLoad)
encoder.encode(data)
Expand Down
4 changes: 2 additions & 2 deletions sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
// FEC codec initialization
sess.fecDecoder = newFECDecoder(dataShards, parityShards)
if sess.block != nil {
sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize)
sess.fecEncoder = newFECEncoder(dataShards, parityShards, cryptHeaderSize, IKCP_RTO_MIN)
} else {
sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0)
sess.fecEncoder = newFECEncoder(dataShards, parityShards, 0, IKCP_RTO_MIN)
}

// calculate additional header size introduced by FEC and encryption
Expand Down

0 comments on commit 375407b

Please sign in to comment.