Simple pipeline combining Twitter search results and Knative Events to store, classify and display events
To configure the Twitter event source you will need Twitter API access keys. Good instructions on how to get them
Once you get the four keys, you will need to create Twitter API keys secret:
# kubectl delete secret ktweet-secrets -n demo
kubectl create secret generic ktweet-secrets -n demo \
--from-literal=T_CONSUMER_KEY=$P_CONSUMER_KEY \
--from-literal=T_CONSUMER_SECRET=$P_CONSUMER_SECRET \
--from-literal=T_ACCESS_TOKEN=$P_ACCESS_TOKEN \
--from-literal=T_ACCESS_SECRET=$P_ACCESS_SECRETAdditionally, you will need to define the search term for which you want the source to search Twitter (--query=YourSearchTermHere) in config/source.yaml. Once you are done editing, save the file and apply to your Knative cluster:
kubectl apply -f config/source.yaml -n demoShould return
containersource.sources.eventing.knative.dev/twitter-source createdVerify that twitter-source source was created
kubectl get sources -n demoShould return
NAME AGE
containersource.sources.eventing.knative.dev/twitter-source 33sYou can also right away see if the there are some tweets matching your search
kubectl logs -l eventing.knative.dev/source=twitter-source -n demo -c sourceShould return
2019/07/11 13:23:14 Got tweet: 1149308143958134784
2019/07/11 13:23:14 Posting tweet: 1149308143958134784
2019/07/11 13:23:14 Got tweet: 1149308145359118336
2019/07/11 13:23:14 Posting tweet: 1149308145359118336If you haven't done so already, you will need to enable Firestore in your GCP project, create Cloud Firestore project, which will also enables your API in the Cloud API Manager.
The store service will persist tweets into collection defined in the config/store.yaml (knative-tweets by default). To deploy this service and corresponding trigger apply:
kubectl apply -f config/store.yaml -n demoThe response should be
service.serving.knative.dev/eventstore created
trigger.eventing.knative.dev/twitter-events-trigger createdTo check if the service was deployed successfully you can check the status using kubectl get pods -n demo command. The response should look something like this (e.g. Ready 3/3 and Status Running).
NAME READY STATUS RESTARTS AGE
eventstore-qp2vz-deployment-7f458744dc-bltqz 2/2 Running 0 33sThe above command has also created a trigger. Two things to point here, were are using type: com.twitter filter to send to our eventstore service only the events of twitter type. We also define the target service here by its reference in the subscriber portion of trigger. You can verity that twitter-events-trigger trigger was created
kubectl get triggers -n demoShould return
NAME READY BROKER SUBSCRIBER_URI AGE
twitter-events-trigger True default http://eventstore.demo.svc.cluster.local 1mYou should be able now see the Cloud Events being saved in your Firestore console under the knative-tweets collection (unless you changed the name during store service deployment)
You can also monitor the logs for store-service
kubectl logs -l serving.knative.dev/service=eventstore -n demo -c user-containerNow that the tweets are being published by our source to the default broker, we can create the trigger and service that will classify these tweets. To deploy this service apply:
kubectl apply -f config/classifier.yaml -n demoThe response should be
service.serving.knative.dev/sentclass created
trigger.eventing.knative.dev/sentiment-classifier-trigger createdTo check if the service was deployed successfully you can check the status using kubectl get pods -n demo command. The response should look something like this (e.g. Ready 3/3 and Status Running).
NAME READY STATUS RESTARTS AGE
sentclass-9ndr7-deployment-55cbd7cdcc-lt5gz 2/2 Running 0 18sThe above command also has crated classification service trigger. Just like in the event store service, were are using type: com.twitter filter to send to our sentclass service only the events of twitter type. We also define the target service here by its reference in the subscriber portion of trigger. To verity that sentiment-classifier-trigger trigger was created
kubectl get triggers -n demoShould return
NAME READY BROKER SUBSCRIBER_URI AGE
sentiment-classifier-trigger True default http://sentclass.demo.svc.cluster.local 43sIn case when the tweets are in language not supported by the backing API, the Classification Service will fail and publish the offending tweet to the broker with com.twitter.noneng type. We will configure the translation trigger filter with that type so the offending tweets can be translated and re-publish back to the broker with the original type com.twitter. To deploy this service apply:
kubectl apply -f config/translation.yaml -n demoThe response should be
service.serving.knative.dev/tranlator created
trigger.eventing.knative.dev/translator-trigger createdTo check if the service was deployed successfully you can check the status using kubectl get pods -n demo command. The response should look something like this (e.g. Ready 3/3 and Status Running).
NAME READY STATUS RESTARTS AGE
tranlator-qd9px-deployment-6dffb6b986-znz94 2/2 Running 0 21sThe above command also has crated translation service trigger. Just like in the event store service. To verity that translator-trigger trigger was created
kubectl get triggers -n demoShould return
NAME READY BROKER SUBSCRIBER_URI AGE
translator-trigger True default http://tranlator.demo.svc.cluster.local 1mThe classification service defined in step #2 will publish results to the default broker. The the negative tweets with type com.twitter.negative and positive with type com.twitter.positive. Let's create the viewing service now so we can see the positive tweets and come back to the negative tweets that will be published to slack later.
To do that let's deploy the viewer app and it's trigger by applying:
kubectl apply -f config/view.yaml -n demoThe response should be
service.serving.knative.dev/tweetviewer created
trigger.eventing.knative.dev/twitter-events-viewer createdTo check if the service was deployed successfully you can check the status using kubectl get pods -n demo command. The response should look something like this (e.g. Ready 3/3 and Status Running).
NAME READY STATUS RESTARTS AGE
twitter-events-viewer True default http://tweetviewer.demo.svc.cluster.local 24sAgain, the above command created a service trigger. The only thing to point out here is that we are now filtering only the events that have been classified as positive (type: com.twitter.positive).
filter:
sourceAndType:
type: com.twitter.positiveYou can verity that twitter-events-viewer trigger was created
kubectl get triggers -n demoShould return
NAME READY BROKER SUBSCRIBER_URI AGE
twitter-events-viewer True default http://tweetviewer.demo.svc.cluster.local 17hNavigate to https://tweetviewer.demo.knative.tech/ now and leave the viewer open
The classification service defined in step #2 identifies also tweets that appear to be negative and posts them back to the default broker with the com.twitter.negative type. Let's create the Slack publish service now that will post these tweets to a Slack channel.
First, create a secret with the Slack token and channel details:
Note, Slack channel is not the name of the channel but rather it's ID
kubectl create secret generic slack-notif-secrets -n demo \
--from-literal=SLACK_CHANNEL=$SLACK_KN_TWEETS_CHANNEL \
--from-literal=SLACK_TOKEN=$SLACK_KNTWEETS_API_TOKENNow, lets' install the Slack publishing service and corresponding trigger:
kubectl apply -f config/slack.yaml -n demoThe response should be
service.serving.knative.dev/slack-publisher created
trigger.eventing.knative.dev/slack-tweet-notifier createdTo check if the service was deployed successfully you can check the status using kubectl get pods -n demo command. The response should look something like this (e.g. Ready 3/3 and Status Running).
NAME READY STATUS RESTARTS AGE
slack-publisher-5rx2l-deployment-6d4fc5b5bd-cfwcr 2/2 Running 0 22sAgain, the only thing to point out here is that we are now filtering only the events that have been classified as positive (type: com.twitter.negative).
filter:
sourceAndType:
type: com.twitter.negativeYou can verity that slack-tweet-notifier trigger was created
kubectl get triggers -n demoShould return
NAME READY BROKER SUBSCRIBER_URI AGE
slack-tweet-notifier True default http://slack-publisher.demo.svc.cluster.local 1mAsk audience to tweet about knative (doesn't have to be a #knative tag). Also ask for non-English tweets to show the translation service.
Tracing of events requires latest eventing (right now that's nightly after Oct 10th)
To enable event tracing first edit the tracing config
kubectl edit cm config-tracing -n knative-eventingIn this case I'm using Stackdriver so just add the following two lines. You can also use zipkin in which case you will also have to define the back-end. See the config map's comments for details.
backend: stackdriver
sample-rate: "1.0"Now, assuming you are using --query=knative in source there won't be too many events. If you however use something more frequently tweeted about... like --query=google, you could easily end up with 100s of events per sec. In that case you may want to adjust the sample-rate to something smaller like 0.1 (10%).
To view traces, navigate to the Trace UI in GCP console
Select individual "bar" in the trace timeline to drill down into details
Run this before each demo to set known state:
kubectl delete -f config/ -n demo --ignore-not-found=true
kubectl delete secret ktweet-secrets -n demo
kubectl delete secret slack-notif-secrets -n demo

