This repository was archived by the owner on Nov 20, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathConsumer.fsx
More file actions
118 lines (99 loc) · 3.96 KB
/
Consumer.fsx
File metadata and controls
118 lines (99 loc) · 3.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#load "Refs.fsx"
#time "on"
open FSharp.Control
open Kafunk
open System
open System.Collections.Concurrent
//Log.MinLevel <- LogLevel.Trace
let Log = Log.create __SOURCE_FILE__
let argiDefault i def = fsi.CommandLineArgs |> Seq.tryItem i |> Option.getOr def
let host = argiDefault 1 "localhost"
let topic = argiDefault 2 "absurd-topic"
let group = argiDefault 3 "existential-group"
let count = argiDefault 4 "1" |> Int32.Parse
let go = async {
let! conn =
let connConfig =
let chanConfig =
ChanConfig.create (
requestTimeout = TimeSpan.FromSeconds 60.0,
receiveBufferSize = 8192 * 50,
sendBufferSize = 8192 * 50,
connectRetryPolicy = ChanConfig.DefaultConnectRetryPolicy,
requestRetryPolicy = ChanConfig.DefaultRequestRetryPolicy)
KafkaConfig.create (
[KafkaUri.parse host],
//[KafkaUri.parse "localhost:9092" ; KafkaUri.parse "localhost:9093" ; KafkaUri.parse "localhost:9094"],
tcpConfig = chanConfig,
requestRetryPolicy = KafkaConfig.DefaultRequestRetryPolicy,
version = Versions.V_0_10_1,
//autoApiVersions = true,
//version = Versions.V_0_9_0,
//autoApiVersions = false,
clientId = "leo")
Kafka.connAsync connConfig
let consumerConfig =
ConsumerConfig.create (
groupId = group,
topic = topic,
autoOffsetReset = AutoOffsetReset.StartFromTime Time.EarliestOffset,
fetchMaxBytes = 1000000,
fetchMaxBytesTotal = 50000000,
fetchMaxBytesOverride = 1000000,
//fetchMinBytes = 0,
//fetchMaxWaitMs = 1000,
fetchBufferSize = 1,
sessionTimeout = 30000,
heartbeatFrequency = 3,
checkCrc = false,
endOfTopicPollPolicy = RetryPolicy.constantMs 1000
)
let! consumer =
Consumer.createAsync conn consumerConfig
let showProgress =
AsyncSeq.intervalMs 10000
|> AsyncSeq.iterAsync (fun _ -> async {
let! info = ConsumerInfo.consumerProgress consumer
let str =
info.partitions
|> Seq.map (fun p -> sprintf "[p=%i o=%i hwo=%i lag=%i lead=%i eo=%i mc=%i]" p.partition p.consumerOffset p.highWatermarkOffset p.lag p.lead p.earliestOffset p.messageCount)
|> String.concat " ; "
Log.info "consumer_progress|conn_id=%s topic=%s total_lag=%i min_lead=%i partitions=%s" conn.Config.connId info.topic info.totalLag info.minLead str
return () })
let! _ = Async.StartChild showProgress
let partitionOffsets = new ConcurrentDictionary<Partition, Offset> ()
let handle (s:ConsumerState) (ms:ConsumerMessageSet) = async {
do! Async.Sleep 5000
if ms.partition = 0 then
Log.info "consuming_message_set|topic=%s partition=%i count=%i size=%i os=[%i-%i] ts=[%O] hwo=%i lag=%i"
ms.topic
ms.partition
(ms.messageSet.messages.Length)
(ConsumerMessageSet.size ms)
(ConsumerMessageSet.firstOffset ms)
(ConsumerMessageSet.lastOffset ms)
(ConsumerMessageSet.firstTimestamp ms)
(ms.highWatermarkOffset)
(ConsumerMessageSet.lag ms)
for msi in ms.messageSet.messages do
match partitionOffsets.TryGetValue (ms.partition) with
| true, lastOffset ->
if (lastOffset + 1L < msi.offset) then
let gap = msi.offset - (lastOffset + 1L)
failwithf "non_contig_offsets_detected|partition=%i last_offset=%i current_offset=%i gap=%i" ms.partition lastOffset msi.offset gap
| _ -> ()
partitionOffsets.[ms.partition] <- msi.offset
return () }
use counter = Metrics.counter Log 5000
let handle =
handle
|> Metrics.throughputAsync2To counter (fun (_,ms,_) -> ms.messageSet.messages.Length)
do! Consumer.consumePeriodicCommit consumer (TimeSpan.FromSeconds 10.0) handle
//do! Consumer.consume consumer handle
//do! Consumer.stream consumer |> AsyncSeq.iterAsync (fun (s,ms) -> handle s ms)
Log.info "done_consuming"
}
Seq.init count (fun _ -> go)
|> Async.Parallel
|> Async.RunSynchronously
|> ignore