-
Notifications
You must be signed in to change notification settings - Fork 119
/
Copy pathUrlCache.kt
144 lines (132 loc) · 4.63 KB
/
UrlCache.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package ai.platon.pulsar.common.collect
import ai.platon.pulsar.common.Priority13
import ai.platon.pulsar.common.collect.queue.*
import ai.platon.pulsar.common.urls.UrlAware
import com.google.common.cache.LoadingCache
import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
/**
* The url cache holds urls.
*
* A url cache contains several queues for different purpose: reentrant, non-reentrant and n-reentrant.
* A reentrant queue accepts the same url multiple times, a non-reentrant queue accepts the same url only once,
* and an n-reentrant queue accepts the same url for n times at most.
*
* The URL cache is expected to be very large and items may be loaded from external sources such as MongoDB.
* */
interface UrlCache {
/**
* The cache name
* */
val name: String
/**
* The priority
* */
val priority: Int
/**
* A non-reentrant queue accepts the same url only once
* */
val nonReentrantQueue: Queue<UrlAware>
/**
* An n-reentrant queue accepts the same url for n times at most
* */
val nReentrantQueue: Queue<UrlAware>
/**
* A reentrant queue accepts the same url multiple times
* */
val reentrantQueue: Queue<UrlAware>
/**
* Create a list of all the queues
* */
val queues: List<Queue<UrlAware>>
get() = listOf(nonReentrantQueue, nReentrantQueue, reentrantQueue)
/**
* The total size of all the queues
* */
val size get() = queues.sumOf { it.size }
/**
* The precise count of urls in the external source, since the external source can be very large,
* retrieving the precise size can be very slow in some external source.
* */
val externalSize: Int get() = 0
/**
* The estimated, imprecise count of urls in the external source, it should be very fast.
* */
val estimatedExternalSize get() = 0
/**
* The estimated, imprecise count of all urls both in local cache and the external source.
* */
val estimatedSize get() = size + estimatedExternalSize
/**
* Remove dead urls.
* */
fun removeDeceased()
/**
* Clear the local cache.
* */
fun clear()
/**
* Clear both the local cache and external source.
* */
fun deepClear() = clear()
}
abstract class AbstractUrlCache(
override val name: String,
override val priority: Int
) : UrlCache {
override fun removeDeceased() {
val now = Instant.now()
queues.forEach { it.removeIf { it.deadline < now } }
}
override fun clear() {
queues.forEach { it.clear() }
}
}
open class ConcurrentUrlCache(
name: String = "",
priority: Int = Priority13.NORMAL.value
) : AbstractUrlCache(name, priority) {
override val nonReentrantQueue = ConcurrentNonReentrantQueue<UrlAware>()
override val nReentrantQueue = ConcurrentNEntrantQueue<UrlAware>(3)
override val reentrantQueue = ConcurrentLinkedQueue<UrlAware>()
}
/**
* Contains a sets of loading queues which can load urls from external source using [urlLoader].
* */
class LoadingUrlCache constructor(
name: String,
priority: Int,
/**
* A loader to load urls from external sources
* */
val urlLoader: ExternalUrlLoader,
/**
* The capacity for each queue
* */
val capacity: Int = LoadingQueue.DEFAULT_CAPACITY,
) : AbstractUrlCache(name, priority), Loadable<UrlAware> {
companion object {
const val G_NON_REENTRANT = 1
const val G_N_ENTRANT = 2
const val G_REENTRANT = 3
}
override val nonReentrantQueue = ConcurrentNonReentrantLoadingQueue(urlLoader, topic(G_NON_REENTRANT))
override val nReentrantQueue = ConcurrentNEntrantLoadingQueue(urlLoader, topic(G_N_ENTRANT), 3)
override val reentrantQueue = ConcurrentLoadingQueue(urlLoader, topic(G_REENTRANT))
override val queues: List<Queue<UrlAware>> get() = listOf(nonReentrantQueue, nReentrantQueue, reentrantQueue)
override val externalSize: Int
get() = queues.filterIsInstance<LoadingQueue<UrlAware>>().sumOf { it.externalSize }
override val estimatedExternalSize: Int
get() = queues.filterIsInstance<LoadingQueue<UrlAware>>().sumOf { it.estimatedExternalSize }
override fun load() {
queues.filterIsInstance<Loadable<UrlAware>>().forEach { it.load() }
}
override fun loadNow(): Collection<UrlAware> {
return queues.filterIsInstance<Loadable<UrlAware>>().flatMap { it.loadNow() }
}
override fun deepClear() {
queues.filterIsInstance<LoadingQueue<UrlAware>>().forEach { it.deepClear() }
}
private fun topic(group: Int) = UrlTopic(name, group, priority, capacity)
}