Skip to content

FORKING - Apache/Beam Optimization and Clean code. #34621

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@

# [2.65.0] - Unreleased

## Optimization

* The code has been perfectly optimized, using native Java libraries.


## Highlights

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

Edited by: NopAngel | Angel Nieto (FORK)
-->

# Apache Beam

[Apache Beam](http://beam.apache.org/) is a unified model for defining both batch and streaming data-parallel processing pipelines, as well as a set of language-specific SDKs for constructing pipelines and Runners for executing them on distributed processing backends, including [Apache Flink](http://flink.apache.org/), [Apache Spark](http://spark.apache.org/), [Google Cloud Dataflow](http://cloud.google.com/dataflow/), and [Hazelcast Jet](https://jet.hazelcast.org/).

## Settings

Some code has been changed in some files to optimize [Apache-Beam](https://bean.apache.org/) performance. You can see the changed files (mostly Java) in the commits/branch.
This FORK is made by: Angel Nieto | [NopAngel](https://github.com/NopAngel/)

## Status

[![Maven Version](https://maven-badges.herokuapp.com/maven-central/org.apache.beam/beam-sdks-java-core/badge.svg)](http://search.maven.org/#search|gav|1|g:"org.apache.beam")
Expand Down
196 changes: 99 additions & 97 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* EDIT BY: NopAngel | Angel Nieto (FORK)
*
*/
package org.apache.beam.sdk;

Expand All @@ -22,105 +25,104 @@
import org.joda.time.Duration;

/**
* Result of {@link Pipeline#run()}.
*
* <p>This is often a job handle to an underlying data processing engine.
* Represents the result of {@link Pipeline#run()}.
* Acts as a handle for managing and monitoring pipeline execution.
*/
public interface PipelineResult {

/**
* Retrieves the current state of the pipeline execution.
*
* @return the {@link State} representing the state of this pipeline.
*/
State getState();

/**
* Cancels the pipeline execution.
*
* @throws IOException if there is a problem executing the cancel request.
* @throws UnsupportedOperationException if the runner does not support cancellation.
*/
State cancel() throws IOException;

/**
* Waits until the pipeline finishes and returns the final status. It times out after the given
* duration.
*
* @param duration The time to wait for the pipeline to finish. Provide a value less than 1 ms for
* an infinite wait.
* @return The final state of the pipeline or null on timeout.
* @throws UnsupportedOperationException if the runner does not support waiting to finish with a
* timeout.
*/
State waitUntilFinish(Duration duration);

/**
* Waits until the pipeline finishes and returns the final status.
*
* @return The final state of the pipeline.
* @throws UnsupportedOperationException if the runner does not support waiting to finish.
*/
State waitUntilFinish();

// TODO: method to retrieve error messages.

/**
* Possible job states, for both completed and ongoing jobs.
*
* <p>When determining if a job is still running, consult the {@link #isTerminal()} method rather
* than inspecting the precise state.
*/
enum State {

/** The job state was not specified or unknown to a runner. */
UNKNOWN(false, false),

/** The job has been paused, or has not yet started. */
STOPPED(false, false),

/** The job is currently running. */
RUNNING(false, false),

/** The job has successfully completed. */
DONE(true, false),

/** The job has failed. */
FAILED(true, false),

/** The job has been explicitly cancelled. */
CANCELLED(true, false),

/** The job has been updated. */
UPDATED(true, true),

/** The job state reported by a runner cannot be interpreted by the SDK. */
UNRECOGNIZED(false, false);

private final boolean terminal;

private final boolean hasReplacement;

State(boolean terminal, boolean hasReplacement) {
this.terminal = terminal;
this.hasReplacement = hasReplacement;
}

/** @return {@code true} if the job state can no longer complete work. */
public final boolean isTerminal() {
return terminal;
}

/** @return {@code true} if this job state indicates that a replacement job exists. */
public final boolean hasReplacementJob() {
return hasReplacement;
/**
* Retrieves the current execution state of the pipeline.
*
* @return the {@link State} representing the pipeline's status.
*/
State getState();

/**
* Cancels the pipeline execution.
*
* @return the final {@link State} after cancellation.
* @throws IOException if an error occurs while cancelling the pipeline.
* @throws UnsupportedOperationException if cancellation is not supported by the runner.
*/
State cancel() throws IOException;

/**
* Waits for the pipeline to finish within a specified timeout duration.
*
* @param duration Duration to wait. Use less than 1 ms for infinite wait.
* @return The final {@link State} of the pipeline or {@code null} on timeout.
* @throws UnsupportedOperationException if the runner does not support waiting with a timeout.
*/
State waitUntilFinish(Duration duration);

/**
* Waits for the pipeline to finish and returns its final state.
*
* @return The final {@link State} of the pipeline.
* @throws UnsupportedOperationException if the runner does not support waiting for completion.
*/
State waitUntilFinish();

/**
* Provides access to metrics for the executed pipeline.
*
* @return {@link MetricResults} for the pipeline.
* @throws UnsupportedOperationException if metric retrieval is not supported by the runner.
*/
MetricResults metrics();

/**
* Enum representing the possible states of a pipeline job.
*/
enum State {
/** State is unknown or unspecified. */
UNKNOWN(false, false),

/** Pipeline is paused or not yet started. */
STOPPED(false, false),

/** Pipeline is currently running. */
RUNNING(false, false),

/** Pipeline completed successfully. */
DONE(true, false),

/** Pipeline execution failed. */
FAILED(true, false),

/** Pipeline was explicitly cancelled. */
CANCELLED(true, false),

/** Pipeline has been updated. */
UPDATED(true, true),

/** State reported by the runner is unrecognized. */
UNRECOGNIZED(false, false);

private final boolean terminal;
private final boolean hasReplacement;

State(boolean terminal, boolean hasReplacement) {
this.terminal = terminal;
this.hasReplacement = hasReplacement;
}

/**
* Indicates if the job state is terminal.
*
* @return {@code true} if the pipeline can no longer complete work.
*/
public boolean isTerminal() {
return terminal;
}

/**
* Indicates if a replacement job exists for the current state.
*
* @return {@code true} if a replacement job exists.
*/
public boolean hasReplacementJob() {
return hasReplacement;
}
}
}

/**
* Returns the object to access metrics from the pipeline.
*
* @throws UnsupportedOperationException if the runner doesn't support retrieving metrics.
*/
MetricResults metrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,62 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
* EDIT BY: NopAngel | Angel Nieto (FORK)
*
*
*/


package org.apache.beam.sdk.coders;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.BitSet;

/** Coder for {@link BitSet}. */

public class BitSetCoder extends AtomicCoder<BitSet> {
private static final BitSetCoder INSTANCE = new BitSetCoder();
private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
private static final BitSetCoder INSTANCE = new BitSetCoder();
private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();

private BitSetCoder() {}
private BitSetCoder() {}

public static BitSetCoder of() {
return INSTANCE;
}
public static BitSetCoder of() {
return INSTANCE;
}

@Override
public void encode(BitSet value, OutputStream outStream) throws CoderException, IOException {
encode(value, outStream, Context.NESTED);
}
@Override
public void encode(BitSet value, OutputStream outStream, Context context) throws CoderException, IOException {
if (value == null) {
throw new CoderException("Cannot encode a null BitSet");
}
BYTE_ARRAY_CODER.encode(value.toByteArray(), outStream, context);
}

@Override
public void encode(BitSet value, OutputStream outStream, Context context)
throws CoderException, IOException {
if (value == null) {
throw new CoderException("cannot encode a null BitSet");
@Override
public void encode(BitSet value, OutputStream outStream) throws CoderException, IOException {
encode(value, outStream, Context.NESTED);
}
BYTE_ARRAY_CODER.encodeAndOwn(value.toByteArray(), outStream, context);
}

@Override
public BitSet decode(InputStream inStream) throws CoderException, IOException {
return decode(inStream, Context.NESTED);
}
@Override
public BitSet decode(InputStream inStream, Context context) throws CoderException, IOException {
return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));
}

@Override
public BitSet decode(InputStream inStream, Context context) throws CoderException, IOException {
return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));
}
@Override
public BitSet decode(InputStream inStream) throws CoderException, IOException {
return decode(inStream, Context.NESTED);
}

@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
this, "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
BYTE_ARRAY_CODER.verifyDeterministic(); // Ensure the underlying ByteArrayCoder is deterministic
}

@Override
public boolean consistentWithEquals() {
return true;
}
@Override
public boolean consistentWithEquals() {
return true;
}
}
Loading
Loading