Daily Data Processing Workflow #130
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| name: Daily Data Processing Workflow | |
| on: | |
| schedule: | |
| - cron: '0 21 * * *' # 9 PM UTC daily | |
| workflow_dispatch: | |
| permissions: | |
| contents: write | |
| jobs: | |
| run_scripts: | |
| runs-on: ubuntu-latest | |
| steps: | |
| - name: Checkout repository | |
| uses: actions/checkout@v4 | |
| - name: Set up Python | |
| uses: actions/setup-python@v4 | |
| with: | |
| python-version: '3.x' | |
| - name: Install dependencies | |
| run: | | |
| python -m pip install --upgrade pip | |
| pip install numpy | |
| pip install slack_sdk reportlab boto3 pymupdf python-pptx openpyxl nltk openai==0.28 | |
| - name: Debug environment | |
| run: | | |
| echo "Current directory:" | |
| pwd | |
| echo "Files in directory:" | |
| ls -la | |
| echo "Python version:" | |
| python --version | |
| echo "Installed packages:" | |
| pip list | |
| - name: Create slackintegration.py | |
| run: | | |
| cat > slackintegration.py << 'EOF' | |
| import os | |
| import time | |
| import datetime | |
| import pickle | |
| from slack_sdk import WebClient | |
| from slack_sdk.errors import SlackApiError | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| import boto3 | |
| # Get credentials from environment variables | |
| aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID') | |
| aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY') | |
| slack_token = os.environ.get('SLACK_BOT_TOKEN') | |
| # Initialize Slack client | |
| client = WebClient(token=slack_token) | |
| # --- SLACK FUNCTIONS --- | |
| def get_all_joined_channel_ids(): | |
| channel_ids = [] | |
| cursor = None | |
| while True: | |
| response = client.conversations_list( | |
| types="public_channel,private_channel", | |
| limit=1000, | |
| cursor=cursor | |
| ) | |
| for channel in response['channels']: | |
| if channel.get('is_member'): | |
| channel_ids.append(channel['id']) | |
| cursor = response.get('response_metadata', {}).get('next_cursor') | |
| if not cursor: | |
| break | |
| return channel_ids | |
| def fetch_hcmsupportbot_messages_all_channels(oldest, latest): | |
| all_messages = [] | |
| channel_ids = get_all_joined_channel_ids() | |
| for channel_id in channel_ids: | |
| try: | |
| response = client.conversations_history( | |
| channel=channel_id, | |
| oldest=oldest, | |
| latest=latest, | |
| limit=1000 | |
| ) | |
| for msg in response['messages']: | |
| if '#hcmsupportbot' in msg.get('text', '').lower(): | |
| all_messages.append({ | |
| 'channel': channel_id, | |
| 'user': msg.get('user', ''), | |
| 'text': msg.get('text', ''), | |
| 'ts': msg.get('ts') | |
| }) | |
| except Exception as e: | |
| print(f"Error fetching from {channel_id}: {e}") | |
| return all_messages | |
| # --- PDF FORMATTING --- | |
| styles = getSampleStyleSheet() | |
| style_normal = ParagraphStyle( | |
| 'Normal', | |
| parent=styles['Normal'], | |
| fontName='Courier', | |
| fontSize=10, | |
| leading=12 | |
| ) | |
| style_header = ParagraphStyle( | |
| 'Header', | |
| parent=styles['Heading2'], | |
| alignment=1, | |
| spaceAfter=20 | |
| ) | |
| # --- PDF GENERATION --- | |
| def save_to_pdf(messages, filename): | |
| doc = SimpleDocTemplate(filename, pagesize=letter) | |
| flowables = [] | |
| header_text = f"Daily #hcmsupportbot Messages\n{datetime.datetime.now().strftime('%Y-%m-%d')}" | |
| flowables.append(Paragraph(header_text, style_header)) | |
| for msg in messages: | |
| user = msg.get('user', 'unknown') | |
| text = msg.get('text', '') | |
| flowables.append(Paragraph(f"<b>{user}</b>: {text}", style_normal)) | |
| flowables.append(Spacer(1, 12)) | |
| doc.build(flowables) | |
| def upload_to_s3(filename, key_prefix='slack_pdfs/'): | |
| # Create S3 client using environment variables | |
| s3 = boto3.client( | |
| 's3', | |
| aws_access_key_id=aws_access_key_id, | |
| aws_secret_access_key=aws_secret_access_key | |
| ) | |
| s3.upload_file( | |
| Filename=filename, | |
| Bucket='hcmbotknowledgesource', | |
| Key=key_prefix + filename | |
| ) | |
| print(f"Uploaded {filename} to s3://hcmbotknowledgesource/{key_prefix}{filename}") | |
| # --- CREATE SLACK DATA --- | |
| def create_slack_data(messages): | |
| # Create a simple structure for the data | |
| today_date = datetime.datetime.now().strftime('%Y-%m-%d') | |
| # Extract just the text for chunks | |
| chunks = [msg.get('text', '') for msg in messages] | |
| # Create the data structure with a unique key for today's data | |
| data = { | |
| f"slack_{today_date}": { | |
| "chunks": chunks, | |
| "embeddings": [], # Will be populated by chromarag.py | |
| "source": "slack_pdf", | |
| "original_filename": f"hcmsupportbot_{today_date}.pdf" | |
| } | |
| } | |
| return data, today_date | |
| # --- MAIN WORKFLOW --- | |
| def main(): | |
| # Calculate time range | |
| now = datetime.datetime.now() | |
| yesterday = now - datetime.timedelta(days=1) | |
| # Fetch messages | |
| messages = fetch_hcmsupportbot_messages_all_channels( | |
| oldest=int(yesterday.timestamp()), | |
| latest=int(now.timestamp()) | |
| ) | |
| if messages: | |
| # Generate PDF | |
| today_date = now.strftime('%Y-%m-%d') | |
| pdf_filename = f"hcmsupportbot_{today_date}.pdf" | |
| save_to_pdf(messages, pdf_filename) | |
| # Upload PDF to S3 | |
| upload_to_s3(pdf_filename) | |
| # Create slack data structure | |
| slack_data, today_date = create_slack_data(messages) | |
| # Save temporary slack data file for processing | |
| temp_slack_file = f"slack_data_{today_date}.pkl" | |
| with open(temp_slack_file, 'wb') as f: | |
| pickle.dump(slack_data, f) | |
| print(f"Successfully processed {len(messages)} messages") | |
| print(f"Created PDF: {pdf_filename}") | |
| print(f"Created temporary slack data: {temp_slack_file}") | |
| return pdf_filename, temp_slack_file | |
| else: | |
| print("No messages found for today.") | |
| return None, None | |
| if __name__ == "__main__": | |
| main() | |
| EOF | |
| - name: Run slackintegration.py and capture output | |
| id: slack | |
| run: | | |
| # Run the script | |
| python slackintegration.py | tee slack_output.log | |
| # Check if PDF was generated and uploaded | |
| if grep -q "Uploaded hcmsupportbot_" slack_output.log; then | |
| echo "pdf_status=generated" >> $GITHUB_OUTPUT | |
| # Extract the temporary slack data filename from the output | |
| TEMP_SLACK_FILE=$(grep "Created temporary slack data:" slack_output.log | awk '{print $5}') | |
| echo "temp_slack_file=${TEMP_SLACK_FILE}" >> $GITHUB_OUTPUT | |
| elif grep -q "No messages found for today." slack_output.log; then | |
| echo "pdf_status=none" >> $GITHUB_OUTPUT | |
| else | |
| echo "pdf_status=error" >> $GITHUB_OUTPUT | |
| fi | |
| env: | |
| AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} | |
| AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} | |
| SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} | |
| - name: Prepare document store | |
| run: | | |
| python -c " | |
| import boto3 | |
| import os | |
| import pickle | |
| # Get credentials | |
| aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID') | |
| aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY') | |
| # Create S3 client | |
| s3 = boto3.client( | |
| 's3', | |
| aws_access_key_id=aws_access_key_id, | |
| aws_secret_access_key=aws_secret_access_key | |
| ) | |
| # First try to download existing document store from S3 root | |
| try: | |
| s3.download_file('hcmbotknowledgesource', 'document_store.pkl', 'document_store.pkl') | |
| print('Downloaded existing document store from S3') | |
| except Exception as e: | |
| print(f'Could not download document store from S3: {e}') | |
| # If not available, use the base document store from repo | |
| if os.path.exists('documen_store_og.pkl'): | |
| # Load the original store | |
| with open('documen_store_og.pkl', 'rb') as f: | |
| doc_store = pickle.load(f) | |
| # Save as the working document store | |
| with open('document_store.pkl', 'wb') as f: | |
| pickle.dump(doc_store, f) | |
| print('Using documen_store_og.pkl from repository as initial document store') | |
| else: | |
| print('Could not find documen_store_og.pkl in repository') | |
| " | |
| env: | |
| AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} | |
| AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} | |
| - name: Create chromarag.py | |
| if: steps.slack.outputs.pdf_status == 'generated' | |
| run: | | |
| cat > chromarag.py << 'EOF' | |
| import os | |
| import pickle | |
| import numpy as np | |
| import openai | |
| # Get credentials from environment variables | |
| aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID') | |
| aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY') | |
| openai_api_key = os.environ.get('OPENAI_API_KEY') | |
| # Set OpenAI API key | |
| openai.api_key = openai_api_key | |
| def generate_embeddings(texts, batch_size=10): | |
| """Generate embeddings for text chunks using OpenAI API""" | |
| if not texts: | |
| return [] | |
| embeddings = [] | |
| for i in range(0, len(texts), batch_size): | |
| batch = texts[i:i + batch_size] | |
| try: | |
| response = openai.Embedding.create( | |
| model="text-embedding-3-small", | |
| input=batch | |
| ) | |
| embeddings.extend([embedding["embedding"] for embedding in response["data"]]) | |
| except Exception as e: | |
| print(f"Error generating embeddings for batch {i}-{i+batch_size}: {e}") | |
| return embeddings | |
| def process_and_append_slack_data(slack_pickle_file, document_store_file): | |
| # Load the document store | |
| try: | |
| with open(document_store_file, 'rb') as f: | |
| document_store = pickle.load(f) | |
| print(f"Loaded document store with {len(document_store)} documents") | |
| except Exception as e: | |
| print(f"Error loading document store: {e}") | |
| return False | |
| # Load the slack pickle | |
| try: | |
| with open(slack_pickle_file, 'rb') as f: | |
| slack_data = pickle.load(f) | |
| print(f"Loaded slack data with {len(slack_data)} entries") | |
| except Exception as e: | |
| print(f"Error loading slack pickle: {e}") | |
| return False | |
| # Process each document in the slack data | |
| for doc_name, doc_data in slack_data.items(): | |
| chunks = doc_data.get("chunks", []) | |
| if chunks: | |
| print(f"Generating embeddings for {len(chunks)} chunks in {doc_name}") | |
| embeddings = generate_embeddings(chunks) | |
| # Update the embeddings in the slack data | |
| doc_data["embeddings"] = embeddings | |
| print(f"Generated {len(embeddings)} embeddings for {doc_name}") | |
| else: | |
| print(f"No chunks found in {doc_name}") | |
| # Append the processed slack data to the document store | |
| document_store.update(slack_data) | |
| print(f"Updated document store, now has {len(document_store)} documents") | |
| # Save the updated document store | |
| with open(document_store_file, 'wb') as f: | |
| pickle.dump(document_store, f) | |
| print(f"Saved updated document store to {document_store_file}") | |
| return True | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) < 3: | |
| print("Usage: python chromarag.py <slack_pickle_file> <document_store_file>") | |
| sys.exit(1) | |
| slack_pickle_file = sys.argv[1] | |
| document_store_file = sys.argv[2] | |
| success = process_and_append_slack_data(slack_pickle_file, document_store_file) | |
| sys.exit(0 if success else 1) | |
| EOF | |
| - name: Process and append slack data | |
| if: steps.slack.outputs.pdf_status == 'generated' | |
| run: | | |
| # Run the script with the temporary slack file from the previous step | |
| python chromarag.py "${{ steps.slack.outputs.temp_slack_file }}" "document_store.pkl" | |
| env: | |
| AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} | |
| AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} | |
| OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} | |
| - name: Upload updated document store to S3 | |
| if: steps.slack.outputs.pdf_status == 'generated' | |
| run: | | |
| python -c " | |
| import boto3 | |
| import os | |
| # Get credentials from environment variables | |
| aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID') | |
| aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY') | |
| # Create S3 client | |
| s3 = boto3.client( | |
| 's3', | |
| aws_access_key_id=aws_access_key_id, | |
| aws_secret_access_key=aws_secret_access_key | |
| ) | |
| try: | |
| # Upload to the root of the bucket, not to a subfolder | |
| s3.upload_file('document_store.pkl', 'hcmbotknowledgesource', 'document_store.pkl') | |
| print('Uploaded updated document store to S3 root') | |
| except Exception as e: | |
| print(f'Could not upload document store: {e}') | |
| " | |
| env: | |
| AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} | |
| AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} | |
| - name: Clean up old files in S3 | |
| if: steps.slack.outputs.pdf_status == 'generated' | |
| run: | | |
| python -c " | |
| import boto3 | |
| import os | |
| from datetime import datetime, timedelta | |
| # Get credentials from environment variables | |
| aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID') | |
| aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY') | |
| # Create S3 client | |
| s3 = boto3.client( | |
| 's3', | |
| aws_access_key_id=aws_access_key_id, | |
| aws_secret_access_key=aws_secret_access_key | |
| ) | |
| # Keep only the last 7 days of files | |
| cutoff_date = datetime.now() - timedelta(days=7) | |
| try: | |
| # List objects in the slack_pdfs prefix | |
| response = s3.list_objects_v2( | |
| Bucket='hcmbotknowledgesource', | |
| Prefix='slack_pdfs/' | |
| ) | |
| if 'Contents' in response: | |
| for obj in response['Contents']: | |
| key = obj['Key'] | |
| last_modified = obj['LastModified'] | |
| # Delete old temporary pickle files | |
| if (last_modified < cutoff_date and key.endswith('.pkl')) or ('slack_data_' in key): | |
| s3.delete_object(Bucket='hcmbotknowledgesource', Key=key) | |
| print(f'Deleted old file: {key}') | |
| except Exception as e: | |
| print(f'Error cleaning up old files: {e}') | |
| " | |
| env: | |
| AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} | |
| AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} | |
| - name: Create .gitignore | |
| run: | | |
| # Create or update .gitignore to exclude temporary files | |
| if [ ! -f .gitignore ]; then | |
| echo "pdfs2/" > .gitignore | |
| echo "slack_output.log" >> .gitignore | |
| echo "*.log" >> .gitignore | |
| echo "*.pkl" >> .gitignore | |
| else | |
| grep -q "pdfs2/" .gitignore || echo "pdfs2/" >> .gitignore | |
| grep -q "slack_output.log" .gitignore || echo "slack_output.log" >> .gitignore | |
| grep -q "*.log" .gitignore || echo "*.log" >> .gitignore | |
| grep -q "*.pkl" .gitignore || echo "*.pkl" >> .gitignore | |
| fi |