//
//  nzbfiles.cpp
//
//  This is higher-performance xmlpipe2 interface for the 'nzb' Sphinx index.
//  You should see something in the vicinity of a 5-10x performance increase
//  over the php implementation.  If you are serious about indexing your NZBs
//  for full-text searching, it is highly recommended that you use this over
//  the php version.

#include <cstdlib>
#include <cstdio>
#include <cstring>
#include <errno.h>
#include <mysql.h>
#include <pthread.h>
#include <list>
#include <regex.h>
#include <zlib.h>

using namespace std;

// 128KB buffer
#define BUFLEN 131072

// Error codes
#define USAGE_ERR 10
#define MYSQL_ERR 20

// Should keep roughly 1MB of record data in memory
//#define MAX_RECORDS 20000
#define MAX_RECORDS 20000000 // No limit

// Number of threads - set to the number of cores
#define NUM_THREADS 2

int record_count = 0;

bool threads_die = false;

// Holds data retreived from a row in the database
struct record {
    int  id;
    char guid[51];
};

char usage[]        = "nzbxmlpipe2 [host] [user] [pass] [db name]";
char baseFilename[] = "/var/newznab/nnplus/nzbfiles/%c/%s.nzb.gz";

// New plan: make a global queue to hold XXX number of record IDs and guids.
// Create Y number of threads and create a function that safely retreives
// values from the queue (wrap in a mutex).  The main thread updates the queue,
// never exceeding Z number of items and reports on the progress.

// A list to hold record ids and guid
list<record*> records;

// Mutex to lock access to the list
pthread_mutex_t recordsmutex;
pthread_mutex_t recordscountmutex;

record* getRecord()
{
    pthread_mutex_lock(&recordsmutex);
    record* rec = NULL;
    if (records.size() > 0) {
        rec = records.front();
        records.pop_front();
    }
    pthread_mutex_unlock(&recordsmutex);
    return rec;
}

void completedRecord()
{
    pthread_mutex_lock(&recordscountmutex);
    record_count++;
    pthread_mutex_unlock(&recordscountmutex);
}

/**
 * Searches ``string`` for ``patrn``.  If found, ``begin`` and ``end`` are set to the locations
 * within ``string`` where ``patrn`` was found.  If a match was made, the matched cstring is returned.
 */
char* regexp(char* string, regex_t* rgT, unsigned long int* begin, unsigned long int* end) {
    // TODO: Modify this to acccept a compile regex rather than a pattern
    int i, w = 0, len;                  
    char* word = NULL;
    regmatch_t match;
    if ((regexec(rgT, string, 1, &match, 0)) == 0) {
        *begin = (int)match.rm_so;
        *end = (int)match.rm_eo;
        len = *end - *begin;
        word = (char*)malloc(len+1);
        for (i = *begin; i < *end; i++) {
            word[w] = string[i];
            w++;
        }
        word[w] = 0;
    }
    //regfree(&rgT);
    return word;
}

/**
 * Connects to the MySQL server.  If the connection is successful, it is
 * returned, otherwise NULL is returned (and the error will be printed to)
 * stderr.
 */
MYSQL* connectToMySQL(const char* host, const char* user, const char* pass, const char* db)
{
    MYSQL* conn = mysql_init(NULL);
    if (!mysql_real_connect(conn, host, user, pass, db, 0, NULL, 0)) {
        fprintf(stderr, "%s\n", mysql_error(conn));
        return NULL;
    }
}

/**
 * The work done by an individual thread worker (consumer).  Reads record data
 * by calling ``getRecord`` and processes the associated NZB file. 
 */
void* threadWorker(void* t)
{
    // Thread id
    long tid = (long)t;
    
    // Buffer to hold decompressed gzip data
    char buf[BUFLEN];
    
    // Filename of the NZB
    char filename[512];
    
    // NZB regex that looks for the subject="..." attribute
    regex_t rgT;
    regcomp(&rgT, "subject[ ]*=[ ]*\"([^\"]+)\"", REG_EXTENDED);
    
    // Record pointer
    record* rec;
    
    // gzip file object
    gzFile in;
    
    // length of data read in by gzread.  If this is zero after a gzread op
    // then is means that there is no more data.
    int len;
    
    // Possible error code returned by gzread
    int err;
    
    // Number of files found in the NZB
    int count = 0;
    
    int count_records = 0;
    
    while (!threads_die) {
        // Get a record
        rec = getRecord();
        while(rec == NULL) {
            printf("Thread %f sleeping\n", tid);
            sleep(0.1);
            rec = getRecord();
        }
        
        // Starting a new NZB
        
        // Reset the data buffer
        *buf = '\0';
        
        // Reset the count of files
        count = 0;
        
        // baseFilename: /path/to/nzbs/%c/%s.nzb.gz
        sprintf(filename, (char*)baseFilename, rec->guid[0], rec->guid);
    
        in = gzopen(filename, "rb");
        if (in == NULL) {
            // Skip this file
            printf("%s: errorno: %d; couldn't open file '%s'\n", rec->guid, errno, filename);
            continue; 
        }
        
        while (true) {
            len = gzread(in, buf, sizeof(buf));
            if (len < 0) printf("%s", gzerror(in, &err));
            if (len == 0) break;

            char* buf2;;
            unsigned long int offset = 0;
            unsigned long int b, e;
            while (true) {
                buf2 = buf + offset;
                b = 0, e = 0;
                char* match = regexp(buf2, &rgT, &b, &e);
                if (e == 0) {
                    break;
                }
                free(match);
                offset += e;
                count++;
            }
        }
        
        completedRecord();
        count_records++;
        //printf("%s: found %d files '%s'\n", row[0], count, filename);
        
        // Close the gzip file handle
        gzclose(in);
        
        // Free the record
        delete rec;
    }
    printf("Thread %d returning\n", tid);
}

int main(int argc, const char * argv[])
{
    pthread_mutex_t mutexsum;
    MYSQL_RES*      res;
    MYSQL_ROW       row;
    
    // Set errorno to zero so that we can check for an error later
    errno = 0;
    
    if (argc < 4) {
        printf("%s\n", usage);
        return USAGE_ERR;
    }
    
    // Initialize MySQL library
    if (mysql_library_init(0, NULL, NULL)) {
        printf("Could not initialize MySQL library\n");
        return MYSQL_ERR;
    }
    
    // Attempt to connect to the server
    MYSQL* conn = connectToMySQL(argv[1], argv[2], argv[3], argv[4]);
    if (!conn) {
        printf("Error connecting to MySQL\n");
        return MYSQL_ERR;
    }
    
    // Get the number of releases.  It's kind of counterintuitive, but a 0
    // return value means success here
    if (mysql_query(conn, "SELECT COUNT(ID) FROM releases")) {
        printf("%s\n", mysql_error(conn));
        return MYSQL_ERR;
    }
    
    res = mysql_use_result(conn);
    row = mysql_fetch_row(res);
    int total_records = atoi(row[0]);
    int count_records = 0;
    printf("Total records: %d\n", total_records);
    mysql_free_result(res);
    
    if (mysql_query(conn, "SELECT ID, guid FROM releases ORDER BY ID ASC")) {
        printf("%s\n", mysql_error(conn));
        return MYSQL_ERR;
    }
    
    // Create the threads
    pthread_t threads[NUM_THREADS];
    int rc;
    for (int t = 0; t < NUM_THREADS; t++) {
        printf("Creating thread %d\n", t);
        rc = pthread_create(&threads[t], NULL, threadWorker, (void*)t);
        if (rc) {
            printf("ERROR; return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }
    
    
    // TODO: Possibly break this down into chunks to reduce memory usage.
    //       For 1 million results, this will use up ~50MB RAM.
    res = mysql_use_result(conn);
    while ((row = mysql_fetch_row(res)) != NULL) {
        // Fetch all of the rows and put them into the records list
        
        // Don't allow more than MAX_RECORDS records to be stored
        while (records.size() > MAX_RECORDS) {
            //printf("MAX_RECORDS reached...sleeping...\n");
            sleep(0.1);
        }
        
        record* rec = new record;
        rec->id = atoi(row[0]);
        strcpy(rec->guid, row[1]);
        records.push_back(rec);
        
        if ((record_count % 100) == 0) {
            //fprintf(stdout, "Completed: %0.2f%% (%d/%d)\r", (float(count_records)/total_records *100), count_records, total_records);
            fprintf(stdout, "Completed %d records\r", record_count);
            fflush(stdout);
        }
    }
    
    /* close connection */
    mysql_free_result(res);
    mysql_close(conn);
    
    while (!records.empty()) {
        //record* rec = getRecord();
        //printf("%d: %s\n", rec->id, rec->guid);
        printf("Waiting for threads to finish\n");
        sleep(1);
    }
    
    return 1;
}


