Skip to content

Commit 6421850

Browse files
Improve robustness (#4)
* use explicit prefetch count for consumer * added hack for ssl certs * fixed arg * added timeout arg
1 parent 1a0a6e5 commit 6421850

File tree

1 file changed

+21
-0
lines changed

1 file changed

+21
-0
lines changed

main.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,9 @@ def get_ssl_options(args):
194194
context = ssl.create_default_context(cafile=args.rabbitmq_cacertfile)
195195
# HACK this allows the host and baked in host to be configured independently
196196
context.check_hostname = False
197+
if args.ssl_no_verify:
198+
context.verify_mode = ssl.CERT_NONE
199+
return pika.SSLOptions(context, args.rabbitmq_host)
197200
if args.rabbitmq_certfile != "":
198201
context.load_cert_chain(args.rabbitmq_certfile, args.rabbitmq_keyfile)
199202
return pika.SSLOptions(context, args.rabbitmq_host)
@@ -227,6 +230,11 @@ def main():
227230
"--influxdb_bucket", default=getenv("INFLUXDB_BUCKET", "waggle")
228231
)
229232
parser.add_argument("--influxdb_org", default=getenv("INFLUXDB_ORG", "waggle"))
233+
parser.add_argument(
234+
"--influxdb_connection_timeout",
235+
type=int,
236+
default=getenv("INFLUXDB_CONNECTION_TIMEOUT", "10000"),
237+
)
230238
parser.add_argument(
231239
"--max_flush_interval",
232240
default=getenv("MAX_FLUSH_INTERVAL", "1.0"),
@@ -245,6 +253,17 @@ def main():
245253
type=int,
246254
help="port to expose metrics",
247255
)
256+
parser.add_argument(
257+
"--prefetch_count",
258+
default=getenv("PREFETCH_COUNT", "10000"),
259+
type=int,
260+
help="prefetch count to use for consumer",
261+
)
262+
parser.add_argument(
263+
"--ssl_no_verify",
264+
action="store_true",
265+
help="disable ssl host verification. please only use for debugging!",
266+
)
248267
args = parser.parse_args()
249268

250269
logging.basicConfig(
@@ -276,6 +295,7 @@ def main():
276295
url=args.influxdb_url,
277296
token=args.influxdb_token,
278297
org=args.influxdb_org,
298+
timeout=args.influxdb_connection_timeout,
279299
enable_gzip=True,
280300
)
281301
)
@@ -288,6 +308,7 @@ def main():
288308
ch = conn.channel()
289309
ch.queue_declare(args.rabbitmq_queue, durable=True)
290310
ch.queue_bind(args.rabbitmq_queue, args.rabbitmq_exchange, "#")
311+
ch.basic_qos(prefetch_count=args.prefetch_count)
291312

292313
handler = MessageHandler(
293314
rabbitmq_conn=conn,

0 commit comments

Comments
 (0)