Skip to content

Feature/1 add support for apache avro and breakup #239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions morf-avro/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>morf-parent</artifactId>
<groupId>org.alfasoftware</groupId>
<version>2.0.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<name>Morf - Avro</name>

<artifactId>morf-avro</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>9</source>
<target>9</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.alfasoftware</groupId>
<artifactId>morf-directory-spi</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.jimfs/jimfs -->
<dependency>
<groupId>com.google.jimfs</groupId>
<artifactId>jimfs</artifactId>
<scope>test</scope>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.9.0</version>
</dependency>




</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.alfasoftware.morf.avro;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.alfasoftware.morf.dataset.DataSetConsumer;
import org.alfasoftware.morf.dataset.Record;
import org.alfasoftware.morf.directory.DirectoryDataSetConsumer;
import org.alfasoftware.morf.metadata.Column;
import org.alfasoftware.morf.metadata.DataType;
import org.alfasoftware.morf.metadata.DataValueLookup;
import org.alfasoftware.morf.metadata.Table;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;

import java.io.IOException;
import java.nio.file.Path;
import java.util.EnumMap;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public class AvroDataSetConsumer extends DirectoryDataSetConsumer implements DataSetConsumer {

private static final Gson GSON_FOR_TABLE_DATA = new GsonBuilder().setPrettyPrinting().registerTypeAdapter(Table.class, new JsonTableSerializer()).create();
private static final Gson GSON_FOR_COLUMN_DATA = new GsonBuilder().setPrettyPrinting().registerTypeAdapter(Table.class, new JsonColumnSetSerializer()).create();
private final Map<DataType, BiFunction<Record, String, Object>> recordMappingFunctions = new EnumMap<>(DataType.class);

public AvroDataSetConsumer(Path path) {
super("avro", path);
recordMappingFunctions.put(DataType.DATE, DataValueLookup::getDate);
recordMappingFunctions.put(DataType.BIG_INTEGER, DataValueLookup::getLong);
recordMappingFunctions.put(DataType.BLOB, DataValueLookup::getByteArray);
recordMappingFunctions.put(DataType.BOOLEAN, DataValueLookup::getBoolean);
recordMappingFunctions.put(DataType.CLOB, DataValueLookup::getByteArray);
recordMappingFunctions.put(DataType.DECIMAL, DataValueLookup::getBigDecimal);
recordMappingFunctions.put(DataType.INTEGER, DataValueLookup::getInteger);
recordMappingFunctions.put(DataType.STRING, DataValueLookup::getString);
}

@Override
public void open() {
super.directoryOutputStreamProvider.open();
}

@Override
public void close(CloseState closeState) {
super.directoryOutputStreamProvider.close();
}

@Override
public void table(Table table, Iterable<Record> records) {
String gsonSchema = GSON_FOR_COLUMN_DATA.toJson(table, Table.class);
Map<String, DataType> columnNamesAndTypes = table.columns().stream().collect(Collectors.toMap(Column::getName, Column::getType));
Schema avroSchema = new Schema.Parser().parse(gsonSchema);
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(avroSchema);
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
dataFileWriter.create(avroSchema, super.directoryOutputStreamProvider.openOutputStreamForTable(table.getName()));
for (Record r : records) {
GenericRecord row = new GenericData.Record(avroSchema);
columnNamesAndTypes.entrySet().forEach(entry -> row.put(entry.getKey(), recordMappingFunctions.get(entry.getValue()).apply(r, entry.getKey())));
dataFileWriter.append(row);
}
} catch (IOException e) {
throw new RuntimeException("Exception occurred upon trying to write avro data file for table [" + table.getName() + "]", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.alfasoftware.morf.avro;

import org.alfasoftware.morf.dataset.DataSetProducer;
import org.alfasoftware.morf.dataset.Record;
import org.alfasoftware.morf.metadata.Schema;

public class AvroDataSetProducer implements DataSetProducer {
@Override
public void open() {

}

@Override
public void close() {

}

@Override
public Schema getSchema() {
return null;
}

@Override
public Iterable<Record> records(String tableName) {
return null;
}

@Override
public boolean isTableEmpty(String tableName) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.alfasoftware.morf.avro;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import org.alfasoftware.morf.metadata.Column;
import org.alfasoftware.morf.metadata.DataType;
import org.alfasoftware.morf.metadata.Table;

import java.lang.reflect.Type;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;

public class JsonColumnSetSerializer implements JsonSerializer<Table> {

private final Map<DataType, String> DATA_TYPE_TO_AVRO_TYPE_MAP;

JsonColumnSetSerializer() {
DATA_TYPE_TO_AVRO_TYPE_MAP = new EnumMap<DataType, String>(DataType.class);
DATA_TYPE_TO_AVRO_TYPE_MAP.put(DataType.DATE, "int");
DATA_TYPE_TO_AVRO_TYPE_MAP.put(DataType.BIG_INTEGER, "long");
DATA_TYPE_TO_AVRO_TYPE_MAP.put(DataType.BLOB, "string");
DATA_TYPE_TO_AVRO_TYPE_MAP.put(DataType.BOOLEAN, "boolean");
DATA_TYPE_TO_AVRO_TYPE_MAP.put(DataType.CLOB, "string");
DATA_TYPE_TO_AVRO_TYPE_MAP.put(DataType.DECIMAL, "double");
DATA_TYPE_TO_AVRO_TYPE_MAP.put(DataType.INTEGER, "int");
DATA_TYPE_TO_AVRO_TYPE_MAP.put(DataType.STRING, "string");
}

@Override
public JsonElement serialize(Table table, Type type, JsonSerializationContext jsonSerializationContext) {
JsonElement jsonElement = new JsonObject();
jsonElement.getAsJsonObject().addProperty("namespace", "org.alfasoftware.morf");
jsonElement.getAsJsonObject().addProperty("name", table.getName() + "_columns");
jsonElement.getAsJsonObject().addProperty("type", "record");
JsonElement fields = getColumnsAsFieldsFor(table.columns());
jsonElement.getAsJsonObject().add("fields", fields);
return jsonElement;
}

private JsonArray getColumnsAsFieldsFor(List<Column> columns) {
JsonArray array = new JsonArray();
for (Column c : columns) {
JsonObject object = new JsonObject();
object.getAsJsonObject().addProperty("name", c.getName());
object.getAsJsonObject().addProperty("type", DATA_TYPE_TO_AVRO_TYPE_MAP.get(c.getType()));
array.getAsJsonArray().add(object);
}
return array;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.alfasoftware.morf.avro;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import org.alfasoftware.morf.metadata.Column;
import org.alfasoftware.morf.metadata.DataType;
import org.alfasoftware.morf.metadata.Table;

import java.lang.reflect.Type;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;

public class JsonTableSerializer implements JsonSerializer<Table> {

private final Map<DataType, String> DATA_TYPE_TO_AVRO_TYPE_MAP;

JsonTableSerializer() {
DATA_TYPE_TO_AVRO_TYPE_MAP = new EnumMap<DataType, String>(DataType.class);
DATA_TYPE_TO_AVRO_TYPE_MAP.put(DataType.DATE, "int");
DATA_TYPE_TO_AVRO_TYPE_MAP.put(DataType.BIG_INTEGER, "long");
DATA_TYPE_TO_AVRO_TYPE_MAP.put(DataType.BLOB, "string");
// recordMappingFunctions.put(DataType.BOOLEAN, (record, columnName) -> record.getBoolean(columnName));
// recordMappingFunctions.put(DataType.CLOB, (record, columnName) -> record.getByteArray(columnName));
// recordMappingFunctions.put(DataType.DECIMAL, (record, columnName) -> record.getBigDecimal(columnName));
// recordMappingFunctions.put(DataType.INTEGER, (record, columnName) -> record.getInteger(columnName));
DATA_TYPE_TO_AVRO_TYPE_MAP.put(DataType.STRING, "string");
}

@Override
public JsonElement serialize(Table table, Type type, JsonSerializationContext jsonSerializationContext) {
JsonElement jsonElement = new JsonObject();
jsonElement.getAsJsonObject().addProperty("namespace", "org.alfasoftware.morf");
jsonElement.getAsJsonObject().addProperty("name", "table");
jsonElement.getAsJsonObject().addProperty("type", "record");
JsonElement fields = new JsonArray();
JsonElement isTemporary = new JsonObject();
isTemporary.getAsJsonObject().addProperty("name", "isTemporary");
isTemporary.getAsJsonObject().addProperty("type", "boolean");
fields.getAsJsonArray().add(isTemporary);
JsonElement columnType = new JsonObject();
columnType.getAsJsonObject().addProperty("name", "column");
columnType.getAsJsonObject().addProperty("type", "record");
columnType.getAsJsonObject().add("fields", getColumnsAsFieldsFor(table.columns()));
JsonElement columns = new JsonObject();
columns.getAsJsonObject().addProperty("name", "columns");
columns.getAsJsonObject().add("type", columnType);
fields.getAsJsonArray().add(columns);
jsonElement.getAsJsonObject().add("fields", fields);
return jsonElement;
}

private JsonArray getColumnsAsFieldsFor(List<Column> columns) {
JsonArray array = new JsonArray();
for (Column c : columns) {
JsonObject object = new JsonObject();
object.getAsJsonObject().addProperty("name", c.getName());
object.getAsJsonObject().addProperty("type", DATA_TYPE_TO_AVRO_TYPE_MAP.get(c.getType()));
array.getAsJsonArray().add(object);
}
return array;
}
}
Loading