Skip to content

Implement PollerMetrics for EpollSystem #4335

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: series/3.6.x
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 120 additions & 1 deletion core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object EpollSystem extends PollingSystem {

def interrupt(targetThread: Thread, targetPoller: Poller): Unit = ()

def metrics(poller: Poller): PollerMetrics = PollerMetrics.noop
def metrics(poller: Poller): PollerMetrics = poller.metrics()

private final class FileDescriptorPollerImpl private[EpollSystem] (
ctx: PollingContext[Poller])
Expand Down Expand Up @@ -181,6 +181,115 @@ object EpollSystem extends PollingSystem {

final class Poller private[EpollSystem] (epfd: Int) {

private var totalReadSubmitted = 0L
private var totalReadSucceeded = 0L
private var totalReadErrored = 0L
private var totalReadCanceled = 0L
private var readOutstanding = 0

private var totalWriteSubmitted = 0L
private var totalWriteSucceeded = 0L
private var totalWriteErrored = 0L
private var totalWriteCanceled = 0L
private var writeOutstanding = 0

private[this] val pollerMetrics: PollerMetrics = new PollerMetrics {

override def operationsOutstandingCount(): Int = readOutstanding + writeOutstanding

override def totalOperationsSubmittedCount(): Long =
totalReadSubmitted + totalWriteSubmitted

override def totalOperationsSucceededCount(): Long =
totalReadSucceeded + totalWriteSucceeded

override def totalOperationsErroredCount(): Long = totalReadErrored + totalWriteErrored

override def totalOperationsCanceledCount(): Long = totalReadCanceled + totalWriteCanceled

override def acceptOperationsOutstandingCount(): Int = 0

override def totalAcceptOperationsSubmittedCount(): Long = 0L

override def totalAcceptOperationsSucceededCount(): Long = 0L

override def totalAcceptOperationsErroredCount(): Long = 0L

override def totalAcceptOperationsCanceledCount(): Long = 0L

override def connectOperationsOutstandingCount(): Int = 0

override def totalConnectOperationsSubmittedCount(): Long = 0L

override def totalConnectOperationsSucceededCount(): Long = 0L

override def totalConnectOperationsErroredCount(): Long = 0L

override def totalConnectOperationsCanceledCount(): Long = 0L

override def readOperationsOutstandingCount(): Int = readOutstanding

override def totalReadOperationsSubmittedCount(): Long = totalReadSubmitted

override def totalReadOperationsSucceededCount(): Long = totalReadSucceeded

override def totalReadOperationsErroredCount(): Long = totalReadErrored

override def totalReadOperationsCanceledCount(): Long = totalReadCanceled

override def writeOperationsOutstandingCount(): Int = writeOutstanding

override def totalWriteOperationsSubmittedCount(): Long = totalWriteSubmitted

override def totalWriteOperationsSucceededCount(): Long = totalWriteSucceeded

override def totalWriteOperationsErroredCount(): Long = totalWriteErrored

override def totalWriteOperationsCanceledCount(): Long = totalWriteCanceled

override def toString: String = "Epoll"
}

private[EpollSystem] def metrics(): PollerMetrics = pollerMetrics

private[this] def incrementOperationCount(reads: Boolean, writes: Boolean): Unit = {
if (reads) {
totalReadSubmitted += 1
readOutstanding += 1
}
if (writes) {
totalWriteSubmitted += 1
writeOutstanding += 1
}
}

private[this] def handleOperationCompletion(
reads: Boolean,
writes: Boolean,
succeeded: Boolean): Unit = {
if (reads) {
readOutstanding -= 1
if (succeeded) totalReadSucceeded += 1 else totalReadErrored += 1
}

if (writes) {
writeOutstanding -= 1
if (succeeded) totalWriteSucceeded += 1 else totalWriteErrored += 1
}
}

private[this] def handleOperationCanceled(reads: Boolean, writes: Boolean): Unit = {
if (reads) {
readOutstanding -= 1
totalReadCanceled += 1
}

if (writes) {
writeOutstanding -= 1
totalWriteCanceled += 1
}
}

private[this] val handles: Set[PollHandle] =
Collections.newSetFromMap(new IdentityHashMap)

Expand Down Expand Up @@ -213,6 +322,14 @@ object EpollSystem extends PollingSystem {
while (i < readyEventCount) {
val event = events + i.toLong
val handle = fromPtr(event.data)
val eventFlags = event.events.toInt

val succeded = eventFlags != 0
handleOperationCompletion(
reads = (eventFlags & EPOLLIN) != 0,
writes = (eventFlags & EPOLLOUT) != 0,
succeded)

handle.notify(event.events.toInt)
i += 1
}
Expand All @@ -239,8 +356,10 @@ object EpollSystem extends PollingSystem {
Left(new IOException(fromCString(strerror(errno))))
else {
handles.add(handle)
incrementOperationCount(reads, writes)
val remove = IO {
handles.remove(handle)
handleOperationCanceled(reads, writes)
if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, null) != 0)
throw new IOException(fromCString(strerror(errno)))
}
Expand Down
Loading