-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsource.js
134 lines (118 loc) · 2.62 KB
/
source.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
const { EventEmitter } = require('events')
const { Origin } = require('./origin')
const { source } = require('./storage')
const { Box } = require('./box')
const storage = require('./storage')
const codecs = require('./codecs')
const extend = require('extend')
const debug = require('debug')('little-network-box:source')
const hooks = require('./hooks')
const pump = require('pump')
const get = require('get-uri')
const url = require('url')
const fs = require('fs')
// quick util
const bind = (self, f) => (...args) => f.call(self, ...args)
// exported symbols attached to the `Source` class
const kSourceStream = Symbol('Source.stream')
/**
* The `Source` class represents a streamable source specified
* defaulting to URI streams supported by the `get-uri`.
* @public
* @class
* @extends Origin
*/
class Source extends Origin {
/**
* Default options for a `Source` class instance.
* @public
* @static
* @param {?(Object)} defaults
* @param {...?(Object)} overrides
* @return {Object}
*/
static defaults(defaults, ...overrides) {
return Origin.defaults({
encryptionKey: null,
highWaterMark: 64 * 1024,
indexing: true,
}, defaults, ...overrides)
}
/**
* @private
*/
[Box.options](opts) {
super[Box.options](opts)
const u = url.parse(opts.uri)
try {
if (!u.protocol) {
fs.accessSync(opts.uri)
u.protocol = 'file:'
opts.uri = url.format(u)
}
} catch (err) {
void err
}
}
/**
* @protected
*/
[Box.init](opts) {
super[Box.init](opts)
this.uri = opts.uri
}
/**
* @private
*/
[Box.codec](opts) {
return null
}
/**
* @private
*/
[Box.storage](storage, opts) {
return source(this, this.uri, storage, null, opts)
}
/**
* @private
*/
[Box.ready](opts, done) {
super[Box.ready](opts, (err) => {
if (err) { return done(err) }
this.feed.ready(() => {
this[kSourceStream](opts, (err, stream) => {
if (err || !stream) { return done(err) }
pump(stream, this.createWriteStream(), done)
})
})
})
}
/**
* @private
*/
[kSourceStream](opts, done) {
const { highWaterMark } = opts
get(this.uri, { highWaterMark }, done)
}
}
/**
* The `Source.stream` symbol for overloading the handler
* for getting source stream stream.
* @public
* @static
* @type {Symbol}
*/
Source.stream = kSourceStream
/**
* Factory for creating `Sink` instances.
* @public
*/
function createSource(...args) {
return new Source(...args)
}
/**
* Module exports.
*/
module.exports = Object.assign(createSource, {
Source,
})