Skip to content

Conversation

@gujjariramya
Copy link

@gujjariramya gujjariramya commented Dec 22, 2020

In order to persist scribe 2.0 data to GCP, we are using Brooklin connectors. We will create brooklin connectors to consume from event kafka topics and process it by converting avro record to parquet format in brooklin. And store the final parquet record in gcs bucket.

RFC Doc
Puppet Change for Brooklin config: https://github.csnzoo.com/secure/puppet-cloud/pull/2352/files

Testing

  1. Tested the code changes by coping the tar file to brooklin node and then created datastreams for kafka topics and verified data is populating in gcs buckets.
  2. On the scribe side validated offline data counts with the datastream and there is less than <1% deviation.

Staging
This code is already being used to store pilot data in GCS buckets

Server properties

##### scribeAvroparquetfile connector config
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.factoryClassName=com.linkedin.datastream.cloud.storage.CloudStorageTransportProviderAdminFactory
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.packageQueueSize=1000
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.objectBuilderThreadCount=3
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.maxFileSize=120108864
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.maxFileAge=900000
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.inflightCommits=2
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.committer.class=com.linkedin.datastream.cloud.storage.committer.GCSObjectCommitter
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.committer.threads=3
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.committer.credentialsPath=/wayfair/app/brooklin/config/brooklingcp.json
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.committer.scribeParquetFileStructure=true
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.committer.writeAtOnceMaxFileSize=1048576
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.io.class=com.linkedin.datastream.cloud.storage.io.ScribeAvroParquetEventFile
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.io.directory=/wayfair/data/brooklincloudstorage
brooklin.server.transportProvider.GCSTransportProviderScribeAvroParquetEventFile.io.schemaRegistryURL=http://kube-kafka-schema-c1.service.intrabo1.consul.csnzoo.com:80

Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.

Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md

@gujjariramya gujjariramya changed the title WIP Scribe data persistence to gcs Scribe data persistence to gcs Jan 14, 2021
@ckommini
Copy link

Changes look good. Please clean up comments and any test code before merge.

import org.slf4j.LoggerFactory;

/**
* Implementation of {@link File} to support Parquet file format

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's expand this description and add details around what this connector does at a high-level.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, change all references to Scribe in the comments to call out Scribe 2.0.

Comment on lines +332 to +333
DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS ZZ");
String date = format.format(new Date(value));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should set it to EST explicitly since GCP is UTC by default.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, this needed further investigation. Here is the ticket for it

Comment on lines +143 to +146
} catch (Exception e) {
LOG.error("Unable to write to WriteLog {}", e);
aPackage.getAckCallback().onCompletion(new DatastreamRecordMetadata(
aPackage.getCheckpoint(), aPackage.getTopic(), aPackage.getPartition()), e);
Copy link

@ckommini ckommini Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd let Santosh confirm if it's safe to do this here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't catch Exception here. Instead identify the specific exceptions can be raised and handle them.

Make sure you have full coverage of exception handling. Unhandled exception will result in dead object builder thread that could be serving other streams.

@sdomalap
Copy link
Collaborator

sdomalap commented Mar 5, 2021

Did you run style checks and bug checks by running ./gradlew clean build?
As per https://github.com/linkedin/Brooklin/wiki/Developer-Guide


// scribe parquet file structure: events/event_name/eventdate=2020-12-21/scribeKafkatopic+partition+startOffset+endOffset+suffix.parquet
// Eg: events/healthcheck_evaluated/eventdate=2021-02-22/scribe_internal-healthcheck_evaluated+0+187535+187631+1613970121085.parquet
if (isScribeParquetFileStructure) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users may have different object name requirements. We should work on making this a dynamic datastream config parameter.

* @param schema parquet compatible avro schema
* @param avroRecord the incoming record
* @return GenericRecord the record converted to match the new schema
* @throws Exception
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the method be declared as throws Exception? Also, why generic Exception object?

}
}
} catch (Exception e) {
LOG.error(String.format("Exception in getting avro field schema types in ScribeParquetAvroConverter: Schema: %s, field: %s, typeName: %s, exception: %s", schema.getName(), fieldName, e));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are logging an error. Should you continue?

If you can. Log this message as warn.

}
}
}
} catch (NullPointerException e){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would never catch NullPointerException. Please identify the case and handle it properly. This may mask other issues in your code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if you are experiencing NullPointerException. It's most likely un recoverable error. You need to understand the implication of it, is it data loss, or something else you should be concerned about?

@gujjariramya gujjariramya changed the title Scribe data persistence to gcs WIP: Scribe data persistence to gcs Apr 2, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants