Skip to content
This repository was archived by the owner on Aug 4, 2022. It is now read-only.

[REEF-2054] Elastic group communication: broadcast #1487

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
fa3ed84
first commit
interesaaat Dec 21, 2018
6a68fef
update
interesaaat Dec 28, 2018
14896fa
Removed checkpointing stuff
interesaaat Dec 28, 2018
0a57845
some dir refactoring plus added some stuff.
interesaaat Dec 31, 2018
ef5f339
done with communication layer
interesaaat Jan 1, 2019
d79e997
Done with the task side
interesaaat Jan 2, 2019
1ce6854
Added boradcast example plus few bug fixes
interesaaat Jan 2, 2019
d86b265
fixed bugs
interesaaat Jan 4, 2019
897247a
bug fixes
interesaaat Jan 4, 2019
87f0e7b
Addressing frist round of reviews from Sergiy.
interesaaat Jan 15, 2019
bc54416
- Swtiched log entries from $ to -{0}.
interesaaat Jan 31, 2019
efb630f
Fixed:
interesaaat Jan 31, 2019
835e046
Moved a file into the proper directory
interesaaat Jan 31, 2019
30085b4
Fixed typo.
interesaaat Feb 1, 2019
0cfab73
Added the delegate for task configuration.
interesaaat Feb 2, 2019
c9259ac
Improved the API.
interesaaat Feb 4, 2019
90ff0ed
Few fixes addressing Sergiy's comments
interesaaat Feb 7, 2019
48dcc06
Another round of fixes.
interesaaat Feb 7, 2019
7acd6ec
Addressing Sergiy's new comments.
interesaaat Feb 12, 2019
6a91d96
Fixed stupid error.
interesaaat Feb 12, 2019
55aedfa
added further fixes following Sergiy's suggestions
interesaaat Feb 14, 2019
edb7fd0
Fixed later issues.
interesaaat Feb 19, 2019
301ea51
Done with another pass.
interesaaat Feb 21, 2019
739f9d4
Another round done.
interesaaat Feb 22, 2019
bc80651
Done with another round plus few minor bug fixes.
interesaaat Feb 26, 2019
10634d0
Better node identifier stuff
interesaaat Feb 27, 2019
4739662
Done with another round
interesaaat Mar 4, 2019
08e2f90
Adressed new Sergiy's comments.
interesaaat Mar 25, 2019
eb187cb
Other round of comments.
interesaaat Apr 4, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions lang/cs/Org.Apache.REEF.Common/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Org.Apache.REEF.Network")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

using System;
using System.Globalization;
using System.IO;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Client.API;
using Org.Apache.REEF.Client.Local;
using Org.Apache.REEF.Client.Yarn;
using Org.Apache.REEF.Network.Elastic.Config;
using Org.Apache.REEF.Network.Elastic.Driver.Default;

namespace Org.Apache.REEF.Network.Examples.Client
{
internal class JobIdentifiers
{
public const string ElastiBroadcast = "ElasticBroadcast";
public const string ElastiBroadcastWithFailure = "ElasticBroadcastWithFailure";
}

public sealed class ElasticBroadcastClient<T> where T : DefaultElasticDriver
{
private const string Local = "local";
private const string Yarn = "yarn";
private const string DefaultRuntimeFolder = "REEF_LOCAL_RUNTIME";
private const string stage = "Broadcast";

public ElasticBroadcastClient(
bool runOnYarn,
int numTasks,
int startingPortNo,
int portRange,
string jobIdentifier)
{
string driverId = typeof(T).Name;
JobIdentifier = jobIdentifier;

IConfiguration driverConfig = TangFactory.GetTang()
.NewConfigurationBuilder(GetDriverConf())
.BindNamedParameter<ElasticServiceConfigurationOptions.NumEvaluators, int>(
GenericType<ElasticServiceConfigurationOptions.NumEvaluators>.Class,
numTasks.ToString(CultureInfo.InvariantCulture))
.BindNamedParameter<ElasticServiceConfigurationOptions.StartingPort, int>(
GenericType<ElasticServiceConfigurationOptions.StartingPort>.Class,
startingPortNo.ToString(CultureInfo.InvariantCulture))
.BindNamedParameter<ElasticServiceConfigurationOptions.PortRange, int>(
GenericType<ElasticServiceConfigurationOptions.PortRange>.Class,
portRange.ToString(CultureInfo.InvariantCulture))
.Build();

IConfiguration elsticGroupCommServiceDriverConfig = TangFactory.GetTang()
.NewConfigurationBuilder()
.BindStringNamedParam<ElasticServiceConfigurationOptions.DriverId>(driverId)
.BindStringNamedParam<ElasticServiceConfigurationOptions.DefaultStageName>(stage)
.BindIntNamedParam<ElasticServiceConfigurationOptions.NumberOfTasks>(
numTasks.ToString(CultureInfo.InvariantCulture))
.Build();

IConfiguration merged = Configurations
.Merge(driverConfig, elsticGroupCommServiceDriverConfig);

string runPlatform = runOnYarn ? "yarn" : "local";

TestRun(merged, typeof(T), numTasks, JobIdentifier, runPlatform);
}

private static void TestRun(
IConfiguration driverConfig,
Type globalAssemblyType,
int numberOfEvaluator,
string jobIdentifier = "myDriver",
string runOnYarn = "local",
string runtimeFolder = DefaultRuntimeFolder)
{
IInjector injector = TangFactory.GetTang()
.NewInjector(GetRuntimeConfiguration(runOnYarn, numberOfEvaluator, runtimeFolder));
var reefClient = injector.GetInstance<IREEFClient>();
var jobRequestBuilder = injector.GetInstance<JobRequestBuilder>();
var jobSubmission = jobRequestBuilder
.AddDriverConfiguration(driverConfig)
.AddGlobalAssemblyForType(globalAssemblyType)
.SetJobIdentifier(jobIdentifier)
.Build();

reefClient.SubmitAndGetJobStatus(jobSubmission);
}

private static IConfiguration GetRuntimeConfiguration(
string runOnYarn,
int numberOfEvaluator,
string runtimeFolder)
{
switch (runOnYarn)
{
case Local:
var dir = Path.Combine(".", runtimeFolder);
return LocalRuntimeClientConfiguration.ConfigurationModule
.Set(
LocalRuntimeClientConfiguration.NumberOfEvaluators,
numberOfEvaluator.ToString())
.Set(LocalRuntimeClientConfiguration.RuntimeFolder, dir)
.Build();

case Yarn:
return YARNClientConfiguration.ConfigurationModule.Build();

default:
throw new ArgumentException("Unknown runtime: " + runOnYarn);
}
}

private string JobIdentifier { get; set; }

private IConfiguration GetDriverConf()
{
return DriverConfiguration.ConfigurationModule
.Set(DriverConfiguration.OnDriverStarted, GenericType<T>.Class)
.Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<T>.Class)
.Set(DriverConfiguration.OnEvaluatorFailed, GenericType<T>.Class)
.Set(DriverConfiguration.OnContextActive, GenericType<T>.Class)
.Set(DriverConfiguration.OnTaskRunning, GenericType<T>.Class)
.Set(DriverConfiguration.OnTaskCompleted, GenericType<T>.Class)
.Set(DriverConfiguration.OnTaskFailed, GenericType<T>.Class)
.Set(DriverConfiguration.OnTaskMessage, GenericType<T>.Class)
.Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
.Build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ under the License.
<AssemblyName>Org.Apache.REEF.Network.Examples.Client</AssemblyName>
<Description>REEF Network Client examples</Description>
<PackageTags>REEF Examples Network Client</PackageTags>
<StartupObject></StartupObject>
</PropertyGroup>
<Import Project="..\build.App.props" />
<ItemGroup>
Expand All @@ -34,5 +35,5 @@ under the License.
<ProjectReference Include="..\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj" />
<ProjectReference Include="..\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj" />
</ItemGroup>
<Import Project="..\build.targets"/>
<Import Project="..\build.targets" />
</Project>
136 changes: 122 additions & 14 deletions lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,35 @@
// specific language governing permissions and limitations
// under the License.

using System;
using Org.Apache.REEF.Network.Examples.Elastic;
using Org.Apache.REEF.Network.Examples.GroupCommunication;
using System;

namespace Org.Apache.REEF.Network.Examples.Client
{
internal enum TestType
{
PipelineBroadcastAndReduce,
BroadcastAndReduce,
ElasticBroadcast,
ElasticBroadcastWithFailureInConstructor,
ElasticBroadcastWithFailureBeforeWorkflow,
ElasticBroadcastWithFailEvaluatorBeforeWorkflow,
ElasticBroadcastWithFailureBeforeBroadcast,
ElasticBroadcastWithFailureAfterBroadcast,
ElasticBroadcastWithMultipleFailures
}

public class Run
{
public static void Main(string[] args)
{
Console.WriteLine("start running client: " + DateTime.Now);
bool runOnYarn = false;
int numNodes = 9;
int numNodes = 5;
int startPort = 8900;
int portRange = 1000;
string testToRun = "RunBroadcastAndReduce";
testToRun = testToRun.ToLower();
string testToRun = TestType.ElasticBroadcastWithFailEvaluatorBeforeWorkflow.ToString();

if (args != null)
{
Expand All @@ -56,11 +69,11 @@ public static void Main(string[] args)

if (args.Length > 4)
{
testToRun = args[4].ToLower();
testToRun = args[4];
}
}

if (testToRun.Equals("RunPipelineBroadcastAndReduce".ToLower()) || testToRun.Equals("all"))
if (TestType.PipelineBroadcastAndReduce.Match(testToRun))
{
int arraySize = GroupTestConstants.ArrayLength;
int chunkSize = GroupTestConstants.ChunkSize;
Expand All @@ -71,16 +84,111 @@ public static void Main(string[] args)
chunkSize = int.Parse(args[6]);
}

new PipelineBroadcastAndReduceClient().RunPipelineBroadcastAndReduce(runOnYarn, numNodes, startPort,
portRange, arraySize, chunkSize);
Console.WriteLine("RunPipelineBroadcastAndReduce completed!!!");
new PipelineBroadcastAndReduceClient().RunPipelineBroadcastAndReduce(
runOnYarn,
numNodes,
startPort,
portRange,
arraySize,
chunkSize);
Console.WriteLine("PipelineBroadcastAndReduce completed!!!");
}

if (TestType.BroadcastAndReduce.Match(testToRun))
{
new BroadcastAndReduceClient().RunBroadcastAndReduce(
runOnYarn,
numNodes,
startPort,
portRange);
Console.WriteLine("BroadcastAndReduce completed!!!");
}

if (TestType.ElasticBroadcast.Match(testToRun))
{
new ElasticBroadcastClient<ElasticBroadcastDriver>(
runOnYarn,
numNodes,
startPort,
portRange,
JobIdentifiers.ElastiBroadcast);
Console.WriteLine("ElasticBroadcast completed!!!");
}

if (TestType.ElasticBroadcastWithFailureInConstructor.Match(testToRun))
{
new ElasticBroadcastClient<ElasticBroadcastDriverWithFailures<BroadcastSlaveTaskDieInConstructor>>(
runOnYarn,
numNodes,
startPort,
portRange,
JobIdentifiers.ElastiBroadcastWithFailure);
Console.WriteLine("ElasticBroadcastWithFailureInConstructor completed!!!");
}

if (TestType.ElasticBroadcastWithFailureBeforeWorkflow.Match(testToRun))
{
new ElasticBroadcastClient<ElasticBroadcastDriverWithFailures<BroadcastSlaveTaskDieBeforeWorkflow>>(
runOnYarn,
numNodes,
startPort,
portRange,
JobIdentifiers.ElastiBroadcastWithFailure);
Console.WriteLine("ElasticBroadcastWithFailureBeforeWorkflow completed!!!");
}

if (TestType.ElasticBroadcastWithFailEvaluatorBeforeWorkflow.Match(testToRun))
{
new ElasticBroadcastClient<ElasticBroadcastDriverWithFailures<BroadcastSlaveTaskDieEvaluatorBeforeWorkflow>>(
runOnYarn,
numNodes,
startPort,
portRange,
JobIdentifiers.ElastiBroadcastWithFailure);
Console.WriteLine("ElasticBroadcastWithFailEvaluatorBeforeWorkflow completed!!!");
}

if (TestType.ElasticBroadcastWithFailureBeforeBroadcast.Match(testToRun))
{
new ElasticBroadcastClient<ElasticBroadcastDriverWithFailures<BroadcastSlaveTaskDieBeforeWorkflow>>(
runOnYarn,
numNodes,
startPort,
portRange,
JobIdentifiers.ElastiBroadcastWithFailure);
Console.WriteLine("ElasticBroadcastWithFailureBeforeBroadcast completed!!!");
}

if (TestType.ElasticBroadcastWithFailureAfterBroadcast.Match(testToRun))
{
new ElasticBroadcastClient<ElasticBroadcastDriverWithFailures<BroadcastSlaveTaskDieAfterBroadcast>>(
runOnYarn,
numNodes,
startPort,
portRange,
JobIdentifiers.ElastiBroadcastWithFailure);
Console.WriteLine("ElasticBroadcastWithFailureAfterBroadcast completed!!!");
}

if (testToRun.Equals("RunBroadcastAndReduce".ToLower()) || testToRun.Equals("all"))
if (TestType.ElasticBroadcastWithMultipleFailures.Match(testToRun))
{
new BroadcastAndReduceClient().RunBroadcastAndReduce(runOnYarn, numNodes, startPort, portRange);
Console.WriteLine("RunBroadcastAndReduce completed!!!");
}
new ElasticBroadcastClient<ElasticBroadcastDriverWithFailures<BroadcastSlaveTaskDieMultiple>>(
runOnYarn,
numNodes,
startPort,
portRange,
JobIdentifiers.ElastiBroadcastWithFailure);
Console.WriteLine("ElasticBroadcastWithMultipleFailures completed!!!");
}
}
}

internal static class TestTypeMatcher
{
public static bool Match(this TestType test, string name)
{
name = name.ToLower();
return name.Equals("all") || test.ToString().ToLower().Equals(name);
}
}
}
}
Loading