diff --git a/presto-main/src/main/java/com/facebook/presto/dynamicCatalog/CatalogVo.java b/presto-main/src/main/java/com/facebook/presto/dynamicCatalog/CatalogVo.java new file mode 100644 index 0000000000000..d0c7702ccdd70 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/dynamicCatalog/CatalogVo.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.dynamicCatalog; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; + +public class CatalogVo +{ + @JsonProperty("catalogName") + public String catalogName; + @JsonProperty("connectorName") + public String connectorName; + @JsonProperty("properties") + public Map properties; + + public CatalogVo(String catalogName, String connectorName, Map properties) + { + this.catalogName = catalogName; + this.connectorName = connectorName; + this.properties = properties; + } + + public CatalogVo() + { + } + + public String getCatalogName() + { + return catalogName; + } + + public void setCatalogName(String catalogName) + { + this.catalogName = catalogName; + } + + public String getConnectorName() + { + return connectorName; + } + + public void setConnectorName(String connectorName) + { + this.connectorName = connectorName; + } + + public Map getProperties() + { + return properties; + } + + public void setProperties(Map properties) + { + this.properties = properties; + } + + @Override + public String toString() + { + return "CatalogVo{" + + "catalogName='" + catalogName + '\'' + + ", connectorName='" + connectorName + '\'' + + ", properties=" + properties + + '}'; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/dynamicCatalog/DynamicCatalogController.java b/presto-main/src/main/java/com/facebook/presto/dynamicCatalog/DynamicCatalogController.java new file mode 100644 index 0000000000000..29cce033c928b --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/dynamicCatalog/DynamicCatalogController.java @@ -0,0 +1,151 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.dynamicCatalog; + +import com.facebook.airlift.discovery.client.Announcer; +import com.facebook.airlift.discovery.client.ServiceAnnouncement; +import com.facebook.airlift.log.Logger; +import com.facebook.presto.connector.ConnectorManager; +import com.facebook.presto.metadata.CatalogManager; +import com.facebook.presto.metadata.InternalNodeManager; +import com.facebook.presto.metadata.StaticCatalogStoreConfig; +import com.facebook.presto.spi.ConnectorId; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; + +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import java.io.File; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static com.facebook.airlift.discovery.client.ServiceAnnouncement.serviceAnnouncement; +import static com.google.common.base.Strings.nullToEmpty; + +@Path("/v1/catalog") +public class DynamicCatalogController +{ + private final CatalogManager catalogManager; + private final ConnectorManager connectorManager; + private final Announcer announcer; + private final InternalNodeManager internalNodeManager; + private final File catalogConfigurationDir; + private static final Logger log = Logger.get(DynamicCatalogController.class); + private final ResponseParser responseParser; + + private DynamicCatalogController(CatalogManager catalogManager, ConnectorManager connectorManager, Announcer announcer, InternalNodeManager internalNodeManager, File catalogConfigurationDir, ResponseParser responseParser) + { + this.catalogManager = catalogManager; + this.connectorManager = connectorManager; + this.announcer = announcer; + this.internalNodeManager = internalNodeManager; + this.catalogConfigurationDir = catalogConfigurationDir; + this.responseParser = responseParser; + } + + @Inject + public DynamicCatalogController(CatalogManager catalogManager, ConnectorManager connectorManager, Announcer announcer, InternalNodeManager internalNodeManager, StaticCatalogStoreConfig config, ResponseParser responseParser) + { + this(catalogManager, connectorManager, announcer, internalNodeManager, config.getCatalogConfigurationDir(), responseParser); + } + + @Path("add") + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response addCatalog(CatalogVo catalogVo) + { + try { + log.info("addCatalog : input values are " + catalogVo); + ConnectorId connectorId = connectorManager.createConnection(catalogVo.getCatalogName(), catalogVo.getConnectorName(), catalogVo.getProperties()); + updateConnectorIdAnnouncement(announcer, connectorId, internalNodeManager); + log.info("addCatalog() : catalogConfigurationDir: " + catalogConfigurationDir.getAbsolutePath()); + writeToFile(catalogVo); + log.info("addCatalog() : Successfully added catalog " + catalogVo.getCatalogName()); + return successResponse(responseParser.build("Successfully added catalog: " + catalogVo.getCatalogName(), 200)); + } + catch (Exception ex) { + log.error("addCatalog() : Error adding catalog " + ex.getMessage()); + return failedResponse(responseParser.build("Error adding Catalog: " + ex.getMessage(), 500)); + } + } + + private void writeToFile(CatalogVo catalogVo) + throws Exception + { + final Properties properties = new Properties(); + properties.put("connector.name", catalogVo.getConnectorName()); + properties.putAll(catalogVo.getProperties()); + String filePath = getPropertyFilePath(catalogVo.getCatalogName()); + log.info("filepath: " + filePath); + File propertiesFile = new File(filePath); + try (OutputStream out = Files.newOutputStream(propertiesFile.toPath())) { + properties.store(out, "adding catalog using endpoint"); + } + catch (Exception ex) { + log.error("error while writing to a file :" + ex.getMessage()); + throw new Exception("Error writing to file " + ex.getMessage()); + } + } + + private String getPropertyFilePath(String catalogName) + { + return catalogConfigurationDir.getPath() + File.separator + catalogName + ".properties"; + } + + private static ServiceAnnouncement getPrestoAnnouncement(Set announcements) + { + for (ServiceAnnouncement announcement : announcements) { + if (announcement.getType().equals("presto")) { + return announcement; + } + } + throw new IllegalArgumentException("Presto announcement not found: " + announcements); + } + + private static void updateConnectorIdAnnouncement(Announcer announcer, ConnectorId connectorId, InternalNodeManager nodeManager) + { + ServiceAnnouncement announcement = getPrestoAnnouncement(announcer.getServiceAnnouncements()); + Map properties = new LinkedHashMap<>(announcement.getProperties()); + String property = nullToEmpty(properties.get("connectorIds")); + Set connectorIds = new LinkedHashSet<>(Splitter.on(',').trimResults().omitEmptyStrings().splitToList(property)); + connectorIds.add(connectorId.toString()); + properties.put("connectorIds", Joiner.on(',').join(connectorIds)); + announcer.removeServiceAnnouncement(announcement.getId()); + announcer.addServiceAnnouncement(serviceAnnouncement(announcement.getType()).addProperties(properties).build()); + announcer.forceAnnounce(); + nodeManager.refreshNodes(); + } + + private Response successResponse(ResponseParser responseParser) + { + return Response.status(Response.Status.OK).entity(responseParser).type(MediaType.APPLICATION_JSON).build(); + } + + private Response failedResponse(ResponseParser responseParser) + { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(responseParser).type(MediaType.APPLICATION_JSON).build(); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/dynamicCatalog/ResponseParser.java b/presto-main/src/main/java/com/facebook/presto/dynamicCatalog/ResponseParser.java new file mode 100644 index 0000000000000..fd2c59e08df03 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/dynamicCatalog/ResponseParser.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.dynamicCatalog; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class ResponseParser +{ + @JsonProperty("message") + public String message; + @JsonProperty("statusCode") + public int statusCode; + + public ResponseParser build(String message, int statusCode) + { + this.message = message; + this.statusCode = statusCode; + return this; + } + + public ResponseParser() + { + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java b/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java index 1abf3537acffa..d053d4ef02906 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java @@ -30,6 +30,8 @@ import com.facebook.presto.dispatcher.FailedDispatchQueryFactory; import com.facebook.presto.dispatcher.LocalDispatchQueryFactory; import com.facebook.presto.dispatcher.QueuedStatementResource; +import com.facebook.presto.dynamicCatalog.DynamicCatalogController; +import com.facebook.presto.dynamicCatalog.ResponseParser; import com.facebook.presto.event.QueryMonitor; import com.facebook.presto.event.QueryMonitorConfig; import com.facebook.presto.execution.AddColumnTask; @@ -202,6 +204,9 @@ protected void setup(Binder binder) jaxrsBinder(binder).bind(ExecutingStatementResource.class); binder.bind(StatementHttpExecutionMBean.class).in(Scopes.SINGLETON); newExporter(binder).export(StatementHttpExecutionMBean.class).withGeneratedName(); + jaxrsBinder(binder).bind(DynamicCatalogController.class); + newExporter(binder).export(DynamicCatalogController.class).withGeneratedName(); + binder.bind(ResponseParser.class).in(Scopes.SINGLETON); // resource for serving static content jaxrsBinder(binder).bind(WebUiResource.class); diff --git a/presto-main/src/test/java/com/facebook/presto/dynamicCatalog/TestDynamicCatalogController.java b/presto-main/src/test/java/com/facebook/presto/dynamicCatalog/TestDynamicCatalogController.java new file mode 100644 index 0000000000000..b7f6b6d5b624b --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/dynamicCatalog/TestDynamicCatalogController.java @@ -0,0 +1,146 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.dynamicCatalog; + +import com.facebook.airlift.http.client.HttpClient; +import com.facebook.airlift.http.client.Request; +import com.facebook.airlift.http.client.StatusResponseHandler; +import com.facebook.airlift.http.client.jetty.JettyHttpClient; +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.log.Logger; +import com.facebook.airlift.testing.Closeables; +import com.facebook.presto.server.testing.TestingPrestoServer; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.File; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom; +import static com.facebook.airlift.http.client.JsonBodyGenerator.jsonBodyGenerator; +import static com.facebook.airlift.http.client.Request.Builder.preparePost; +import static com.facebook.airlift.http.client.StatusResponseHandler.createStatusResponseHandler; +import static com.facebook.presto.client.PrestoHeaders.PRESTO_CATALOG; +import static com.facebook.presto.client.PrestoHeaders.PRESTO_SCHEMA; +import static com.facebook.presto.client.PrestoHeaders.PRESTO_SOURCE; +import static com.facebook.presto.client.PrestoHeaders.PRESTO_TIME_ZONE; +import static com.facebook.presto.client.PrestoHeaders.PRESTO_USER; +import static org.testng.Assert.assertEquals; + +@Test(singleThreaded = true) +public class TestDynamicCatalogController +{ + private static final JsonCodec catalogVoJsonCodec = JsonCodec.jsonCodec(CatalogVo.class); + private TestingPrestoServer server; + private HttpClient client; + private static final Logger log = Logger.get(TestDynamicCatalogController.class); + + @BeforeMethod + public void setup() + throws Exception + { + server = new TestingPrestoServer(); + client = new JettyHttpClient(); + cleanUpCatalog(); + } + + @SuppressWarnings("deprecation") + @AfterMethod + public void teardown() + { + cleanUpCatalog(); + Closeables.closeQuietly(server); + Closeables.closeQuietly(client); + } + + @Test + public void testAddCatalog() + { + CatalogVo catalogVo = getFakeCatalogObject("system", "testing"); + assertEquals(executeAddCatalogCall(catalogVo).getStatusCode(), 200); + } + + @Test + public void testAddCatalogFailed() + { + CatalogVo catalogVo = getFakeCatalogObject("invalidConnector", "invalidCatalog"); + assertEquals(executeAddCatalogCall(catalogVo).getStatusCode(), 500); + } + + private URI uriFor(String path) + { + return uriBuilderFrom(server.getBaseUrl()).replacePath(path).build(); + } + + private CatalogVo getFakeCatalogObject(String connectorName, String catalogName) + { + CatalogVo catalogVo = new CatalogVo(); + catalogVo.setCatalogName(catalogName); + catalogVo.setConnectorName(connectorName); + Map map = new HashMap<>(); + map.put("connector.name", connectorName); + map.put("connection-url", "jdbc:postgresql://localhost:5432/postgres"); + map.put("connection-user", "postgres"); + map.put("connection-password", "postgres"); + catalogVo.setProperties(map); + return catalogVo; + } + + private void cleanUpCatalog() + { + if (deleteCatalog()) { + log.debug("TestDynamicCatalogController:cleanUpCatalog() Successfully deleted catalog"); + } + else { + log.debug("TestDynamicCatalogController:cleanUpCatalog() Not able to deleted catalog"); + } + } + + private boolean deleteCatalog() + { + String catalogName = getFakeCatalogObject("system", "testing").getCatalogName(); + return deletePropertyFile(catalogName); + } + + private boolean deletePropertyFile(String catalogName) + { + return new File(getPropertyFilePath(catalogName)).delete(); + } + + private String getPropertyFilePath(String catalogName) + { + File catalogConfigurationDir = new File("etc/catalog/"); + return catalogConfigurationDir.getPath() + File.separator + catalogName + ".properties"; + } + + private StatusResponseHandler.StatusResponse executeAddCatalogCall(CatalogVo catalogVo) + { + Request request = preparePost().setHeader(PRESTO_USER, "user") + .setUri(uriFor("/v1/catalog/add")) + .setBodyGenerator(jsonBodyGenerator(catalogVoJsonCodec, catalogVo)) + .setHeader(PRESTO_SOURCE, "source") + .setHeader(PRESTO_CATALOG, "catalog") + .setHeader(PRESTO_SCHEMA, "schema") + .setHeader(PRESTO_TIME_ZONE, "invalid time zone") + .setHeader("Content-Type", "application/json") + .build(); + StatusResponseHandler.StatusResponse response = client.execute( + request, + createStatusResponseHandler()); + return response; + } +}