Skip to content

Commit 9b6f997

Browse files
committed
Merge branch 'develop'
2 parents f45558d + 99cf096 commit 9b6f997

15 files changed

Lines changed: 320 additions & 254 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
sonatype.sbt
12
# see also test/files/.gitignore
23
/test/files/.gitignore
34

CHANGES.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# chill #
22

3+
### 0.7.4 ###
4+
* If the inner Instantiator is cached we should just cache the kayo and…: https://github.com/twitter/chill/pull/243
5+
6+
### 0.7.3 ###
7+
* Update the build: https://github.com/twitter/chill/pull/248
8+
* Use currentThread classloader: https://github.com/twitter/chill/pull/247
9+
310
### 0.7.2 ###
411
* Setting the class loader by default: https://github.com/twitter/chill/pull/242
512

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ Discussion occurs primarily on the [Chill mailing list](https://groups.google.co
159159

160160
## Maven
161161

162-
Chill modules are available on Maven Central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.7.2` and each scala project is published for `2.10` and `2.11`. Search [search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Cchill) when in doubt.
162+
Chill modules are available on Maven Central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.7.4` and each scala project is published for `2.10` and `2.11`. Search [search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Cchill) when in doubt.
163163

164164
## Authors
165165

build.sbt

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
import ReleaseTransformations._
2+
import com.typesafe.sbt.SbtScalariform._
3+
import com.typesafe.tools.mima.plugin.MimaKeys._
4+
import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
5+
import scala.collection.JavaConverters._
6+
import scalariform.formatter.preferences._
7+
8+
val kryoVersion = "2.21"
9+
val bijectionVersion = "0.9.0"
10+
val algebirdVersion = "0.12.0"
11+
12+
def isScala210x(scalaVersion: String) = scalaVersion match {
13+
case version if version startsWith "2.10" => true
14+
case _ => false
15+
}
16+
17+
val sharedSettings = Project.defaultSettings ++ mimaDefaultSettings ++ scalariformSettings ++ Seq(
18+
organization := "com.twitter",
19+
scalaVersion := "2.10.5",
20+
crossScalaVersions := Seq("2.10.5", "2.11.7"),
21+
scalacOptions ++= Seq("-unchecked", "-deprecation"),
22+
ScalariformKeys.preferences := formattingPreferences,
23+
24+
// Twitter Hadoop needs this, sorry 1.7 fans
25+
javacOptions ++= Seq("-target", "1.6", "-source", "1.6", "-Xlint:-options"),
26+
javacOptions in doc := Seq("-source", "1.6"),
27+
28+
resolvers ++= Seq(
29+
Opts.resolver.sonatypeSnapshots,
30+
Opts.resolver.sonatypeReleases
31+
),
32+
libraryDependencies ++= Seq(
33+
"org.scalacheck" %% "scalacheck" % "1.11.5" % "test",
34+
"org.scalatest" %% "scalatest" % "2.2.2" % "test",
35+
"com.esotericsoftware.kryo" % "kryo" % kryoVersion
36+
),
37+
38+
parallelExecution in Test := true,
39+
40+
// Publishing options:
41+
releaseCrossBuild := true,
42+
releasePublishArtifactsAction := PgpKeys.publishSigned.value,
43+
publishMavenStyle := true,
44+
publishArtifact in Test := false,
45+
pomIncludeRepository := { x => false },
46+
47+
publishTo <<= version { v =>
48+
Some(
49+
if (v.trim.toUpperCase.endsWith("SNAPSHOT"))
50+
Opts.resolver.sonatypeSnapshots
51+
else
52+
Opts.resolver.sonatypeStaging
53+
//"twttr" at "http://artifactory.local.twitter.com/libs-releases-local"
54+
)
55+
},
56+
pomExtra := (
57+
<url>https://github.com/twitter/chill</url>
58+
<licenses>
59+
<license>
60+
<name>Apache 2</name>
61+
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
62+
<distribution>repo</distribution>
63+
<comments>A business-friendly OSS license</comments>
64+
</license>
65+
</licenses>
66+
<scm>
67+
<url>git@github.com:twitter/chill.git</url>
68+
<connection>scm:git:git@github.com:twitter/chill.git</connection>
69+
</scm>
70+
<developers>
71+
<developer>
72+
<id>oscar</id>
73+
<name>Oscar Boykin</name>
74+
<url>http://twitter.com/posco</url>
75+
</developer>
76+
<developer>
77+
<id>sritchie</id>
78+
<name>Sam Ritchie</name>
79+
<url>http://twitter.com/sritchie</url>
80+
</developer>
81+
</developers>)
82+
)
83+
84+
// Aggregated project
85+
lazy val chillAll = Project(
86+
id = "chill-all",
87+
base = file("."),
88+
settings = sharedSettings
89+
).settings(
90+
test := { },
91+
publish := { },
92+
publishLocal := { }
93+
).aggregate(
94+
chill,
95+
chillBijection,
96+
chillScrooge,
97+
chillStorm,
98+
chillJava,
99+
chillHadoop,
100+
chillThrift,
101+
chillProtobuf,
102+
chillAkka,
103+
chillAvro,
104+
chillAlgebird
105+
)
106+
107+
lazy val formattingPreferences = {
108+
import scalariform.formatter.preferences._
109+
FormattingPreferences().
110+
setPreference(AlignParameters, false).
111+
setPreference(PreserveSpaceBeforeArguments, true)
112+
}
113+
114+
/**
115+
* This returns the youngest jar we released that is compatible
116+
* with the current.
117+
*/
118+
val unreleasedModules = Set[String]("akka")
119+
val javaOnly = Set[String]("storm", "java", "hadoop", "thrift", "protobuf")
120+
121+
def youngestForwardCompatible(subProj: String) =
122+
Some(subProj)
123+
.filterNot(unreleasedModules.contains(_))
124+
.map { s =>
125+
val suffix = if (javaOnly.contains(s)) "" else "_2.10"
126+
"com.twitter" % ("chill-" + s + suffix) % "0.7.2"
127+
}
128+
129+
def module(name: String) = {
130+
val id = "chill-%s".format(name)
131+
Project(id = id, base = file(id), settings = sharedSettings ++ Seq(
132+
Keys.name := id,
133+
previousArtifact := youngestForwardCompatible(name),
134+
// Disable cross publishing for java artifacts
135+
publishArtifact <<= (scalaVersion) { scalaVersion =>
136+
if(javaOnly.contains(name) && scalaVersion.startsWith("2.11")) false else true
137+
}
138+
)
139+
)
140+
}
141+
142+
// We usually do the pattern of having a core module, but we don't want to cause
143+
// pain for legacy deploys. With this, they can stay the same.
144+
lazy val chill = Project(
145+
id = "chill",
146+
base = file("chill-scala"),
147+
settings = sharedSettings
148+
).settings(
149+
name := "chill",
150+
previousArtifact := Some("com.twitter" % "chill_2.10" % "0.7.2")
151+
).dependsOn(chillJava)
152+
153+
lazy val chillAkka = module("akka").settings(
154+
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
155+
libraryDependencies ++= Seq(
156+
"com.typesafe" % "config" % "1.2.1",
157+
"com.typesafe.akka" %% "akka-actor" % "2.3.6" % "provided"
158+
)
159+
).dependsOn(chill % "test->test;compile->compile")
160+
161+
lazy val chillBijection = module("bijection").settings(
162+
libraryDependencies ++= Seq(
163+
"com.twitter" %% "bijection-core" % bijectionVersion
164+
)
165+
).dependsOn(chill % "test->test;compile->compile")
166+
167+
// This can only have java deps!
168+
lazy val chillJava = module("java").settings(
169+
crossPaths := false,
170+
autoScalaLibrary := false
171+
)
172+
173+
// This can only have java deps!
174+
lazy val chillStorm = module("storm").settings(
175+
crossPaths := false,
176+
autoScalaLibrary := false,
177+
resolvers ++= Seq(
178+
"Clojars Repository" at "http://clojars.org/repo",
179+
"Conjars Repository" at "http://conjars.org/repo"
180+
),
181+
libraryDependencies += "storm" % "storm" % "0.9.0-wip9" % "provided"
182+
).dependsOn(chillJava)
183+
184+
// This can only have java deps!
185+
lazy val chillHadoop = module("hadoop").settings(
186+
crossPaths := false,
187+
autoScalaLibrary := false,
188+
libraryDependencies ++= Seq(
189+
"org.apache.hadoop" % "hadoop-core" % "0.20.2" % "provided",
190+
"org.slf4j" % "slf4j-api" % "1.6.6",
191+
"org.slf4j" % "slf4j-log4j12" % "1.6.6" % "provided"
192+
)
193+
).dependsOn(chillJava)
194+
195+
// This can only have java deps!
196+
lazy val chillThrift = module("thrift").settings(
197+
crossPaths := false,
198+
autoScalaLibrary := false,
199+
libraryDependencies ++= Seq(
200+
"org.apache.thrift" % "libthrift" % "0.6.1" % "provided"
201+
)
202+
)
203+
204+
lazy val chillScrooge = module("scrooge").settings(
205+
libraryDependencies ++= Seq(
206+
"org.apache.thrift" % "libthrift" % "0.6.1" exclude("junit", "junit"),
207+
"com.twitter" %% "scrooge-serializer" % "3.20.0"
208+
)
209+
).dependsOn(chill % "test->test;compile->compile")
210+
211+
// This can only have java deps!
212+
lazy val chillProtobuf = module("protobuf").settings(
213+
crossPaths := false,
214+
autoScalaLibrary := false,
215+
libraryDependencies ++= Seq(
216+
"com.google.protobuf" % "protobuf-java" % "2.3.0" % "provided"
217+
)
218+
).dependsOn(chillJava)
219+
220+
lazy val chillAvro = module("avro").settings(
221+
libraryDependencies ++= Seq(
222+
"com.twitter" %% "bijection-avro" % bijectionVersion,
223+
"junit" % "junit" % "4.5" % "test"
224+
)
225+
).dependsOn(chill,chillJava, chillBijection)
226+
227+
lazy val chillAlgebird = module("algebird").settings(
228+
libraryDependencies ++= Seq(
229+
"com.twitter" %% "algebird-core" % algebirdVersion
230+
)
231+
).dependsOn(chill)

chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerialization.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,24 @@
3737
public class KryoSerialization extends Configured implements Serialization<Object> {
3838
// can't be final because we need to set them in setConf (for Configured)
3939
KryoPool kryoPool;
40-
Kryo testKryo;
40+
41+
private static KryoPool cachedPool = null;
42+
private static KryoInstantiator cachedKryoInst = null;
43+
44+
/**
45+
* Hadoop will re-initialize the KryoSerialization on every spill
46+
* This gets very expensive if you output a lot from a mapper to initialize the chill/kryo stack
47+
* The KryoInstantiator's already do some caching, and figuring out if its safe to cache,
48+
* so here we piggy back on that to avoid generating new kryo's or kryo pools
49+
*/
50+
public static synchronized void resetOrUpdateFromCache(KryoSerialization instance, KryoInstantiator kryoInst){
51+
if(kryoInst != cachedKryoInst) {
52+
cachedPool = KryoPool.withByteArrayOutputStream(MAX_CACHED_KRYO, kryoInst);
53+
cachedKryoInst = kryoInst;
54+
}
55+
instance.kryoPool = cachedPool;
56+
}
57+
4158
/**
4259
* Since each thread only needs 1 Kryo, the pool doesn't need more
4360
* space than the number of threads. We guess that there are 4 hyperthreads /
@@ -69,8 +86,7 @@ public void setConf(Configuration conf) {
6986
if (conf != null) {
7087
try {
7188
KryoInstantiator kryoInst = new ConfiguredInstantiator(new HadoopConfig(conf));
72-
testKryo = kryoInst.newKryo();
73-
kryoPool = KryoPool.withByteArrayOutputStream(MAX_CACHED_KRYO, kryoInst);
89+
resetOrUpdateFromCache(this, kryoInst);
7490
}
7591
catch(ConfigurationException cx) {
7692
// This interface can't throw
@@ -86,7 +102,7 @@ public void setConf(Configuration conf) {
86102
*/
87103
public boolean accept(Class<?> aClass) {
88104
try {
89-
return (testKryo.getRegistration(aClass) != null);
105+
return kryoPool.hasRegistration(aClass);
90106
} catch (IllegalArgumentException e) {
91107
return false;
92108
}

chill-java/src/main/java/com/twitter/chill/KryoInstantiator.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,17 @@
2727
public class KryoInstantiator implements Serializable {
2828
public Kryo newKryo() { return new Kryo(); }
2929

30+
/** Use this to set a specific classloader
31+
*/
32+
public KryoInstantiator setClassLoader(final ClassLoader cl) {
33+
return new KryoInstantiator() {
34+
public Kryo newKryo() {
35+
Kryo k = KryoInstantiator.this.newKryo();
36+
k.setClassLoader(cl);
37+
return k;
38+
}
39+
};
40+
}
3041
/** If true, Kryo will error if it sees a class that has not been registered
3142
*/
3243
public KryoInstantiator setInstantiatorStrategy(final InstantiatorStrategy inst) {
@@ -72,6 +83,17 @@ public Kryo newKryo() {
7283
}
7384
};
7485
}
86+
/** Use Thread.currentThread().getContextClassLoader() as the ClassLoader where ther newKryo is called
87+
*/
88+
public KryoInstantiator setThreadContextClassLoader() {
89+
return new KryoInstantiator() {
90+
public Kryo newKryo() {
91+
Kryo k = KryoInstantiator.this.newKryo();
92+
k.setClassLoader(Thread.currentThread().getContextClassLoader());
93+
return k;
94+
}
95+
};
96+
}
7597

7698
public KryoInstantiator withRegistrar(final IKryoRegistrar r) {
7799
return new KryoInstantiator() {

chill-java/src/main/java/com/twitter/chill/KryoPool.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,14 @@ public byte[] toBytesWithoutClass(Object obj) {
131131
release(serde);
132132
}
133133
}
134+
135+
public boolean hasRegistration(Class obj) {
136+
SerDeState serde = borrow();
137+
try {
138+
return serde.hasRegistration(obj);
139+
}
140+
finally {
141+
release(serde);
142+
}
143+
}
134144
}

chill-java/src/main/java/com/twitter/chill/SerDeState.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,8 @@ public Object readClassAndObject() {
6868
public void writeOutputTo(OutputStream os) throws IOException {
6969
os.write(outputToBytes());
7070
}
71+
72+
public boolean hasRegistration(Class obj) {
73+
return kryo.getRegistration(obj) != null;
74+
}
7175
}

chill-java/src/main/java/com/twitter/chill/config/ConfiguredInstantiator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public ConfiguredInstantiator(Config conf) throws ConfigurationException {
5757
throw new ConfigurationException("Invalid Config Key: " + conf.get(KEY));
5858
}
5959
KryoInstantiator reflected = null;
60-
try { reflected = reflect((Class<? extends KryoInstantiator>)Class.forName(parts[0]), conf); }
60+
try { reflected = reflect((Class<? extends KryoInstantiator>)Class.forName(parts[0], true, Thread.currentThread().getContextClassLoader()), conf); }
6161
catch(ClassNotFoundException x) {
6262
throw new ConfigurationException("Could not find class for: " + parts[0], x);
6363
}

0 commit comments

Comments
 (0)