diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e1f0cd198a8..2f3e5fee7d4 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -6,7 +6,7 @@ name: CI on: # Triggers the workflow on push or pull request events but only for the master branch pull_request: - branches: [ master, next ] + branches: [ master, next, release.24.10 ] # Allows you to run this workflow manually from the Actions tab workflow_dispatch: @@ -302,7 +302,7 @@ jobs: cd ui-tests npm install xvfb-run --auto-servernum --server-args="-screen 0 1280x1024x24" \ - npm run cy:run:dashboard --headless --no-sandbox --disable-gpu --disable-dev-shm-usage + npm run cy:run:dashboard - name: Upload UI tests artifacts if: ${{ failure() }} @@ -381,7 +381,7 @@ jobs: cd ui-tests npm install xvfb-run --auto-servernum --server-args="-screen 0 1280x1024x24" \ - npm run cy:run:onboarding --headless --no-sandbox --disable-gpu --disable-dev-shm-usage + npm run cy:run:onboarding - name: Upload UI tests artifacts if: ${{ failure() }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c881958168..f6aedf66af5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,42 @@ +## Version 24.10.2 +Fixes: +- [core] Correct aggregated collection cleanup on event omitting +- [core] Fixed bug where changing passwords results in the loss of the "Global Admin" role +- [core] Fixed bug where exporting incoming data logs could result in "Incorrect parameter \"data\" error +- [core] Removed use of commands which needs admin rights from report manager. +- [crash] Fixed bug in crash ingestion for scenarios where the "app version" is not a string. +- [script] Fixing bug with "delete_old_members" script that led to malformed requests + +Enterprise fixes: +- [nps] Fixed bug that showed the wrong nps preview title + +## Version 24.10.1 +Fixes: +- [core] Replaced "Users" with "Sessions" label on technology home widgets +- [push] Improved ability to observe push related errors +- [push] Replaced push plugin with an earlier version of the plugin + +Enterprise fixes: +- [cohorts] Fixed issues with nightly cleanup +- [data-manager] Fixed UI bug where rules were not visible when editing "Merge by regex" transformations +- [drill] Fixed wrong pie chart label tooltip in dashboard widget +- [flows] Fixed bug in case of null data in schema +- [license] Fixed bug with MAU type of licenses that would prevent the server from starting +- [nps] Fixed bug in the editor where the "internal name" field was not mandatory +- [nps] Fixed bug where it was possible to submit empty nps surveys +- [ratings] Fixed bug with user consent +- [ratings] Fixed UI bug where "Internal name" was not a mandatory field + +Security: +- Bumped cookie-parser from 1.4.6 to 1.4.7 +- Bumped express-rate-limit from 7.4.0 to 7.4.1 +- Bumped moment-timezone from 0.5.45 to 0.5.46 +- Bumped sass from 1.79.3 to 1.79.4 +- Fixing minor vulnerability that would allow for unauthorized file upload + +Enterprise Features: +- [block] Added a way to filter crashes by their error (stacktrace) + ## Version 24.10 Fixes: - [core] Interpreting carrier value of "--" as an unknown value @@ -34,6 +73,7 @@ Enterprise Features: ## Version 24.05.15 Enterprise fixes: +- [ab-testing] Fixed JSON.parse issue preventing creation of AB tests - [nps] Fixed UI issues in the widget editor related to the "user consent" section - [ratings] Fixed rendering issue for escaped values diff --git a/api/api.js b/api/api.js index 5f259134ff8..593030fe2ee 100644 --- a/api/api.js +++ b/api/api.js @@ -313,6 +313,7 @@ plugins.connectToAllDatabases().then(function() { jobs.job('api:clearAutoTasks').replace().schedule('every 1 day'); jobs.job('api:task').replace().schedule('every 5 minutes'); jobs.job('api:userMerge').replace().schedule('every 10 minutes'); + jobs.job("api:ttlCleanup").replace().schedule("every 1 minute"); //jobs.job('api:appExpire').replace().schedule('every 1 day'); }, 10000); diff --git a/api/jobs/task.js b/api/jobs/task.js index a106a4bcbf2..06b6893a7e7 100644 --- a/api/jobs/task.js +++ b/api/jobs/task.js @@ -56,6 +56,10 @@ class MonitorJob extends job.Job { return true; } + if (task.dirty) { + return true; + } + if ((now + duration - lastStart) / 1000 >= interval) { return true; } @@ -74,7 +78,8 @@ class MonitorJob extends job.Job { taskmanager.rerunTask({ db: common.db, id: task._id, - autoUpdate: true + autoUpdate: true, + dirty: task.dirty }, function(e) { if (e) { log.e(e, e.stack); diff --git a/api/jobs/ttlCleanup.js b/api/jobs/ttlCleanup.js new file mode 100644 index 00000000000..1c168d38b21 --- /dev/null +++ b/api/jobs/ttlCleanup.js @@ -0,0 +1,48 @@ +const plugins = require("../../plugins/pluginManager.js"); +const common = require('../utils/common'); +const job = require("../parts/jobs/job.js"); +const log = require("../utils/log.js")("job:ttlCleanup"); + +/** + * Class for job of cleaning expired records inside ttl collections + */ +class TTLCleanup extends job.Job { + /** + * Run the job + */ + async run() { + log.d("Started running TTL clean up job"); + for (let i = 0; i < plugins.ttlCollections.length; i++) { + const { + db = "countly", + collection, + property, + expireAfterSeconds = 0 + } = plugins.ttlCollections[i]; + let dbInstance; + switch (db) { + case "countly": dbInstance = common.db; break; + case "countly_drill": dbInstance = common.drillDb; break; + case "countly_out": dbInstance = common.outDb; break; + } + if (!dbInstance) { + log.e("Invalid db selection:", db); + continue; + } + + log.d("Started cleaning up", collection); + const result = await dbInstance.collection(collection).deleteMany({ + [property]: { + $lte: new Date(Date.now() - expireAfterSeconds * 1000) + } + }); + log.d("Finished cleaning up", result.deletedCount, "records from", collection); + + // Sleep 1 second to prevent sending too many deleteMany queries + await new Promise(res => setTimeout(res, 1000)); + } + log.d("Finished running TTL clean up job"); + } +} + +module.exports = TTLCleanup; \ No newline at end of file diff --git a/api/parts/data/fetch.js b/api/parts/data/fetch.js index c94f20bb422..210c277571a 100644 --- a/api/parts/data/fetch.js +++ b/api/parts/data/fetch.js @@ -1323,7 +1323,7 @@ fetch.fetchEvents = function(params) { */ fetch.fetchTimeObj = function(collection, params, isCustomEvent, options) { fetchTimeObj(collection, params, isCustomEvent, options, function(output) { - if (params?.qstring?.event) { + if (params.qstring?.event) { output.eventName = params.qstring.event; } common.returnOutput(params, output); diff --git a/api/parts/jobs/README.md b/api/parts/jobs/README.md new file mode 100644 index 00000000000..e8329172151 --- /dev/null +++ b/api/parts/jobs/README.md @@ -0,0 +1,650 @@ +# Jobs Feature Documentation + +## Table of Contents +1. [Overview](#overview) +2. [System Architecture](#system-architecture) +3. [Flowcharts](#flowcharts) + - [Job Lifecycle](#job-lifecycle-flowchart) + - [Dependency Chart](#dependency-chart) + - [IPC Communication](#ipc-communication-flow) + - [Retry Logic](#retry-logic-flowchart) + - [Job Creation and Scheduling](#job-creation-and-scheduling-flowchart) +4. [Key Components](#key-components) +5. [Job Lifecycle](#job-lifecycle) +6. [IPC Communication](#ipc-communication) +7. [Retry Mechanism](#retry-mechanism) +8. [Usage Guide](#usage-guide) +9. [Implementation Guide](#implementation-guide) +10. [File Breakdown](#file-breakdown) +11. [Configuration](#configuration) +12. [Technical Implementation of Scheduling](#technical-implementation-of-scheduling) +13. [Best Practices](#best-practices) + +## Overview + +The Jobs feature is a robust system designed to handle asynchronous, potentially long-running tasks in a distributed environment. It supports job scheduling, execution, inter-process communication, and automatic retries. The system is built to be scalable and fault-tolerant, capable of running jobs across multiple processes and servers. + +Key features include: +- Flexible job scheduling (immediate, delayed, or recurring) +- Inter-process communication for resource-intensive jobs +- Automatic retry mechanism with customizable policies +- Resource management for efficient job execution +- Scalable architecture supporting distributed environments + +## System Architecture + +The Jobs system consists of several interconnected components: + +- **Manager**: Oversees job execution and resource management +- **Handle**: Provides API for job creation and scheduling +- **Job**: Base class for all job types +- **Resource**: Manages resources for IPC jobs +- **RetryPolicy**: Defines retry behavior for failed jobs +- **IPC**: Handles inter-process communication + +These components work together to create, schedule, execute, and monitor jobs across the system. + +## Flowcharts + +### Job Lifecycle Flowchart + +```mermaid +graph TD + A[Start] --> B{Is it Master Process?} + B -->|Yes| C[Initialize Manager] + B -->|No| D[Initialize Handle] + C --> E[Scan for job types] + E --> F[Monitor jobs collection] + D --> G[Wait for job requests] + F --> H{New job to run?} + H -->|Yes| I[Create job instance] + H -->|No| F + I --> J{Is it an IPC job?} + J -->|Yes| K[Create ResourceFaçade] + J -->|No| L[Run locally] + K --> M[Fork executor.js] + M --> N[Initialize Resource] + N --> O[Run job in Resource] + L --> P[Run job in current process] + O --> Q[Job completes] + P --> Q + Q --> R[Update job status] + R --> S{Retry needed?} + S -->|Yes| T[Delay and retry] + S -->|No| F + T --> I + G --> U{Job request received?} + U -->|Yes| V[Create job] + U -->|No| G + V --> W[Schedule or run job] + W --> X[Send to Manager if needed] + X --> G +``` + +### Dependency Chart + +```mermaid +graph TD + A[index.js] --> B[manager.js] + A --> C[handle.js] + B --> D[job.js] + B --> E[resource.js] + B --> F[ipc.js] + B --> G[retry.js] + C --> D + C --> F + D --> G + E --> F + E --> D + H[executor.js] --> D + H --> E + H --> F +``` + +### IPC Communication Flow + +```mermaid +sequenceDiagram + participant MP as Main Process + participant CP as Child Process + participant R as Resource + + MP->>CP: Fork child process + Note over MP,CP: Child process starts + + MP->>CP: CMD.RUN (Job data) + CP->>R: Initialize Resource + R-->>CP: Resource ready + CP-->>MP: EVT.UPDATE (Resource initialized) + + loop Job Execution + CP->>R: Execute job step + R-->>CP: Step result + CP-->>MP: EVT.UPDATE (Progress) + end + + alt Job Completed Successfully + CP-->>MP: CMD.DONE (Result) + else Job Failed + CP-->>MP: CMD.DONE (Error) + end + + MP->>CP: CMD.ABORT (optional) + CP->>R: Abort job + R-->>CP: Job aborted + CP-->>MP: CMD.DONE (Aborted) + + CP->>R: Close resource + R-->>CP: Resource closed + CP-->>MP: EVT.UPDATE (Resource closed) + + MP->>CP: Terminate child process + Note over MP,CP: Child process ends +``` + +### Retry Logic Flowchart + +```mermaid +graph TD + A[Job Fails] --> B{Retry Policy Check} + B -->|Retry Allowed| C[Delay] + B -->|No More Retries| D[Mark Job as Failed] + C --> E[Increment Retry Count] + E --> F[Reschedule Job] + F --> G[Job Runs Again] + G --> H{Job Succeeds?} + H -->|Yes| I[Mark Job as Completed] + H -->|No| B +``` + +### Job Creation and Scheduling Flowchart + +```mermaid +graph TD + A[Start] --> B[Create Job Instance] + B --> C{Schedule Type} + C -->|Now| D[Run Immediately] + C -->|Once| E[Schedule for Specific Time] + C -->|In| F[Schedule After Delay] + C -->|Custom| G[Set Custom Schedule] + D --> H[Save Job to Database] + E --> H + F --> H + G --> H + H --> I[Job Ready for Execution] +``` + +## Key Components + +### Manager (manager.js) +The Manager is responsible for overseeing job execution and resource management. It monitors the jobs collection, creates job instances, and delegates execution to the appropriate process. + +### Handle (handle.js) +The Handle provides an API for job creation and scheduling. It's the entry point for creating new jobs and defining their execution parameters. + +### Job (job.js) +The Job class is the base class for all job types. It defines the structure and basic behavior of a job, including methods for running, aborting, and updating status. + +### Resource (resource.js) +The Resource class manages resources for IPC jobs. It handles the lifecycle of resources needed for job execution in separate processes. + +### RetryPolicy (retry.js) +The RetryPolicy defines the behavior for retrying failed jobs. It determines if and when a job should be retried based on its failure characteristics. + +### IPC (ipc.js) +The IPC module facilitates communication between the main process and child processes running IPC jobs. It provides a robust messaging system for exchanging data, commands, and status updates. + +### Executor (executor.js) +The Executor is responsible for setting up and running IPC jobs in separate processes. It initializes the necessary resources and communication channels, executes the job, and reports results back to the main process. + +## Job Lifecycle + +1. **Creation**: Jobs are created using the Handle API. +2. **Scheduling**: Jobs can be scheduled to run immediately, at a specific time, or on a recurring basis. +3. **Execution**: The Manager picks up scheduled jobs and executes them, either locally or in a separate process for IPC jobs. +4. **Monitoring**: Job progress is monitored and status is updated in the database. +5. **Completion/Failure**: Upon completion or failure, the job status is updated, and the retry policy is consulted if necessary. +6. **Retry**: If required and allowed by the retry policy, failed jobs are rescheduled for another attempt. +7. **Cleanup**: After job completion (successful or failed), any associated resources are cleaned up, and the job's final status is recorded. + +## IPC Communication + +Inter-Process Communication (IPC) is used for jobs that need to run in separate processes. The system uses a custom IPC module (ipc.js) to facilitate communication between the main process and job processes. + +Key aspects of IPC: +- **Channel Creation**: Each IPC job has a unique channel for communication. +- **Message Passing**: The main process and job processes exchange messages for status updates, commands, and results. +- **Resource Management**: IPC is used to manage the lifecycle of resources in separate processes. + +The IPC system uses a message-based protocol with the following key message types: +- `CMD.RUN`: Instructs a child process to start running a job +- `CMD.ABORT`: Signals a job to abort its execution +- `CMD.DONE`: Indicates that a job has completed (successfully or with an error) +- `EVT.UPDATE`: Provides progress updates from the job to the main process + +Example of sending an IPC message: + +```javascript +channel.send(CMD.RUN, jobData); +``` + +Example of receiving an IPC message: + +```javascript +channel.on(CMD.DONE, (result) => { + // Handle job completion +}); +``` + +## Retry Mechanism + +The retry mechanism is implemented through the RetryPolicy class. It defines: + +- **Number of Retries**: How many times a job should be retried. +- **Delay**: The delay between retries, which can be fixed or increasing. +- **Error Handling**: Which types of errors should trigger a retry. + +Different job types can have custom retry policies tailored to their specific needs. + +Example of a custom retry policy: + +```javascript +class CustomRetryPolicy extends RetryPolicy { + constructor(maxRetries, initialDelay) { + super(); + this.maxRetries = maxRetries; + this.initialDelay = initialDelay; + } + + shouldRetry(attempt, error) { + return attempt < this.maxRetries && this.isRetriableError(error); + } + + getDelay(attempt) { + return this.initialDelay * Math.pow(2, attempt); // Exponential backoff + } + + isRetriableError(error) { + // Define which errors should trigger a retry + return error.code === 'NETWORK_ERROR' || error.code === 'RESOURCE_UNAVAILABLE'; + } +} +``` + +## Usage Guide + +This section provides examples and explanations for using the Jobs system, including all supported inputs and scheduling types. + +### Creating a Job + +To create a job, use the `job()` method from the Handle API: + +```javascript +const jobHandle = require('./path/to/handle'); + +const myJob = jobHandle.job('myJobType', { + // job data + param1: 'value1', + param2: 'value2' +}); +``` + +Parameters: +- `jobType` (string): The type of job to create. This should match a registered job type in the system. +- `jobData` (object): An object containing any data needed for the job execution. + +### Scheduling a Job + +After creating a job, you can schedule it using one of the following methods: + +#### Run Immediately + +```javascript +myJob.now(); +``` + +This schedules the job to run as soon as possible. + +#### Run Once at a Specific Time + +```javascript +myJob.once(new Date('2023-12-31T23:59:59')); +``` + +This schedules the job to run once at the specified date and time. + +#### Run After a Delay + +```javascript +myJob.in(3600); // Run after 1 hour +``` + +This schedules the job to run after the specified number of seconds. + +#### Custom Schedule + +The `schedule` method is highly flexible and accepts various types of inputs: +It uses ```later.js``` to support this + +1. Cron Syntax + ```javascript + myJob.schedule('0 0 * * *'); // Run at midnight every day + ``` + +2. Natural Language + ```javascript + myJob.schedule('every 5 minutes'); + myJob.schedule('at 10:15 am every weekday'); + ``` + +3. Object Literal + ```javascript + myJob.schedule({ + h: [10, 14, 18], // Run at 10am, 2pm, and 6pm + dw: [1, 3, 5] // On Monday, Wednesday, and Friday + }); + ``` + +4. Array of Schedules + ```javascript + myJob.schedule([ + '0 0 * * *', // At midnight + 'every weekday at 9am' // Every weekday at 9am + ]); + ``` + +5. Custom Schedule Functions + ```javascript + myJob.schedule(function() { + return Date.now() + 60000; // Run 1 minute from now + }); + ``` + +6. Predefined Schedule Constants (if supported by your implementation) + ```javascript + myJob.schedule(SCHEDULE.EVERY_HOUR); + ``` + + +This sets a custom schedule for the job using a cron-like syntax. The example above schedules the job to run every 5 minutes. + +### Job Types + +Different job types can be implemented by extending the base Job class. Here's an example of a custom job type: + +```javascript +const { Job } = require('./path/to/job'); + +class MyCustomJob extends Job { + async run(db, done, progress) { + // Job logic goes here + // Use 'db' for database operations + // Call 'progress()' to report progress + // Call 'done()' when the job is complete + } +} + +module.exports = MyCustomJob; +``` + +### Handling Job Results + +Job results and errors are typically handled in the job implementation. However, for immediate feedback, you can chain promises: + +```javascript +myJob.now().then((result) => { + console.log('Job completed successfully:', result); +}).catch((error) => { + console.error('Job failed:', error); +}); +``` + +Remember that for long-running jobs, it's better to implement result handling within the job itself, possibly by updating a database or triggering a callback. + +## Implementation Guide + +To implement a new job type: + +1. Create a new job class that extends the base Job class. +2. Implement the `run` method to define the job's main logic. +3. Define a retry policy if needed. +4. Register the job type in the system (usually done automatically by the scanner). +5. Use the Handle API to create and schedule instances of your job. + +Example: + +```javascript +const { Job } = require('./job.js'); +const { DefaultRetryPolicy } = require('./retry.js'); + +class MyCustomJob extends Job { + constructor(name, data) { + super(name, data); + } + + async run(db, done, progress) { + // Implement job logic here + // Use 'db' for database operations + // Call 'done()' when finished + // Use 'progress()' to report progress + } + + retryPolicy() { + return new DefaultRetryPolicy(3); // Retry up to 3 times + } +} + +module.exports = MyCustomJob; +``` + +## File Breakdown + +### index.js +Entry point that determines whether to load the manager or handle based on the process type. + +```javascript +const countlyConfig = require('./../../config', 'dont-enclose'); + +if (require('cluster').isMaster && process.argv[1].endsWith('api/api.js') && !(countlyConfig && countlyConfig.preventJobs)) { + module.exports = require('./manager.js'); +} else { + module.exports = require('./handle.js'); +} +``` + +### manager.js +Implements the Manager class, which oversees job execution and resource management. + +Key methods: +- `constructor()`: Initializes the manager and starts monitoring jobs. +- `check()`: Periodically checks for new jobs to run. +- `process(jobs)`: Processes a batch of jobs. +- `run(job)`: Executes a single job. + +### handle.js +Implements the Handle class, which provides the API for job creation and scheduling. + +Key methods: +- `job(name, data)`: Creates a new job instance. +- `schedule(schedule, strict, nextTime)`: Schedules a job. +- `once(date, strict)`: Schedules a job to run once at a specific time. +- `now()`: Schedules a job to run immediately. +- `in(seconds)`: Schedules a job to run after a delay. + +### job.js +Defines the base Job class and its subclasses (IPCJob, TransientJob). + +Key methods: +- `run(db, done, progress)`: Main method to be implemented by subclasses. +- `_save(set)`: Saves job state to the database. +- `_finish(err)`: Marks a job as finished. +- `retryPolicy()`: Returns the retry policy for the job. + +### resource.js +Implements the Resource and ResourceFaçade classes for managing job resources. + +Key methods: +- `open()`: Opens a resource. +- `close()`: Closes a resource. +- `run(job)`: Runs a job using the resource. + +### retry.js +Defines retry policies for jobs. + +Classes: +- `DefaultRetryPolicy`: Standard retry policy. +- `IPCRetryPolicy`: Retry policy specific to IPC jobs. +- `NoRetryPolicy`: Policy for jobs that should never be retried. + +### ipc.js +Handles inter-process communication. + +Key classes: +- `IdChannel`: Represents a communication channel between processes. + +Key methods: +- `send(cmd, data)`: Sends a message through the channel. +- `on(cmd, handler)`: Registers a handler for incoming messages. + +### executor.js +Entry point for child processes that run IPC jobs. + +Key functionality: +- Sets up the environment for running a job in a separate process. +- Initializes the necessary resources and communication channels. +- Executes the job and communicates results back to the main process. + +## Configuration + +The Jobs system can be configured through the main application configuration file. Key configuration options include: + +- `jobs.concurrency`: Maximum number of jobs that can run concurrently +- `jobs.retryDelay`: Default delay between retry attempts +- `jobs.maxRetries`: Default maximum number of retry attempts +- `jobs.timeout`: Default timeout for job execution + +Example configuration: + +```javascript +{ + jobs: { + concurrency: 5, + retryDelay: 60000, // 1 minute + maxRetries: 3, + timeout: 300000 // 5 minutes + } +} +``` + +## Technical Implementation of Scheduling + +The Jobs system implements scheduling using a combination of database persistence and Node.js timers. Here's a breakdown of the process: + +### 1. Schedule Persistence (manager.js) + +When a job is scheduled, it's saved to the MongoDB database with a `next` field indicating the next run time. This is handled in the `schedule` method of the Job class: + +```javascript +schedule(schedule, strict, nextTime) { + this._json.schedule = schedule; + this._json.status = STATUS.SCHEDULED; + + if (strict !== undefined) { + this._json.strict = strict; + } + + if (nextTime) { + this._json.next = nextTime; + } + else { + schedule = typeof schedule === 'string' ? later.parse.text(schedule) : schedule; + var next = later.schedule(schedule).next(1); + if (!next) { + return null; + } + + this._json.next = next.getTime(); + } + + return this._save(); +} +``` + +### 2. Job Checking (manager.js) + +The Manager class periodically checks for jobs that are due to run. This is done in the `check` method: + +```javascript +check() { + var find = { + status: STATUS.SCHEDULED, + next: {$lt: Date.now()}, + name: {$in: this.types} + }; + + this.collection.find(find).sort({next: 1}).limit(MAXIMUM_IN_LINE_JOBS_PER_NAME).toArray((err, jobs) => { + if (err) { + // Error handling + } + else if (jobs && jobs.length) { + this.process(jobs); + } + else { + this.checkAfterDelay(); + } + }); +} +``` + +### 3. Scheduling Next Check (manager.js) + +After processing jobs, the system schedules the next check using Node.js's `setTimeout`: + +```javascript +checkAfterDelay(delay) { + setTimeout(() => { + this.check(); + }, delay || DELAY_BETWEEN_CHECKS); +} +``` + +### 4. Job Execution (manager.js) + +When it's time to run a job, the `run` method is called: + +```javascript +run(job) { + if (job instanceof JOB.IPCJob) { + return this.runIPC(job); + } + else { + return this.runLocally(job); + } +} +``` + +### 5. Rescheduling Recurring Jobs (job.js) + +After a job completes, if it's a recurring job, it's rescheduled: + +```javascript +_finish(err) { + // ... other completion logic ... + + if (this._json.schedule) { + this.schedule(this._json.schedule, this._json.strict); + } +} +``` + +This implementation allows for efficient scheduling of jobs: +- Jobs are persisted in the database, allowing for system restarts without losing scheduled jobs. +- The periodic checking mechanism allows for handling a large number of jobs without keeping them all in memory. +- Using Node.js timers for the checking mechanism provides a simple and reliable way to trigger job processing. +- The use of `later.js` for parsing schedules allows for flexible and powerful schedule definitions. + +## Best Practices + +1. **Job Atomicity**: Design jobs to be atomic and idempotent where possible. This ensures that jobs can be safely retried without unintended side effects. + +7. **Job Data**: Keep job data lightweight and serializable. Avoid storing large objects or non-serializable data in the job's data field. diff --git a/api/parts/jobs/job_flow.md b/api/parts/jobs/job_flow.md new file mode 100644 index 00000000000..e77b9d18ce4 --- /dev/null +++ b/api/parts/jobs/job_flow.md @@ -0,0 +1,101 @@ +```mermaid + +graph TD + A[Start] --> B{Is it Master Process?
index.js} + B -->|Yes| C[Load manager.js] + B -->|No| D[Load handle.js] + + C --> E[Initialize Manager
Manager.constructor
manager.js] + E --> F[Scan for job types
scan
scanner.js] + F --> G[Monitor jobs collection
check
manager.js] + + D --> H[Initialize Handle
Handle.constructor
handle.js] + H --> I[Wait for job requests] + + G --> J{New job to run?
process
manager.js} + J -->|Yes| K[Create job instance
create
manager.js] + J -->|No| G + + K --> AI{Job requires division?
divide
job.js} + AI -->|Yes| AJ[Create sub-jobs
_divide
job.js] + AI -->|No| L + AJ --> AK[Process sub-jobs
manager.js] + AK --> L + + L{Is it an IPC job?
instanceof IPCJob
job.js} + L -->|Yes| M[Create ResourceFaçade
ResourceFaçade.constructor
resource.js] + L -->|No| N[Run locally
runLocally
manager.js] + + M --> O[Fork executor.js
cp.fork
resource.js] + O --> P[Initialize Resource
Resource.constructor
resource.js] + P --> Q[Run job in Resource
run
resource.js] + + N --> R[Run job in current process
_run
job.js] + + Q --> AL{Resource check needed?
resource.js} + AL -->|Yes| AM[Check resource activity
checkActive
resource.js] + AL -->|No| S + AM -->|Active| Q + AM -->|Inactive| AN[Close resource
close
resource.js] + AN --> S + + R --> AO{Job aborted?
_abort
job.js} + AO -->|Yes| AP[Handle abortion
_finish
job.js] + AO -->|No| S + AP --> T + + S[Job completes
_finish
job.js] + + S --> T[Update job status
_save
job.js] + T --> U{Retry needed?
retryPolicy.run
retry.js} + U -->|Yes| V[Delay and retry
delay
retry.js] + U -->|No| G + V --> K + + I --> W{Job request received?
handle.js} + W -->|Yes| X[Create job
job
handle.js] + W -->|No| I + + X --> Y{Schedule type?
handle.js} + Y -->|Now| Z[Run immediately
now
handle.js] + Y -->|Once| AA[Schedule for later
once
handle.js] + Y -->|In| AB[Schedule after delay
in
handle.js] + Y -->|Custom Schedule| AC[Set custom schedule
schedule
handle.js] + + Z --> AD[Save job
_save
job.js] + AA --> AD + AB --> AD + AC --> AD + + AD --> AE{Is it transient?
handle.js} + AE -->|Yes| AF[Run transient job
runTransient
handle.js] + AE -->|No| AG[Send to Manager if needed
ipc.send
ipc.js] + + AF --> AH[Process in IPC channel
IdChannel.on
ipc.js] + AG --> I + AH --> I + + Q --> AQ{IPC communication needed?
resource.js} + AQ -->|Yes| AR[Send IPC message
channel.send
ipc.js] + AQ -->|No| S + AR --> AS[Receive IPC message
channel.on
ipc.js] + AS --> Q + + R --> AT{Progress update?
job.js} + AT -->|Yes| AU[Send progress
_save
job.js] + AT -->|No| R + AU --> R + + G --> AV{Job cancelled or paused?
manager.js} + AV -->|Yes| AW[Update job status
_save
job.js] + AV -->|No| G + AW --> G + + P --> AX[Open resource
open
resource.js] + AX --> AY{Resource opened successfully?
resource.js} + AY -->|Yes| Q + AY -->|No| AZ[Handle resource error
resource.js] + AZ --> S + + +``` \ No newline at end of file diff --git a/api/parts/jobs/runner.js b/api/parts/jobs/runner.js index 93edee66693..6d305d3c69b 100644 --- a/api/parts/jobs/runner.js +++ b/api/parts/jobs/runner.js @@ -23,7 +23,7 @@ let leader, // leader doc */ function setup() { if (!collection) { - common.db.createCollection(COLLECTION, (err, col) => { + common.db.createCollection(COLLECTION, (err) => { if (err) { log.d('collection exists'); collection = common.db.collection(COLLECTION); @@ -40,7 +40,7 @@ function setup() { }); } else { - collection = col; + collection = common.db.collection(COLLECTION); setImmediate(periodic); } }); @@ -406,4 +406,4 @@ module.exports = { // console.log('resolving'); // return new Promise(res => setTimeout(res, 10000 * Math.random())); // } -// }); \ No newline at end of file +// }); diff --git a/api/utils/requestProcessor.js b/api/utils/requestProcessor.js index d4786939a2e..17099466dad 100755 --- a/api/utils/requestProcessor.js +++ b/api/utils/requestProcessor.js @@ -7,7 +7,7 @@ const Promise = require('bluebird'); const url = require('url'); const common = require('./common.js'); const countlyCommon = require('../lib/countly.common.js'); -const { validateAppAdmin, validateUser, validateRead, validateUserForRead, validateUserForWrite, validateGlobalAdmin, dbUserHasAccessToCollection, validateUpdate, validateDelete, validateCreate } = require('./rights.js'); +const { validateAppAdmin, validateUser, validateRead, validateUserForRead, validateUserForWrite, validateGlobalAdmin, dbUserHasAccessToCollection, validateUpdate, validateDelete, validateCreate, getBaseAppFilter } = require('./rights.js'); const authorize = require('./authorizer.js'); const taskmanager = require('./taskmanager.js'); const plugins = require('../../plugins/pluginManager.js'); @@ -828,17 +828,6 @@ const processRequest = (params) => { }); }); break; - case 'stop': - validateUserForWrite(params, () => { - taskmanager.stopTask({ - db: common.db, - id: params.qstring.task_id, - op_id: params.qstring.op_id - }, (err, res) => { - common.returnMessage(params, 200, res); - }); - }); - break; case 'delete': validateUserForWrite(params, () => { taskmanager.deleteResult({ @@ -1011,44 +1000,46 @@ const processRequest = (params) => { catch (SyntaxError) { update_array.overview = []; console.log('Parse ' + params.qstring.event_overview + ' JSON failed', params.req.url, params.req.body); } - if (update_array.overview && Array.isArray(update_array.overview) && update_array.overview.length > 12) { - common.returnMessage(params, 400, "You can't add more than 12 items in overview"); - return; - } - //sanitize overview - var allowedEventKeys = event.list; - var allowedProperties = ['dur', 'sum', 'count']; - var propertyNames = { - 'dur': 'Dur', - 'sum': 'Sum', - 'count': 'Count' - }; - for (let i = 0; i < update_array.overview.length; i++) { - update_array.overview[i].order = i; - update_array.overview[i].eventKey = update_array.overview[i].eventKey || ""; - update_array.overview[i].eventProperty = update_array.overview[i].eventProperty || ""; - if (allowedEventKeys.indexOf(update_array.overview[i].eventKey) === -1 || allowedProperties.indexOf(update_array.overview[i].eventProperty) === -1) { - update_array.overview.splice(i, 1); - i = i - 1; - } - else { - update_array.overview[i].is_event_group = (typeof update_array.overview[i].is_event_group === 'boolean' && update_array.overview[i].is_event_group) || false; - update_array.overview[i].eventName = update_array.overview[i].eventName || update_array.overview[i].eventKey; - update_array.overview[i].propertyName = propertyNames[update_array.overview[i].eventProperty]; - } - } - //check for duplicates - var overview_map = Object.create(null); - for (let p = 0; p < update_array.overview.length; p++) { - if (!overview_map[update_array.overview[p].eventKey]) { - overview_map[update_array.overview[p].eventKey] = {}; + if (update_array.overview && Array.isArray(update_array.overview)) { + if (update_array.overview.length > 12) { + common.returnMessage(params, 400, "You can't add more than 12 items in overview"); + return; } - if (!overview_map[update_array.overview[p].eventKey][update_array.overview[p].eventProperty]) { - overview_map[update_array.overview[p].eventKey][update_array.overview[p].eventProperty] = 1; + //sanitize overview + var allowedEventKeys = event.list; + var allowedProperties = ['dur', 'sum', 'count']; + var propertyNames = { + 'dur': 'Dur', + 'sum': 'Sum', + 'count': 'Count' + }; + for (let i = 0; i < update_array.overview.length; i++) { + update_array.overview[i].order = i; + update_array.overview[i].eventKey = update_array.overview[i].eventKey || ""; + update_array.overview[i].eventProperty = update_array.overview[i].eventProperty || ""; + if (allowedEventKeys.indexOf(update_array.overview[i].eventKey) === -1 || allowedProperties.indexOf(update_array.overview[i].eventProperty) === -1) { + update_array.overview.splice(i, 1); + i = i - 1; + } + else { + update_array.overview[i].is_event_group = (typeof update_array.overview[i].is_event_group === 'boolean' && update_array.overview[i].is_event_group) || false; + update_array.overview[i].eventName = update_array.overview[i].eventName || update_array.overview[i].eventKey; + update_array.overview[i].propertyName = propertyNames[update_array.overview[i].eventProperty]; + } } - else { - update_array.overview.splice(p, 1); - p = p - 1; + //check for duplicates + var overview_map = Object.create(null); + for (let p = 0; p < update_array.overview.length; p++) { + if (!overview_map[update_array.overview[p].eventKey]) { + overview_map[update_array.overview[p].eventKey] = {}; + } + if (!overview_map[update_array.overview[p].eventKey][update_array.overview[p].eventProperty]) { + overview_map[update_array.overview[p].eventKey][update_array.overview[p].eventProperty] = 1; + } + else { + update_array.overview.splice(p, 1); + p = p - 1; + } } } } @@ -1181,7 +1172,7 @@ const processRequest = (params) => { return new Promise(function(resolve) { var collectionNameWoPrefix = common.crypto.createHash('sha1').update(obj.key + params.qstring.app_id).digest('hex'); //removes all document for current segment - common.db.collection("events" + collectionNameWoPrefix).remove({"s": {$in: obj.list}}, {multi: true}, function(err3) { + common.db.collection("events_data").remove({"_id": {"$regex": ("^" + params.qstring.app_id + "_" + collectionNameWoPrefix + "_.*")}, "s": {$in: obj.list}}, {multi: true}, function(err3) { if (err3) { console.log(err3); } @@ -1196,7 +1187,7 @@ const processRequest = (params) => { unsetUs["meta_v2." + obj.list[p]] = ""; } //clears out meta data for segments - common.db.collection("events" + collectionNameWoPrefix).update({$or: my_query}, {$unset: unsetUs}, {multi: true}, function(err4) { + common.db.collection("events_data").update({"_id": {"$regex": ("^" + params.qstring.app_id + "_" + collectionNameWoPrefix + "_.*")}, $or: my_query}, {$unset: unsetUs}, {multi: true}, function(err4) { if (err4) { console.log(err4); } @@ -1240,7 +1231,6 @@ const processRequest = (params) => { else { resolve(); } - }); } else { @@ -2129,7 +2119,7 @@ const processRequest = (params) => { } dbUserHasAccessToCollection(params, params.qstring.collection, (hasAccess) => { - if (hasAccess) { + if (hasAccess || (params.qstring.db === "countly_drill" && params.qstring.collection === "drill_events") || (params.qstring.db === "countly" && params.qstring.collection === "events_data")) { var dbs = { countly: common.db, countly_drill: common.drillDb, countly_out: common.outDb, countly_fs: countlyFs.gridfs.getHandler() }; var db = ""; if (params.qstring.db && dbs[params.qstring.db]) { @@ -2138,6 +2128,23 @@ const processRequest = (params) => { else { db = common.db; } + if (!params.member.global_admin && params.qstring.collection === "drill_events" || params.qstring.collection === "events_data") { + var base_filter = getBaseAppFilter(params.member, params.qstring.db, params.qstring.collection); + if (base_filter && Object.keys(base_filter).length > 0) { + params.qstring.query = params.qstring.query || {}; + for (var key in base_filter) { + if (params.qstring.query[key]) { + params.qstring.query.$and = params.qstring.query.$and || []; + params.qstring.query.$and.push({[key]: base_filter[key]}); + params.qstring.query.$and.push({[key]: params.qstring.query[key]}); + delete params.qstring.query[key]; + } + else { + params.qstring.query[key] = base_filter[key]; + } + } + } + } countlyApi.data.exports.fromDatabase({ db: db, params: params, @@ -3657,7 +3664,7 @@ const restartRequest = (params, initiator, done, try_times, fail) => { */ function processUser(params, initiator, done, try_times) { return new Promise((resolve) => { - if (!params.app_user.uid) { + if (params && params.app_user && !params.app_user.uid) { //first time we see this user, we need to id him with uid countlyApi.mgmt.appUsers.getUid(params.app_id, function(err, uid) { plugins.dispatch("/i/app_users/create", { @@ -3716,7 +3723,7 @@ function processUser(params, initiator, done, try_times) { }); } //check if device id was changed - else if (params.qstring.old_device_id && params.qstring.old_device_id !== params.qstring.device_id) { + else if (params && params.qstring && params.qstring.old_device_id && params.qstring.old_device_id !== params.qstring.device_id) { const old_id = common.crypto.createHash('sha1') .update(params.qstring.app_key + params.qstring.old_device_id + "") .digest('hex'); diff --git a/api/utils/rights.js b/api/utils/rights.js index 6cb3024ff6e..5d431752865 100644 --- a/api/utils/rights.js +++ b/api/utils/rights.js @@ -1083,7 +1083,32 @@ function validateWrite(params, feature, accessType, callback, callbackParam) { }); }); } - +/** + * Creates filter object to filter by member allowed collections + * @param {object} member - members object from params + * @param {string} dbName - database name as string + * @param {string} collectionName - collection Name + * @returns {object} filter object + */ +exports.getBaseAppFilter = function(member, dbName, collectionName) { + var base_filter = {}; + var apps = exports.getUserApps(member); + if (dbName === "countly_drill" && collectionName === "drill_events") { + if (Array.isArray(apps) && apps.length > 0) { + base_filter.a = {"$in": apps}; + } + } + else if (dbName === "countly" && collectionName === "events_data") { + var in_array = []; + if (Array.isArray(apps) && apps.length > 0) { + for (var i = 0; i < apps.length; i++) { + in_array.push(new RegExp("^" + apps[i] + "_.*")); + } + base_filter = {"_id": {"$in": in_array}}; + } + } + return base_filter; +}; /** * Validate user for create access by api_key for provided app_id (both required parameters for the request). * @param {params} params - {@link params} object diff --git a/api/utils/taskmanager.js b/api/utils/taskmanager.js index d5d703499c8..0233bd96081 100644 --- a/api/utils/taskmanager.js +++ b/api/utils/taskmanager.js @@ -58,7 +58,7 @@ const log = require('./log.js')('core:taskmanager'); * }, outputData:function(err, data){ * common.returnOutput(params, data); * } -* })); +* })); */ taskmanager.longtask = function(options) { options.db = options.db || common.db; @@ -66,80 +66,6 @@ taskmanager.longtask = function(options) { var start = new Date().getTime(); var timeout; - var saveOpId = async function(comment_id, retryCount) { - common.db.admin().command({ currentOp: 1 }, async function(error, result) { - if (error) { - log.d(error); - return; - } - else { - if (result && result.inprog) { - for (var i = 0; i < result.inprog.length; i++) { - let op = result.inprog[i]; - if (!('$truncated' in op.command) && (i !== result.inprog.length - 1)) { - continue; - } - if (!('$truncated' in op.command) && (i === result.inprog.length - 1)) { - if (retryCount < 3) { - setTimeout(() => saveOpId(comment_id, (++retryCount)), 500); - return; - } - else { - log.d(`operation not found for task:${options.id} comment: ${comment_id}`); - break; - } - } - - let comment_position = op.command.$truncated.indexOf('$comment'); - if (comment_position === -1) { - continue; - } - - let substr = op.command.$truncated.substring(comment_position, op.command.$truncated.length) || ""; - var comment_val = ""; - substr = substr.match(/"(.*?)"/); - if (substr && Array.isArray(substr)) { - comment_val = substr[1]; - } - - if (comment_val === comment_id) { - var task_id = options.id; - var op_id = op.opid; - await common.db.collection("long_tasks").findOneAndUpdate({ _id: common.db.ObjectID(task_id) }, { $set: { op_id: op_id } }); - log.d(`Operation found task: ${task_id} op:${op_id} comment: ${comment_id}`); - break; - } - else if ((comment_val !== comment_id) && (i === (result.inprog.length - 1))) { - if (retryCount < 3) { - setTimeout(() => saveOpId(comment_id, (++retryCount)), 500); - break; - } - else { - log.d(`operation not found for task:${options.id} comment: ${comment_id}`); - break; - } - } - } - } - } - }); - }; - - if (options.comment_id) { - var retryCount = 0; - try { - saveOpId(options.comment_id, retryCount); - } - catch (err) { - if (retryCount < 3) { - setTimeout(() =>saveOpId(options.comment_id, ++retryCount), 500); - } - else { - console.log(err); - } - } - } - /** switching to long task */ function switchToLongTask() { timeout = null; @@ -298,9 +224,6 @@ taskmanager.createTask = function(options, callback) { update.subtask_key = options.subtask_key || ""; update.taskgroup = options.taskgroup || false; update.linked_to = options.linked_to; - if (options.comment_id) { - update.comment_id = options.comment_id; - } if (options.subtask && options.subtask !== "") { update.subtask = options.subtask; var updateSub = {$set: {}}; @@ -323,6 +246,89 @@ taskmanager.createTask = function(options, callback) { } }; +var checkIfAllRulesMatch = function(rules, data) { + var match = true; + for (var key in rules) { + if (data[key]) { + if (rules[key] === data[key]) { + continue; + } + else { + if (typeof rules[key] === "object") { + if (!checkIfAllRulesMatch(rules[key], data[key])) { + return false; + } + } + else { + if (data[key].$in) { + if (data[key].$in.indexOf(rules[key]) === -1) { + return false; + } + } + else if (data[key].$nin) { + if (data[key].$nin.indexOf(rules[key]) !== -1) { + return false; + } + } + else { + return false; + } + } + } + } + else { + return false; + } + } + return match; +}; + +taskmanager.markReportsDirtyBasedOnRule = function(options, callback) { + common.db.collection("long_tasks").find({ + autoRefresh: true, + }).toArray(function(err, tasks) { + var ids_to_mark_dirty = []; + if (err) { + log.e("Error while fetching tasks", err); + if (callback && typeof callback === "function") { + callback(); + } + return; + + } + tasks = tasks || []; + for (var z = 0; z < tasks.length; z++) { + try { + var req = JSON.parse(tasks[z].request); + if (checkIfAllRulesMatch(options.rules, req.json.queryObject)) { + ids_to_mark_dirty.push(tasks[z]._id); + } + } + catch (e) { + log.e(' got error while process task request parse', e); + } + + } + if (ids_to_mark_dirty.length > 0) { + common.db.collection("long_tasks").updateMany({_id: {$in: ids_to_mark_dirty}}, {$set: {dirty: new Date().getTime()}}, function(err3) { + if (err3) { + log.e("Error while updating reports", err3); + } + if (callback && typeof callback === "function") { + callback(); + } + }); + } + else { + if (callback && typeof callback === "function") { + callback(); + } + } + + + }); + +}; /** * Save result from the task * @param {object} options - options for the task @@ -388,6 +394,9 @@ taskmanager.saveResult = function(options, data, callback) { options.db.collection("long_tasks").update({_id: options.subtask}, updateObj, {'upsert': false}, function() {}); } options.db.collection("long_tasks").findOne({_id: options.id}, function(error, task) { + if (task && task.dirty && task.dirty < task.start) { + update.dirty = false; + } if (options.gridfs || (task && task.gridfs)) { //let's store it in gridfs update.data = {}; @@ -886,6 +895,8 @@ taskmanager.errorResults = function(options, callback) { * @param {object} options - options for the task * @param {object} options.db - database connection * @param {string} options.id - id of the task result +* @param {boolean} options.autoUpdate - if auto update is needed or not +* @param {boolean} options.dirty - if dirty is true then it means some part of report is wrong. It should be regenerated fully. * @param {funciton} callback - callback for the result */ taskmanager.rerunTask = function(options, callback) { @@ -990,7 +1001,7 @@ taskmanager.rerunTask = function(options, callback) { reqData.json.period = JSON.stringify(reqData.json.period); } options.subtask = res.subtask; - reqData.json.autoUpdate = options.autoUpdate || false; + reqData.json.autoUpdate = ((!options.dirty) && (options.autoUpdate || false)); //If dirty set autoUpdate to false if (!reqData.json.api_key && res.creator) { options.db.collection("members").findOne({_id: common.db.ObjectID(res.creator)}, function(err1, member) { if (member && member.api_key) { @@ -1029,50 +1040,6 @@ taskmanager.rerunTask = function(options, callback) { }); }; -taskmanager.stopTask = function(options, callback) { - options.db = options.db || common.db; - - /** - * Stop task - * @param {object} op_id - operation id for mongo process - * @param {object} options1.db - database connection - * @param {string} options1.id - id of the task result - * @param {object} reqData - request data - * @param {funciton} callback1 - callback for the result - */ - function stopTask(op_id) { - common.db.admin().command({ killOp: 1, op: Number.parseInt(op_id) }, function(error, result) { - if (result.ok === 1) { - callback(null, "Success"); - } - else { - callback(null, "Operation could not be stopped"); - } - }); - } - - options.db.collection("long_tasks").findOne({ _id: options.id }, function(err, res) { - if (res) { - if (res.creator) { - options.db.collection("members").findOne({ _id: common.db.ObjectID(res.creator) }, function(err1, member) { - if (member) { - stopTask(res.op_id); - } - else { - callback(null, "No permission to stop this task"); - } - }); - } - else { - stopTask(res.op_id); - } - } - else { - callback(null, "Task does not exist"); - } - }); -}; - /** * Create a callback for getting result, including checking gridfs * @param {function} callback - callback for the result @@ -1113,4 +1080,4 @@ function getResult(callback, options) { } }; } -module.exports = taskmanager; \ No newline at end of file +module.exports = taskmanager; diff --git a/bin/scripts/data-reports/compare_drill_aggregated.js b/bin/scripts/data-reports/compare_drill_aggregated.js index 6a16af89c9d..c93529488e9 100644 --- a/bin/scripts/data-reports/compare_drill_aggregated.js +++ b/bin/scripts/data-reports/compare_drill_aggregated.js @@ -7,9 +7,12 @@ * node compare_drill_aggregated.js */ var period = "7days"; //Chose any of formats: "Xdays" ("7days","100days") or ["1-1-2024", "1-10-2024"], -var app_list = []; //List with apps +var app_list = []; //List with apps "" //Example var eventMap = {"6075f94b7e5e0d392902520c":["Logout","Login"],"6075f94b7e5e0d392902520d":["Logout","Login","Buy"]}; var eventMap = {}; //If left empty will run for all alls/events. + +var union_with_old_collection = true; //False if all sessions are stored in drill_events collection + var verbose = false; //true to show more output @@ -151,11 +154,40 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection(" } } if (haveAnything) { - console.log(" " + JSON.stringify(report)); + let aggCount = totals.c || 0; + let drillCount = drillData.totals.c || 0; + let percentageDiff = 0; + if (drillCount !== 0) { + percentageDiff = ((drillCount - aggCount) / drillCount) * 100; + } + else { + if (aggCount !== 0) { + // If drillCount is 0, and aggCount is not 0, show a large difference + percentageDiff = (aggCount > 0 ? 100 : -100); // 100% or -100% depending on the sign of aggCount + } + else { + percentageDiff = 0; // Both counts are 0, no difference + } + } + + console.log("----------------------------------------------"); + console.log("- Application name:", app.name); + console.log("- Event name:", event); + console.log("- Counts in Aggregated data:", aggCount); + console.log("- Counts in Drill data:", drillCount); + console.log("- Percentage difference between Drill data and Aggregated data:", percentageDiff.toFixed(2) + "%"); + console.log("----------------------------------------------"); endReport[app._id]["bad"]++; endReport[app._id]["events"] = endReport[app._id]["events"] || {}; - endReport[app._id]["events"][event] = {"e": event, report: report}; + endReport[app._id]["events"][event] = { + "e": event, + "aggregated_count": aggCount, + "drill_count": drillCount, + "percentage_difference": percentageDiff.toFixed(2), + "report": report + }; } + resolve2(); }); } @@ -164,6 +196,25 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection(" }).then(function() { console.log("Finished processing app: ", app.name); resolve(); + + //Complete CSV after processing the apps + console.log("\nSummary Report (CSV-like):"); + console.log("App,Event,Aggregated,Drill,% Difference"); + // var csvRows = ["App,Event,Aggregated,Drill,% Difference"]; + for (var appId in endReport) { + var appData = endReport[appId]; + var appName = appData.name; + if (appData.events) { + for (var event in appData.events) { + var eventData = appData.events[event]; + var row = `${appName},${event},${eventData.aggregated_count},${eventData.drill_count},${eventData.percentage_difference}`; + console.log(row); + //csvRows.push(row); + } + } + } + + }).catch(function(eee) { console.log("Error processing app: ", app.name); console.log(eee); @@ -207,14 +258,17 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection(" } endDate = endDate.valueOf() - endDate.utcOffset() * 60000; - let collection = "drill_events" + crypto.createHash('sha1').update(options.event + options.app_id).digest('hex'); - var query = {"ts": {"$gte": startDate, "$lt": endDate}}; - var pipeline = [ - {"$match": query}, - ]; + var query = {"ts": {"$gte": startDate, "$lt": endDate}, "a": options.app_id, "e": options.event}; + var pipeline = []; + pipeline.push({"$match": query}); + if (union_with_old_collection) { + let collection = "drill_events" + crypto.createHash('sha1').update(options.event + options.app_id).digest('hex'); + var query2 = {"ts": {"$gte": startDate, "$lt": endDate}}; + pipeline.push({"$unionWith": { "coll": collection, "pipeline": [{"$match": query2}] }}); + } pipeline.push({"$group": {"_id": "$d", "c": {"$sum": "$c"}, "s": {"$sum": "$s"}, "dur": {"$sum": "$dur"}}}); - options.drillDb.collection(collection).aggregate(pipeline, {"allowDiskUse": true}).toArray(function(err, data) { + options.drillDb.collection("drill_events").aggregate(pipeline, {"allowDiskUse": true}).toArray(function(err, data) { if (err) { console.log(err); } diff --git a/bin/scripts/expire-data/delete_custom_events_regex.js b/bin/scripts/expire-data/delete_custom_events_regex.js index b5e75b1fe2d..73d2f940a96 100755 --- a/bin/scripts/expire-data/delete_custom_events_regex.js +++ b/bin/scripts/expire-data/delete_custom_events_regex.js @@ -6,7 +6,6 @@ */ -const { ObjectId } = require('mongodb'); const pluginManager = require('../../../plugins/pluginManager.js'); const common = require('../../../api/utils/common.js'); const drillCommon = require('../../../plugins/drill/api/common.js'); @@ -25,7 +24,7 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection(" //GET APP try { - const app = await countlyDb.collection("apps").findOne({_id: ObjectId(APP_ID)}, {_id: 1, name: 1}); + const app = await countlyDb.collection("apps").findOne({_id: countlyDb.ObjectID(APP_ID)}, {_id: 1, name: 1}); console.log("App:", app.name); //GET EVENTS var events = []; @@ -51,6 +50,27 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection(" } ]).toArray(); events = events.length ? events[0].list : []; + const metaEvents = await drillDb.collection("drill_meta").aggregate([ + { + $match: { + 'app_id': app._id + "", + "type": "e", + "e": { $regex: regex, $options: CASE_INSENSITIVE ? "i" : "", $nin: events } + } + }, + { + $group: { + _id: "$e" + } + }, + { + $project: { + _id: 0, + e: "$_id" + } + } + ]).toArray(); + events = events.concat(metaEvents.map(e => e.e)); } catch (err) { close("Invalid regex"); @@ -86,6 +106,7 @@ Promise.all([pluginManager.dbConnection("countly"), pluginManager.dbConnection(" close(err); } + async function deleteDrillEvents(appId, events) { for (let i = 0; i < events.length; i++) { var collectionName = drillCommon.getCollectionName(events[i], appId); diff --git a/bin/scripts/member-managament/delete_old_members.js b/bin/scripts/member-managament/delete_old_members.js index 6dee1d7624d..a52639e79f9 100644 --- a/bin/scripts/member-managament/delete_old_members.js +++ b/bin/scripts/member-managament/delete_old_members.js @@ -44,7 +44,7 @@ Promise.all([pluginManager.dbConnection("countly")]).spread(function(countlyDb) Url: SERVER_URL + "/i/users/delete", body: { api_key: API_KEY, - args: JSON.stringify({user_ids: [(data._id + "")]}) + args: {user_ids: [data._id + ""]} } }, function(data) { if (data.err) { @@ -99,8 +99,7 @@ function sendRequest(params, callback) { const options = { uri: url.href, method: params.requestType, - json: true, - body: body, + json: body, strictSSL: false }; diff --git a/frontend/express/app.js b/frontend/express/app.js index 06f9b736362..6e846665354 100644 --- a/frontend/express/app.js +++ b/frontend/express/app.js @@ -603,6 +603,10 @@ Promise.all([plugins.dbConnection(countlyConfig), plugins.dbConnection("countly_ app.use(function(req, res, next) { var contentType = req.headers['content-type']; if (req.method.toLowerCase() === 'post' && contentType && contentType.indexOf('multipart/form-data') >= 0) { + if (!req.session?.uid || Date.now() > req.session?.expires) { + res.status(401).send('Unauthorized'); + return; + } var form = new formidable.IncomingForm(); form.uploadDir = __dirname + '/uploads'; form.parse(req, function(err, fields, files) { @@ -974,6 +978,7 @@ Promise.all([plugins.dbConnection(countlyConfig), plugins.dbConnection("countly_ timezones: timezones, countlyTypeName: COUNTLY_NAMED_TYPE, countlyTypeTrack: COUNTLY_TRACK_TYPE, + countlyTypeCE: COUNTLY_TYPE_CE, countly_tracking, countly_domain, frontend_app: versionInfo.frontend_app || 'e70ec21cbe19e799472dfaee0adb9223516d238f', diff --git a/frontend/express/public/core/app-management/javascripts/countly.views.js b/frontend/express/public/core/app-management/javascripts/countly.views.js index af100602e21..884926318c6 100755 --- a/frontend/express/public/core/app-management/javascripts/countly.views.js +++ b/frontend/express/public/core/app-management/javascripts/countly.views.js @@ -392,10 +392,10 @@ label: data.name }); self.$store.dispatch("countlyCommon/addToAllApps", data); + self.$store.dispatch("countlyCommon/updateActiveApp", data._id + ""); if (self.firstApp) { countlyCommon.ACTIVE_APP_ID = data._id + ""; app.onAppManagementSwitch(data._id + "", data && data.type || "mobile"); - self.$store.dispatch("countlyCommon/updateActiveApp", data._id + ""); app.initSidebar(); } self.firstApp = self.checkIfFirst(); @@ -849,6 +849,9 @@ return countlyGlobal.apps[key].plugins.consolidate.includes(self.selectedApp); } }) || []; + }, + handleCancelForm: function() { + CountlyHelpers.goTo({url: "/manage/apps"}); } }, mounted: function() { diff --git a/frontend/express/public/core/app-management/templates/app-management.html b/frontend/express/public/core/app-management/templates/app-management.html index 90f82ff8230..81707a3bedc 100644 --- a/frontend/express/public/core/app-management/templates/app-management.html +++ b/frontend/express/public/core/app-management/templates/app-management.html @@ -188,13 +188,22 @@

{{apps[selectedApp] -
+
{{i18n( newApp ? 'common.create' : 'common.apply')}} + @click="handleCancelForm" + data-test-id="create-new-app-cancel-button" + type="secondary" + > + {{i18n('common.cancel')}} +
+ {{i18n( newApp ? 'common.create' : 'common.apply')}} + +
diff --git a/frontend/express/public/core/app-version/templates/app-version.html b/frontend/express/public/core/app-version/templates/app-version.html index e6b198adb1b..4ea87e052ef 100644 --- a/frontend/express/public/core/app-version/templates/app-version.html +++ b/frontend/express/public/core/app-version/templates/app-version.html @@ -5,10 +5,7 @@ > diff --git a/frontend/express/public/core/carrier/templates/carrier.html b/frontend/express/public/core/carrier/templates/carrier.html index ce0108ff4ee..5fd82b2867d 100644 --- a/frontend/express/public/core/carrier/templates/carrier.html +++ b/frontend/express/public/core/carrier/templates/carrier.html @@ -6,10 +6,7 @@ > diff --git a/frontend/express/public/core/date-presets/templates/preset-management.html b/frontend/express/public/core/date-presets/templates/preset-management.html index da8f4d2b7af..278422a89a5 100755 --- a/frontend/express/public/core/date-presets/templates/preset-management.html +++ b/frontend/express/public/core/date-presets/templates/preset-management.html @@ -4,13 +4,13 @@ > - - - + + + + + + + + + diff --git a/frontend/express/public/core/device-and-type/javascripts/countly.views.js b/frontend/express/public/core/device-and-type/javascripts/countly.views.js index ab4e7279ace..561c67ced3d 100644 --- a/frontend/express/public/core/device-and-type/javascripts/countly.views.js +++ b/frontend/express/public/core/device-and-type/javascripts/countly.views.js @@ -443,6 +443,20 @@ var GridComponent = countlyVue.views.create({ } return val; }, + onWidgetCommand: function(event) { + if (event === 'add' || event === 'manage' || event === 'show') { + this.graphNotesHandleCommand(event); + return; + } + else if (event === 'zoom') { + this.triggerZoom(); + return; + } + else { + this.$emit('command', event); + return; + } + }, } }); diff --git a/frontend/express/public/core/device-and-type/templates/devices-and-types.html b/frontend/express/public/core/device-and-type/templates/devices-and-types.html index 1717de75acc..bfcbcc2229f 100644 --- a/frontend/express/public/core/device-and-type/templates/devices-and-types.html +++ b/frontend/express/public/core/device-and-type/templates/devices-and-types.html @@ -5,10 +5,7 @@ > diff --git a/plugins/dbviewer/api/api.js b/plugins/dbviewer/api/api.js index 5ea7f771fef..46c45a19425 100644 --- a/plugins/dbviewer/api/api.js +++ b/plugins/dbviewer/api/api.js @@ -5,19 +5,88 @@ var common = require('../../../api/utils/common.js'), countlyFs = require('../../../api/utils/countlyFs.js'), _ = require('underscore'), taskManager = require('../../../api/utils/taskmanager.js'), - { getCollectionName, dbUserHasAccessToCollection, dbLoadEventsData, validateUser, getUserApps, validateGlobalAdmin, hasReadRight } = require('../../../api/utils/rights.js'), + { getCollectionName, dbUserHasAccessToCollection, dbLoadEventsData, validateUser, getUserApps, validateGlobalAdmin, hasReadRight, getBaseAppFilter } = require('../../../api/utils/rights.js'), exported = {}; const { MongoInvalidArgumentError } = require('mongodb'); const { EJSON } = require('bson'); const FEATURE_NAME = 'dbviewer'; +const whiteListedAggregationStages = { + "$addFields": true, + "$bucket": true, + "$bucketAuto": true, + //"$changeStream": false, + //"$changeStreamSplitLargeEvents": false, + //"$collStats": false, + "$count": true, + //"$currentOp": false, + "$densify": true, + //"$documents": false + "$facet": true, + "$fill": true, + "$geoNear": true, + "$graphLookup": true, + "$group": true, + //"$indexStats": false, + "$limit": true, + //"$listLocalSessions": false + //"$listSampledQueries": false + //"$listSearchIndexes": false + //"$listSessions": false + //"$lookup": false + "$match": true, + //"$merge": false + //"$mergeCursors": false + //"$out": false + //"$planCacheStats": false, + "$project": true, + "$querySettings": true, + "$redact": true, + "$replaceRoot": true, + "$replaceWith": true, + "$sample": true, + "$search": true, + "$searchMeta": true, + "$set": true, + "$setWindowFields": true, + //"$sharedDataDistribution": false, + "$skip": true, + "$sort": true, + "$sortByCount": true, + //"$unionWith": false, + "$unset": true, + "$unwind": true, + "$vectorSearch": true //atlas specific +}; var spawn = require('child_process').spawn, child; + (function() { plugins.register("/permissions/features", function(ob) { ob.features.push(FEATURE_NAME); }); + /** + * Function removes not allowed aggregation stages from the pipeline + * @param {array} aggregation - current aggregation pipeline + * @returns {object} changes - object with information which operations were removed + */ + function escapeNotAllowedAggregationStages(aggregation) { + var changes = {}; + for (var z = 0; z < aggregation.length; z++) { + for (var key in aggregation[z]) { + if (!whiteListedAggregationStages[key]) { + changes[key] = true; + delete aggregation[z][key]; + } + } + if (Object.keys(aggregation[z]).length === 0) { + aggregation.splice(z, 1); + z--; + } + } + return changes; + } /** * @api {get} /o/db Access database @@ -179,6 +248,25 @@ var spawn = require('child_process').spawn, filter = {}; } + var base_filter = {}; + if (!params.member.global_admin) { + base_filter = getBaseAppFilter(params.member, dbNameOnParam, params.qstring.collection); + } + + if (base_filter && Object.keys(base_filter).length > 0) { + for (var key in base_filter) { + if (filter[key]) { + filter.$and = filter.$and || []; + filter.$and.push({[key]: base_filter[key]}); + filter.$and.push({[key]: filter[key]}); + delete filter[key]; + } + else { + filter[key] = base_filter[key]; + } + } + } + if (dbs[dbNameOnParam]) { try { var cursor = dbs[dbNameOnParam].collection(params.qstring.collection).find(filter, { projection }); @@ -191,6 +279,7 @@ var spawn = require('child_process').spawn, common.returnMessage(params, 400, "Invalid collection name: Collection names can not contain '$' or other invalid characters"); } else { + log.e(error); common.returnMessage(params, 500, "An unexpected error occurred."); } return false; @@ -291,7 +380,7 @@ var spawn = require('child_process').spawn, async.each(results, function(col, done) { if (col.collectionName.indexOf("system.indexes") === -1 && col.collectionName.indexOf("sessions_") === -1) { userHasAccess(params, col.collectionName, params.qstring.app_id, function(hasAccess) { - if (hasAccess) { + if (hasAccess || col.collectionName === "events_data" || col.collectionName === "drill_events") { ob = parseCollectionName(col.collectionName, lookup); db.collections[ob.pretty] = ob.name; } @@ -318,8 +407,9 @@ var spawn = require('child_process').spawn, * Get aggregated result by the parameter on the url * @param {string} collection - collection will be applied related query * @param {object} aggregation - aggregation object + * @param {object} changes - object referencing removed stages from pipeline * */ - function aggregate(collection, aggregation) { + function aggregate(collection, aggregation, changes) { if (params.qstring.iDisplayLength) { aggregation.push({ "$limit": parseInt(params.qstring.iDisplayLength) }); } @@ -339,6 +429,10 @@ var spawn = require('child_process').spawn, else if (collection === 'auth_tokens') { aggregation.splice(addProjectionAt, 0, {"$addFields": {"_id": "***redacted***"}}); } + else if ((collection === "events_data" || collection === "drill_events") && !params.member.global_admin) { + var base_filter = getBaseAppFilter(params.member, dbNameOnParam, params.qstring.collection); + aggregation.splice(0, 0, {"$match": base_filter}); + } // check task is already running? taskManager.checkIfRunning({ db: dbs[dbNameOnParam], @@ -375,7 +469,7 @@ var spawn = require('child_process').spawn, }, outputData: function(aggregationErr, result) { if (!aggregationErr) { - common.returnOutput(params, { sEcho: params.qstring.sEcho, iTotalRecords: 0, iTotalDisplayRecords: 0, "aaData": result }); + common.returnOutput(params, { sEcho: params.qstring.sEcho, iTotalRecords: 0, iTotalDisplayRecords: 0, "aaData": result, "removed": (changes || {}) }); } else { common.returnMessage(params, 500, aggregationErr); @@ -409,7 +503,12 @@ var spawn = require('child_process').spawn, if (appId) { if (hasReadRight(FEATURE_NAME, appId, parameters.member)) { - return dbUserHasAccessToCollection(parameters, collection, appId, callback); + if (collection === "events_data" || collection === "drill_events") { + return callback(true); + } + else { + return dbUserHasAccessToCollection(parameters, collection, appId, callback); + } } } else { @@ -485,10 +584,14 @@ var spawn = require('child_process').spawn, } else { userHasAccess(params, params.qstring.collection, function(hasAccess) { - if (hasAccess) { + if (hasAccess || params.qstring.collection === "events_data" || params.qstring.collection === "drill_events") { try { let aggregation = EJSON.parse(params.qstring.aggregation); - aggregate(params.qstring.collection, aggregation); + var changes = escapeNotAllowedAggregationStages(aggregation); + if (changes && Object.keys(changes).length > 0) { + log.d("Removed stages from pipeline: ", JSON.stringify(changes)); + } + aggregate(params.qstring.collection, aggregation, changes); } catch (e) { common.returnMessage(params, 500, 'Aggregation object is not valid.'); @@ -508,7 +611,7 @@ var spawn = require('child_process').spawn, } else { userHasAccess(params, params.qstring.collection, function(hasAccess) { - if (hasAccess) { + if (hasAccess || params.qstring.collection === "events_data" || params.qstring.collection === "drill_events") { dbGetCollection(); } else { diff --git a/plugins/dbviewer/frontend/public/javascripts/countly.views.js b/plugins/dbviewer/frontend/public/javascripts/countly.views.js index 2436521175b..4f2a661466e 100644 --- a/plugins/dbviewer/frontend/public/javascripts/countly.views.js +++ b/plugins/dbviewer/frontend/public/javascripts/countly.views.js @@ -534,6 +534,13 @@ if (res.aaData.length) { self.fields = Object.keys(map); } + if (res.removed && typeof res.removed === 'object' && Object.keys(res.removed).length > 0) { + self.removed = CV.i18n('dbviewer.removed-warning') + Object.keys(res.removed).join(", "); + + } + else { + self.removed = ""; + } } if (err) { var message = CV.i18n('dbviewer.server-error'); @@ -559,7 +566,7 @@ } }, updatePath: function(query) { - window.location.hash = "#/manage/db/aggregate/" + this.db + "/" + this.collection + "/" + query; + app.navigate("#/manage/db/aggregate/" + this.db + "/" + this.collection + "/" + query); }, getCollectionName: function() { var self = this; diff --git a/plugins/dbviewer/frontend/public/localization/dbviewer.properties b/plugins/dbviewer/frontend/public/localization/dbviewer.properties index 018c8d96649..eb102853b63 100644 --- a/plugins/dbviewer/frontend/public/localization/dbviewer.properties +++ b/plugins/dbviewer/frontend/public/localization/dbviewer.properties @@ -36,6 +36,7 @@ dbviewer.generate-aggregate-report= Generate aggregate report dbviewer.back-to-dbviewer = Back to DB Viewer dbviewer.invalid-pipeline = Invalid pipeline object, please check pipeline input. dbviewer.server-error = There was a server error. There might be more information in logs. +dbviewer.removed-warning = Some stages are removed from aggregation pipleine. Following stages are allowed only with global admin rights: dbviewer.not-found-data = Couldn't find any results dbviewer.execute-aggregation = Execute Aggregation on {0} dbviewer.prepare-new-aggregation = Prepare New Aggregation diff --git a/plugins/dbviewer/frontend/public/templates/aggregate.html b/plugins/dbviewer/frontend/public/templates/aggregate.html index 63fa391b593..56c711e00ab 100755 --- a/plugins/dbviewer/frontend/public/templates/aggregate.html +++ b/plugins/dbviewer/frontend/public/templates/aggregate.html @@ -25,6 +25,7 @@
+