-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtemporary.go
More file actions
214 lines (190 loc) · 4.49 KB
/
temporary.go
File metadata and controls
214 lines (190 loc) · 4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
// https://github.com/linfangrong/temporary
/*
将io.Reader转换成io.ReadSeeker。
功能
1. 超过一定大小、将临时缓冲转成临时文件
2. 异步读取源信息
3. 异步读取源信息后关闭
Attention
使用异步读取数据时候, 在调用其他方法之前一定要先使用Await。
该操作等待全部数据的加载, 同时可以处理读取数据时的错误。
*/
package temporary
import (
"io"
"io/ioutil"
"sync"
)
type Temporary interface {
Await() error
Size() int64
Type() string
Name() string
Bytes() []byte
io.Reader
io.Seeker
io.Closer
}
const (
TemporaryBuffer string = "Buffer"
TemporaryFile string = "File"
)
type temporaryItemer interface {
Size() int64
Type() string
Name() string
Bytes() []byte
Sync() error
io.Writer
io.Reader
io.Seeker
io.Closer
}
type temporary struct {
item temporaryItemer
itemConvert bool
itemWg *sync.WaitGroup
itemAsyncErr error
maxBufferSize int64
fileDir string
filePattern string
}
func NewTemporary(reader io.Reader, maxBufferSize int64, fileDir string, filePattern string) (_ Temporary, err error) {
var temp *temporary = &temporary{
item: newTemporaryBuffer(),
itemConvert: false,
itemWg: new(sync.WaitGroup),
maxBufferSize: maxBufferSize,
fileDir: fileDir,
filePattern: filePattern,
}
if _, err = io.Copy(temp, reader); err != nil {
return
}
if err = temp.item.Sync(); err != nil {
return
}
return temp, nil
}
func NewAsyncTemporary(reader io.Reader, maxBufferSize int64, fileDir string, filePattern string) (_ Temporary) {
var temp *temporary = &temporary{
item: newTemporaryBuffer(),
itemConvert: false,
itemWg: new(sync.WaitGroup),
maxBufferSize: maxBufferSize,
fileDir: fileDir,
filePattern: filePattern,
}
temp.itemWg.Add(1)
go func(temp *temporary, reader io.Reader) {
if _, temp.itemAsyncErr = io.Copy(temp, reader); temp.itemAsyncErr != nil {
goto end
}
if temp.itemAsyncErr = temp.item.Sync(); temp.itemAsyncErr != nil {
goto end
}
end:
temp.itemWg.Done()
}(temp, reader)
return temp
}
func NewMustCloseReaderAsyncTemporary(readcloser io.ReadCloser, maxBufferSize int64, fileDir string, filePattern string) (_ Temporary) {
var temp *temporary = &temporary{
item: newTemporaryBuffer(),
itemConvert: false,
itemWg: new(sync.WaitGroup),
maxBufferSize: maxBufferSize,
fileDir: fileDir,
filePattern: filePattern,
}
temp.itemWg.Add(1)
go func(temp *temporary, readcloser io.ReadCloser) {
if _, temp.itemAsyncErr = io.Copy(temp, readcloser); temp.itemAsyncErr != nil {
goto end
}
if temp.itemAsyncErr = temp.item.Sync(); temp.itemAsyncErr != nil {
goto end
}
end:
io.Copy(ioutil.Discard, readcloser)
readcloser.Close()
temp.itemWg.Done()
}(temp, readcloser)
return temp
}
func (temp *temporary) toTemporaryFile() (err error) {
if temp.itemConvert {
return
}
var (
seekPosition int64
tf *temporaryFile
)
if seekPosition, err = temp.item.Seek(0, io.SeekCurrent); err != nil {
return
}
if tf, err = newTemporaryFile(temp.fileDir, temp.filePattern); err != nil {
return
}
if _, err = temp.item.Seek(0, io.SeekStart); err != nil {
return
}
if _, err = io.Copy(tf, temp.item); err != nil {
return
}
temp.item.Close()
if _, err = tf.Seek(seekPosition, io.SeekStart); err != nil {
return
}
temp.item = tf
temp.itemConvert = true
return
}
func (temp *temporary) Write(p []byte) (n int, err error) {
if temp.item.Size()+int64(len(p)) > temp.maxBufferSize {
if err = temp.toTemporaryFile(); err != nil {
return
}
}
if n, err = temp.item.Write(p); err != nil {
if err == ErrBufferTooLarge { // 内存分配出错
if err = temp.toTemporaryFile(); err != nil {
return
}
var m int
m, err = temp.item.Write(p[n:])
n += m
if err != nil {
return
}
return
}
return
}
return
}
func (temp *temporary) Await() error {
temp.itemWg.Wait()
return temp.itemAsyncErr
}
func (temp *temporary) Size() int64 {
return temp.item.Size()
}
func (temp *temporary) Type() string {
return temp.item.Type()
}
func (temp *temporary) Name() string {
return temp.item.Name()
}
func (temp *temporary) Bytes() []byte {
return temp.item.Bytes()
}
func (temp *temporary) Read(p []byte) (n int, err error) {
return temp.item.Read(p)
}
func (temp *temporary) Seek(offset int64, whence int) (abs int64, err error) {
return temp.item.Seek(offset, whence)
}
func (temp *temporary) Close() (err error) {
return temp.item.Close()
}