/** * Eugene Marinelli * 4/9/08 * Node communications daemon. */ #include #include #include #include #include #include #include #include #include #include #include #include #define PEDOMETER_PACKET 's' #define EKG_PACKET 'e' #define PULSEOX_PACKET 'p' #define MAX_PACKET_LEN 256 #define MAX_STACK_SIZE 256 typedef struct { short length; unsigned char data[MAX_PACKET_LEN]; unsigned char checksum; } XbeePacket; typedef struct { enum {START, LENGTH, DATA, CHECKSUM} next_item; int bytes_remaining; unsigned char stack[MAX_STACK_SIZE]; int stacktop; XbeePacket pkt; } ParsingState; static void sigint_handler(int a); static void exit_handler(int servSock); static void* monitor_xbee(void* arg); static void handle_request(char* result_buf, char* req); static void write_ekg_db_entry(EkgStat stat); static sqlite3* db_conn; static int serial_fd; static int servSock; static pthread_mutex_t cur_data_lock; static int time_available; static Time base_real_time; static double last_time_update_time; static int pulseox_available; static short pulseox; static int pedometer_count; static int exercise_id; ParsingState parsing_state; int main(int argc, char** argv) { signal(SIGINT, sigint_handler); if (argc != 2) { fprintf(stderr, "Usage: %s \n", argv[0]); return 1; } char* db_filename = argv[1]; printf("Saving EKG records to db: %s\n", db_filename); // Initialize database. int ret; if ((ret = sqlite3_open(db_filename, &db_conn)) != SQLITE_OK) { fprintf(stderr, "sqlite3_open failed: error code %d\n", ret); } sqlite3_busy_timeout(db_conn, 500); pthread_mutex_init(&cur_data_lock, NULL); time_available = 0; pulseox_available = 0; pedometer_count = 0; exercise_id = -1; parsing_state.next_item = START; parsing_state.bytes_remaining = 1; memset(parsing_state.stack, 0, MAX_STACK_SIZE); parsing_state.stacktop = 0; // Run xbee communication thread. pthread_t xbee_thread; pthread_attr_t pattr; pthread_attr_init(&pattr); pthread_create(&xbee_thread, &pattr, monitor_xbee, NULL); // Run server. server(handle_request, &servSock, NODED_ADDRESS); exit_handler(servSock); return 0; } static float clock_time() { return ((float)clock()) / ((float)CLOCKS_PER_SEC); } static void handle_request(char* result_buf, char* req) { // Aggregator sends (exercise id,current time in seconds,current date) in its requests. // TODO create a case of time/date not yet available and handle it // Ghetto tokenization int len = strlen(req); char eid_buf[20], time_buf[20], date_buf[20]; memset(eid_buf, 0, 20); memset(time_buf, 0, 20); memset(date_buf, 0, 20); // Exercise id int i; for (i = 0; i < len; i++) { if (req[i] == ',') { i++; break; } else { eid_buf[i] = req[i]; } } // Seconds int j = i; for (; i < len; i++) { if (req[i] == ',') { i++; break; } else { time_buf[i - j] = req[i]; } } // Date j = i; for (; i < len; i++) { date_buf[i - j] = req[i]; } pthread_mutex_lock(&cur_data_lock); exercise_id = atoi(eid_buf); Time new_time; new_time.seconds = atof(time_buf); strcpy(new_time.date, date_buf); time_available = 1; base_real_time = new_time; // last_time_update_time = clock_time(); // printf("last_time_update_time = %f\n", last_time_update_time); sprintf(result_buf, "%d,%d", pulseox, pedometer_count); pthread_mutex_unlock(&cur_data_lock); } // TODO separate thread for this? static void handle_packet(unsigned char* pkt, int nbytes) { int i; for (i = 0; i < nbytes; i++) { unsigned char c = pkt[i]; switch (parsing_state.next_item) { case START: if (c == 0x7e) { parsing_state.next_item = LENGTH; parsing_state.bytes_remaining = 2; } break; case LENGTH: assert(parsing_state.bytes_remaining > 0); parsing_state.stack[parsing_state.stacktop++] = c; parsing_state.bytes_remaining--; if (parsing_state.bytes_remaining == 0) { parsing_state.pkt.length = (parsing_state.stack[0] << 8) + parsing_state.stack[1]; parsing_state.stacktop = 0; memset(parsing_state.stack, 0, 3); parsing_state.next_item = DATA; parsing_state.bytes_remaining = parsing_state.pkt.length; } break; case DATA: assert(parsing_state.bytes_remaining > 0); parsing_state.stack[parsing_state.stacktop++] = c; parsing_state.bytes_remaining--; if (parsing_state.bytes_remaining == 0) { memcpy(parsing_state.pkt.data, parsing_state.stack, parsing_state.pkt.length); parsing_state.stacktop = 0; memset(parsing_state.stack, 0, parsing_state.pkt.length); parsing_state.next_item = CHECKSUM; parsing_state.bytes_remaining = 1; } break; case CHECKSUM: parsing_state.pkt.checksum = c; parsing_state.next_item = START; parsing_state.bytes_remaining = 1; memset(parsing_state.stack, 0, MAX_STACK_SIZE); parsing_state.stacktop = 0; // Check the checksum -- TODO // Packet complete. unsigned char* data = parsing_state.pkt.data; if (data[0] != 0x81) { printf("data[0] is not 0x81."); return; } //short src = (data[1] << 8) + data[2]; unsigned char type = data[5]; //printf("src:0x%x; type:0x%x\n", src, type); switch (type) { case PEDOMETER_PACKET: printf("step.\n"); pthread_mutex_lock(&cur_data_lock); pedometer_count++; pthread_mutex_unlock(&cur_data_lock); break; case EKG_PACKET: printf("ekg.\n"); if (parsing_state.pkt.length >= 6 + 2 * EKG_VALUES_PER_READING) { if (time_available) { // Only log if we know what time it is. EkgStat ekg_stat; memset(&ekg_stat, 0, sizeof(EkgStat)); ekg_stat.exercise_id = exercise_id; // float elapsed = clock_time() - last_time_update_time; // printf("elapsed: %f last_time: %f\n", elapsed, last_time_update_time); ekg_stat.time.seconds = base_real_time.seconds; // + elapsed; // Doesn't seem to work properly on gumstix // TODO midnight case? strcpy(ekg_stat.time.date, base_real_time.date); for (i = 0; i < EKG_VALUES_PER_READING; i++) { char h = parsing_state.pkt.data[2*i]; char l = parsing_state.pkt.data[2*i + 1]; ekg_stat.values[i] = (h << 8) + l; } write_ekg_db_entry(ekg_stat); } else { printf("Time not available yet.\n"); } } else { fprintf(stderr, "EKG packet is too short! (%d bytes)\n", parsing_state.pkt.length); } break; case PULSEOX_PACKET: if (parsing_state.pkt.length >= 8) { short pulseox_value = (data[6] << 8) + data[7]; printf("pulseox is %d.\n", pulseox_value); pthread_mutex_lock(&cur_data_lock); pulseox = pulseox_value; pulseox_available = 1; pthread_mutex_unlock(&cur_data_lock); } else { fprintf(stderr, "pulseox packet is too short! (%d bytes)\n", parsing_state.pkt.length); } break; default: printf("invalid type %d\n", type); break; } break; } } } static void* monitor_xbee(void* arg) { printf("Connecting to xbee...\n"); serial_fd = init_serial(XBEE_SERIAL, B115200, O_RDONLY); if (serial_fd == -1) { exit_handler(servSock); } while (1) { unsigned char buf[16]; int nrecv = read(serial_fd, buf, 12); if (nrecv > 0) { buf[nrecv] = 0; // Read does not append a 0!!! // printf("%d bytes read.\n", nrecv); handle_packet(buf, nrecv); } } close(serial_fd); return NULL; } static void write_ekg_db_entry(EkgStat stat) { char nums[512] = ""; int i; for (i = 0; i < EKG_VALUES_PER_READING; i++) { // TODO this is pretty inefficient... char next[80]; if (i != EKG_VALUES_PER_READING - 1) { sprintf(next, "%d,", stat.values[i]); } else { sprintf(next, "%d", stat.values[i]); // (no comma) } strcat(nums, next); } char query_buf[1024]; sprintf(query_buf, "insert into ekg_data values (%d,\"%s\",%f,%s)", stat.exercise_id, stat.time.date, stat.time.seconds, nums); char* err_buf; int ret = sqlite3_exec(db_conn, query_buf, NULL, NULL, &err_buf); if (ret != SQLITE_OK) { fprintf(stderr, "sqlite3_exec failed: %s\n", err_buf); } } static void sigint_handler(int a) { exit_handler(servSock); } static void exit_handler(int servSock) { if (servSock) { if (close(servSock) == -1) { perror("close"); } } unlink(NODED_ADDRESS); if (serial_fd > 0) { printf("Closing xbee serial connection...\n"); close(serial_fd); } exit(1); }