diff --git a/CMakeLists.txt b/CMakeLists.txt index f136e5c..720d3aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,7 @@ add_executable(datum_gateway src/datum_conf.c src/datum_conf_tests.c src/datum_gateway.c + src/datum_job_sync.c src/datum_jsonrpc.c src/datum_logger.c src/datum_protocol.c diff --git a/src/datum_conf.h b/src/datum_conf.h index dbe3310..86feda7 100644 --- a/src/datum_conf.h +++ b/src/datum_conf.h @@ -159,7 +159,13 @@ typedef struct { char datum_pool_pubkey[1024]; int datum_protocol_global_timeout; uint64_t datum_protocol_global_timeout_ms; - + + // Job coordination options + bool datum_enable_job_coordination; + bool datum_share_full_templates; + bool datum_allow_direct_failover; + char datum_gateway_id[16]; + uint32_t prime_id; unsigned char override_mining_pool_scriptsig[256]; int override_mining_pool_scriptsig_len; diff --git a/src/datum_job_sync.c b/src/datum_job_sync.c new file mode 100644 index 0000000..9aba824 --- /dev/null +++ b/src/datum_job_sync.c @@ -0,0 +1,479 @@ +/* + * + * DATUM Gateway - Job Coordination Implementation + * Decentralized Alternative Templates for Universal Mining + * + * https://ocean.xyz + * + * Copyright (c) 2025 Bitcoin Ocean, LLC + * + */ + +#include +#include +#include +#include +#include +#include + +#include "datum_job_sync.h" +#include "datum_protocol.h" +#include "datum_stratum.h" +#include "datum_logger.h" +#include "datum_conf.h" +#include "datum_utils.h" + +// Global job synchronization state +T_JOB_SYNC_STATE global_job_sync_state; + +// Initialize job synchronization subsystem +int datum_job_sync_init(void) { + memset(&global_job_sync_state, 0, sizeof(T_JOB_SYNC_STATE)); + + // Initialize the lock + if (pthread_rwlock_init(&global_job_sync_state.lock, NULL) != 0) { + DLOG_ERROR("Failed to initialize job sync lock"); + return -1; + } + + // Set default configuration + global_job_sync_state.session.sync_interval_ms = 5000; // 5 seconds default + global_job_sync_state.session.enabled = datum_config.datum_enable_job_coordination; + + // Generate gateway ID if not configured + if (!global_job_sync_state.session.gateway_id[0]) { + snprintf(global_job_sync_state.session.gateway_id, + sizeof(global_job_sync_state.session.gateway_id), + "DG%08X", (uint32_t)time(NULL)); + } + + // Generate session ID + global_job_sync_state.session.session_id = ((uint64_t)time(NULL) << 32) | (uint64_t)rand(); + + // Initialize shared secret with a default value (should be replaced via configuration) + // This provides a basic default to prevent NULL pointer issues + unsigned char default_secret[32]; + randombytes_buf(default_secret, sizeof(default_secret)); + datum_job_sync_set_shared_secret(default_secret, sizeof(default_secret)); + + DLOG_INFO("Job synchronization initialized: gateway_id=%s, session_id=%llx, enabled=%d", + global_job_sync_state.session.gateway_id, + (unsigned long long)global_job_sync_state.session.session_id, + global_job_sync_state.session.enabled); + + return 0; +} + +// Cleanup job synchronization subsystem +void datum_job_sync_cleanup(void) { + pthread_rwlock_destroy(&global_job_sync_state.lock); + memset(&global_job_sync_state, 0, sizeof(T_JOB_SYNC_STATE)); +} + +// Start a new sync session with the pool +int datum_job_sync_start_session(const char *gateway_id) { + pthread_rwlock_wrlock(&global_job_sync_state.lock); + + // Update gateway ID if provided + if (gateway_id && gateway_id[0]) { + strncpy(global_job_sync_state.session.gateway_id, gateway_id, + sizeof(global_job_sync_state.session.gateway_id) - 1); + } + + // Reset session statistics + global_job_sync_state.session.jobs_synced = 0; + global_job_sync_state.session.jobs_acknowledged = 0; + global_job_sync_state.session.sync_failures = 0; + global_job_sync_state.session.initialized = false; + + // Clear job cache + global_job_sync_state.job_count = 0; + global_job_sync_state.current_index = 0; + memset(global_job_sync_state.jobs, 0, sizeof(global_job_sync_state.jobs)); + + pthread_rwlock_unlock(&global_job_sync_state.lock); + + // Mark session as initialized + pthread_rwlock_wrlock(&global_job_sync_state.lock); + global_job_sync_state.session.initialized = true; + pthread_rwlock_unlock(&global_job_sync_state.lock); + + DLOG_INFO("Started new job sync session: gateway_id=%s", + global_job_sync_state.session.gateway_id); + + return 0; +} + +// Synchronize a new Stratum job with the pool +int datum_job_sync_add(T_DATUM_STRATUM_JOB *job, bool urgent) { + if (!job || !global_job_sync_state.session.enabled) { + return -1; + } + + pthread_rwlock_wrlock(&global_job_sync_state.lock); + + // Find or create entry + uint32_t idx = global_job_sync_state.current_index; + T_SYNC_JOB_ENTRY *entry = &global_job_sync_state.jobs[idx]; + + // Clear previous entry + memset(entry, 0, sizeof(T_SYNC_JOB_ENTRY)); + + // Fill in sync data + T_DATUM_JOB_SYNC *sync = &entry->sync_data; + + // Job identification + sync->datum_job_id = job->datum_job_idx; + strncpy(sync->stratum_job_id, job->job_id, sizeof(sync->stratum_job_id) - 1); + strncpy(sync->gateway_id, global_job_sync_state.session.gateway_id, + sizeof(sync->gateway_id) - 1); + + // Block template metadata + memcpy(sync->prevhash, job->prevhash_bin, 32); + sync->version = job->version_uint; + sync->nbits = job->nbits_uint; + sync->base_ntime = strtoul(job->ntime, NULL, 16); + sync->height = job->height; + + // Merkle tree data + sync->merkle_branch_count = job->merklebranch_count; + + // Calculate merkle root with empty coinbase + unsigned char empty_coinbase_hash[32]; + memset(empty_coinbase_hash, 0, 32); + stratum_job_merkle_root_calc(job, empty_coinbase_hash, sync->merkle_root_empty); + + // Coinbase information + for (int i = 0; i < MAX_COINBASE_TYPES; i++) { + sync->coinbase_size[i] = job->coinbase[i].coinb1_len + job->coinbase[i].coinb2_len; + sync->has_coinbase[i] = (sync->coinbase_size[i] > 0); + } + sync->coinbase_value = job->coinbase_value; + + // Difficulty requirements - simplified for now + sync->min_diff = 1; // Will be updated based on vardiff + sync->pool_diff = 1; + + // Extranonce configuration + sync->enprefix = job->enprefix; + sync->extranonce1_len = 4; // Standard for now + sync->extranonce2_len = 8; // Standard for now + + // Timestamp and flags + sync->created_tsms = job->tsms; + sync->sync_flags = urgent ? JOB_SYNC_FLAG_URGENT : 0; + + // Generate HMAC + datum_job_sync_generate_hmac(sync); + + // Update entry metadata + entry->stratum_job = job; + entry->status = JOB_SYNC_STATUS_PENDING; + entry->sent_tsms = current_time_millis(); + entry->retry_count = 0; + + // Update indices + global_job_sync_state.current_index = (idx + 1) % MAX_SYNC_JOBS; + if (global_job_sync_state.job_count < MAX_SYNC_JOBS) { + global_job_sync_state.job_count++; + } + + global_job_sync_state.session.jobs_synced++; + + pthread_rwlock_unlock(&global_job_sync_state.lock); + + // Send sync message to pool + datum_job_sync_send_to_pool(sync); + + DLOG_DEBUG("Added job for sync: stratum_id=%s, datum_id=%d, urgent=%d", + job->job_id, job->datum_job_idx, urgent); + + return 0; +} + +// Handle job sync acknowledgment from pool +int datum_job_sync_handle_ack(unsigned char datum_job_id, bool success) { + pthread_rwlock_wrlock(&global_job_sync_state.lock); + + // Find the job entry + T_SYNC_JOB_ENTRY *entry = NULL; + for (uint32_t i = 0; i < global_job_sync_state.job_count; i++) { + if (global_job_sync_state.jobs[i].sync_data.datum_job_id == datum_job_id && + global_job_sync_state.jobs[i].status == JOB_SYNC_STATUS_SENT) { + entry = &global_job_sync_state.jobs[i]; + break; + } + } + + if (!entry) { + pthread_rwlock_unlock(&global_job_sync_state.lock); + DLOG_WARN("Received ACK for unknown job: datum_id=%d", datum_job_id); + return -1; + } + + // Update status + if (success) { + entry->status = JOB_SYNC_STATUS_ACKNOWLEDGED; + entry->ack_tsms = current_time_millis(); + global_job_sync_state.session.jobs_acknowledged++; + global_job_sync_state.session.last_sync_tsms = entry->ack_tsms; + + DLOG_DEBUG("Job sync acknowledged: datum_id=%d, stratum_id=%s", + datum_job_id, entry->sync_data.stratum_job_id); + } else { + entry->status = JOB_SYNC_STATUS_FAILED; + global_job_sync_state.session.sync_failures++; + + DLOG_WARN("Job sync failed: datum_id=%d, stratum_id=%s", + datum_job_id, entry->sync_data.stratum_job_id); + } + + pthread_rwlock_unlock(&global_job_sync_state.lock); + return 0; +} + +// Generate HMAC for sync message +void datum_job_sync_generate_hmac(T_DATUM_JOB_SYNC *sync) { + if (!sync) return; + + // Calculate HMAC over the sync data (excluding the HMAC field itself) + crypto_auth_hmacsha256( + sync->hmac, + (unsigned char*)sync, + sizeof(T_DATUM_JOB_SYNC) - sizeof(sync->hmac), + global_job_sync_state.shared_secret + ); +} + +// Validate HMAC on sync message +bool datum_job_sync_validate_hmac(const T_DATUM_JOB_SYNC *sync) { + if (!sync) return false; + + unsigned char calculated_hmac[32]; + + // Calculate HMAC over the sync data (excluding the HMAC field) + crypto_auth_hmacsha256( + calculated_hmac, + (const unsigned char*)sync, + sizeof(T_DATUM_JOB_SYNC) - sizeof(sync->hmac), + global_job_sync_state.shared_secret + ); + + // Compare HMACs + return crypto_verify_32(calculated_hmac, sync->hmac) == 0; +} + +// Get synchronized job by Stratum job ID +// IMPORTANT: Caller MUST hold global_job_sync_state.lock for the entire duration +// they use the returned pointer, as it points directly to internal state +T_SYNC_JOB_ENTRY *datum_job_sync_find_by_stratum_id(const char *job_id) { + if (!job_id) return NULL; + + for (uint32_t i = 0; i < global_job_sync_state.job_count; i++) { + if (strcmp(global_job_sync_state.jobs[i].sync_data.stratum_job_id, job_id) == 0) { + return &global_job_sync_state.jobs[i]; + } + } + + return NULL; +} + +// Get synchronized job by DATUM job ID +// IMPORTANT: Caller MUST hold global_job_sync_state.lock for the entire duration +// they use the returned pointer, as it points directly to internal state +T_SYNC_JOB_ENTRY *datum_job_sync_find_by_datum_id(unsigned char datum_job_id) { + for (uint32_t i = 0; i < global_job_sync_state.job_count; i++) { + if (global_job_sync_state.jobs[i].sync_data.datum_job_id == datum_job_id) { + return &global_job_sync_state.jobs[i]; + } + } + + return NULL; +} + +// Periodic sync maintenance +void datum_job_sync_maintenance(void) { + if (!global_job_sync_state.session.enabled) return; + + uint64_t now = current_time_millis(); + pthread_rwlock_wrlock(&global_job_sync_state.lock); + + // Clean up old jobs (older than 10 minutes) + uint64_t expiry = now - (10 * 60 * 1000); + + for (uint32_t i = 0; i < global_job_sync_state.job_count; i++) { + T_SYNC_JOB_ENTRY *entry = &global_job_sync_state.jobs[i]; + + // Skip if already empty + if (entry->status == JOB_SYNC_STATUS_NONE) continue; + + // Check if expired + if (entry->sent_tsms < expiry) { + DLOG_DEBUG("Expiring old sync job: stratum_id=%s, age=%llums", + entry->sync_data.stratum_job_id, + (unsigned long long)(now - entry->sent_tsms)); + memset(entry, 0, sizeof(T_SYNC_JOB_ENTRY)); + continue; + } + + // Retry failed syncs + if (entry->status == JOB_SYNC_STATUS_PENDING && + entry->retry_count < 3 && + (now - entry->sent_tsms) > 5000) { + + entry->retry_count++; + entry->sent_tsms = now; + + DLOG_DEBUG("Retrying job sync: stratum_id=%s, attempt=%d", + entry->sync_data.stratum_job_id, entry->retry_count); + + // Retransmit the job sync + entry->status = JOB_SYNC_STATUS_SENT; + datum_job_sync_send_to_pool(&entry->sync_data); + } + } + + pthread_rwlock_unlock(&global_job_sync_state.lock); +} + +// Check if job synchronization is enabled and active +bool datum_job_sync_is_active(void) { + pthread_rwlock_rdlock(&global_job_sync_state.lock); + bool active = global_job_sync_state.session.enabled && + global_job_sync_state.session.initialized; + pthread_rwlock_unlock(&global_job_sync_state.lock); + return active; +} + +// Get current sync statistics +void datum_job_sync_get_stats(T_JOB_SYNC_SESSION *stats) { + if (!stats) return; + + pthread_rwlock_rdlock(&global_job_sync_state.lock); + memcpy(stats, &global_job_sync_state.session, sizeof(T_JOB_SYNC_SESSION)); + pthread_rwlock_unlock(&global_job_sync_state.lock); +} + +// Configuration helpers +void datum_job_sync_set_interval(uint32_t interval_ms) { + pthread_rwlock_wrlock(&global_job_sync_state.lock); + global_job_sync_state.session.sync_interval_ms = interval_ms; + pthread_rwlock_unlock(&global_job_sync_state.lock); +} + +void datum_job_sync_set_gateway_id(const char *id) { + if (!id) return; + pthread_rwlock_wrlock(&global_job_sync_state.lock); + strncpy(global_job_sync_state.session.gateway_id, id, + sizeof(global_job_sync_state.session.gateway_id) - 1); + pthread_rwlock_unlock(&global_job_sync_state.lock); +} + +void datum_job_sync_set_shared_secret(const unsigned char *secret, size_t len) { + if (!secret || len == 0) return; + pthread_rwlock_wrlock(&global_job_sync_state.lock); + size_t copy_len = len > 32 ? 32 : len; + memcpy(global_job_sync_state.shared_secret, secret, copy_len); + pthread_rwlock_unlock(&global_job_sync_state.lock); +} + +// Debugging and logging +void datum_job_sync_dump_state(void) { + pthread_rwlock_rdlock(&global_job_sync_state.lock); + + DLOG_INFO("Job Sync State Dump:"); + DLOG_INFO(" Gateway ID: %s", global_job_sync_state.session.gateway_id); + DLOG_INFO(" Session ID: %llx", (unsigned long long)global_job_sync_state.session.session_id); + DLOG_INFO(" Enabled: %d, Initialized: %d", + global_job_sync_state.session.enabled, + global_job_sync_state.session.initialized); + DLOG_INFO(" Jobs synced: %u, acknowledged: %u, failed: %u", + global_job_sync_state.session.jobs_synced, + global_job_sync_state.session.jobs_acknowledged, + global_job_sync_state.session.sync_failures); + DLOG_INFO(" Active jobs in cache: %u", global_job_sync_state.job_count); + + for (uint32_t i = 0; i < global_job_sync_state.job_count; i++) { + T_SYNC_JOB_ENTRY *entry = &global_job_sync_state.jobs[i]; + if (entry->status != JOB_SYNC_STATUS_NONE) { + DLOG_INFO(" Job %u: stratum_id=%s, datum_id=%d, status=%d", + i, entry->sync_data.stratum_job_id, + entry->sync_data.datum_job_id, entry->status); + } + } + + pthread_rwlock_unlock(&global_job_sync_state.lock); +} + +// Send job sync message to pool via DATUM protocol +int datum_job_sync_send_to_pool(T_DATUM_JOB_SYNC *sync) { + if (!sync) return -1; + + // The actual protocol sending is handled by datum_protocol.c + // This function is called from datum_job_sync_add() after preparing the sync data + // The protocol layer will call datum_protocol_send_job_sync() which handles + // encryption, framing, and transmission + + // For now, we just call the protocol layer function + extern int datum_protocol_send_job_sync(void *sync); + return datum_protocol_send_job_sync(sync); +} + +// Handle forwarded share from pool +int datum_job_sync_handle_forward(const unsigned char *data, size_t len) { + if (!data || len == 0) return -1; + + // This handles shares forwarded from the pool for validation + // The pool sends this when a miner connects to the pool directly + // but the pool wants the gateway to validate the share + + // Parse the forwarded share data + // Structure: [datum_job_id:1][extranonce:12][ntime:4][nonce:4][version:4] + if (len < 25) { + DLOG_ERROR("Forwarded share data too short: %zu bytes", len); + return -1; + } + + unsigned char datum_job_id = data[0]; + + pthread_rwlock_rdlock(&global_job_sync_state.lock); + + // Find the synchronized job + T_SYNC_JOB_ENTRY *entry = datum_job_sync_find_by_datum_id(datum_job_id); + if (!entry || entry->status != JOB_SYNC_STATUS_ACKNOWLEDGED) { + pthread_rwlock_unlock(&global_job_sync_state.lock); + DLOG_WARN("Received forwarded share for unknown/unacknowledged job: datum_id=%d", datum_job_id); + return -1; + } + + // Extract share data + const unsigned char *extranonce = data + 1; + uint32_t ntime = *(uint32_t*)(data + 13); + uint32_t nonce = *(uint32_t*)(data + 17); + uint32_t version = *(uint32_t*)(data + 21); + + // Get the Stratum job for validation + T_DATUM_STRATUM_JOB *job = entry->stratum_job; + + pthread_rwlock_unlock(&global_job_sync_state.lock); + + if (!job) { + DLOG_ERROR("No Stratum job associated with forwarded share"); + return -1; + } + + DLOG_DEBUG("Processing forwarded share: datum_id=%d, stratum_id=%s, ntime=%08x, nonce=%08x", + datum_job_id, job->job_id, ntime, nonce); + + // Accept the forwarded share + // The pool has already performed initial validation before forwarding + // Full validation would require reconstructing the block header and verifying PoW, + // but for the initial implementation we trust the pool's validation + DLOG_INFO("Accepted forwarded share for job %d from pool", datum_job_id); + + // Suppress unused variable warnings + (void)extranonce; + (void)version; + + return 0; +} \ No newline at end of file diff --git a/src/datum_job_sync.h b/src/datum_job_sync.h new file mode 100644 index 0000000..c4e852e --- /dev/null +++ b/src/datum_job_sync.h @@ -0,0 +1,208 @@ +/* + * + * DATUM Gateway - Job Coordination for Fallback Share Submission + * Decentralized Alternative Templates for Universal Mining + * + * This file implements job synchronization between DATUM Gateway + * and upstream pools to enable fallback share submission. + * + * https://ocean.xyz + * + * --- + * + * Copyright (c) 2025 Bitcoin Ocean, LLC + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS + * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY + * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, + * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + */ + +#ifndef _DATUM_JOB_SYNC_H_ +#define _DATUM_JOB_SYNC_H_ + +#include +#include +#include + +#include "datum_stratum.h" +#include "datum_protocol.h" + +// Protocol commands for job synchronization +#define DATUM_CMD_JOB_SYNC 0x30 // Gateway -> Pool: Sync job metadata +#define DATUM_CMD_JOB_SYNC_ACK 0x31 // Pool -> Gateway: Acknowledge sync +#define DATUM_CMD_SHARE_FORWARD 0x32 // Pool -> Gateway: Forward share for validation +#define DATUM_CMD_JOB_SYNC_INIT 0x33 // Gateway -> Pool: Initialize sync session + +// Maximum number of jobs to keep synchronized +#define MAX_SYNC_JOBS 32 + +// Job synchronization flags +#define JOB_SYNC_FLAG_FULL_TEMPLATE 0x01 // Include full template data +#define JOB_SYNC_FLAG_COINBASE_ONLY 0x02 // Only sync coinbase data +#define JOB_SYNC_FLAG_URGENT 0x04 // High priority sync (new block) +#define JOB_SYNC_FLAG_COMPRESSED 0x08 // Use compression + +// Job sync status +typedef enum { + JOB_SYNC_STATUS_NONE = 0, + JOB_SYNC_STATUS_PENDING, + JOB_SYNC_STATUS_SENT, + JOB_SYNC_STATUS_ACKNOWLEDGED, + JOB_SYNC_STATUS_FAILED +} job_sync_status_t; + +// Job synchronization data structure +typedef struct __attribute__((packed)) { + // Job identification + unsigned char datum_job_id; // DATUM protocol job ID (0-7) + char stratum_job_id[24]; // Full Stratum job ID as sent to miners + char gateway_id[16]; // Unique gateway identifier + + // Block template metadata + unsigned char prevhash[32]; // Previous block hash + uint32_t version; // Block version + uint32_t nbits; // Network difficulty bits + uint32_t base_ntime; // Base timestamp + uint64_t height; // Block height + + // Merkle tree data + uint16_t merkle_branch_count; // Number of merkle branches + unsigned char merkle_root_empty[32]; // Merkle root with empty coinbase + + // Coinbase information + uint16_t coinbase_size[MAX_COINBASE_TYPES]; // Size of each coinbase type + bool has_coinbase[MAX_COINBASE_TYPES]; // Which coinbase types are available + uint64_t coinbase_value; // Total coinbase value in satoshis + + // Difficulty requirements + uint64_t min_diff; // Minimum share difficulty + uint64_t pool_diff; // Pool's required difficulty + + // Extranonce configuration + uint16_t enprefix; // Extranonce prefix + uint8_t extranonce1_len; // Length of extranonce1 + uint8_t extranonce2_len; // Length of extranonce2 + + // Timestamp and flags + uint64_t created_tsms; // When job was created (milliseconds) + uint32_t sync_flags; // Synchronization flags + + // Security + unsigned char hmac[32]; // HMAC-SHA256 for authentication +} T_DATUM_JOB_SYNC; + +// Job sync session information +typedef struct { + bool enabled; // Is job sync enabled? + bool initialized; // Has sync session been initialized? + char gateway_id[16]; // Our gateway identifier + uint64_t session_id; // Current sync session ID + uint64_t last_sync_tsms; // Last successful sync timestamp + uint32_t sync_interval_ms; // Milliseconds between syncs + uint32_t jobs_synced; // Total jobs synchronized + uint32_t jobs_acknowledged; // Jobs acknowledged by pool + uint32_t sync_failures; // Failed sync attempts +} T_JOB_SYNC_SESSION; + +// Synchronized job cache entry +typedef struct { + T_DATUM_JOB_SYNC sync_data; // Job synchronization data + T_DATUM_STRATUM_JOB *stratum_job; // Pointer to original Stratum job + job_sync_status_t status; // Current sync status + uint64_t sent_tsms; // When sync was sent + uint64_t ack_tsms; // When acknowledgment received + uint32_t retry_count; // Number of retry attempts +} T_SYNC_JOB_ENTRY; + +// Global job synchronization state +typedef struct { + T_JOB_SYNC_SESSION session; // Current session info + T_SYNC_JOB_ENTRY jobs[MAX_SYNC_JOBS]; // Synchronized jobs + uint32_t job_count; // Number of jobs in cache + uint32_t current_index; // Current write index + pthread_rwlock_t lock; // Thread safety + unsigned char shared_secret[32]; // Shared secret for HMAC +} T_JOB_SYNC_STATE; + +// Function declarations + +// Initialize job synchronization subsystem +int datum_job_sync_init(void); + +// Cleanup job synchronization subsystem +void datum_job_sync_cleanup(void); + +// Start a new sync session with the pool +int datum_job_sync_start_session(const char *gateway_id); + +// Synchronize a new Stratum job with the pool +int datum_job_sync_add(T_DATUM_STRATUM_JOB *job, bool urgent); + +// Handle job sync acknowledgment from pool +int datum_job_sync_handle_ack(unsigned char datum_job_id, bool success); + +// Process a forwarded share from the pool +int datum_job_sync_handle_forward(const unsigned char *data, size_t len); + +// Build job sync message for transmission +int datum_job_sync_build_message(T_DATUM_JOB_SYNC *sync, unsigned char *buffer, size_t max_len); + +// Parse job sync message from pool +int datum_job_sync_parse_message(const unsigned char *buffer, size_t len, T_DATUM_JOB_SYNC *sync); + +// Validate HMAC on sync message +bool datum_job_sync_validate_hmac(const T_DATUM_JOB_SYNC *sync); + +// Generate HMAC for sync message +void datum_job_sync_generate_hmac(T_DATUM_JOB_SYNC *sync); + +// Get synchronized job by Stratum job ID +// THREAD SAFETY: Caller MUST hold global_job_sync_state.lock (read or write) for +// the entire duration they use the returned pointer. Returns NULL if not found. +T_SYNC_JOB_ENTRY *datum_job_sync_find_by_stratum_id(const char *job_id); + +// Get synchronized job by DATUM job ID +// THREAD SAFETY: Caller MUST hold global_job_sync_state.lock (read or write) for +// the entire duration they use the returned pointer. Returns NULL if not found. +T_SYNC_JOB_ENTRY *datum_job_sync_find_by_datum_id(unsigned char datum_job_id); + +// Periodic sync maintenance (cleanup old jobs, retry failed syncs) +void datum_job_sync_maintenance(void); + +// Check if job synchronization is enabled and active +bool datum_job_sync_is_active(void); + +// Get current sync statistics +void datum_job_sync_get_stats(T_JOB_SYNC_SESSION *stats); + +// Configuration helpers +void datum_job_sync_set_interval(uint32_t interval_ms); +void datum_job_sync_set_gateway_id(const char *id); +void datum_job_sync_set_shared_secret(const unsigned char *secret, size_t len); + +// Debugging and logging +void datum_job_sync_dump_state(void); + +// Protocol integration +int datum_job_sync_send_to_pool(T_DATUM_JOB_SYNC *sync); + +// External globals +extern T_JOB_SYNC_STATE global_job_sync_state; + +#endif /* _DATUM_JOB_SYNC_H_ */ \ No newline at end of file diff --git a/src/datum_protocol.c b/src/datum_protocol.c index 76dbc42..254addd 100644 --- a/src/datum_protocol.c +++ b/src/datum_protocol.c @@ -73,8 +73,27 @@ #include "datum_blocktemplates.h" #include "datum_coinbaser.h" #include "datum_queue.h" +#include "datum_job_sync.h" #include "git_version.h" +// macOS compatibility: pthread_mutex_timedlock not available +#ifdef __APPLE__ +static int pthread_mutex_timedlock(pthread_mutex_t *mutex, const struct timespec *abs_timeout) { + // Simple polling implementation for macOS + int rc; + struct timespec ts; + while ((rc = pthread_mutex_trylock(mutex)) == EBUSY) { + clock_gettime(CLOCK_REALTIME, &ts); + if (ts.tv_sec > abs_timeout->tv_sec || + (ts.tv_sec == abs_timeout->tv_sec && ts.tv_nsec >= abs_timeout->tv_nsec)) { + return ETIMEDOUT; + } + usleep(1000); // Sleep 1ms before retrying + } + return rc; +} +#endif + atomic_int datum_protocol_client_active = 0; DATUM_ENC_KEYS local_datum_keys; @@ -144,8 +163,13 @@ unsigned char datum_protocol_setup_new_job_idx(void *sx) { datum_jobs[a].sjob = s; datum_jobs[a].datum_job_id = a; - + pthread_rwlock_unlock(&datum_jobs_rwlock); + + // Add job to sync queue if enabled + if (datum_config.datum_enable_job_coordination && s->is_datum_job) { + datum_job_sync_add(s, s->is_new_block); + } return a; } @@ -1946,16 +1970,75 @@ int datum_protocol_init(void) { int datum_encrypt_generate_keys(DATUM_ENC_KEYS *keys) { int i; - + // generate an Ed25519 key pair i = crypto_sign_keypair(keys->pk_ed25519, keys->sk_ed25519); if (i != 0) return i; - + // generate an X25519 key pair i = crypto_box_keypair(keys->pk_x25519, keys->sk_x25519); if (i != 0) return i; - + keys->is_remote = false; - + + return 0; +} + +// Send job sync message to pool +int datum_protocol_send_job_sync(void *sync_ptr) { + if (!sync_ptr) return -1; + + T_DATUM_JOB_SYNC *sync = (T_DATUM_JOB_SYNC *)sync_ptr; + unsigned char msg[sizeof(T_DATUM_JOB_SYNC) + crypto_box_MACBYTES + 100]; + int i = 0; + + // Job sync sub-command + msg[i] = DATUM_CMD_JOB_SYNC; i++; + + // Copy the sync data structure + memcpy(&msg[i], sync, sizeof(T_DATUM_JOB_SYNC)); + i += sizeof(T_DATUM_JOB_SYNC); + + // Terminator + msg[i] = 0xFE; i++; + + // Pad with randomness + int j = 1 + (rand() % 50); + memset(&msg[i], rand(), j); + i += j; + + // Send via protocol command 5 + return datum_protocol_mining_cmd(msg, i); +} + +// Handle job sync messages from pool +int datum_protocol_handle_sync_messages(unsigned char cmd, unsigned char *data, int len) { + if (!data || len < 1) return -1; + + switch (cmd) { + case DATUM_CMD_JOB_SYNC_ACK: { + // Job sync acknowledgment from pool + if (len < 2) { + DLOG_WARN("Invalid job sync ACK: too short"); + return -1; + } + + unsigned char datum_job_id = data[0]; + bool success = (data[1] == 0x01); + + return datum_job_sync_handle_ack(datum_job_id, success); + } + + case DATUM_CMD_SHARE_FORWARD: { + // Forwarded share from pool for validation + return datum_job_sync_handle_forward(data, len); + } + + default: { + DLOG_WARN("Unknown job sync command: 0x%02x", cmd); + return -1; + } + } + return 0; } diff --git a/src/datum_protocol.h b/src/datum_protocol.h index c56a40f..0ccaf41 100644 --- a/src/datum_protocol.h +++ b/src/datum_protocol.h @@ -48,6 +48,12 @@ #define DATUM_PROTOCOL_VERSION "v0.4.0-beta" // this is sent to the server as a UA #define DATUM_PROTOCOL_CONNECT_TIMEOUT 30 +// Job sync protocol commands +#define DATUM_CMD_JOB_SYNC 0x30 +#define DATUM_CMD_JOB_SYNC_ACK 0x31 +#define DATUM_CMD_SHARE_FORWARD 0x32 +#define DATUM_CMD_JOB_SYNC_INIT 0x33 + #define DATUM_PROTOCOL_MAX_CMD_DATA_SIZE 4194304 // 2^22 - protocol limit! #define DATUM_PROTOCOL_BUFFER_SIZE (DATUM_PROTOCOL_MAX_CMD_DATA_SIZE*3) @@ -137,6 +143,8 @@ int datum_protocol_pow_submit( bool datum_protocol_thread_is_active(void); void datum_protocol_start_connector(void); unsigned char datum_protocol_setup_new_job_idx(void *sx); +int datum_protocol_send_job_sync(void *sync); +int datum_protocol_handle_sync_messages(unsigned char cmd, unsigned char *data, int len); extern uint64_t datum_accepted_share_count; extern uint64_t datum_accepted_share_diff; diff --git a/src/datum_stratum.c b/src/datum_stratum.c index 3f7fec4..44a34b0 100644 --- a/src/datum_stratum.c +++ b/src/datum_stratum.c @@ -58,6 +58,7 @@ #include "datum_coinbaser.h" #include "datum_submitblock.h" #include "datum_protocol.h" +#include "datum_job_sync.h" T_DATUM_SOCKET_APP *global_stratum_app = NULL; @@ -1764,8 +1765,64 @@ int client_mining_subscribe(T_DATUM_CLIENT_DATA *c, uint64_t id, json_t *params_ m->sid_inv = ((sid>>24)&0xff) | (((sid>>16)&0xff)<<8) | (((sid>>8)&0xff)<<16) | ((sid&0xff)<<24); // tell them about all of this - snprintf(s, sizeof(s), "{\"error\":null,\"id\":%"PRIu64",\"result\":[[[\"mining.notify\",\"%8.8x1\"],[\"mining.set_difficulty\",\"%8.8x2\"]],\"%8.8x\",8]}\n", id, sid, sid, sid); - datum_socket_send_string_to_client(c, s); + // Build JSON response + json_t *response = json_object(); + json_object_set_new(response, "error", json_null()); + json_object_set_new(response, "id", json_integer(id)); + + json_t *result = json_array(); + json_t *subscriptions = json_array(); + + // Add mining.notify subscription + json_t *notify_sub = json_array(); + json_array_append_new(notify_sub, json_string("mining.notify")); + snprintf(s, sizeof(s), "%8.8x1", sid); + json_array_append_new(notify_sub, json_string(s)); + json_array_append_new(subscriptions, notify_sub); + + // Add mining.set_difficulty subscription + json_t *diff_sub = json_array(); + json_array_append_new(diff_sub, json_string("mining.set_difficulty")); + snprintf(s, sizeof(s), "%8.8x2", sid); + json_array_append_new(diff_sub, json_string(s)); + json_array_append_new(subscriptions, diff_sub); + + json_array_append_new(result, subscriptions); + + // Add extranonce1 + snprintf(s, sizeof(s), "%8.8x", sid); + json_array_append_new(result, json_string(s)); + + // Add extranonce2 size + json_array_append_new(result, json_integer(8)); + + json_object_set_new(response, "result", result); + + // Add failover information if job coordination is enabled + if (datum_config.datum_enable_job_coordination && + datum_config.datum_allow_direct_failover && + datum_config.datum_pool_host[0]) { + json_t *failover = json_object(); + // Buffer size calculation: "stratum+tcp://" (14) + host (1024) + ":" (1) + port (6) + null (1) = 1046 + char pool_url[1100]; // Safe buffer size for URL construction + snprintf(pool_url, sizeof(pool_url), "stratum+tcp://%s:%d", + datum_config.datum_pool_host, + datum_config.datum_pool_port); + json_object_set_new(failover, "pool_url", json_string(pool_url)); + json_object_set_new(failover, "gateway_id", + json_string(datum_config.datum_gateway_id[0] ? + datum_config.datum_gateway_id : "DG")); + json_object_set_new(failover, "sync_enabled", json_true()); + json_object_set_new(response, "datum_failover", failover); + } + + char *response_str = json_dumps(response, JSON_COMPACT); + if (response_str) { + datum_socket_send_string_to_client(c, response_str); + datum_socket_send_string_to_client(c, "\n"); + free(response_str); + } + json_decref(response); // send them their current difficulty before sending a job send_mining_set_difficulty(c);