-
Notifications
You must be signed in to change notification settings - Fork 1
/
item.js
70 lines (59 loc) · 1.34 KB
/
item.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
'use strict'
const defer = require('any-deferred')
class ChannelItem {
constructor (request) {
this.sender = defer()
this.receiver = defer()
this.request = request
this.next = null
}
consume () {
if (this.request) setImmediate(this.request)
return this.sender.promise
}
// Non-destructive traversal--there may already be
// an input or output in the queue
ensureNext () {
if (!this.next) {
this.next = new ChannelItem(this.receiver.resolve)
}
return this.next
}
// This allows you to insert new items at the start
wrap () {
const { request } = this
const item = new ChannelItem(request)
this.request = item.receiver.resolve
item.next = this
return item
}
send (value) {
this.sender.resolve({
done: false,
value
})
return this.receiver.promise
}
error (error) {
this.sender.reject(error)
// Ensure the tail is finalized
if (this.next && this.next !== this) {
this.next.error(error)
}
this.next = this
return this.receiver.promise
}
close (value) {
this.sender.resolve({
done: true,
value
})
// Ensure the tail is finalized
if (this.next && this.next !== this) {
this.next.close(value)
}
this.next = this
return this.receiver.promise
}
}
module.exports = ChannelItem