Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 41 additions & 44 deletions src/helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,6 @@ CBDATA_CLASS_INIT(helper_stateful_server);

InstanceIdDefinitions(HelperServerBase, "Hlpr");

void
HelperServerBase::initStats()
{
stats.uses=0;
stats.replies=0;
stats.pending=0;
stats.releases=0;
stats.timedout = 0;
}

void
HelperServerBase::closePipesSafely(const char *id_name)
{
Expand Down Expand Up @@ -136,6 +126,24 @@ HelperServerBase::dropQueued()
}
}

HelperServerBase::HelperServerBase(const char *desc, Ip::Address &ip, int aPid, void *aIpc, int rfd, int wfd) :
pid(aPid),
addr(ip),
readPipe(new Comm::Connection),
writePipe(new Comm::Connection),
hIpc(aIpc)
{
rbuf = static_cast<char *>(memAllocBuf(ReadBufSize, &rbuf_sz));

readPipe->fd = rfd;
readPipe->noteStart();
fd_note(readPipe->fd, desc);

writePipe->fd = wfd;
writePipe->noteStart();
fd_note(writePipe->fd, desc);
}

HelperServerBase::~HelperServerBase()
{
if (rbuf) {
Expand All @@ -144,6 +152,13 @@ HelperServerBase::~HelperServerBase()
}
}

helper_server::helper_server(helper *hlp, int aPid, void *aIpc, int rfd, int wfd) :
HelperServerBase(hlp->cmdline->key, hlp->addr, aPid, aIpc, rfd, wfd),
parent(cbdataReference(hlp))
{
wqueue = new MemBuf;
}

helper_server::~helper_server()
{
wqueue->clean();
Expand Down Expand Up @@ -174,6 +189,12 @@ helper_server::dropQueued()
requestsIndex.clear();
}

helper_stateful_server::helper_stateful_server(statefulhelper *hlp, int aPid, void *aIpc, int rfd, int wfd) :
HelperServerBase(hlp->cmdline->key, hlp->addr, aPid, aIpc, rfd, wfd),
parent(cbdataReference(hlp))
{
}

helper_stateful_server::~helper_stateful_server()
{
/* TODO: walk the local queue of requests and carry them all out */
Expand Down Expand Up @@ -201,7 +222,6 @@ helperOpenServers(helper * hlp)
char *procname;
const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
char fd_note_buf[FD_DESC_SZ];
helper_server *srv;
int nargs = 0;
int k;
pid_t pid;
Expand Down Expand Up @@ -265,22 +285,7 @@ helperOpenServers(helper * hlp)

++ hlp->childs.n_running;
++ hlp->childs.n_active;
srv = new helper_server;
srv->hIpc = hIpc;
srv->pid = pid;
srv->initStats();
srv->addr = hlp->addr;
srv->readPipe = new Comm::Connection;
srv->readPipe->fd = rfd;
srv->writePipe = new Comm::Connection;
srv->writePipe->fd = wfd;
srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
srv->wqueue = new MemBuf;
srv->roffset = 0;
srv->nextRequestId = 0;
srv->replyXaction = NULL;
srv->ignoreToEom = false;
srv->parent = cbdataReference(hlp);
auto *srv = new helper_server(hlp, pid, hIpc, rfd, wfd);
dlinkAddTail(srv, &srv->link, &hlp->servers);

if (rfd == wfd) {
Expand Down Expand Up @@ -392,20 +397,7 @@ helperStatefulOpenServers(statefulhelper * hlp)

++ hlp->childs.n_running;
++ hlp->childs.n_active;
helper_stateful_server *srv = new helper_stateful_server;
srv->hIpc = hIpc;
srv->pid = pid;
srv->initStats();
srv->addr = hlp->addr;
srv->readPipe = new Comm::Connection;
srv->readPipe->fd = rfd;
srv->writePipe = new Comm::Connection;
srv->writePipe->fd = wfd;
srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
srv->roffset = 0;
srv->parent = cbdataReference(hlp);
srv->reservationStart = 0;

auto *srv = new helper_stateful_server(hlp, pid, hIpc, rfd, wfd);
dlinkAddTail(srv, &srv->link, &hlp->servers);

if (rfd == wfd) {
Expand Down Expand Up @@ -553,7 +545,7 @@ helper::submit(const char *buf, HLPCB * callback, void *data)
{
Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
submitRequest(r);
debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
debugs(84, DBG_DATA, "eom=" << int(eom) << ", " << Raw("buf", buf, strlen(buf)));
}

/// Submit request or callback the caller with a Helper::Error error.
Expand Down Expand Up @@ -663,7 +655,7 @@ statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Help
}

debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
"', " << Raw("buf", buf, (!buf?0:strlen(buf))));
"', eom=" << int(eom) << ", " << Raw("buf", buf, (!buf?0:strlen(buf))));

syncQueueStats();
}
Expand Down Expand Up @@ -980,8 +972,8 @@ helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::
assert(cbdataReferenceValid(data));

/* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */

if (flag == Comm::ERR_CLOSING) {
debugs(84, 5, hlp->id_name << " #" << srv->index << " is ERR_CLOSING " << conn);
return;
}

Expand Down Expand Up @@ -1024,6 +1016,8 @@ helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::
--eom;
}
*eom = '\0';
} else {
debugs(84, 3, "no EOM terminator found after " << srv->roffset << " bytes");
}

if (!srv->ignoreToEom && !srv->replyXaction) {
Expand Down Expand Up @@ -1099,6 +1093,7 @@ helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len
/* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */

if (flag == Comm::ERR_CLOSING) {
debugs(84, 5, hlp->id_name << " #" << srv->index << " is ERR_CLOSING " << conn);
return;
}

Expand Down Expand Up @@ -1136,6 +1131,8 @@ helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len
}

*t = '\0';
} else {
debugs(84, 3, "no EOM terminator found after " << srv->roffset << " bytes");
}

if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
Expand Down
Loading