/** * Eugene Marinelli * 18549 Heathnet * 3/19/08 * Aggregator - synthesizes data from various sources and stores in sqlite db. */ #include #include #include #include #include #include #include #include #include #include #include #define POLL_PERIOD 100000 // useconds static void sigint_handler(int a); static void write_body_db_entry(BodyStat stat); static int request_gps_data(GpsData* d, int gps_sock); static int connect_to_gps(void); static int connect_to_noded(void); static int request_node_data(short* pulseox_buf, int* pedometer_buf, int sock); static sqlite3* db_conn; static Time last_time; static int time_available, location_available, pulseox_available, ped_available; static int exercise_id; static int max_exercise_id_callback(void* a, int num_cols, char** results, char** colnames) { if (results[0] == NULL) { exercise_id = 0; } else { exercise_id = atoi(results[0]) + 1; } printf("Exercise ID: %d\n", exercise_id); return 0; } static void sigpipe_handler(int a) { fprintf(stderr, "Broken pipe (handled).\n"); } int main(int argc, char** argv) { if (argc != 2) { fprintf(stderr, "Usage: %s \n", argv[0]); return 1; } signal(SIGINT, sigint_handler); signal(SIGPIPE, sigpipe_handler); time_available = 0; location_available = 0; ped_available = 0; pulseox_available = 0; char* db_filename = argv[1]; printf("Saving records to db: %s\n", db_filename); // Initialize database. int ret = sqlite3_open(db_filename, &db_conn); if (ret != SQLITE_OK) { fprintf(stderr, "sqlite3_open failed: error code %d\n", ret); } sqlite3_busy_timeout(db_conn, 500); // Get exercise ID from db printf("Getting exercise ID...\n"); exercise_id = -1; char* err_buf; ret = sqlite3_exec(db_conn, "select max(Exercise_ID) from data_collection;", max_exercise_id_callback, (void*)NULL, &err_buf); if (ret != SQLITE_OK) { fprintf(stderr, "sqlite3_exec failed: %s\n", err_buf); sigint_handler(0); } int gps_sock = -1; int noded_sock = -1; while (1) { int got_gps_data = 0; GpsData gps_data; if (gps_sock != -1) { // Get GPS data. if (request_gps_data(&gps_data, gps_sock) != -1) { last_time = gps_data.time; got_gps_data = 1; } else { printf("Disconnected from GPS module.\n"); gps_sock = -1; } } else { // Try to connect. gps_sock = connect_to_gps(); if (gps_sock == -1) { printf("Warning: failed to connect to GPS.\n"); } } int got_node_data = 0; short pulseox = 0; int pedometer = 0; if (noded_sock != -1) { // Get node data. if (request_node_data(&pulseox, &pedometer, noded_sock) != -1) { printf("node_data: pulseox:%d ped:%d\n", pulseox, pedometer); got_node_data = 1; } else { printf("Disconnected from noded module.\n"); noded_sock = -1; } } else { noded_sock = connect_to_noded(); if (noded_sock == -1) { printf("Warning: failed to connect to noded.\n"); } } if (time_available) { // Create current stat value if there's anything new. BodyStat stat; memset(&stat, 0, sizeof(BodyStat)); stat.exercise_id = exercise_id; if (got_gps_data) { stat.time = gps_data.time; stat.location = gps_data.location; } if (got_node_data) { stat.pulseox = pulseox; stat.pedometer_steps = pedometer; } write_body_db_entry(stat); } usleep(POLL_PERIOD); } sqlite3_close(db_conn); return 0; } static int connect_to_gps() { struct sockaddr_un remote; remote.sun_family = AF_UNIX; int gps_sock; if ((gps_sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { perror("socket"); exit(1); } strcpy(remote.sun_path, GPS_ADDRESS); int len = strlen(remote.sun_path) + sizeof(remote.sun_family) + 1; if (connect(gps_sock, (struct sockaddr *)&remote, len) == -1) { perror("GPS connect"); return -1; } else { printf("Connected to GPS module.\n"); } return gps_sock; } static int connect_to_noded() { struct sockaddr_un remote; remote.sun_family = AF_UNIX; int noded_sock; if ((noded_sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) { perror("socket"); return -1; } strcpy(remote.sun_path, NODED_ADDRESS); int len = strlen(remote.sun_path) + sizeof(remote.sun_family) + 1; if (connect(noded_sock, (struct sockaddr *)&remote, len) == -1) { return -1; } else { printf("Connected to noded module.\n"); } return noded_sock; } static int request_gps_data(GpsData* result_buf, int sock) { char* req = "a"; if (send(sock, req, strlen(req), 0) == -1) { perror("gps send"); return -1; } int t; char buf[128]; if ((t = recv(sock, buf, 120, 0)) < 0) { printf("Lost connection to GPS module.\n"); perror("gps recv"); return -1; } buf[t] = 0; char data_flag = buf[0]; time_available = (data_flag >> 1) & 1; location_available = (data_flag >> 2) & 1; if (location_available && time_available) { sscanf(buf+1, "%6s,%lf,%f,%d,%f,%d", result_buf->time.date, &(result_buf->time.seconds), &(result_buf->location.latitude), (int*)&(result_buf->location.latitude_dir), &(result_buf->location.longitude), (int*)&(result_buf->location.longitude_dir)); printf("date: %s, seconds: %f, lat: %f, latdir: %d, lon: %f, londir: %d\n", result_buf->time.date, result_buf->time.seconds, result_buf->location.latitude, result_buf->location.latitude_dir, result_buf->location.longitude, result_buf->location.longitude_dir); } else if (location_available) { sscanf(buf+1, "%f,%d,%f,%d", &(result_buf->location.latitude), (int*)&(result_buf->location.latitude_dir), &(result_buf->location.longitude), (int*)&(result_buf->location.longitude_dir)); printf("lat: %f, latdir: %d, lon: %f, londir: %d\n", result_buf->location.latitude, result_buf->location.latitude_dir, result_buf->location.longitude, result_buf->location.longitude_dir); } else if (time_available) { sscanf(buf+1, "%6s,%lf", result_buf->time.date, &(result_buf->time.seconds)); printf("date: %s, seconds: %f\n", result_buf->time.date, result_buf->time.seconds); } return 0; } static int request_node_data(short* pulseox_buf, int* pedometer_buf, int sock) { char req[80]; if (time_available) { sprintf(req, "%d,%f,%s", exercise_id, last_time.seconds, last_time.date); } else { sprintf(req, "NO_TIME"); } if (send(sock, req, strlen(req), 0) == -1) { perror("noded send"); return -1; } int t; char buf[80]; if ((t = recv(sock, buf, 100, 0)) < 0) { printf("Lost connection to noded module.\n"); perror("recv"); return -1; } buf[t] = 0; sscanf(buf, "%hd,%d", pulseox_buf, pedometer_buf); pulseox_available = 1; ped_available = 1; // TODO fix return 0; } static void write_body_db_entry(BodyStat stat) { char lat[40], lon[40], latdir[20], londir[20], pulseox[20], pedometer[20]; if (!time_available) { fprintf(stderr, "Cannot write an entry without the time!\n"); return; } if (location_available) { sprintf(lat, "%f", stat.location.latitude); sprintf(lon, "%f", stat.location.longitude); sprintf(latdir, "%d", stat.location.latitude_dir); sprintf(londir, "%d", stat.location.longitude_dir); } else { sprintf(lat, "NULL"); sprintf(lon, "NULL"); sprintf(latdir, "NULL"); sprintf(londir, "NULL"); } if (pulseox_available) { sprintf(pulseox, "%d", stat.pulseox); } else { sprintf(pulseox, "NULL"); } if (ped_available) { sprintf(pedometer, "%d", stat.pedometer_steps); } else { sprintf(pedometer, "NULL"); } char query_buf[256]; sprintf(query_buf, "insert into data_collection values (%d,\"%s\",%f,%s,%s,%s,%s,%s,%s)", stat.exercise_id, stat.time.date, stat.time.seconds, lat, lon, latdir, londir, pedometer, pulseox); char* err_buf; int ret = sqlite3_exec(db_conn, query_buf, NULL, NULL, &err_buf); if (ret != SQLITE_OK) { fprintf(stderr, "sqlite3_exec failed: %s\n", err_buf); } } static void sigint_handler(int a) { if (db_conn != NULL) { printf("Closing db connection...\n"); sqlite3_close(db_conn); } exit(1); }