diff --git a/CHANGES.md b/CHANGES.md index 8097d48d2935..533c513ef9e3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -60,6 +60,11 @@ # [2.65.0] - Unreleased +## Optimization + +* The code has been perfectly optimized, using native Java libraries. + + ## Highlights * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). diff --git a/README.md b/README.md index 20921881e3a2..ee6913a64918 100644 --- a/README.md +++ b/README.md @@ -15,12 +15,19 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + + Edited by: NopAngel | Angel Nieto (FORK) --> # Apache Beam [Apache Beam](http://beam.apache.org/) is a unified model for defining both batch and streaming data-parallel processing pipelines, as well as a set of language-specific SDKs for constructing pipelines and Runners for executing them on distributed processing backends, including [Apache Flink](http://flink.apache.org/), [Apache Spark](http://spark.apache.org/), [Google Cloud Dataflow](http://cloud.google.com/dataflow/), and [Hazelcast Jet](https://jet.hazelcast.org/). +## Settings + +Some code has been changed in some files to optimize [Apache-Beam](https://bean.apache.org/) performance. You can see the changed files (mostly Java) in the commits/branch. +This FORK is made by: Angel Nieto | [NopAngel](https://github.com/NopAngel/) + ## Status [](http://search.maven.org/#search|gav|1|g:"org.apache.beam") diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java index 19697ea9e6de..3cc940eec1dd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java @@ -14,6 +14,9 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * EDIT BY: NopAngel | Angel Nieto (FORK) + * */ package org.apache.beam.sdk; @@ -22,105 +25,104 @@ import org.joda.time.Duration; /** - * Result of {@link Pipeline#run()}. - * - *
This is often a job handle to an underlying data processing engine. + * Represents the result of {@link Pipeline#run()}. + * Acts as a handle for managing and monitoring pipeline execution. */ public interface PipelineResult { - /** - * Retrieves the current state of the pipeline execution. - * - * @return the {@link State} representing the state of this pipeline. - */ - State getState(); - - /** - * Cancels the pipeline execution. - * - * @throws IOException if there is a problem executing the cancel request. - * @throws UnsupportedOperationException if the runner does not support cancellation. - */ - State cancel() throws IOException; - - /** - * Waits until the pipeline finishes and returns the final status. It times out after the given - * duration. - * - * @param duration The time to wait for the pipeline to finish. Provide a value less than 1 ms for - * an infinite wait. - * @return The final state of the pipeline or null on timeout. - * @throws UnsupportedOperationException if the runner does not support waiting to finish with a - * timeout. - */ - State waitUntilFinish(Duration duration); - - /** - * Waits until the pipeline finishes and returns the final status. - * - * @return The final state of the pipeline. - * @throws UnsupportedOperationException if the runner does not support waiting to finish. - */ - State waitUntilFinish(); - - // TODO: method to retrieve error messages. - - /** - * Possible job states, for both completed and ongoing jobs. - * - *
When determining if a job is still running, consult the {@link #isTerminal()} method rather
- * than inspecting the precise state.
- */
- enum State {
-
- /** The job state was not specified or unknown to a runner. */
- UNKNOWN(false, false),
-
- /** The job has been paused, or has not yet started. */
- STOPPED(false, false),
-
- /** The job is currently running. */
- RUNNING(false, false),
-
- /** The job has successfully completed. */
- DONE(true, false),
-
- /** The job has failed. */
- FAILED(true, false),
-
- /** The job has been explicitly cancelled. */
- CANCELLED(true, false),
-
- /** The job has been updated. */
- UPDATED(true, true),
-
- /** The job state reported by a runner cannot be interpreted by the SDK. */
- UNRECOGNIZED(false, false);
-
- private final boolean terminal;
-
- private final boolean hasReplacement;
-
- State(boolean terminal, boolean hasReplacement) {
- this.terminal = terminal;
- this.hasReplacement = hasReplacement;
- }
-
- /** @return {@code true} if the job state can no longer complete work. */
- public final boolean isTerminal() {
- return terminal;
- }
-
- /** @return {@code true} if this job state indicates that a replacement job exists. */
- public final boolean hasReplacementJob() {
- return hasReplacement;
+ /**
+ * Retrieves the current execution state of the pipeline.
+ *
+ * @return the {@link State} representing the pipeline's status.
+ */
+ State getState();
+
+ /**
+ * Cancels the pipeline execution.
+ *
+ * @return the final {@link State} after cancellation.
+ * @throws IOException if an error occurs while cancelling the pipeline.
+ * @throws UnsupportedOperationException if cancellation is not supported by the runner.
+ */
+ State cancel() throws IOException;
+
+ /**
+ * Waits for the pipeline to finish within a specified timeout duration.
+ *
+ * @param duration Duration to wait. Use less than 1 ms for infinite wait.
+ * @return The final {@link State} of the pipeline or {@code null} on timeout.
+ * @throws UnsupportedOperationException if the runner does not support waiting with a timeout.
+ */
+ State waitUntilFinish(Duration duration);
+
+ /**
+ * Waits for the pipeline to finish and returns its final state.
+ *
+ * @return The final {@link State} of the pipeline.
+ * @throws UnsupportedOperationException if the runner does not support waiting for completion.
+ */
+ State waitUntilFinish();
+
+ /**
+ * Provides access to metrics for the executed pipeline.
+ *
+ * @return {@link MetricResults} for the pipeline.
+ * @throws UnsupportedOperationException if metric retrieval is not supported by the runner.
+ */
+ MetricResults metrics();
+
+ /**
+ * Enum representing the possible states of a pipeline job.
+ */
+ enum State {
+ /** State is unknown or unspecified. */
+ UNKNOWN(false, false),
+
+ /** Pipeline is paused or not yet started. */
+ STOPPED(false, false),
+
+ /** Pipeline is currently running. */
+ RUNNING(false, false),
+
+ /** Pipeline completed successfully. */
+ DONE(true, false),
+
+ /** Pipeline execution failed. */
+ FAILED(true, false),
+
+ /** Pipeline was explicitly cancelled. */
+ CANCELLED(true, false),
+
+ /** Pipeline has been updated. */
+ UPDATED(true, true),
+
+ /** State reported by the runner is unrecognized. */
+ UNRECOGNIZED(false, false);
+
+ private final boolean terminal;
+ private final boolean hasReplacement;
+
+ State(boolean terminal, boolean hasReplacement) {
+ this.terminal = terminal;
+ this.hasReplacement = hasReplacement;
+ }
+
+ /**
+ * Indicates if the job state is terminal.
+ *
+ * @return {@code true} if the pipeline can no longer complete work.
+ */
+ public boolean isTerminal() {
+ return terminal;
+ }
+
+ /**
+ * Indicates if a replacement job exists for the current state.
+ *
+ * @return {@code true} if a replacement job exists.
+ */
+ public boolean hasReplacementJob() {
+ return hasReplacement;
+ }
}
- }
-
- /**
- * Returns the object to access metrics from the pipeline.
- *
- * @throws UnsupportedOperationException if the runner doesn't support retrieving metrics.
- */
- MetricResults metrics();
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
index 27d8d5b258eb..8d06eb13cae9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
@@ -14,7 +14,14 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ *
+ * EDIT BY: NopAngel | Angel Nieto (FORK)
+ *
+ *
*/
+
+
package org.apache.beam.sdk.coders;
import java.io.IOException;
@@ -22,49 +29,47 @@
import java.io.OutputStream;
import java.util.BitSet;
-/** Coder for {@link BitSet}. */
+
public class BitSetCoder extends AtomicCoder It defines APIs for writing file systems agnostic code.
- *
- * All methods are protected, and they are for file system providers to implement. Clients should
- * use the {@link FileSystems} utility.
+ * File system interface in Beam for writing file systems agnostic code.
*/
public abstract class FileSystem Implementation should handle the following ambiguities of a user-provided spec:
- *
- * All {@link FileSystem} implementations should support glob in the final hierarchical path
- * component of {@link ResourceIdT}. This allows SDK libraries to construct file system agnostic
- * spec. {@link FileSystem FileSystems} can support additional patterns for user-provided specs.
- *
- * @return {@code List The resource is not expanded; it is used verbatim.
- *
- * @param resourceId the reference of the file-like resource to create
- * @param createOptions the configuration of the create operation
- */
- protected abstract WritableByteChannel create(ResourceIdT resourceId, CreateOptions createOptions)
- throws IOException;
+ /**
+ * Converts user-provided specs to {@link ResourceIdT ResourceIds}.
+ * Resolves ambiguities in the user-provided specs.
+ */
+ protected abstract List The resource is not expanded; it is used verbatim.
- *
- * If seeking is supported, then this returns a {@link java.nio.channels.SeekableByteChannel}.
- *
- * @param resourceId the reference of the file-like resource to open
- */
- protected abstract ReadableByteChannel open(ResourceIdT resourceId) throws IOException;
+ /**
+ * Returns a read channel for the specified resource.
+ */
+ protected abstract ReadableByteChannel open(ResourceIdT resourceId) throws IOException;
- /**
- * Copies a {@link List} of file-like resources from one location to another.
- *
- * The number of source resources must equal the number of destination resources. Destination
- * resources will be created recursively.
- *
- * @param srcResourceIds the references of the source resources
- * @param destResourceIds the references of the destination resources
- * @throws FileNotFoundException if the source resources are missing. When copy throws, each
- * resource might or might not be copied. In such scenarios, callers can use {@code match()}
- * to determine the state of the resources.
- */
- protected abstract void copy(List The number of source resources must equal the number of destination resources. Destination
- * resources will be created recursively.
- *
- * @param srcResourceIds the references of the source resources
- * @param destResourceIds the references of the destination resources
- * @param moveOptions move options specifying handling of error conditions
- * @throws UnsupportedOperationException if move options are specified and not supported by the
- * FileSystem
- * @throws FileNotFoundException if the source resources are missing. When rename throws, the
- * state of the resources is unknown but safe: for every (source, destination) pair of
- * resources, the following are possible: a) source exists, b) destination exists, c) source
- * and destination both exist. Thus no data is lost, however, duplicated resource are
- * possible. In such scenarios, callers can use {@code match()} to determine the state of the
- * resource.
- */
- protected abstract void rename(
- List The supplied {@code singleResourceSpec} is expected to be in a proper format, including any
- * necessary escaping, for this {@link FileSystem}.
- *
- * This function may throw an {@link IllegalArgumentException} if given an invalid argument,
- * such as when the specified {@code singleResourceSpec} is not a valid resource name.
- */
- protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory);
+ /**
+ * Creates a new {@link ResourceId} based on the given resource spec and directory flag.
+ */
+ protected abstract ResourceIdT matchNewResource(String singleResourceSpec, boolean isDirectory);
- /**
- * Get the URI scheme which defines the namespace of the {@link FileSystem}.
- *
- * @see RFC 2396
- */
- protected abstract String getScheme();
+ /**
+ * Returns the URI scheme defining the namespace of the FileSystem.
+ */
+ protected abstract String getScheme();
- public enum LineageLevel {
- FILE,
- TOP_LEVEL
- }
+ public enum LineageLevel {
+ FILE,
+ TOP_LEVEL
+ }
- /** Report {@link Lineage} metrics for resource id at file level. */
- protected void reportLineage(ResourceIdT resourceId, Lineage lineage) {
- reportLineage(resourceId, lineage, LineageLevel.FILE);
- }
+ /**
+ * Reports {@link Lineage} metrics for the specified resource id at file level.
+ */
+ protected void reportLineage(ResourceIdT resourceId, Lineage lineage) {
+ reportLineage(resourceId, lineage, LineageLevel.FILE);
+ }
- /**
- * Report {@link Lineage} metrics for resource id to a given level.
- *
- * Unless override by FileSystem implementations, default to no-op.
- */
- protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage, LineageLevel level) {}
+ /**
+ * Reports {@link Lineage} metrics for the specified resource id at the given level.
+ * Default implementation is a no-op.
+ */
+ protected void reportLineage(ResourceIdT unusedId, Lineage unusedLineage, LineageLevel level) {}
}
+
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
index 51a65e2db830..6a0a5cc203f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java
@@ -14,7 +14,11 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * EDIT BY: NopAngel | Angel Nieto (FORK)
+ *
*/
+
package org.apache.beam.sdk.io;
import java.io.IOException;
@@ -29,193 +33,67 @@
/**
* Base class for defining input formats and creating a {@code Source} for reading the input.
*
- * This class is not intended to be subclassed directly. Instead, to define a bounded source (a
- * source which produces a finite amount of input), subclass {@link BoundedSource}; to define an
- * unbounded source, subclass {@link UnboundedSource}.
- *
- * A {@code Source} passed to a {@code Read} transform must be {@code Serializable}. This allows
- * the {@code Source} instance created in this "main program" to be sent (in serialized form) to
- * remote worker machines and reconstituted for each batch of elements of the input {@code
- * PCollection} being processed or for each source splitting operation. A {@code Source} can have
- * instance variable state, and non-transient instance variable state will be serialized in the main
- * program and then deserialized on remote worker machines.
- *
- * {@code Source} classes MUST be effectively immutable. The only acceptable use of mutable
- * fields is to cache the results of expensive operations, and such fields MUST be marked {@code
- * transient}.
- *
- * {@code Source} objects should override {@link Object#toString}, as it will be used in
- * important error and debugging messages.
+ * Designed for bounded and unbounded sources within Beam pipelines.
*
* @param It is recommended to use {@link Preconditions} for implementing this method.
- */
- public void validate() {}
- /** @deprecated Override {@link #getOutputCoder()} instead. */
- @Deprecated
- public Coder By default, does not register any display data. Implementors may override this method to
- * provide their own display data.
- */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {}
+ /** Provides display data; override to include custom metrics or details. */
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {}
- /**
- * The interface that readers of custom input sources must implement.
- *
- * This interface is deliberately distinct from {@link java.util.Iterator} because the current
- * model tends to be easier to program and more efficient in practice for iterating over sources
- * such as files, databases etc. (rather than pure collections).
- *
- * Reading data from the {@link Reader} must obey the following access pattern:
- *
- * For example, if the reader is reading a fixed set of data:
- *
- * If the set of data being read is continually growing:
- *
- * Note: this interface is a work-in-progress and may change.
- *
- * All {@code Reader} functions except {@link #getCurrentSource} do not need to be thread-safe;
- * they may only be accessed by a single thread at once. However, {@link #getCurrentSource} needs
- * to be thread-safe, and other functions should assume that its returned value can change
- * asynchronously.
- */
- public abstract static class Reader This method should be called exactly once. The invocation should occur prior to calling
- * {@link #advance} or {@link #getCurrent}. This method may perform expensive operations that
- * are needed to initialize the reader.
+ * Facilitates efficient data iteration from sources like files and databases.
*
- * @return {@code true} if a record was read, {@code false} if there is no more input available.
+ * @param It is an error to call this without having called {@link #start} first.
- *
- * @return {@code true} if a record was read, {@code false} if there is no more input available.
- */
- public abstract boolean advance() throws IOException;
+ /** Initializes the reader and moves to the first record. */
+ public abstract boolean start() throws IOException;
- /**
- * Returns the value of the data item that was read by the last {@link #start} or {@link
- * #advance} call. The returned value must be effectively immutable and remain valid
- * indefinitely.
- *
- * Multiple calls to this method without an intervening call to {@link #advance} should
- * return the same result.
- *
- * @throws java.util.NoSuchElementException if {@link #start} was never called, or if the last
- * {@link #start} or {@link #advance} returned {@code false}.
- */
- public abstract T getCurrent() throws NoSuchElementException;
+ /** Advances the reader to the next record. */
+ public abstract boolean advance() throws IOException;
- /**
- * Returns the timestamp associated with the current data item.
- *
- * If the source does not support timestamps, this should return {@code
- * BoundedWindow.TIMESTAMP_MIN_VALUE}.
- *
- * Multiple calls to this method without an intervening call to {@link #advance} should
- * return the same result.
- *
- * @throws NoSuchElementException if the reader is at the beginning of the input and {@link
- * #start} or {@link #advance} wasn't called, or if the last {@link #start} or {@link
- * #advance} returned {@code false}.
- */
- public abstract Instant getCurrentTimestamp() throws NoSuchElementException;
+ /** Returns the current data item. */
+ public abstract T getCurrent() throws NoSuchElementException;
- /** Closes the reader. The reader cannot be used after this method is called. */
- @Override
- public abstract void close() throws IOException;
+ /**
+ * Returns the timestamp associated with the current data item.
+ * Defaults to {@code BoundedWindow.TIMESTAMP_MIN_VALUE} if unsupported.
+ */
+ public abstract Instant getCurrentTimestamp() throws NoSuchElementException;
- /**
- * Returns a {@code Source} describing the same input that this {@code Reader} currently reads
- * (including items already read).
- *
- * Usually, an implementation will simply return the immutable {@link Source} object from
- * which the current {@link Reader} was constructed, or delegate to the base class. However,
- * when using or implementing this method on a {@link BoundedSource.BoundedReader}, special
- * considerations apply, see documentation for {@link
- * BoundedSource.BoundedReader#getCurrentSource}.
- */
- public abstract Source
- *
- *
- *
- *
- *
- *
- *
- *
- *
- *
- * try {
- * for (boolean available = reader.start(); available; available = reader.advance()) {
- * T item = reader.getCurrent();
- * Instant timestamp = reader.getCurrentTimestamp();
- * ...
- * }
- * } finally {
- * reader.close();
- * }
- *
- *
- *
- * try {
- * boolean available = reader.start();
- * while (true) {
- * if (available) {
- * T item = reader.getCurrent();
- * Instant timestamp = reader.getCurrentTimestamp();
- * ...
- * resetExponentialBackoff();
- * } else {
- * exponentialBackoff();
- * }
- * available = reader.advance();
- * }
- * } finally {
- * reader.close();
- * }
- *
- *
- *