forked from weaveworks/weave
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchunked.go
More file actions
123 lines (111 loc) · 3 KB
/
chunked.go
File metadata and controls
123 lines (111 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Based on net/http/internal
package proxy
import (
"bufio"
"bytes"
"errors"
"io"
"io/ioutil"
"strconv"
)
var (
ErrLineTooLong = errors.New("header line too long")
ErrInvalidChunkLength = errors.New("invalid byte in chunk length")
)
// Unlike net/http/internal.chunkedReader, this has an interface where we can
// handle individual chunks. The interface is based on database/sql.Rows.
func NewChunkedReader(r io.Reader) *ChunkedReader {
br, ok := r.(*bufio.Reader)
if !ok {
br = bufio.NewReader(r)
}
return &ChunkedReader{r: br}
}
type ChunkedReader struct {
r *bufio.Reader
chunk *io.LimitedReader
err error
buf [2]byte
}
// Next prepares the next chunk for reading. It returns true on success, or
// false if there is no next chunk or an error happened while preparing
// it. Err should be consulted to distinguish between the two cases.
//
// Every call to Chunk, even the first one, must be preceded by a call to Next.
//
// Calls to Next will discard any unread bytes in the current Chunk.
func (cr *ChunkedReader) Next() bool {
if cr.err != nil {
return false
}
// Check the termination of the previous chunk
if cr.chunk != nil {
// Make sure the remainder is drained, in case the user of this quit
// reading early.
if _, cr.err = io.Copy(ioutil.Discard, cr.chunk); cr.err != nil {
return false
}
// Check the next two bytes after the chunk are \r\n
if _, cr.err = io.ReadFull(cr.r, cr.buf[:2]); cr.err != nil {
return false
}
if cr.buf[0] != '\r' || cr.buf[1] != '\n' {
cr.err = errors.New("malformed chunked encoding")
return false
}
} else {
cr.chunk = &io.LimitedReader{R: cr.r}
}
// Setup the next chunk
if n := cr.beginChunk(); n > 0 {
cr.chunk.N = int64(n)
} else if cr.err == nil {
cr.err = io.EOF
}
return cr.err == nil
}
// Chunk returns the io.Reader of the current chunk. On each call, this returns
// the same io.Reader for a given chunk.
func (cr *ChunkedReader) Chunk() io.Reader {
return cr.chunk
}
// Err returns the error, if any, that was encountered during iteration.
func (cr *ChunkedReader) Err() error {
if cr.err == io.EOF {
return nil
}
return cr.err
}
func (cr *ChunkedReader) beginChunk() uint64 {
var (
line []byte
n uint64
)
// chunk-size CRLF
line, cr.err = readLine(cr.r)
if cr.err != nil {
return 0
}
n, cr.err = strconv.ParseUint(string(line), 16, 64)
if cr.err != nil {
cr.err = ErrInvalidChunkLength
}
return n
}
// Read a line of bytes (up to \n) from b.
// Give up if the line exceeds the buffer size.
// The returned bytes are a pointer into storage in
// the bufio, so they are only valid until the next bufio read.
func readLine(b *bufio.Reader) (p []byte, err error) {
if p, err = b.ReadSlice('\n'); err != nil {
// We always know when EOF is coming.
// If the caller asked for a line, there should be a line.
if err == io.EOF {
err = io.ErrUnexpectedEOF
} else if err == bufio.ErrBufferFull {
err = ErrLineTooLong
}
return nil, err
}
return bytes.TrimRight(p, " \t\n\r"), nil
}