From 0b9b1cea5d819d76448163d1c5cc7331e720730d Mon Sep 17 00:00:00 2001 From: Kapil Arya Date: Wed, 20 Aug 2014 13:02:10 -0700 Subject: [PATCH 1/4] cpp/{README -> README.md} --- cpp/{README => README.md} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename cpp/{README => README.md} (95%) diff --git a/cpp/README b/cpp/README.md similarity index 95% rename from cpp/README rename to cpp/README.md index ce0f496..53104fc 100644 --- a/cpp/README +++ b/cpp/README.md @@ -3,7 +3,7 @@ RENDLER implementation in C++ Dependencies: ============ -- libboost_regex.so +- libboost\_regex.so - libcurl.so - Makefile assumes all 3rdparty libraries/headers to be available in the default include path (/usr/include?). @@ -18,7 +18,7 @@ Limitations/Features: ==================== - Doesn't store the images to S3, just locally. - Image files are kept in rendler-work-dir in the same folder as the - render_executor executable. + render\_executor executable. - Images files are named R where N is a monotonouly increasing integer. - It wouldn't crawl outside of the given base URL (it will still render those webpages) to avoid pulling in too much data. From 2c0b7351b3955eb16fc191350ae0a579a4494171 Mon Sep 17 00:00:00 2001 From: Kapil Arya Date: Wed, 20 Aug 2014 13:04:47 -0700 Subject: [PATCH 2/4] Minor updates to cpp/README.md --- cpp/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/README.md b/cpp/README.md index 53104fc..48fe7a3 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -28,5 +28,5 @@ Limitations/Features: Communication between Scheduler and Executors: ============================================= - Each framework message consists of a vector of strings: - RenderExecuter->Scheduler: { taskId, taskUrl, filepath } - CrawlExecuter->Scheduler: { taskId, taskUrl, + } + - RenderExecuter->Scheduler: { taskId, taskUrl, filepath } + - CrawlExecuter->Scheduler: { taskId, taskUrl, \+ } From 30852abbe073ef6cd7a3b67eebff14e35027b7d1 Mon Sep 17 00:00:00 2001 From: Kapil Arya Date: Thu, 21 Aug 2014 21:37:12 +0000 Subject: [PATCH 3/4] Fix compilation flags for C++ implementation. --- cpp/Makefile | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/cpp/Makefile b/cpp/Makefile index fd47f4f..8382958 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -1,8 +1,15 @@ +# Prefix where Mesos is installed +MESOS_PREFIX = /usr +#MESOS_PREFIX = $(HOME)/usr + CXX = g++ -CXXFLAGS = -g -O2 -LDFLAGS += -lmesos +CXXFLAGS = -g -g2 -O2 -std=c++11 +LDFLAGS = -L$(MESOS_PREFIX)/lib +INCLUDES = -I$(MESOS_PREFIX)/include + +LDFLAGS += -lmesos -lprotobuf -lpthread CXXCOMPILE = $(CXX) $(INCLUDES) $(CXXFLAGS) -c -o $@ -CXXLINK = $(CXX) $(INCLUDES) $(CXXFLAGS) $(LDFLAGS) -o $@ +CXXLINK = $(CXX) $(INCLUDES) $(CXXFLAGS) -o $@ $< $(LDFLAGS) default: all all: rendler crawl_executor render_executor @@ -10,10 +17,10 @@ all: rendler crawl_executor render_executor HEADERS = rendler_helper.hpp crawl_executor: crawl_executor.cpp $(HEADERS) - $(CXXLINK) $< -lboost_regex -lcurl + $(CXXLINK) -lboost_regex -lcurl %: %.cpp $(HEADERS) - $(CXXLINK) $< + $(CXXLINK) check: crawl ./crawl http://mesosphere.io/team/ From 8fac3594f850da8f4adb4b4d5e9f222048713aae Mon Sep 17 00:00:00 2001 From: Kapil Arya Date: Thu, 21 Aug 2014 23:51:20 -0700 Subject: [PATCH 4/4] Added C++ skeleton rendler. --- cpp/Makefile | 4 +- cpp/rendler_skeleton.cpp | 377 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 379 insertions(+), 2 deletions(-) create mode 100644 cpp/rendler_skeleton.cpp diff --git a/cpp/Makefile b/cpp/Makefile index 8382958..56eb2cc 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -12,7 +12,7 @@ CXXCOMPILE = $(CXX) $(INCLUDES) $(CXXFLAGS) -c -o $@ CXXLINK = $(CXX) $(INCLUDES) $(CXXFLAGS) -o $@ $< $(LDFLAGS) default: all -all: rendler crawl_executor render_executor +all: rendler rendler_skeleton crawl_executor render_executor HEADERS = rendler_helper.hpp @@ -26,4 +26,4 @@ check: crawl ./crawl http://mesosphere.io/team/ clean: - (rm -f core crawl_executor render_executor rendler) + (rm -f core crawl_executor render_executor rendler rendler_skeleton) diff --git a/cpp/rendler_skeleton.cpp b/cpp/rendler_skeleton.cpp new file mode 100644 index 0000000..b5a4c91 --- /dev/null +++ b/cpp/rendler_skeleton.cpp @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// See the Mesos Framework Development Guide: +// http://mesos.apache.org/documentation/latest/app-framework-development-guide + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "rendler_helper.hpp" + +using namespace mesos; + +using std::cout; +using std::endl; +using std::string; +using std::vector; +using std::queue; +using std::map; + +using mesos::Resources; + +const float CPUS_PER_TASK = 0.2; +const int32_t MEM_PER_TASK = 32; + +static queue crawlQueue; +static queue renderQueue; +static map > crawlResults; +static map renderResults; +static map processed; +static size_t nextUrlId = 0; + +MesosSchedulerDriver* schedulerDriver; + +static void shutdown(); +static void SIGINTHandler(); + +class Rendler : public Scheduler +{ +public: + Rendler(const ExecutorInfo& _crawler, + const ExecutorInfo& _renderer, + const string& _seedUrl) + : crawler(_crawler), + renderer(_renderer), + seedUrl(_seedUrl), + tasksLaunched(0), + tasksFinished(0), + frameworkMessagesReceived(0) + { + crawlQueue.push(seedUrl); + renderQueue.push(seedUrl); + processed[seedUrl] = nextUrlId++; + size_t lsp = seedUrl.find_last_of('/'); + baseUrl = seedUrl.substr(0, lsp); // No trailing slash + TASK_RESOURCES = Resources::parse( + "cpus:" + stringify(CPUS_PER_TASK) + + ";mem:" + stringify(MEM_PER_TASK)).get(); + + } + + virtual ~Rendler() {} + + + // Invoked when the scheduler successfully registers with a Mesos master. + // It is called with the frameworkId, a unique ID generated by the + // master, and the masterInfo which is information about the master + // itself. + virtual void registered(SchedulerDriver*, + const FrameworkID&, + const MasterInfo&) + { + cout << "Registered!" << endl; + } + + // Invoked when the scheduler re-registers with a newly elected Mesos + // master. This is only called when the scheduler has previously been + // registered. masterInfo contains information about the newly elected + // master. + virtual void reregistered(SchedulerDriver*, const MasterInfo& masterInfo) {} + + virtual void disconnected(SchedulerDriver* driver) {} + + size_t maxTasksForOffer(Offer offer) + { + size_t count = 0; + Resources remaining = offer.resources(); + // + // TODO + // + return count; + } + + TaskInfo makeTaskPrototype(Offer& offer, const ExecutorInfo& executor) + { + TaskInfo task; + task.mutable_slave_id()->MergeFrom(offer.slave_id()); + task.mutable_executor()->MergeFrom(executor); + task.mutable_resources()->MergeFrom(TASK_RESOURCES); + return task; + } + + TaskInfo makeCrawlTask(Offer& offer) + { + string url = crawlQueue.front(); + crawlQueue.pop(); + + TaskInfo task = makeTaskPrototype(offer, crawler); + // + // TODO: Fill in task name, id, and data + // + return task; + } + + TaskInfo makeRenderTask(Offer& offer) + { + string url = renderQueue.front(); + renderQueue.pop(); + + TaskInfo task = makeTaskPrototype(offer, renderer); + // + // TODO: Fill in task name, id, and data + // + return task; + } + + // Invoked when resources have been offered to this framework. A single + // offer will only contain resources from a single slave. Resources + // associated with an offer will not be re-offered to _this_ framework + // until either (a) this framework has rejected those resources (see + // SchedulerDriver.launchTasks) or (b) those resources have been + // rescinded (see Scheduler.offerRescinded). Note that resources may be + // concurrently offered to more than one framework at a time (depending + // on the allocator being used). In that case, the first framework to + // launch tasks using those resources will be able to use them while the + // other frameworks will have those resources rescinded (or if a + // framework has already launched tasks with those resources then those + // tasks will fail with a TASK_LOST status and a message saying as much). + virtual void resourceOffers(SchedulerDriver* driver, + const vector& offers) + { + cout << "Received resource offer(s)" << endl; + // + // TODO: Launch tasks. + // + } + + virtual void offerRescinded(SchedulerDriver* driver, + const OfferID& offerId) {} + + // Invoked when the status of a task has changed (e.g., a slave is lost + // and so the task is lost, a task finishes and an executor sends a + // status update saying so, etc.) Note that returning from this callback + // acknowledges receipt of this status update. If for whatever reason + // the scheduler aborts during this callback (or the process exits) + // another status update will be delivered. Note, however, that this is + // currently not true if the slave sending the status update is lost or + // fails during that time. + virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status) + { + if (status.state() == TASK_FINISHED) { + cout << "Task " << status.task_id().value() << " finished" << endl; + tasksFinished++; + } + + // + // TODO: Terminate by calling shutdown() and driver->stop() if all running + // tasks have finished and there are no more queued tasks. + // + } + + // Invoked when an executor sends a message. These messages are best + // effort; do not expect a framework message to be retransmitted in any + // reliable fashion. + virtual void frameworkMessage(SchedulerDriver* driver, + const ExecutorID& executorId, + const SlaveID& slaveId, + const string& data) + { + vector strVector = stringToVector(data); + string taskId = strVector[0]; + string url = strVector[1]; + + if (executorId.value() == crawler.executor_id().value()) { + cout << "Crawler msg received: " << taskId << endl; + vector newURLs(strVector.begin() + 2, strVector.end()); + // + // TODO + // + } else { + cout << "Renderer msg received: " << taskId << endl; + string path = strVector[2]; + // + // TODO + // + } + } + + // Invoked when a slave has been determined unreachable (e.g., machine + // failure, network partition.) Most frameworks will need to reschedule + // any tasks launched on this slave on a new slave. + virtual void slaveLost(SchedulerDriver* driver, const SlaveID& sid) {} + + // Invoked when an executor has exited/terminated. Note that any tasks + // running will have TASK_LOST status updates automatically generated. + virtual void executorLost(SchedulerDriver* driver, + const ExecutorID& executorID, + const SlaveID& slaveID, + int status) {} + + // Invoked when there is an unrecoverable error in the scheduler or + // scheduler driver. The driver will be aborted BEFORE invoking this + // callback. + virtual void error(SchedulerDriver* driver, const string& message) + { + cout << message << endl; + } + +private: + const ExecutorInfo crawler; + const ExecutorInfo renderer; + string seedUrl; + string baseUrl; + size_t tasksLaunched; + size_t tasksFinished; + size_t frameworkMessagesReceived; + Resources TASK_RESOURCES; +}; + +static void shutdown() +{ + printf("Rendler is shutting down\n"); + printf("Writing results to result.dot\n"); + + FILE *f = fopen("result.dot", "w"); + fprintf(f, "digraph G {\n"); + fprintf(f, " node [shape=box];\n"); + + // Add vertices. + map::iterator rit; + for (rit = renderResults.begin(); rit != renderResults.end(); rit++) { + // Prepend character as dot vertices cannot starting with a digit. + string url_hash = "R" + stringify(processed[rit->first]); + string& filename = rit->second; + fprintf(f, + " %s[label=\"\" image=\"%s\"];\n", + url_hash.c_str(), + filename.c_str()); + } + + // Add edges. + map >::iterator cit; + for (cit = crawlResults.begin(); cit != crawlResults.end(); cit++) { + if (renderResults.find(cit->first) == renderResults.end()) { + continue; + } + string from_hash = "R" + stringify(processed[cit->first]); + vector& adjList = cit->second; + + for (size_t i = 0; i < adjList.size(); i++) { + string to_hash = "R" + stringify(processed[adjList[i]]); + if (renderResults.find(adjList[i]) != renderResults.end()) { + // DOT format is: + // A -> B; + fprintf(f, " %s -> %s;\n", from_hash.c_str(), to_hash.c_str()); + } + } + } + + fprintf(f, "}\n"); + fclose(f); +} + +static void SIGINTHandler(int signum) +{ + if (schedulerDriver != NULL) { + shutdown(); + schedulerDriver->stop(); + } + delete schedulerDriver; + exit(0); +} + +#define shift argc--,argv++ +int main(int argc, char** argv) +{ + string seedUrl, master; + shift; + while (true) { + string s = argc>0 ? argv[0] : "--help"; + if (argc > 1 && s == "--seedUrl") { + seedUrl = argv[1]; + shift; shift; + } else if (argc > 1 && s == "--master") { + master = argv[1]; + shift; shift; + } else { + break; + } + } + + if (master.length() == 0 || seedUrl.length() == 0) { + printf("Usage: rendler --seedUrl --master :\n"); + exit(1); + } + + // Find this executable's directory to locate executor. + string path = realpath(dirname(argv[0]), NULL); + string crawlerURI = path + "/crawl_executor"; + string rendererURI = path + "/render_executor"; + cout << crawlerURI << endl; + cout << rendererURI << endl; + + ExecutorInfo crawler; + crawler.mutable_executor_id()->set_value("Crawler"); + crawler.mutable_command()->set_value(crawlerURI); + crawler.set_name("Crawl Executor (C++)"); + crawler.set_source("cpp"); + + ExecutorInfo renderer; + renderer.mutable_executor_id()->set_value("Renderer"); + renderer.mutable_command()->set_value(rendererURI); + renderer.set_name("Render Executor (C++)"); + renderer.set_source("cpp"); + + Rendler scheduler(crawler, renderer, seedUrl); + + FrameworkInfo framework; + framework.set_user(""); // Have Mesos fill in the current user. + framework.set_name("Rendler Framework (C++)"); + //framework.set_role(role); + framework.set_principal("rendler-cpp"); + + // Set up the signal handler for SIGINT for clean shutdown. + struct sigaction action; + action.sa_handler = SIGINTHandler; + sigemptyset(&action.sa_mask); + action.sa_flags = 0; + sigaction(SIGINT, &action, NULL); + + schedulerDriver = new MesosSchedulerDriver(&scheduler, framework, master); + + int status = schedulerDriver->run() == DRIVER_STOPPED ? 0 : 1; + + // Ensure that the driver process terminates. + schedulerDriver->stop(); + + shutdown(); + + delete schedulerDriver; + return status; +}