#include "backend.h" #include "bus.h" #ifdef __linux__ #include #include #include #include #include "config.h" #include "timer.h" #include "mount.h" int data[3]={0}; #include "io.h" typedef struct { uint16_t year; uint8_t month; uint8_t day; } db_date_t; typedef struct { db_date_t date; off_t index; } db_metadata_t; typedef struct { db_date_t date; uint32_t micros; } db_data_t; static int need_flush=0; static pthread_t thread; int init_data_files(config_t* cfg) { char file[256]; FILE* f = NULL; sprintf(file, "%s/%s", config_get_string(cfg, "storage.location"), config_get_string(cfg, "storage.years")); f=fopen(file, "rb"); if(!f) { f=fopen(file, "w+"); if(!f) { fprintf(stderr, "Unable to init file '%s' : %s\n", file, strerror(errno)); return -1; } } fclose(f); sprintf(file, "%s/%s", config_get_string(cfg, "storage.location"), config_get_string(cfg, "storage.monthes")); f=fopen(file, "rb"); if(!f) { f=fopen(file, "w+"); if(!f) { fprintf(stderr, "Unable to init file '%s' : %s\n", file, strerror(errno)); return -1; } } fclose(f); sprintf(file, "%s/%s", config_get_string(cfg, "storage.location"), config_get_string(cfg, "storage.days")); f=fopen(file, "rb"); if(!f) { f=fopen(file, "w+"); if(!f) { fprintf(stderr, "Unable to init file '%s' : %s\n", file, strerror(errno)); return -1; } } fclose(f); sprintf(file, "%s/%s", config_get_string(cfg, "storage.location"), config_get_string(cfg, "storage.data")); f=fopen(file, "rb"); if(!f) { f=fopen(file, "w+"); if(!f) { fprintf(stderr, "Unable to init file '%s' : %s\n", file, strerror(errno)); return -1; } } fclose(f); sprintf(file, "%s", config_get_string(cfg, "cache.file")); f=fopen(file, "w+"); if(!f) { fprintf(stderr, "Unable to init file '%s' : %s\n", file, strerror(errno)); return -1; } fclose(f); return 0; } int do_pid_file(config_t* cfg) { const char* file = config_get_string(cfg, "misc.pidfile"); FILE* f; int x; f=fopen(file, "w+"); if(!f) { fprintf(stderr, "Erreur unable to open pid file '%s'\n", file); perror(" "); return -1; } x=getpid(); fprintf(f, "%d\n", x); fclose(f); return 0; } void usr1_handler(int signo) { if (signo == SIGUSR1) { need_flush=1; } } int backend_init(config_t* cfg, bus_t* bus, const char* config) { time(NULL); setup_io(); config_init(cfg, "config.cfg"); config_print(cfg); if(do_pid_file(cfg)) return -1; if(init_data_files(cfg)) return -1; if (signal(SIGUSR1, usr1_handler) == SIG_ERR) fprintf(stderr, "Error unable to catch signal : %s\n", strerror(errno)); if(config_get_int(cfg, "mount") && !is_mounted(config_get_string(cfg, "mount.dst"))) { if(mount_samba(cfg)) return -1; } bus_init_recv(bus, config_get_int(cfg, "pin.data"), config_get_int(cfg, "pin.clk"), config_get_int(cfg, "pin.ack")); init_read_pin(config_get_int(cfg, "pin.data"), config_get_int(cfg, "pin.clk"), config_get_int(cfg, "pin.ack")); #ifdef ARM pthread_create(&thread, NULL, bus_recv_thread, &bus); #endif return 0; } void get_date(uint64_t time, date_t* date) { struct tm t; time_t ti = time; t=*localtime(&ti); date->year = t.tm_year+1900; date->month = t.tm_mon+1; date->day = t.tm_mday; date->sec = t.tm_sec + t.tm_min*60 + t.tm_hour*3600; date->usec = time%1000000; } int insert_index_day(config_t *cfg, db_metadata_t* db) { char file[256]; sprintf(file, "%s/%s", config_get_string(cfg, "storage.location"), config_get_string(cfg, "storage.days")); FILE* f = fopen(file, "r+"); if(!f) { fprintf(stderr, "Erreur impossible d'ouvrir le fichier '%s' en ecriture : %s\n" , file, strerror(errno)); return -1; } fseek(f, 0, SEEK_END); fwrite(db, sizeof(db_metadata_t), 1, f); fclose(f); return 0; } int insert_index_month(config_t *cfg, db_metadata_t* _db) { char file[256]; FILE* f; db_metadata_t db = {0}; db.date.year=_db->date.year; db.date.month=_db->date.month; db.index=_db->index; sprintf(file, "%s/%s", config_get_string(cfg, "storage.location"), config_get_string(cfg, "storage.monthes")); f = fopen(file, "r+"); if(!f) { fprintf(stderr, "Erreur impossible d'ouvrir le fichier '%s' en ecriture : %s\n" , file, strerror(errno)); return -1; } fseek(f, 0, SEEK_END); fwrite(&db, sizeof(db_metadata_t), 1, f); fclose(f); return 0; } int insert_index_year(config_t *cfg, db_metadata_t* _db) { char file[256]; FILE* f; db_metadata_t db = {0}; db.date.year=_db->date.year; db.index=_db->index; sprintf(file, "%s/%s", config_get_string(cfg, "storage.location"), config_get_string(cfg, "storage.years")); f = fopen(file, "r+"); if(!f) { fprintf(stderr, "Erreur impossible d'ouvrir le fichier '%s' en ecriture : %s\n" , file, strerror(errno)); return -1; } fseek(f, 0, SEEK_END); fwrite(&db, sizeof(db_metadata_t), 1, f); fclose(f); return 0; } int get_last_index_date(config_t* cfg, db_data_t* md, off_t* iy, off_t* im, off_t* id) { FILE* f; char file[256]; md->micros=0; md->date.year=md->date.month=md->date.day=0; //data sprintf(file, "%s/%s", config_get_string(cfg, "storage.location"), config_get_string(cfg, "storage.data")); f=fopen(file, "rb"); if(!f) { fprintf(stderr, "Erreur impossible d'ouvrir le fichier '%s' en lecture : %s\n" , file, strerror(errno)); return -1; } fseek(f, -sizeof(db_data_t), SEEK_END); fread(md, sizeof(db_data_t), 1, f); fclose(f); //index Year sprintf(file, "%s/%s", config_get_string(cfg, "storage.location"), config_get_string(cfg, "storage.years")); f=fopen(file, "rb"); if(!f) { fprintf(stderr, "Erreur impossible d'ouvrir le fichier '%s' en lecture : %s\n" , file, strerror(errno)); return -1; } fseek(f, 0, SEEK_END); *im = ftell(f) / sizeof(db_metadata_t); fclose(f); //index month sprintf(file, "%s/%s", config_get_string(cfg, "storage.location"), config_get_string(cfg, "storage.monthes")); f=fopen(file, "rb"); if(!f) { fprintf(stderr, "Erreur impossible d'ouvrir le fichier '%s' en lecture : %s\n" , file, strerror(errno)); return -1; } fseek(f,0, SEEK_END); *im = ftell(f) / sizeof(db_metadata_t); fclose(f); //index day sprintf(file, "%s/%s", config_get_string(cfg, "storage.location"), config_get_string(cfg, "storage.days")); f=fopen(file, "rb"); if(!f) { fprintf(stderr, "Erreur impossible d'ouvrir le fichier '%s' en lecture : %s\n" , file, strerror(errno)); return -1; } fseek(f, 0, SEEK_END); *id = ftell(f) / sizeof(db_metadata_t); fclose(f); return 0; } int backend_flush(config_t* cfg) { db_data_t curr, last; off_t indexY, indexM, indexDay, indexData; char file[256]; sprintf(file, "%s/%s", config_get_string(cfg, "storage.location"), config_get_string(cfg, "storage.data")); get_last_index_date(cfg, &last, &indexY, &indexM, &indexDay); FILE* f = fopen(file, "wb"); if(!f) { fprintf(stderr, "Erreur impossible d'ouvrir le fichier '%s' en lecture : %s\n" , file, strerror(errno)); return -1; } fseek(f, 0, SEEK_END); indexData=ftell(f)/sizeof(db_data_t); FILE* in = fopen(config_get_string(cfg, "cache.file"), "rb"); if(!in) { fprintf(stderr, "Erreur impossible d'ouvrir le fichier '%s' en lecture : %s\n" , config_get_string(cfg, "cache.file"), strerror(errno)); return -1; } while( (fread(&curr, sizeof(db_data_t), 1, in))>0 ) { if(curr.date.day != last.date.day) { db_metadata_t t={0}; t.date=curr.date; t.index=indexData; indexDay++; insert_index_day(cfg, &t); last.date.day=curr.date.day; } if(curr.date.month != last.date.month) { db_metadata_t t={0}; t.date.year=curr.date.year; t.date.month=curr.date.month; t.index=indexDay-1; indexM++; insert_index_month(cfg, &t); last.date.month=curr.date.month; } if(curr.date.year != last.date.year) { db_metadata_t t={0}; t.date.year=curr.date.year; t.index=indexM-1; indexY++; insert_index_year(cfg, &t); last.date.year=curr.date.year; } fwrite(&curr, sizeof(db_data_t), 1, f); indexData++; } fprintf(stderr, "Sortie: %s\n", strerror(errno) ); fclose(f); fclose(in); f = fopen(config_get_string(cfg, "cache.file"), "w+"); fclose(f); return 0; } void backend_insert(config_t* cfg, uint64_t c) { db_data_t d; date_t dd; FILE* f = fopen(config_get_string(cfg, "cache.file"), "r+"); if(!f) { fprintf(stderr, "Cant open cache file '%s' : %s\n", config_get_string(cfg, "cache.file"), strerror(errno)); return; } get_date(c, &dd); d.date.year=dd.year; d.date.month=dd.month; d.date.day=dd.day; d.micros=dd.sec*1000 + dd.usec/1000; //printf("Insert: %d-%d-%d : %d:%d:%d.%d \n", dd.year, dd.month, dd.day, d.micros/3600000, (d.micros%3600000)/60000, d.micros%60000/1000, d.micros%1000); fseek(f, 0, SEEK_END); fwrite(&d, sizeof(db_data_t), 1, f); fclose(f); } void backend_loop(config_t* cfg, bus_t* bus) { uint64_t interval = config_get_int(cfg, "cache.flush_interval"); uint64_t last_flush = get_time_us()/1000000, current; int i=0, n=(28*12*2)*100; while(1) { uint32_t c; current=get_time_us()/1000000; if(current>= interval +last_flush || need_flush || i==n) { printf("Flushing\n"); last_flush=current; backend_flush(cfg); need_flush=0; if(i==n) return; } bus_read(bus, &c, 4); backend_insert(cfg, c); i++; } return; } #endif