Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions gunicorn/arbiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def start(self):
if not (self.cfg.reuse_port and hasattr(socket, 'SO_REUSEPORT')):
self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds)

listeners_str = ",".join([str(lnr) for lnr in self.LISTENERS])
listeners_str = ",".join([sock.get_uri(lnr, self.cfg.is_ssl) for lnr in self.LISTENERS])
self.log.debug("Arbiter booted")
self.log.info("Listening at: %s (%s)", listeners_str, self.pid)
self.log.info("Using worker: %s", self.cfg.worker_class_str)
Expand Down Expand Up @@ -459,7 +459,7 @@ def reload(self):
lnr.close()
# init new listeners
self.LISTENERS = sock.create_sockets(self.cfg, self.log)
listeners_str = ",".join([str(lnr) for lnr in self.LISTENERS])
listeners_str = ",".join([sock.get_uri(lnr, self.cfg.is_ssl) for lnr in self.LISTENERS])
self.log.info("Listening at: %s", listeners_str)

# do some actions on reload
Expand Down Expand Up @@ -601,8 +601,8 @@ def manage_workers(self):
"mtype": "gauge"})

if self.cfg.enable_backlog_metric:
backlog = sum(sock.get_backlog() or 0
for sock in self.LISTENERS)
backlog = sum(sock.get_backlog(lnr) or 0
for lnr in self.LISTENERS)

if backlog >= 0:
self.log.debug("socket backlog: {0}".format(backlog),
Expand Down
281 changes: 115 additions & 166 deletions gunicorn/sock.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import socket
import ssl
import stat
import struct
import sys
import time

Expand All @@ -16,150 +15,79 @@
PLATFORM = sys.platform


class BaseSocket:
if PLATFORM != "linux":
def get_backlog(sock):
return -1
else:
import struct
# tcp_info struct from include/uapi/linux/tcp.h
_TCPI_FMT = '=' + 'B' * 8 + 'I' * 24
# getsockopt silently truncates to requested length
_TCPI_LEN = struct.calcsize(_TCPI_FMT) # 104
_TCPI_INDEX_UNACKED = 12

def get_backlog(sock):
if sock.family not in (socket.AF_INET, socket.AF_INET6):
return -1
try:
tcp_info_struct = sock.getsockopt(socket.IPPROTO_TCP,
socket.TCP_INFO, _TCPI_LEN)
return struct.unpack(_TCPI_FMT, tcp_info_struct)[_TCPI_INDEX_UNACKED]
except (AttributeError, OSError):
pass
return 0

def __init__(self, address, conf, log, fd=None):
self.log = log
self.conf = conf

self.cfg_addr = address
if fd is None:
sock = socket.socket(self.FAMILY, socket.SOCK_STREAM)
bound = False
def _get_socket_family(addr):
if isinstance(addr, tuple):
if util.is_ipv6(addr[0]):
return socket.AF_INET6
else:
sock = socket.fromfd(fd, self.FAMILY, socket.SOCK_STREAM)
os.close(fd)
bound = True
return socket.AF_INET

self.sock = self.set_options(sock, bound=bound)
if isinstance(addr, (str, bytes)):
return socket.AF_UNIX

def __str__(self):
return "<socket %d>" % self.sock.fileno()
raise TypeError("Unable to determine socket family for: %r" % addr)

def __getattr__(self, name):
return getattr(self.sock, name)

def set_options(self, sock, bound=False):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if (self.conf.reuse_port
and hasattr(socket, 'SO_REUSEPORT')): # pragma: no cover
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except OSError as err:
if err.errno not in (errno.ENOPROTOOPT, errno.EINVAL):
raise
if not bound:
self.bind(sock)
sock.setblocking(0)

# make sure that the socket can be inherited
if hasattr(sock, "set_inheritable"):
sock.set_inheritable(True)

sock.listen(self.conf.backlog)
return sock

def bind(self, sock):
sock.bind(self.cfg_addr)

def close(self):
if self.sock is None:
return
def create_socket(conf, log, addr):
family = _get_socket_family(addr)

if family is socket.AF_UNIX:
# remove any existing socket at the given path
try:
self.sock.close()
except OSError as e:
self.log.info("Error while closing socket %s", str(e))

self.sock = None

def get_backlog(self):
return -1


class TCPSocket(BaseSocket):

FAMILY = socket.AF_INET

def __str__(self):
if self.conf.is_ssl:
scheme = "https"
st = os.stat(addr)
except OSError as err:
if err.args[0] != errno.ENOENT:
raise
else:
scheme = "http"

addr = self.sock.getsockname()
return "%s://%s:%d" % (scheme, addr[0], addr[1])

def set_options(self, sock, bound=False):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
return super().set_options(sock, bound=bound)

if PLATFORM == "linux":
def get_backlog(self):
if self.sock:
# tcp_info struct from include/uapi/linux/tcp.h
fmt = 'B' * 8 + 'I' * 24
try:
tcp_info_struct = self.sock.getsockopt(socket.IPPROTO_TCP,
socket.TCP_INFO, 104)
# 12 is tcpi_unacked
return struct.unpack(fmt, tcp_info_struct)[12]
except (AttributeError, OSError):
pass
return 0
else:
def get_backlog(self):
return -1


class TCP6Socket(TCPSocket):

FAMILY = socket.AF_INET6

def __str__(self):
(host, port, _, _) = self.sock.getsockname()
return "http://[%s]:%d" % (host, port)


class UnixSocket(BaseSocket):

FAMILY = socket.AF_UNIX

def __init__(self, addr, conf, log, fd=None):
if fd is None:
try:
st = os.stat(addr)
except OSError as e:
if e.args[0] != errno.ENOENT:
raise
if stat.S_ISSOCK(st.st_mode):
os.remove(addr)
else:
if stat.S_ISSOCK(st.st_mode):
os.remove(addr)
else:
raise ValueError("%r is not a socket" % addr)
super().__init__(addr, conf, log, fd=fd)

def __str__(self):
return "unix:%s" % self.cfg_addr

def bind(self, sock):
old_umask = os.umask(self.conf.umask)
sock.bind(self.cfg_addr)
util.chown(self.cfg_addr, self.conf.uid, self.conf.gid)
os.umask(old_umask)
raise ValueError("%r already exists but is not a UNIX socket" % addr)

for i in range(5):
try:
sock = socket.socket(family)
sock.bind(addr)
sock.listen(conf.backlog)
if family is socket.AF_UNIX:
util.chown(addr, conf.uid, conf.gid)
return sock
except OSError as e:
if e.errno == errno.EADDRINUSE:
log.error("Connection in use: %s", str(addr))
if e.errno == errno.EADDRNOTAVAIL:
log.error("Invalid address: %s", str(addr))
msg = "connection to {addr} failed: {error}"
log.error(msg.format(addr=str(addr), error=str(e)))
if i < 5:
log.debug("Retrying in 1 second.")
time.sleep(1)

def _sock_type(addr):
if isinstance(addr, tuple):
if util.is_ipv6(addr[0]):
sock_type = TCP6Socket
else:
sock_type = TCPSocket
elif isinstance(addr, (str, bytes)):
sock_type = UnixSocket
else:
raise TypeError("Unable to create socket from: %r" % addr)
return sock_type
log.error("Can't connect to %s", str(addr))
sys.exit(1)


def create_sockets(conf, log, fds=None):
Expand Down Expand Up @@ -190,49 +118,70 @@ def create_sockets(conf, log, fds=None):
# sockets are already bound
if fdaddr:
for fd in fdaddr:
sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
sock_name = sock.getsockname()
sock_type = _sock_type(sock_name)
listener = sock_type(sock_name, conf, log, fd=fd)
listeners.append(listener)

# no file descriptor duplication
sock = socket.socket(fileno=fd)
sock.listen(conf.backlog)
set_socket_options(conf, sock)
listeners.append(sock)
return listeners

# no sockets is bound, first initialization of gunicorn in this env.
for addr in laddr:
sock_type = _sock_type(addr)
sock = None
for i in range(5):
try:
sock = sock_type(addr, conf, log)
except OSError as e:
if e.args[0] == errno.EADDRINUSE:
log.error("Connection in use: %s", str(addr))
if e.args[0] == errno.EADDRNOTAVAIL:
log.error("Invalid address: %s", str(addr))
msg = "connection to {addr} failed: {error}"
log.error(msg.format(addr=str(addr), error=str(e)))
if i < 5:
log.debug("Retrying in 1 second.")
time.sleep(1)
else:
break

if sock is None:
log.error("Can't connect to %s", str(addr))
sys.exit(1)

listeners.append(sock)
old_umask = os.umask(conf.umask)
try:
for addr in laddr:
sock = create_socket(conf, log, addr)
set_socket_options(conf, sock)
listeners.append(sock)
finally:
os.umask(old_umask)

return listeners


def close_sockets(listeners, unlink=True):
for sock in listeners:
sock_name = sock.getsockname()
sock.close()
if unlink and _sock_type(sock_name) is UnixSocket:
os.unlink(sock_name)
try:
if unlink and sock.family is socket.AF_UNIX:
sock_name = sock.getsockname()
os.unlink(sock_name)
finally:
sock.close()


def get_uri(listener, is_ssl):
addr = listener.getsockname()
family = _get_socket_family(addr)
scheme = "https" if is_ssl else "http"

if family is socket.AF_INET:
(host, port) = listener.getsockname()
return f"{scheme}://{host}:{port}"

if family is socket.AF_INET6:
(host, port, _, _) = listener.getsockname()
return f"{scheme}://[{host}]:{port}"

if family is socket.AF_UNIX:
path = listener.getsockname()
return f"unix://{path}"


def set_socket_options(conf, sock):
sock.setblocking(False)

# make sure that the socket can be inherited
if hasattr(sock, "set_inheritable"):
sock.set_inheritable(True)

if sock.family in (socket.AF_INET, socket.AF_INET6):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if (conf.reuse_port and hasattr(socket, 'SO_REUSEPORT')): # pragma: no cover
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except OSError as err:
if err.errno not in (errno.ENOPROTOOPT, errno.EINVAL):
raise


def ssl_context(conf):
Expand Down
7 changes: 4 additions & 3 deletions gunicorn/workers/gasgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import signal
import sys

from gunicorn.sock import get_uri
from gunicorn.workers import base
from gunicorn.asgi.protocol import ASGIProtocol

Expand Down Expand Up @@ -178,15 +179,15 @@ async def _serve(self):
try:
server = await self.loop.create_server(
lambda: ASGIProtocol(self),
sock=sock.sock,
sock=sock,
ssl=ssl_context,
reuse_address=True,
start_serving=True,
)
self.servers.append(server)
self.log.info("ASGI server listening on %s", sock)
self.log.info("ASGI server listening on %s", get_uri(sock, self.cfg.is_ssl))
except Exception as e:
self.log.error("Failed to create server on %s: %s", sock, e)
self.log.error("Failed to create server on %s: %s", get_uri(sock, self.cfg.is_ssl), e)

if not self.servers:
self.log.error("No servers could be started")
Expand Down
4 changes: 2 additions & 2 deletions gunicorn/workers/ggevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def patch(self):
# patch sockets
sockets = []
for s in self.sockets:
sockets.append(socket.socket(s.FAMILY, socket.SOCK_STREAM,
fileno=s.sock.detach()))
sockets.append(socket.socket(s.family, socket.SOCK_STREAM,
fileno=s.detach()))
self.sockets = sockets

def notify(self):
Expand Down
Loading