diff --git a/pom.xml b/pom.xml
index d49ef2e5..15731dd7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
HTTP Plugins
io.cdap
http-plugins
- 1.4.0-SNAPSHOT
+ 1.4.1-service-account
@@ -82,7 +82,7 @@
2.8.5
2.3.0
4.5.9
- 2.4.0-SNAPSHOT
+ 2.3.0-SNAPSHOT
2.9.9
4.11
2.7.1
@@ -93,6 +93,16 @@
+
+ com.google.guava
+ guava
+ 13.0.1
+
+
+ com.google.auth
+ google-auth-library-oauth2-http
+ 0.25.5
+
io.cdap.cdap
cdap-api
@@ -354,8 +364,6 @@
jython-standalone
${jython.version}
-
-
io.cdap.cdap
diff --git a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java
index a554dd6c..942a43fd 100644
--- a/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java
+++ b/src/main/java/io/cdap/plugin/http/source/common/BaseHttpSourceConfig.java
@@ -87,6 +87,9 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig {
public static final String PROPERTY_CLIENT_SECRET = "clientSecret";
public static final String PROPERTY_SCOPES = "scopes";
public static final String PROPERTY_REFRESH_TOKEN = "refreshToken";
+ public static final String PROPERTY_SERVICE_ACCOUNT_ENABLED = "serviceAccountEnabled";
+ public static final String PROPERTY_SERVICE_ACCOUNT_JSON = "serviceAccountJson";
+ public static final String PROPERTY_SERVICE_ACCOUNT_SCOPE = "serviceAccountScope";
public static final String PROPERTY_VERIFY_HTTPS = "verifyHttps";
public static final String PROPERTY_KEYSTORE_FILE = "keystoreFile";
public static final String PROPERTY_KEYSTORE_TYPE = "keystoreType";
@@ -316,6 +319,23 @@ public abstract class BaseHttpSourceConfig extends ReferencePluginConfig {
@Macro
protected String refreshToken;
+ @Name(PROPERTY_SERVICE_ACCOUNT_ENABLED)
+ @Description("If true, plugin will use service account key to perform oauth2 authentication.")
+ protected String serviceAccountEnabled;
+
+ @Nullable
+ @Name(PROPERTY_SERVICE_ACCOUNT_JSON)
+ @Description("Json key file content for OAuth2 with service account.")
+ @Macro
+ protected String serviceAccountJson;
+
+ @Nullable
+ @Name(PROPERTY_SERVICE_ACCOUNT_SCOPE)
+ @Description("Scope used when using a service account json key file. " +
+ "Defaults to https://www.googleapis.com/auth/cloud-platform")
+ @Macro
+ protected String serviceAccountScope;
+
@Name(PROPERTY_VERIFY_HTTPS)
@Description("If false, untrusted trust certificates (e.g. self signed), will not lead to an" +
"error. Do not disable this in production environment on a network you do not entirely trust. " +
@@ -533,6 +553,10 @@ public Boolean getOauth2Enabled() {
return Boolean.parseBoolean(oauth2Enabled);
}
+ public Boolean getServiceAccountEnabled() {
+ return Boolean.parseBoolean(serviceAccountEnabled);
+ }
+
@Nullable
public String getAuthUrl() {
return authUrl;
@@ -563,6 +587,16 @@ public String getRefreshToken() {
return refreshToken;
}
+ @Nullable
+ public String getServiceAccountJson() {
+ return serviceAccountJson;
+ }
+
+ @Nullable
+ public String getServiceAccountScope() {
+ return serviceAccountScope;
+ }
+
public Boolean getVerifyHttps() {
return Boolean.parseBoolean(verifyHttps);
}
@@ -794,6 +828,11 @@ PAGINATION_INDEX_PLACEHOLDER, getPaginationType()),
assertIsSet(getRefreshToken(), PROPERTY_REFRESH_TOKEN, reasonOauth2);
}
+ if (!containsMacro(PROPERTY_SERVICE_ACCOUNT_ENABLED) && this.getServiceAccountEnabled()) {
+ String reasonOauth2 = "Service Account is enabled";
+ assertIsSet(getServiceAccountJson(), PROPERTY_SERVICE_ACCOUNT_JSON, reasonOauth2);
+ }
+
if (!containsMacro(PROPERTY_VERIFY_HTTPS) && !getVerifyHttps()) {
assertIsNotSet(getTrustStoreFile(), PROPERTY_TRUSTSTORE_FILE,
String.format("trustore settings are ignored due to disabled %s", PROPERTY_VERIFY_HTTPS));
diff --git a/src/main/java/io/cdap/plugin/http/source/common/http/HttpClient.java b/src/main/java/io/cdap/plugin/http/source/common/http/HttpClient.java
index bb41293c..e4c71050 100644
--- a/src/main/java/io/cdap/plugin/http/source/common/http/HttpClient.java
+++ b/src/main/java/io/cdap/plugin/http/source/common/http/HttpClient.java
@@ -126,15 +126,20 @@ private CloseableHttpClient createHttpClient() throws IOException {
ArrayList clientHeaders = new ArrayList<>();
+ String accessToken = null;
// oAuth2
+ if (config.getServiceAccountEnabled()) {
+ accessToken = OAuthUtil.getAccessTokenByServiceAccount(HttpClients.createDefault(),
+ config.getServiceAccountJson(),
+ config.getServiceAccountScope());
+ }
if (config.getOauth2Enabled()) {
- String accessToken = OAuthUtil.getAccessTokenByRefreshToken(HttpClients.createDefault(), config.getTokenUrl(),
- config.getClientId(), config.getClientSecret(),
- config.getRefreshToken());
- clientHeaders.add(new BasicHeader("Authorization", "Bearer " + accessToken));
+ accessToken = OAuthUtil.getAccessTokenByRefreshToken(HttpClients.createDefault(), config.getTokenUrl(),
+ config.getClientId(), config.getClientSecret(),
+ config.getRefreshToken());
}
-
+ clientHeaders.add(new BasicHeader("Authorization", "Bearer " + accessToken));
// set default headers
if (headers != null) {
for (Map.Entry headerEntry : this.headers.entrySet()) {
diff --git a/src/main/java/io/cdap/plugin/http/source/common/http/OAuthUtil.java b/src/main/java/io/cdap/plugin/http/source/common/http/OAuthUtil.java
index 27ac280a..46bc11c3 100644
--- a/src/main/java/io/cdap/plugin/http/source/common/http/OAuthUtil.java
+++ b/src/main/java/io/cdap/plugin/http/source/common/http/OAuthUtil.java
@@ -15,22 +15,176 @@
*/
package io.cdap.plugin.http.source.common.http;
+import com.google.api.client.http.GenericUrl;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestFactory;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.UrlEncodedContent;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.JsonObjectParser;
+import com.google.api.client.json.gson.GsonFactory;
+import com.google.api.client.json.webtoken.JsonWebSignature;
+import com.google.api.client.json.webtoken.JsonWebToken;
+import com.google.api.client.util.GenericData;
+import com.google.api.client.util.SecurityUtils;
+
+import com.google.auth.http.HttpTransportFactory;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
import io.cdap.plugin.http.source.common.pagination.page.JSONUtil;
+
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringReader;
+import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+
+import java.security.GeneralSecurityException;
+import java.security.KeyFactory;
+import java.security.PrivateKey;
+import java.security.Signature;
+import java.security.spec.PKCS8EncodedKeySpec;
+
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* A class which contains utilities to make OAuth2 specific calls.
*/
public class OAuthUtil {
+
+ public static PrivateKey readPKCS8Pem(String key) throws Exception {
+ key = key.replace("-----BEGIN PRIVATE KEY-----", "");
+ key = key.replace("-----END PRIVATE KEY-----", "");
+ key = key.replaceAll("\\s+", "");
+
+ // Base64 decode the result
+ byte [] pkcs8EncodedBytes = decode(key);
+
+ // extract the private key
+ PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(pkcs8EncodedBytes);
+ KeyFactory kf = KeyFactory.getInstance("RSA");
+ return kf.generatePrivate(keySpec);
+ }
+
+ public static String encodeBase64URLSafeString(byte[] binaryData) {
+ if (binaryData == null) {
+ return null;
+ }
+ return Base64.getUrlEncoder().withoutPadding().encodeToString(binaryData);
+ }
+
+ public static String signUsingRsaSha256(
+ PrivateKey privateKey,
+ JsonFactory jsonFactory,
+ JsonWebSignature.Header header,
+ JsonWebToken.Payload payload)
+ throws GeneralSecurityException, IOException {
+ String head = encodeBase64URLSafeString(jsonFactory.toByteArray(header));
+ String content = head + "."
+ + encodeBase64URLSafeString(jsonFactory.toByteArray(payload));
+ byte[] contentBytes = content.getBytes();
+ Signature sig = Signature.getInstance("SHA256withRSA");
+ sig.initSign(privateKey);
+ sig.update(contentBytes);
+ byte[] signature = sig.sign();
+ return content + "." + encodeBase64URLSafeString(signature);
+ }
+
+
+
+// copied from https://stackoverflow.com/a/4265472
+ private static char[] theALPHABET =
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/".toCharArray();
+
+ private static int[] toInt = new int[128];
+
+ static {
+ for (int i = 0; i < theALPHABET.length; i++) {
+ toInt[theALPHABET[i]] = i;
+ }
+ }
+
+ /**
+ * Translates the specified byte array into Base64 string.
+ *
+ * @param buf the byte array (not null)
+ * @return the translated Base64 string (not null)
+ */
+ public static String encode(byte[] buf) {
+ int size = buf.length;
+ char[] ar = new char[((size + 2) / 3) * 4];
+ int a = 0;
+ int i = 0;
+ while (i < size) {
+ byte b0 = buf[i++];
+ byte b1 = (i < size) ? buf[i++] : 0;
+ byte b2 = (i < size) ? buf[i++] : 0;
+
+ int mask = 0x3F;
+ ar[a++] = theALPHABET[(b0 >> 2) & mask];
+ ar[a++] = theALPHABET[((b0 << 4) | ((b1 & 0xFF) >> 4)) & mask];
+ ar[a++] = theALPHABET[((b1 << 2) | ((b2 & 0xFF) >> 6)) & mask];
+ ar[a++] = theALPHABET[b2 & mask];
+ }
+ switch (size % 3) {
+ case 1: ar[--a] = '=';
+ break;
+ case 2: ar[--a] = '=';
+ ar[--a] = '=';
+ break;
+ }
+ return new String(ar);
+ }
+
+ /**
+ * Translates the specified Base64 string into a byte array.
+ *
+ * @param s the Base64 string (not null)
+ * @return the byte array (not null)
+ */
+ public static byte[] decode(String s) {
+ int delta = s.endsWith("==") ? 2 : s.endsWith("=") ? 1 : 0;
+ byte[] buffer = new byte[s.length() * 3 / 4 - delta];
+ int mask = 0xFF;
+ int index = 0;
+ for (int i = 0; i < s.length(); i += 4) {
+ int c0 = toInt[s.charAt(i)];
+ int c1 = toInt[s.charAt(i + 1)];
+ buffer[index++] = (byte) (((c0 << 2) | (c1 >> 4)) & mask);
+ if (index >= buffer.length) {
+ return buffer;
+ }
+ int c2 = toInt[s.charAt(i + 2)];
+ buffer[index++] = (byte) (((c1 << 4) | (c2 >> 2)) & mask);
+ if (index >= buffer.length) {
+ return buffer;
+ }
+ int c3 = toInt[s.charAt(i + 3)];
+ buffer[index++] = (byte) (((c2 << 6) | c3) & mask);
+ }
+ return buffer;
+ }
+
+
public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient, String tokenUrl, String clientId,
String clientSecret, String refreshToken)
throws IOException {
@@ -54,5 +208,70 @@ public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient
JsonElement jsonElement = JSONUtil.toJsonObject(responseString).get("access_token");
return jsonElement.getAsString();
}
-}
+ public static String getAccessTokenByServiceAccount(CloseableHttpClient httpclient, String serviceAccountJson,
+ String serviceAccountScope)
+ throws IOException {
+ HttpTransportFactory transportFactory;
+ JsonObject sa = JSONUtil.toJsonObject(serviceAccountJson);
+ String tokenServerUri = sa.get("token_uri").getAsString();
+ String scope = serviceAccountScope;
+
+ if (serviceAccountScope == null) {
+ scope = "https://www.googleapis.com/auth/cloud-platform";
+ }
+
+ PrivateKey key;
+ try {
+ key = readPKCS8Pem(sa.get("private_key").getAsString());
+ } catch (Exception e) {
+ throw new IOException(
+ "Error decoding service account private key.", e);
+ }
+ long currentTime = System.currentTimeMillis();
+ JsonWebSignature.Header header = new JsonWebSignature.Header();
+ header.setAlgorithm("RS256");
+ header.setType("JWT");
+ header.setKeyId(sa.get("private_key_id").getAsString());
+
+
+ JsonWebToken.Payload payload = new JsonWebToken.Payload();
+ payload.setIssuer(sa.get("client_email").getAsString());
+ payload.setIssuedAtTimeSeconds(currentTime / 1000);
+ payload.setExpirationTimeSeconds(currentTime / 1000 + 3600); // one hour
+ payload.setSubject(sa.get("client_email").getAsString());
+ payload.put("scope", scope);
+ payload.setAudience(tokenServerUri);
+
+ String assertion;
+ try {
+ assertion = signUsingRsaSha256(key, GsonFactory.getDefaultInstance(), header, payload);
+ } catch (GeneralSecurityException e) {
+ throw new IOException(
+ "Error signing service account access token request with private key.", e);
+ }
+
+ GenericData tokenRequest = new GenericData();
+ tokenRequest.set("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer");
+ tokenRequest.set("assertion", assertion);
+
+ UrlEncodedContent content = new UrlEncodedContent(tokenRequest);
+ HttpRequestFactory requestFactory = new NetHttpTransport().createRequestFactory();
+ HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(tokenServerUri), content);
+ HttpResponse response = request.execute();
+
+ InputStream in = response.getContent();
+ StringWriter out = new StringWriter();
+ byte[] buf = new byte[4096];
+ int r;
+ while (true) {
+ r = in.read(buf);
+ if (r == -1) {
+ break;
+ }
+ out.write(new String(buf).substring(0, r));
+ }
+ JsonObject responseData = JSONUtil.toJsonObject(out.toString());
+ return responseData.get("access_token").toString();
+ }
+}
diff --git a/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java b/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java
index 07a594e3..aa0ee7f8 100644
--- a/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java
+++ b/src/test/java/io/cdap/plugin/http/etl/HttpSourceETLTest.java
@@ -487,6 +487,8 @@ protected Map getProperties(Map sourceProperties
.put("referenceName", testName.getMethodName())
.put(BaseHttpSourceConfig.PROPERTY_HTTP_METHOD, "GET")
.put(BaseHttpSourceConfig.PROPERTY_OAUTH2_ENABLED, "false")
+ .put(BaseHttpSourceConfig.PROPERTY_SERVICE_ACCOUNT_ENABLED, "false")
+ .put(BaseHttpSourceConfig.PROPERTY_SERVICE_ACCOUNT_SCOPE, "")
.put(BaseHttpSourceConfig.PROPERTY_HTTP_ERROR_HANDLING, "2..:Success,.*:Fail")
.put(BaseHttpSourceConfig.PROPERTY_ERROR_HANDLING, "stopOnError")
.put(BaseHttpSourceConfig.PROPERTY_RETRY_POLICY, "linear")
diff --git a/src/test/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorTest.java b/src/test/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorTest.java
index 16d22b5e..84f1f301 100644
--- a/src/test/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorTest.java
+++ b/src/test/java/io/cdap/plugin/http/source/common/pagination/PaginationIteratorTest.java
@@ -333,6 +333,7 @@ static class BaseTestConfig extends HttpBatchSourceConfig {
this.url = "";
this.httpMethod = "GET";
this.oauth2Enabled = "false";
+ this.serviceAccountEnabled = "false";
this.httpErrorsHandling = "2..:Success,.*:Fail";
this.retryPolicy = "linear";
this.maxRetryDuration = 10L;
diff --git a/widgets/HTTP-batchsource.json b/widgets/HTTP-batchsource.json
index 6c053781..a1724169 100644
--- a/widgets/HTTP-batchsource.json
+++ b/widgets/HTTP-batchsource.json
@@ -153,6 +153,37 @@
}
]
},
+ {
+ "label": "OAuth2 Service Account",
+ "properties": [
+ {
+ "widget-type": "toggle",
+ "label": "OAuth2 Service Account Enabled",
+ "name": "serviceAccountEnabled",
+ "widget-attributes": {
+ "default": "false",
+ "on": {
+ "label": "True",
+ "value": "true"
+ },
+ "off": {
+ "label": "False",
+ "value": "false"
+ }
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Service Account key JSON",
+ "name": "serviceAccountJson"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Scopes",
+ "name": "serviceAccountScope"
+ }
+ ]
+ },
{
"label": "Basic Authentication",
"properties": [