Skip to content
This repository was archived by the owner on Jan 31, 2025. It is now read-only.

Add attachFileContent to JobConfig and ServiceFactory #1921

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,13 @@ default Job newJob(@Nonnull Pipeline pipeline) {
@Nonnull
default Job newJob(@Nonnull Pipeline pipeline, @Nonnull JobConfig config) {
PipelineImpl impl = (PipelineImpl) pipeline;
for (Entry<String, File> e : impl.attachedFiles().entrySet()) {
File file = e.getValue();
for (Entry<String, Object> e : impl.attachedFiles().entrySet()) {
Object object = e.getValue();
if (object instanceof byte[]) {
config.attachFileContent((byte[]) object, e.getKey());
break;
}
File file = (File) object;
if (!file.canRead()) {
throw new JetException("Not readable: " + file);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,19 @@ public JobConfig addClasspathResource(@Nonnull String path, @Nonnull String id)
return addClasspathResource(new File(path), id);
}

/**
* TODO
*/
@Nonnull
public JobConfig attachFileContent(@Nonnull byte[] fileContent, @Nonnull String id) {
Preconditions.checkHasText(id, "Resource ID is blank");
ResourceConfig cfg = new ResourceConfig(fileContent, id);
if (resourceConfigs.putIfAbsent(id, cfg) != null) {
throw new IllegalArgumentException("Resource with id:" + id + " already exists");
}
return this;
}

/**
* Adds the file identified by the supplied URL as a resource that will be
* available to the job while it's executing in the Jet cluster. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@
@PrivateApi
public class ResourceConfig implements Serializable {

static final long serialVersionUID = 1L;

private final URL url;
private final String id;
private final ResourceType resourceType;

private transient byte[] fileContent;

/**
* Creates a resource config with the given properties.
*
Expand All @@ -51,6 +55,7 @@ public class ResourceConfig implements Serializable {
this.url = url;
this.id = id;
this.resourceType = resourceType;
this.fileContent = null;
}

/**
Expand All @@ -71,17 +76,31 @@ public class ResourceConfig implements Serializable {
this.id = id;
this.url = url;
this.resourceType = ResourceType.CLASS;
this.fileContent = null;
}

ResourceConfig(@Nonnull byte[] fileContent, @Nonnull String id) {
this.id = id;
this.url = null;
this.resourceType = ResourceType.FILE_CONTENT;
this.fileContent = fileContent;
}

/**
* Returns the URL at which the resource is available. Resolved on the
* local machine during job submission.
*/
@Nonnull
public URL getUrl() {
return url;
}

/**
* TODO
*/
public byte[] getFileContent() {
return fileContent;
}

/**
* Returns the ID of the resource that will be used to form the {@code
* IMap} key under which it will be stored in the Jet cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public enum ResourceType {
* job.
*/
CLASSPATH_RESOURCE,
/**
* TODO
*/
FILE_CONTENT,
/**
* Represents a plain file. It will be available to the Jet job by its ID,
* through {@link ProcessorSupplier.Context#attachedFile}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,11 @@ interface Context extends ProcessorMetaSupplier.Context {
*/
@Nonnull
File attachedFile(@Nonnull String id);

/**
* TODO
*/
@Nonnull
byte[] attachedFileContent(@Nonnull String id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TestProcessorSupplierContext
implements ProcessorSupplier.Context {

private int memberIndex;
private final Map<String, File> attached = new HashMap<>();
private final Map<String, Object> attached = new HashMap<>();

@Nonnull @Override
public TestProcessorSupplierContext setLogger(@Nonnull ILogger logger) {
Expand Down Expand Up @@ -81,9 +81,18 @@ public File attachedDirectory(@Nonnull String id) {
return attachedFile(id);
}

@Override
public byte[] attachedFileContent(@Nonnull String id) {
byte[] fileContent = (byte[]) attached.get(id);
if (fileContent == null) {
throw new IllegalArgumentException("File content '" + id + "' was not found");
}
return fileContent;
}

@Nonnull @Override
public File attachedFile(@Nonnull String id) {
File file = attached.get(id);
File file = (File) attached.get(id);
if (file == null) {
throw new IllegalArgumentException("File '" + id + "' was not found");
}
Expand All @@ -99,6 +108,14 @@ public TestProcessorSupplierContext addFile(@Nonnull String id, @Nonnull File fi
return this;
}

/**
* Add an attached file content.
*/
public TestProcessorSupplierContext addFileContent(@Nonnull String id, @Nonnull byte[] fileContent) {
attached.put(id, fileContent);
return this;
}

/**
* Sets the member index
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -231,6 +232,13 @@ long uploadJobResources(JobConfig jobConfig) {
case JARS_IN_ZIP:
loadJarsInZip(tmpMap, rc.getUrl());
break;
case FILE_CONTENT:
try (ByteArrayInputStream in = new ByteArrayInputStream(rc.getFileContent());
IMapOutputStream os = new IMapOutputStream(jobFileStorage.get(), fileKeyName(rc.getId()))
) {
packStreamIntoZip(in, os, rc.getId());
}
break;
default:
throw new JetException("Unsupported resource type: " + rc.getResourceType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static com.hazelcast.jet.Util.idToString;
import static com.hazelcast.jet.config.ResourceType.DIRECTORY;
import static com.hazelcast.jet.config.ResourceType.FILE;
import static com.hazelcast.jet.config.ResourceType.FILE_CONTENT;
import static com.hazelcast.jet.impl.JobRepository.fileKeyName;
import static com.hazelcast.jet.impl.JobRepository.jobResourcesMapName;
import static com.hazelcast.jet.impl.util.IOUtil.unzip;
Expand Down Expand Up @@ -203,6 +204,21 @@ public File attachedFile(@Nonnull String id) {
return new File(tempDirectories.computeIfAbsent(id, x -> extractFileToDisk(id)), fnamePath.toString());
}

@Nonnull @Override
public byte[] attachedFileContent(@Nonnull String id) {
Preconditions.checkHasText(id, "id cannot be null or empty");
ResourceConfig resourceConfig = jobConfig().getResourceConfigs().get(id);
if (resourceConfig == null) {
throw new JetException(String.format("No resource is attached with ID '%s'", id));
}
if (resourceConfig.getResourceType() != FILE_CONTENT) {
throw new JetException(String.format("The resource with ID '%s' is not a file content, its type is %s",
id, resourceConfig.getResourceType()
));
}
return extractFileContent(id);
}

public ConcurrentHashMap<String, File> tempDirectories() {
return tempDirectories;
}
Expand All @@ -219,6 +235,15 @@ private File extractFileToDisk(String id) {
}
}

private byte[] extractFileContent(String id) {
IMap<String, byte[]> map = jetInstance().getMap(jobResourcesMapName(jobId()));
try (IMapInputStream inputStream = new IMapInputStream(map, fileKeyName(id))) {
return unzip(inputStream);
} catch (IOException e) {
throw ExceptionUtil.rethrow(e);
}
}

@SuppressWarnings("checkstyle:magicnumber")
private static String tempDirPrefix(String jetInstanceName, String jobId, String resourceId) {
return "jet-" + jetInstanceName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.hazelcast.jet.pipeline.StreamSourceStage;

import javax.annotation.Nonnull;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -52,7 +51,7 @@
public class PipelineImpl implements Pipeline {

private final Map<Transform, List<Transform>> adjacencyMap = new LinkedHashMap<>();
private final Map<String, File> attachedFiles = new HashMap<>();
private final Map<String, Object> attachedFiles = new HashMap<>();

@Nonnull @Override
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -160,12 +159,12 @@ void makeNamesUnique() {
}
}

public void attachFiles(@Nonnull Map<String, File> filesToAttach) {
public void attachFiles(@Nonnull Map<String, Object> filesToAttach) {
this.attachedFiles.putAll(filesToAttach);
}

@Nonnull
public Map<String, File> attachedFiles() {
public Map<String, Object> attachedFiles() {
return Collections.unmodifiableMap(attachedFiles);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,14 @@ public static void unzip(InputStream is, Path targetDir) throws IOException {
}
}
}

public static byte[] unzip(InputStream is) throws IOException {
try (ZipInputStream zipIn = new ZipInputStream(is)) {
ZipEntry nextEntry = zipIn.getNextEntry();
assert nextEntry != null : "no entry in zip";
byte[] bytes = readFully(zipIn);
assert zipIn.getNextEntry() == null : "multiple entries in zip";
return bytes;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.ConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
Expand Down Expand Up @@ -64,9 +63,9 @@
* Finally, Jet calls {@link #destroyContextFn()} with the context object.
* </ol>
* If you don't need the member-wide context object, you can call the simpler
* methods {@link ServiceFactories#nonSharedService(SupplierEx, ConsumerEx)
* methods {@link ServiceFactories#nonSharedService(FunctionEx, ConsumerEx)
* ServiceFactories.processorLocalService} or {@link
* ServiceFactories#sharedService(SupplierEx, ConsumerEx)
* ServiceFactories#sharedService(FunctionEx, ConsumerEx)
* ServiceFactories.memberLocalService}.
* <p>
* Here's a list of pipeline transforms that require a {@code ServiceFactory}:
Expand Down Expand Up @@ -129,7 +128,7 @@ public final class ServiceFactory<C, S> implements Serializable, Cloneable {
private ConsumerEx<? super C> destroyContextFn = ConsumerEx.noop();

@Nonnull
private Map<String, File> attachedFiles = emptyMap();
private Map<String, Object> attachedFiles = emptyMap();

private ServiceFactory(@Nonnull FunctionEx<? super ProcessorSupplier.Context, ? extends C> createContextFn) {
this.createContextFn = createContextFn;
Expand Down Expand Up @@ -331,7 +330,17 @@ public ServiceFactory<C, S> withAttachedFile(@Nonnull String id, @Nonnull File f
if (!file.isFile() || !file.canRead()) {
throw new IllegalArgumentException("Not an existing, readable file: " + file);
}
return attachFileOrDir(id, file);
return attach(id, file);
}

/**
* TODO
*/
@Nonnull
public ServiceFactory<C, S> withAttachedFileContent(@Nonnull String id, @Nonnull byte[] fileContent) {
ServiceFactory<C, S> copy = clone();
copy.attachedFiles.put(id, fileContent);
return copy;
}

/**
Expand All @@ -349,13 +358,13 @@ public ServiceFactory<C, S> withAttachedDirectory(@Nonnull String id, @Nonnull F
if (!directory.isDirectory() || !directory.canRead()) {
throw new IllegalArgumentException("Not an existing, readable directory: " + directory);
}
return attachFileOrDir(id, directory);
return attach(id, directory);
}

@Nonnull
private ServiceFactory<C, S> attachFileOrDir(String id, @Nonnull File file) {
private ServiceFactory<C, S> attach(String id, @Nonnull Object object) {
ServiceFactory<C, S> copy = clone();
copy.attachedFiles.put(id, file);
copy.attachedFiles.put(id, object);
return copy;
}

Expand Down Expand Up @@ -453,7 +462,7 @@ public boolean hasOrderedAsyncResponses() {
* @since 4.0
*/
@Nonnull
public Map<String, File> attachedFiles() {
public Map<String, Object> attachedFiles() {
return Collections.unmodifiableMap(attachedFiles);
}

Expand Down