Skip to content

Fix leaked fiber in notification_stream_handler #5330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

syeopite
Copy link
Member

Of the technically four fibers spawned by create_notification_stream two of them are wrapped around an ensure clause to always unsubscribe themselves from the notification job after an error.

The first is the heartbeat fiber, which is also actually the main fiber of the route handler. The second is a fiber that awaits for notification pushes from the notification job through the connection channel. When an error occurs within the main heartbeat fiber, the ensure clause is executed and the function will unsubscribe itself from receiving any pushes from the notification job.

The problem however is that this could (will almost always actually) occur when the notification receiver fiber is awaiting a value from the notification job. Except the job will no longer be able to send anything to the receiver since they were unsubscribed by the heartbeat fiber just a moment ago.

The notification receiver fiber will now block indefinitely.

And in doing so will pretty much prevent the entire execution stack of the fiber and the create_notification_stream function from getting garbage collected.

The IO buffers for the contents of the request and response will stay referenced, the underlying TCP/TLS sockets will become inaccessible and leaked, the parsed structures of the YT's massive JSON objects will stay allocated, etc.

This PR simply merges the two into a single fiber, via a select statement ensuring that there will be no concurrency problems.


TL;DR: A dangling fiber was blocking indefinitely which prevented a bunch of stuff from being garbage collected.

Of the technically four fibers spawned by `create_notification_stream`
two of them are wrapped around an ensure clause to always unsubscribe
itself from the notification job after an error, or when it simply
finishes.

The first is the heartbeat fiber, which is also actually the main
fiber of the route handler. The second is a fiber that awaits for
notification pushes from the notification job through the
`connection` channel. When an error occurs within the main heartbeat
fiber, the ensure clause is executed and the function will unsubscribe
itself from receiving any pushes from the notification job.

The problem however is that this could (will almost always actually)
occur when the notification receiver fiber is awaiting a value from
the notification job. Except the job will no longer be able to
send anything to the receiver since they were unsubscribed by the
heartbeat fiber just a moment ago.

The notification receiver fiber will now block indefinitely.

And in doing so will pretty much prevent the entire execution stack of
the fiber and the `create_notification_stream` function from getting
garbage collected.

The IO buffers for the contents of the request and
response will stay referenced, the underlying TCP/TLS sockets will
become inaccessible and leaked, the parsed structures of the YT's
massive JSON objects will stay allocated, etc.

This PR simply merges the two into a single fiber, via a select
statement ensuring that there will be no concurrency problems.
@syeopite
Copy link
Member Author

syeopite commented Jun 1, 2025

For reference here's a quick benchmark I did with and without this fix (plus the patch below to tell Invidious to run GC.collect, and to bypass the auth handler as to not overwhelm the database)

Before the changes here:

48  MB          After start-up and first job iteration
115 MB          After 500 connections 
198 MB          After 1000 connections
278 MB          After 1500 connections
253 MB          After Invidious finished handling all the `IO::Errors` caused by the cancellations 
227 MB          After calling `GC.collect`

Afterwards:

48  MB          After start-up and first job iteration
115 MB          After 500 connections
190 MB          After 1000 connections
274 MB          After 1500 connections
234 MB          After Invidious finishes handling all the `IO::Errors` caused by the cancellations   
70  MB          After calling `GC.collect`

Make sure to increase ulimit -n prior to running!

Patch
diff --git a/src/invidious/helpers/handlers.cr b/src/invidious/helpers/handlers.cr
index 13ea9fe9..55f4066b 100644
--- a/src/invidious/helpers/handlers.cr
+++ b/src/invidious/helpers/handlers.cr
@@ -95,6 +95,9 @@ class AuthHandler < Kemal::Handler
     return call_next env unless only_match? env
 
     begin
+      # Bypass database call for the sake of testing the leak
+      call_next env
+
       if token = env.request.headers["Authorization"]?
         token = JSON.parse(URI.decode_www_form(token.lchop("Bearer ")))
         session = URI.decode_www_form(token["session"].as_s)
diff --git a/src/invidious/routes/before_all.cr b/src/invidious/routes/before_all.cr
index b5269668..8ea304a9 100644
--- a/src/invidious/routes/before_all.cr
+++ b/src/invidious/routes/before_all.cr
@@ -66,6 +66,8 @@ module Invidious::Routes::BeforeAll
               }.any? { |r| env.request.resource.starts_with? r }
 
     if env.request.cookies.has_key? "SID"
+      # Bypass database call for the sake of testing the leak
+      return
       sid = env.request.cookies["SID"].value
 
       if sid.starts_with? "v1:"
diff --git a/src/invidious/routes/misc.cr b/src/invidious/routes/misc.cr
index 0b868755..45103e2e 100644
--- a/src/invidious/routes/misc.cr
+++ b/src/invidious/routes/misc.cr
@@ -38,6 +38,13 @@ module Invidious::Routes::Misc
     rendered "licenses"
   end
 
+  def self.call_gc(env)
+    20.times do
+      GC.collect
+      0.20.seconds
+    end
+  end
+
   def self.cross_instance_redirect(env)
     referer = get_referer(env)
 
diff --git a/src/invidious/routing.cr b/src/invidious/routing.cr
index 46b71f1f..8d64b5b5 100644
--- a/src/invidious/routing.cr
+++ b/src/invidious/routing.cr
@@ -21,6 +21,7 @@ module Invidious::Routing
       get "/privacy", Routes::Misc, :privacy
       get "/licenses", Routes::Misc, :licenses
       get "/redirect", Routes::Misc, :cross_instance_redirect
+      get "/call_gc", Routes::Misc, :call_gc
 
       self.register_channel_routes
       self.register_watch_routes
Script
require "http"
require "wait_group"

HEADERS = HTTP::Headers{
  "Connection" => "Keep-Alive",
}

COOKIES = HTTP::Cookies.new
COOKIES.add_request_headers(HEADERS)

CLIENTS_MADE = [] of HTTP::Client
BATCH_SIZE   = 500
BATCH_COUNT  =   3
TARGET       = URI.parse(<<INSERT URI>>)

# Used to modify program state during execution
module Global
  class_property? exit_request = false
  class_property! await_cancellation : WaitGroup
end

def make_client_and_request
  client = HTTP::Client.new(TARGET)

  client.get("/api/v1/auth/notifications", headers: HEADERS) do |response|
    CLIENTS_MADE << client
    Fiber.yield

    until response.body_io.closed?
      if Global.exit_request?
        return Global.await_cancellation.done
      else
        response.body_io.gets
      end
    end
  end
ensure
  client.try &.close
end

BATCH_COUNT.times do |current_cycle|
  BATCH_SIZE.times { spawn { make_client_and_request } }

  while CLIENTS_MADE.size < (BATCH_SIZE * (current_cycle + 1))
    puts "Batch #{current_cycle + 1}: #{CLIENTS_MADE.size}/#{BATCH_SIZE * (current_cycle + 1)} made"
    sleep 2.seconds
  end

  puts "Batch #{current_cycle + 1}: Finished creating #{CLIENTS_MADE.size}/#{BATCH_SIZE * (current_cycle + 1)} clients"
  puts "#{BATCH_COUNT - (current_cycle + 1)} more batches left to go | #{CLIENTS_MADE.size}/#{BATCH_COUNT * BATCH_SIZE}"

  sleep 5.seconds

  Global.exit_request = true
  await_cancellation = WaitGroup.new(BATCH_SIZE)
  Global.await_cancellation = await_cancellation

  puts "Canceling all connections to prepare for the next batch."

  await_cancellation.wait
  Global.exit_request = false

  puts "Done. Starting next batch\n\n"
end

puts "Finished making #{BATCH_COUNT * BATCH_SIZE} requests"
sleep

exit

syeopite added 2 commits June 1, 2025 12:43
I don't believe that closing Fiber channels are strictly necessary
but it doesn't hurt to do.
@syeopite syeopite requested review from unixfox and Fijxu June 12, 2025 18:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant