This repository has been archived by the owner on Oct 30, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworker.coffee
193 lines (165 loc) · 5.78 KB
/
worker.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
Web3 = require "web3"
Pudding = require "ether-pudding"
PuddingLoader = require "ether-pudding/loader"
Promise = require 'bluebird'
Ipfs = require 'ipfs-api'
url = require 'url'
debug = require('debug')('jobjs:worker')
jsjob = require 'jsjob'
ipfs = require './ipfs'
isIpfs = require 'is-ipfs'
class Worker
constructor: (@options = {}) ->
@options.ethereum = {} unless @options.ethereum
@options.ethereum.rpc = 'http://localhost:8545' unless @options.ethereum.rpc
@options.ethereum.contractsDir = './environments/development/contracts' unless @options.ethereum.contractsDir
@agency = null
@options.ipfs = {} unless @options.ipfs
@options.ipfs.apiAddr = '/ip4/127.0.0.1/tcp/5001' unless @options.ipfs.apiAddr
# FIXME: Default should be 8080?
@options.ipfs.gateway = 'http://localhost:8090' unless @options.ipfs.gateway
# FIXME: unhardcode the script
@options.runner =
scripts: [
"window.jsJobRun = function(d, o, cb) { window.polySolvePage(d, o, cb) };"
]
@agencyWatcher = null
@preparePudding()
@prepareIpfs()
@seenJobIds = []
@runner = new jsjob.Runner @options.runner
preparePudding: ->
@web3 = new Web3()
Pudding.setWeb3 @web3
p = new Web3.providers.HttpProvider @options.ethereum.rpc
@web3.setProvider p
prepareIpfs: ->
@ipfs = Ipfs @options.ipfs.apiAddr
loadContracts: (callback) ->
contracts = {}
PuddingLoader.load @options.ethereum.contractsDir, Pudding, contracts, (err, names) ->
return callback err if err
return callback null, contracts
subscribeAgency: (agency, callback) ->
@agencyWatcher = agency.JobPosted()
@agencyWatcher.watch (err, event) =>
return callback err if err
jobId = event.args.jobId.c[0]
# Avoid duplicates. Due to multiple confirmations?
# FIXME: Probably we should wait for a certain number before considering it legit
if jobId in @seenJobIds
debug "Duplicate job #{jobId} received"
return
@seenJobIds.push jobId
return callback null, jobId
startRunner: (callback) ->
@runner.start (err) ->
return callback err if err
do callback
getJobData: (jobId, callback) ->
getCode = @agency.getJobCode.call jobId
.then (d) ->
return ipfs.toStr d
getInput = @agency.getJobInput.call jobId
.then (d) ->
return ipfs.toStr d
return Promise.props({
input: getInput,
code: getCode,
}).nodeify(callback)
getIpfsContents: (hash, callback) ->
unless isIpfs.multihash hash
return callback new Error "#{hash} is not a valid IPFS multihash"
@ipfs.cat hash, (err, data) ->
return callback err if err
contents = ''
data.on 'data', (chunk) ->
contents += chunk
data.on 'end', ->
callback null, contents
setIpfsContents: (data, callback) ->
if typeof data is 'object'
data = JSON.stringify data
if typeof data is 'string'
data = new Buffer data
@ipfs.add data, (err, res) ->
return callback err if err
unless res?.length
return callback new Error "No results for IPFS add"
callback null, res[0].Hash
completeJob: (jobId, resultHash, callback) ->
@agency.completeJob(jobId, ipfs.toHex(resultHash))
.nodeify callback
runJob: (jobId, callback) ->
console.time "Job #{jobId} total"
@getJobData jobId, (err, job) =>
# FIXME: get from IPFS
gatewayUrl = url.parse @options.ipfs.gateway
codeUrl = url.format
protocol: gatewayUrl.protocol
host: gatewayUrl.host
pathname: "/ipfs/#{job.code}"
unless isIpfs.url codeUrl
return callback new Error "#{codeUrl} is not a valid IPFS URL"
console.time "Job #{jobId} IPFS cat"
@getIpfsContents job.input, (err, contents) =>
console.timeEnd "Job #{jobId} IPFS cat"
return callback err if err
try
inputData = JSON.parse contents
catch e
return callback e
jobOptions = {}
console.time "Job #{jobId} run"
@runner.runJob codeUrl, inputData, jobOptions, (err, j) =>
console.timeEnd "Job #{jobId} run"
return callback err if err
resultData = j?.html or j
console.time "Job #{jobId} IPFS add"
@setIpfsContents resultData, (err, hash) =>
console.timeEnd "Job #{jobId} IPFS add"
console.timeEnd "Job #{jobId} total"
return callback err if err
@completeJob jobId, hash, (err) =>
return callback if err
return callback null, hash
start: (callback) ->
@startRunner (err) =>
return callback err if err
@web3.eth.getAccounts (err, accs) =>
Pudding.defaults({
from: accs[0],
gas: 3141592 # XXX: if this is not sent then we run out of gas??
})
@loadContracts (err, contracts) =>
return callback err if err
if @options.agency
@agency = contracts.JobAgency.at @options.agency
else
@agency = contracts.JobAgency.deployed()
console.log 'JobAgency address:', @agency.address
@subscribeAgency @agency, (err, jobId) =>
@runJob jobId, (err, result) =>
if err
console.log "ERROR", err
process.exit 1
console.log result
stop: (callback) ->
@agencyWatcher.stopWatching() if @agencyWatcher
@agencyWatcher = null
@runner.stop callback
parse = () ->
contract = process.argv[process.argv.length - 1];
if contract.substr(0, 2) != '0x'
contract = null
options =
agency: contract
return options
exports.main = main = () ->
options = parse()
w = new Worker options
w.start (err) ->
if err
console.error err
process.exit 1
do main unless module.parent