Skip to Content.
Sympa Menu

ndt-dev - [ndt-dev] [ndt] r1079 committed - Applied aaron-tcp_stats_cleanup changes to Issue139 branch

Subject: NDT-DEV email list created

List archive

[ndt-dev] [ndt] r1079 committed - Applied aaron-tcp_stats_cleanup changes to Issue139 branch


Chronological Thread 
  • From:
  • To:
  • Subject: [ndt-dev] [ndt] r1079 committed - Applied aaron-tcp_stats_cleanup changes to Issue139 branch
  • Date: Thu, 12 Jun 2014 09:50:30 +0000

Revision: 1079
Author:

Date: Thu Jun 12 09:50:00 2014 UTC
Log: Applied aaron-tcp_stats_cleanup changes to Issue139 branch

http://code.google.com/p/ndt/source/detail?r=1079

Added:
/branches/Issue139/src/snap_worker.c
/branches/Issue139/src/snap_worker.h
Modified:
/branches/Issue139/src/Makefile.am
/branches/Issue139/src/test_c2s_srv.c
/branches/Issue139/src/test_mid_srv.c
/branches/Issue139/src/test_s2c_srv.c
/branches/Issue139/src/test_sfw_srv.c
/branches/Issue139/src/testoptions.c
/branches/Issue139/src/testoptions.h
/branches/Issue139/src/tests_srv.h
/branches/Issue139/src/web100-util.c
/branches/Issue139/src/web100srv.c
/branches/Issue139/src/web100srv.h

=======================================
--- /dev/null
+++ /branches/Issue139/src/snap_worker.c Thu Jun 12 09:50:00 2014 UTC
@@ -0,0 +1,205 @@
+/**
+ * This file contains the functions used for the thread that takes periodic
+ * snapshots of the TCP statistics.
+ *
+ * Aaron Brown 2014-03-10
+ *

+ */
+
+#include <tgmath.h>
+#include <pthread.h>
+
+#include "logging.h"
+#include "snap_worker.h"
+#include "utils.h"
+
+static void *snap_worker(void* arg);
+
+SnapResults *alloc_snap_results(int max_snapshots, tcp_stat_agent *agent, tcp_stat_connection conn, tcp_stat_group *group) {
+ SnapResults *snapResults;
+ int i;
+
+ snapResults = calloc(1, sizeof(SnapResults));
+ if (!snapResults) {
+ goto error_out;
+ }
+
+ snapResults->agent = agent;
+ snapResults->conn = conn;
+ snapResults->group = group;
+
+ snapResults->allocated = 0;
+ snapResults->collected = 0;
+
+ snapResults->snapshots = calloc(max_snapshots, sizeof(tcp_stat_snap *));
+ if (!snapResults->snapshots) {
+ goto error_out;
+ }
+
+ for(i = 0; i < max_snapshots; i++) {
+ snapResults->snapshots[i] = tcp_stats_init_snapshot(agent, conn, group);
+ if (!snapResults->snapshots[i]) {
+ goto error_out;
+ }
+
+ snapResults->allocated++;
+ }
+
+ return snapResults;
+
+error_out:
+ free_snap_results(snapResults);
+ return NULL;
+}
+
+void free_snap_results(SnapResults *results) {
+ int i;
+
+ for(i = 0; i < results->allocated; i++) {
+ tcp_stats_free_snapshot(results->snapshots[i]);
+ }
+
+ free(results);
+}
+
+SnapWorker *alloc_snap_worker(tcp_stat_agent* agent, tcp_stat_group *group,
+ tcp_stat_connection conn, double delay, int max_snapshots) {
+ SnapWorker *snapWorker = NULL;
+
+ snapWorker = calloc(1, sizeof(SnapWorker));
+ if (!snapWorker) {
+ goto error_out;
+ }
+
+ snapWorker->delay = delay;
+
+ if (pthread_mutex_init(&snapWorker->mutex, NULL) != 0) {
+ goto error_out;
+ }
+
+ if (pthread_cond_init(&snapWorker->cond, NULL) != 0) {
+ goto error_out;
+ }
+
+ snapWorker->results = alloc_snap_results(max_snapshots, agent, conn, group);
+ if (!snapWorker->results) {
+ goto error_out;
+ }
+
+ return snapWorker;
+
+error_out:
+ free_snap_worker(snapWorker);
+
+ return NULL;
+}
+
+void free_snap_worker(SnapWorker *worker) {
+ if (worker->results)
+ free_snap_results(worker->results);
+
+ pthread_mutex_destroy(&worker->mutex);
+ pthread_cond_destroy(&worker->cond);
+
+ free(worker);
+}
+
+/** Method to start snap worker thread that collects snap logs
+ * @param snaparg object
+ * @param tcp_stat_agent Agent
+ * @param snaplogenabled Is snap logging enabled?
+ * @param workerlooparg integer used to syncronize writing/reading from snaplog/tcp_stat snapshot
+ * @param wrkrthreadidarg Thread Id of workera
+ * @param metafilevariablename Which variable of the meta file gets assigned the snaplog name (unused now)
+ * @param metafilename value of metafile name
+ * @param tcp_stat_connection connection pointer
+ * @param tcp_stat_group group web100_group pointer
+ */
+SnapWorker *start_snap_worker(tcp_stat_agent* agent, tcp_stat_group *group,
+ tcp_stat_connection conn, int delay, int test_length) {
+ SnapWorker *snapWorker = NULL;
+ double converted_delay = ((double) delay) / 1000.0;
+
+ // pad the maximum test length to be sure we have enough snapshots
+ int max_number = ceil((test_length + 1) / converted_delay);
+
+ snapWorker = alloc_snap_worker(agent, group, conn, converted_delay, max_number);
+ if (!snapWorker) {
+ goto error_out;
+ }
+
+ pthread_mutex_lock(&snapWorker->mutex);
+
+ if (pthread_create(&snapWorker->pthread, NULL, snap_worker,
+ (void*) snapWorker)) {
+ log_println(0, "Cannot create worker thread for writing snap log!");
+ goto error_out;
+ }
+
+ // obtain web100 snap into "snaparg.snap"
+ pthread_cond_wait(&snapWorker->cond, &snapWorker->mutex);
+ pthread_mutex_unlock(&snapWorker->mutex);
+
+ return snapWorker;
+
+error_out:
+ if (snapWorker)
+ free_snap_worker(snapWorker);
+
+ return NULL;
+}
+
+/**
+ * Stop the snap worker process
+ * @param workerThreadId Worker Thread's ID
+ * @param snaplogenabled boolean indication whether snap logging is enabled
+ * @param snapArgs_ptr pointer to a snapArgs object
+ * */
+SnapResults *stop_snap_worker(SnapWorker *worker) {
+ SnapResults *results;
+
+ pthread_mutex_lock(&worker->mutex);
+ worker->do_exit = 1;
+ pthread_mutex_unlock(&worker->mutex);
+
+ pthread_join(worker->pthread, NULL);
+
+ results = worker->results;
+
+ worker->results = NULL;
+
+ free_snap_worker(worker);
+
+ return results;
+}
+
+static void *snap_worker(void* arg) {
+ SnapWorker *workerArgs = (SnapWorker *) arg;
+ SnapResults *results = workerArgs->results;
+
+ int i;
+
+ pthread_mutex_lock(&workerArgs->mutex);
+ pthread_cond_signal(&workerArgs->cond);
+
+ i = 0;
+ while (1) {
+ if (workerArgs->do_exit) {
+ pthread_mutex_unlock(&workerArgs->mutex);
+ break;
+ }
+
+ if (i < results->allocated) {
+ tcp_stats_take_snapshot(results->agent, results->conn, results->snapshots[i]);
+
+ results->collected++;
+ i++;
+ }
+
+ pthread_mutex_unlock(&workerArgs->mutex);
+ mysleep(workerArgs->delay);
+ pthread_mutex_lock(&workerArgs->mutex);
+ }
+
+ return NULL;
+}
=======================================
--- /dev/null
+++ /branches/Issue139/src/snap_worker.h Thu Jun 12 09:50:00 2014 UTC
@@ -0,0 +1,36 @@
+/**
+ * This file contains the functions used for the thread that takes periodic
+ * snapshots of the TCP statistics.
+ *
+ * Aaron Brown 2014-03-10
+ *

+ */
+
+#ifndef SNAP_WORKER_H
+#define SNAP_WORKER_H
+
+#include <pthread.h>
+
+#include "web100srv.h"
+
+typedef struct SnapWorker {
+ double delay;
+
+ SnapResults *results;
+
+ int do_exit;
+ pthread_t pthread;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+} SnapWorker;
+
+SnapResults *alloc_snap_results(int max_snapshots, tcp_stat_agent *agent, tcp_stat_connection conn, tcp_stat_group *group);
+void free_snap_results(SnapResults *results);
+SnapWorker *alloc_snap_worker(tcp_stat_agent* agent, tcp_stat_group *group,
+ tcp_stat_connection conn, double delay, int max_snapshots);
+void free_snap_worker(SnapWorker *worker);
+SnapWorker *start_snap_worker(tcp_stat_agent* agent, tcp_stat_group *group,
+ tcp_stat_connection conn, int delay, int test_length);
+SnapResults *stop_snap_worker(SnapWorker *worker);
+
+#endif
=======================================
--- /branches/Issue139/src/Makefile.am Fri May 30 12:29:18 2014 UTC
+++ /branches/Issue139/src/Makefile.am Thu Jun 12 09:50:00 2014 UTC
@@ -76,7 +76,7 @@
web100srv_SOURCES = web100srv.c web100-util.c web100-pcap.c web100-admin.c runningtest.c \
network.c usage.c utils.c mrange.c logging.c testoptions.c ndtptestconstants.c \
protocol.c test_sfw_srv.c test_meta_srv.c ndt_odbc.c strlutils.c heuristics.c \
- test_c2s_srv.c test_s2c_srv.c test_mid_srv.c jsonutils.c
+ test_c2s_srv.c test_s2c_srv.c test_mid_srv.c snap_worker.c jsonutils.c
web100srv_LDFLAGS = $(NDTLDFLAGS) $(I2UTILLDFLAGS)
web100srv_LDADD = $(NDTLIBS) $(I2UTILLIBS) $(I2UTILLIBDEPS) -lpthread $(ZLIB) $(JSONLIB)
web100srv_CPPFLAGS ='-DBASEDIR="$(ndtdir)"' -DFORCE_WEB100
@@ -85,7 +85,7 @@
web10gsrv_SOURCES = web100srv.c web100-util.c web100-pcap.c web100-admin.c runningtest.c \
network.c usage.c utils.c mrange.c logging.c testoptions.c ndtptestconstants.c \
protocol.c test_sfw_srv.c test_meta_srv.c ndt_odbc.c strlutils.c heuristics.c \
- test_c2s_srv.c test_s2c_srv.c test_mid_srv.c web10g-util.c jsonutils.c
+ test_c2s_srv.c test_s2c_srv.c test_mid_srv.c web10g-util.c snap_worker.c jsonutils.c
web10gsrv_LDFLAGS = $(NDTLDFLAGS) $(I2UTILLDFLAGS)
web10gsrv_LDADD = $(NDTLIBS) $(I2UTILLIBS) $(I2UTILLIBDEPS) -lpthread $(ZLIB) $(JSONLIB)
web10gsrv_CPPFLAGS = '-DBASEDIR="$(ndtdir)"'
=======================================
--- /branches/Issue139/src/test_c2s_srv.c Wed May 28 11:17:18 2014 UTC
+++ /branches/Issue139/src/test_c2s_srv.c Thu Jun 12 09:50:00 2014 UTC
@@ -20,6 +20,7 @@
#include "protocol.h"
#include "network.h"
#include "mrange.h"
+#include "snap_worker.h"
#include "jsonutils.h"

/**
@@ -44,7 +45,6 @@
* @param device string device name inout parameter
* @param options Test Option variables
* @param record_reverse integer indicating whether receiver-side statistics have to be logged
- * @param count_vars count of web100 variables
* @param spds[] [] speed check array
* @param spd_index index used for speed check array
* @param conn_options Connection options
@@ -61,9 +61,10 @@
int test_c2s(int ctlsockfd, tcp_stat_agent* agent, TestOptions* testOptions,
int conn_options, double* c2sspd, int set_buff, int window,
int autotune, char* device, Options* options, int record_reverse,
- int count_vars, char spds[4][256], int* spd_index) {
+ char spds[4][256], int* spd_index, SnapResults **snap_results) {
tcp_stat_connection conn;
tcp_stat_group* group = NULL;
+ SnapWorker *snap_worker;
/* The pipe that will return packet pair results */
int mon_pipe[2];
int recvsfd; // receiver socket file descriptor
@@ -84,15 +85,6 @@
I2Addr c2ssrv_addr = NULL; // c2s test's server address
// I2Addr src_addr=NULL; // c2s test source address
char listenc2sport[10]; // listening port
- pthread_t workerThreadId;
-
- // snap related variables
- SnapArgs snapArgs;
- snapArgs.snap = NULL;
-#if USE_WEB100
- snapArgs.log = NULL;
-#endif
- snapArgs.delay = options->snapDelay;
wait_sig = 0;

// Test ID and status descriptors
@@ -100,7 +92,6 @@
enum TEST_STATUS_INT teststatuses = TEST_NOT_STARTED;
enum PROCESS_STATUS_INT procstatusenum = UNKNOWN;
enum PROCESS_TYPE_INT proctypeenum = CONNECT_TYPE;
- char namesuffix[256] = "c2s_snaplog";

if (testOptions->c2sopt) {
setCurrentTest(TEST_C2S);
@@ -252,7 +243,7 @@
I2Addr src_addr = I2AddrByLocalSockFD(get_errhandle(), recvsfd, 0);

// Get tcp_stat connection. Used to collect tcp_stat variable statistics
- conn = tcp_stat_connection_from_socket(agent, recvsfd);
+ conn = tcp_stats_connection_from_socket(agent, recvsfd);

// set up packet tracing. Collected data is used for bottleneck link
// calculations
@@ -306,15 +297,13 @@
log_println(5, "C2S test Parent thinks pipe() returned fd0=%d, fd1=%d",
mon_pipe[0], mon_pipe[1]);

+ group = tcp_stats_get_group(agent, "read");
+
// experimental code, delete when finished
- setCwndlimit(conn, group, agent, options);
+ if (options->limit > 0) {
+ tcp_stats_set_cwnd_limit(agent, conn, group, options->limit);
+ }

- // Create C->S snaplog directories, and perform some initialization based on
- // options
- create_client_logdir((struct sockaddr *) &cli_addr, clilen,
- options->c2s_logname, sizeof(options->c2s_logname),
- namesuffix,
- sizeof(namesuffix));
sleep(2);

// send empty TEST_START indicating start of the test
@@ -322,19 +311,8 @@
/* alarm(30); */ // reset alarm() again, this 10 sec test should finish
// before this signal is generated.

- // If snaplog recording is enabled, update meta file to indicate the same
- // and proceed to get snapshot and log it.
- // This block is needed here since the meta file stores names without the
- // full directory but fopen needs full path. Else, it could have gone into
- // the "start_snap_worker" method
- if (options->snaplog) {
- memcpy(meta.c2s_snaplog, namesuffix, strlen(namesuffix));
- /*start_snap_worker(&snapArgs, agent, options->snaplog, &workerLoop,
- &workerThreadId, meta.c2s_snaplog, options->c2s_logname,
- conn, group); */
- }
- start_snap_worker(&snapArgs, agent, NULL, options->snaplog, &workerThreadId,
- meta.c2s_snaplog, options->c2s_logname, conn, group);
+ snap_worker = start_snap_worker(agent, group, conn, options->snapDelay, 10);
+
// Wait on listening socket and read data once ready.
tmptime = secs();
sel_tv.tv_sec = 11; // time out after 11 seconds
@@ -367,7 +345,7 @@

// c->s throuput value calculated and assigned ! Release resources, conclude
// snap writing.
- stop_snap_worker(&workerThreadId, options->snaplog, &snapArgs);
+ *snap_results = stop_snap_worker(snap_worker);

// send the server calculated value of C->S throughput as result to client
snprintf(buff, sizeof(buff), "%6.0f kbps outbound for child %d", *c2sspd,
@@ -379,7 +357,7 @@
// get receiver side Web100 stats and write them to the log file. close
// sockets
if (record_reverse == 1)
- tcp_stat_get_data_recv(recvsfd, agent, conn, count_vars);
+ tcp_stat_get_data_recv(recvsfd, agent, conn);


close(recvsfd);
@@ -453,6 +431,30 @@
// protocol logs
teststatuses = TEST_ENDED;
protolog_status(testOptions->child0, testids, teststatuses, ctlsockfd);
+
+ // save the snapshots
+ if (options->snaplog) {
+ int i;
+ char namesuffix[256] = "c2s_snaplog";
+ tcp_stat_log *log;
+
+ // Create C->S snaplog directories, and perform some initialization based on
+ // options
+ create_client_logdir((struct sockaddr *) &cli_addr, clilen,
+ options->c2s_logname, sizeof(options->c2s_logname),
+ namesuffix,
+ sizeof(namesuffix));
+
+ memcpy(meta.c2s_snaplog, namesuffix, strlen(namesuffix));
+
+ log = tcp_stats_open_log(options->c2s_logname, (*snap_results)->conn, (*snap_results)->group, "w");
+
+ for(i = 0; i < (*snap_results)->collected; i++) {
+ tcp_stats_write_snapshot(log, (*snap_results)->snapshots[i]);
+ }
+
+ tcp_stats_close_log(log);
+ }

// set current test status and free address
setCurrentTest(TEST_NONE);
=======================================
--- /branches/Issue139/src/test_mid_srv.c Wed May 28 11:17:18 2014 UTC
+++ /branches/Issue139/src/test_mid_srv.c Thu Jun 12 09:50:00 2014 UTC
@@ -112,14 +112,6 @@
// strcpy(listenmidport, "0");
strlcpy(listenmidport, "0", sizeof(listenmidport));
}
-
- /* RAC debug */
- /*
- if (KillHung() == 0)
- log_println(5, "KillHung() returned 0, should have tried to kill off some LastAck process");
- else
- log_println(5, "KillHung(): returned non-0 response, nothing to kill or kill failed");
- */

while (midsrv_addr == NULL) {
// attempt to bind to a new port and obtain address structure with
@@ -132,13 +124,6 @@
mrange_next(listenmidport, sizeof(listenmidport)) :
listenmidport,
conn_options, 0);
- if (midsrv_addr == NULL) {
- /*
- log_println(5, " Calling KillHung() because midsrv_address failed to bind");
- if (KillHung() == 0)
- continue;
- */
- }
if (strcmp(listenmidport, "0") == 0) {
log_println(0, "WARNING: ephemeral port number was bound");
break;
@@ -236,11 +221,7 @@

buff[0] = '\0';
// get tcp_stat connection data
-#if USE_WEB100
- if ((conn = tcp_stat_connection_from_socket(agent, midsfd)) == NULL) {
-#elif USE_WEB10G
- if ((conn = tcp_stat_connection_from_socket(agent, midsfd)) == -1) {
-#endif
+ if ((conn = tcp_stats_connection_from_socket(agent, midsfd)) == NULL) {
log_println(
0,
"!!!!!!!!!!! test_mid() failed to get "TCP_STAT_NAME
=======================================
--- /branches/Issue139/src/test_s2c_srv.c Wed May 28 11:17:18 2014 UTC
+++ /branches/Issue139/src/test_s2c_srv.c Thu Jun 12 09:50:00 2014 UTC
@@ -23,9 +23,6 @@
#include "mrange.h"
#include "jsonutils.h"

-extern pthread_mutex_t mainmutex;
-extern pthread_cond_t maincond;
-
const char RESULTS_KEYS[] = "ThroughputValue UnsentDataAmount TotalSentByte";

/**
@@ -54,8 +51,6 @@
* @param options Test Option variables
* @param spds[][] speed check array
* @param spd_index index used for speed check array
- * @param count_vars count of web100 variables
- * @param peaks Cwnd peaks structure pointer
*
* @return 0 - success,
* >0 - error code.
@@ -73,7 +68,7 @@
int test_s2c(int ctlsockfd, tcp_stat_agent* agent, TestOptions* testOptions,
int conn_options, double* s2cspd, int set_buff, int window,
int autotune, char* device, Options* options, char spds[4][256],
- int* spd_index, int count_vars, CwndPeaks* peaks) {
+ int* spd_index, SnapResults **snap_results) {
#if USE_WEB100
/* experimental code to capture and log multiple copies of the
* web100 variables using the web100_snap() & log() functions.
@@ -82,13 +77,9 @@
web100_snapshot* rsnap = NULL;
web100_group* tgroup;
web100_group* rgroup;
- web100_var* var;
-#elif USE_WEB10G
- estats_val_data* snap;
#endif
tcp_stat_connection conn;
/* Just a holder for web10g */
- tcp_stat_group* group = NULL;
/* Pipe that handles returning packet pair timing */
int mon_pipe[2];
int ret; // ctrl protocol read/write return status
@@ -119,7 +110,10 @@
struct sigaction new, old;
char* jsonMsgValue;

- pthread_t workerThreadId;
+ SnapWorker *snap_worker;
+
+ tcp_stat_group *group;
+ tcp_stat_snap *snap;
int nextseqtosend = 0, lastunackedseq = 0;
int drainingqueuecount = 0, bufctlrnewdata = 0;

@@ -128,14 +122,6 @@
enum TEST_ID testids = S2C;
enum PROCESS_STATUS_INT procstatusenum = UNKNOWN;
enum PROCESS_TYPE_INT proctypeenum = CONNECT_TYPE;
- char snaplogsuffix[256] = "s2c_snaplog";
-
- SnapArgs snapArgs;
- snapArgs.snap = NULL;
-#if USE_WEB100
- snapArgs.log = NULL;
-#endif
- snapArgs.delay = options->snapDelay;
wait_sig = 0;

log_println(0, "test client version: %s", testOptions->client_version);
@@ -290,7 +276,10 @@
}
}
src_addr = I2AddrByLocalSockFD(get_errhandle(), xmitsfd, 0);
- conn = tcp_stat_connection_from_socket(agent, xmitsfd);
+
+ conn = tcp_stats_connection_from_socket(agent, xmitsfd);
+ group = tcp_stats_get_group(agent, "read");
+ snap = tcp_stats_init_snapshot(agent, conn, group);

// set up packet capture. The data collected is used for bottleneck link
// calculations
@@ -342,14 +331,11 @@
}

/* experimental code, delete when finished */
- setCwndlimit(conn, group, agent, options);
+ if (options->limit > 0) {
+ tcp_stats_set_cwnd_limit(agent, conn, group, options->limit);
+ }
/* End of test code */

- // create directory to write web100 snaplog trace
- create_client_logdir((struct sockaddr *) &cli_addr, clilen,
- options->s2c_logname, sizeof(options->s2c_logname),
- snaplogsuffix, sizeof(snaplogsuffix));
-
/* Kludge way of nuking Linux route cache. This should be done
* using the sysctl interface.
*/
@@ -390,17 +376,7 @@
// Write snap logs if option is enabled. update meta log to point to
// this snaplog

- // If snaplog option is enabled, save snaplog details in meta file
- if (options->snaplog) {
- memcpy(meta.s2c_snaplog, snaplogsuffix, strlen(snaplogsuffix));
- }
- // get web100 snapshot and also log it based on options
- /*start_snap_worker(&snapArgs, agent, options->snaplog, &workerLoop,
- &workerThreadId, meta.s2c_snaplog, options->s2c_logname,
- conn, group);*///new file changes
- start_snap_worker(&snapArgs, agent, peaks, options->snaplog,
- &workerThreadId, meta.s2c_snaplog, options->s2c_logname,
- conn, group);
+ snap_worker = start_snap_worker(agent, group, conn, options->snapDelay, 10);

/* alarm(20); */
tmptime = secs(); // current time
@@ -412,32 +388,18 @@
// Increment total attempts at sending-> buffer control
bufctrlattempts++;
if (options->avoidSndBlockUp) { // Do not block send buffers
- pthread_mutex_lock(&mainmutex);
+ tcp_stats_take_snapshot(agent, conn, snap);

// get details of next sequence # to be sent and fetch value from
// snap file
-#if USE_WEB100
- web100_agent_find_var_and_group(agent, "SndNxt", &group,
- &var);
- web100_snap_read(var, snapArgs.snap, tmpstr);
- nextseqtosend = atoi(
- web100_value_to_text(web100_get_var_type(var),
- tmpstr));
+
+ // get next sequence # to be sent
+ tcp_stats_snap_read_var(agent, snap, "SndNxt", tmpstr, sizeof(tmpstr));
+ nextseqtosend = atoi(tmpstr);
+
// get oldest un-acked sequence number
- web100_agent_find_var_and_group(agent, "SndUna", &group,
- &var);
- web100_snap_read(var, snapArgs.snap, tmpstr);
- lastunackedseq = atoi(
- web100_value_to_text(web100_get_var_type(var),
- tmpstr));
-#elif USE_WEB10G
- struct estats_val value;
- web10g_find_val(snapArgs.snap, "SndNxt", &value);
- nextseqtosend = value.uv32;
- web10g_find_val(snapArgs.snap, "SndUna", &value);
- lastunackedseq = value.uv32;
-#endif
- pthread_mutex_unlock(&mainmutex);
+ tcp_stats_snap_read_var(agent, snap, "SndUna", tmpstr, sizeof(tmpstr));
+ lastunackedseq = atoi(tmpstr);

// Temporarily stop sending data if you sense that the buffer is
// overwhelmed
@@ -480,7 +442,7 @@
x2cspd = (8.e-3 * bytes_written) / tx_duration;

// Release semaphore, and close snaplog file. finalize other data
- stop_snap_worker(&workerThreadId, options->snaplog, &snapArgs);
+ *snap_results = stop_snap_worker(snap_worker);

// send the x2cspd to the client
memset(buff, 0, sizeof(buff));
@@ -594,13 +556,13 @@

#if USE_WEB100
// send web100 data to client
- ret = tcp_stat_get_data(tsnap, xmitsfd, ctlsockfd, agent, count_vars);
+ ret = tcp_stat_get_data(tsnap, xmitsfd, ctlsockfd, agent);
web100_snapshot_free(tsnap);
// send tuning-related web100 data collected to client
- ret = tcp_stat_get_data(rsnap, xmitsfd, ctlsockfd, agent, count_vars);
+ ret = tcp_stat_get_data(rsnap, xmitsfd, ctlsockfd, agent);
web100_snapshot_free(rsnap);
#elif USE_WEB10G
- ret = tcp_stat_get_data(snap, xmitsfd, ctlsockfd, agent, count_vars, testOptions->json_support);
+ ret = tcp_stat_get_data(snap, xmitsfd, ctlsockfd, agent, testOptions->json_support);
estats_val_data_free(&snap);
#endif

@@ -674,6 +636,30 @@
// log protocol validation logs
teststatuses = TEST_ENDED;
protolog_status(testOptions->child0, testids, teststatuses, ctlsockfd);
+
+ // save the snapshots
+ if (options->snaplog) {
+ int i;
+ char namesuffix[256] = "s2c_snaplog";
+ tcp_stat_log *log;
+
+ // Create C->S snaplog directories, and perform some initialization based on
+ // options
+ create_client_logdir((struct sockaddr *) &cli_addr, clilen,
+ options->s2c_logname, sizeof(options->s2c_logname),
+ namesuffix,
+ sizeof(namesuffix));
+
+ memcpy(meta.s2c_snaplog, namesuffix, strlen(namesuffix));
+
+ log = tcp_stats_open_log(options->s2c_logname, (*snap_results)->conn, (*snap_results)->group, "w");
+
+ for(i = 0; i < (*snap_results)->collected; i++) {
+ tcp_stats_write_snapshot(log, (*snap_results)->snapshots[i]);
+ }
+
+ tcp_stats_close_log(log);
+ }

setCurrentTest(TEST_NONE);
}
=======================================
--- /branches/Issue139/src/test_sfw_srv.c Wed May 28 11:17:18 2014 UTC
+++ /branches/Issue139/src/test_sfw_srv.c Thu Jun 12 09:50:00 2014 UTC
@@ -112,7 +112,6 @@
/**
* Performs the server part of the Simple firewall test.
* @param ctlsockfd Client control socket descriptor
- * @param agent web100_agent
* @param options The test options
* @param conn_options The connection options
* @returns Integer with values:
@@ -124,8 +123,7 @@
* 5 - Unable to resolve client address
*/

-int test_sfw_srv(int ctlsockfd, tcp_stat_agent* agent, TestOptions* options,
- int conn_options) {
+int test_sfw_srv(int ctlsockfd, TestOptions* options, int conn_options) {
char buff[BUFFSIZE + 1];
I2Addr sfwsrv_addr = NULL;
int sfwsockfd, sfwsockport, sockfd, sfwport;
@@ -135,16 +133,6 @@
struct timeval sel_tv;
int msgLen, msgType;
char* jsonMsgValue;
-
-#if USE_WEB100
- web100_var* var;
- web100_connection* cn;
- web100_group* group;
-#elif USE_WEB10G
- struct estats_val value;
- int cn;
-#endif
- int maxRTT, maxRTO;
char hostname[256];
int rc;

@@ -180,56 +168,6 @@
sfwsockfd = I2AddrFD(sfwsrv_addr);
sfwsockport = I2AddrPort(sfwsrv_addr);
log_println(1, " -- port: %d", sfwsockport);
-
- cn = tcp_stat_connection_from_socket(agent, ctlsockfd);
- if (cn) {
- // Get remote end's address
- memset(hostname, 0, sizeof(hostname));
-
-#if USE_WEB100
- web100_agent_find_var_and_group(agent, "RemAddress", &group, &var);
- web100_raw_read(var, cn, buff);
- // strncpy(hostname, web100_value_to_text(web100_get_var_type(var), buff),
- // 255);
- strlcpy(hostname, web100_value_to_text(web100_get_var_type(var), buff),
- sizeof(hostname));
-#elif USE_WEB10G
- web10g_get_remote_addr(agent, cn, hostname, sizeof(hostname));
-#endif
-
- // Determine test time in seconds.
- // test-time = max(round trip time, timeout) > 3 ? 3 : 1
-
-#if USE_WEB100
- web100_agent_find_var_and_group(agent, "MaxRTT", &group, &var);
- web100_raw_read(var, cn, buff);
- maxRTT = atoi(web100_value_to_text(web100_get_var_type(var), buff));
- web100_agent_find_var_and_group(agent, "MaxRTO", &group, &var);
- web100_raw_read(var, cn, buff);
- maxRTO = atoi(web100_value_to_text(web100_get_var_type(var), buff));
-#elif USE_WEB10G
- web10g_get_val(agent, cn, "MaxRTT", &value);
- maxRTT = value.uv32;
- web10g_get_val(agent, cn, "MaxRTO", &value);
- maxRTO = value.uv32;
-#endif
-
- if (maxRTT > maxRTO)
- maxRTO = maxRTT;
- if ((((double) maxRTO) / 1000.0) > 3.0)
- /* `testTime = (((double) maxRTO) / 1000.0) * 4 ; */
- testTime = 3;
- else
- testTime = 1;
- } else {
- log_println(0, "Simple firewall test: Cannot find connection");
- snprintf(buff, sizeof(buff), "Server (Simple firewall test): "
- "Cannot find connection");
- send_json_message(ctlsockfd, MSG_ERROR, buff, strlen(buff),
- options->json_support, JSON_SINGLE_VALUE);
- I2AddrFree(sfwsrv_addr);
- return -1;
- }
log_println(1, " -- SFW time: %d", testTime);

// try sending TEST_PREPARE msg with ephemeral port number to client.
@@ -298,6 +236,11 @@
I2AddrFree(sfwsrv_addr);
return 4;
}
+
+ clilen = sizeof(cli_addr);
+ getpeername(ctlsockfd, (struct sockaddr*)&cli_addr, &clilen);
+
+ addr2a(&cli_addr, hostname, sizeof(hostname));

// Get node, port(if present) and other details of client end.
// If not able to resolve it, the test cannot proceed to the "throughput"
=======================================
--- /branches/Issue139/src/testoptions.c Wed May 28 11:17:18 2014 UTC
+++ /branches/Issue139/src/testoptions.c Thu Jun 12 09:50:00 2014 UTC
@@ -19,24 +19,10 @@
#include "I2util/util.h"
#include "runningtest.h"
#include "strlutils.h"
+#include "web100srv.h"
+#include "snap_worker.h"
#include "jsonutils.h"

-
-// Worker thread characteristics used to record snaplog and Cwnd peaks
-typedef struct workerArgs {
- SnapArgs* snapArgs; // snapArgs struct pointer
- tcp_stat_agent* agent; // tcp_stat agent pointer
- CwndPeaks* peaks; // data indicating Cwnd values
- int writeSnap; // enable writing snaplog
-} WorkerArgs;
-
-int workerLoop = 0;
-pthread_mutex_t mainmutex = PTHREAD_MUTEX_INITIALIZER;
-pthread_cond_t maincond = PTHREAD_COND_INITIALIZER;
-static int slowStart = 1;
-static int prevCWNDval = -1;
-static int decreasing = 0;
-
/**
* Count the CWND peaks from a snapshot and record the minimal and maximum one.
* Also record the number of transitions between increasing or decreasing
@@ -45,54 +31,50 @@
* @param peaks Structure containing CWND peaks information
* @param snap Web100 snapshot structure
*/
-void findCwndPeaks(tcp_stat_agent* agent, CwndPeaks* peaks,
- tcp_stat_snap* snap) {
- int CurCwnd;
-#if USE_WEB100
- web100_group* group;
- web100_var* var;
- char tmpstr[256];
-#elif USE_WEB10G
- struct estats_val value;
-#endif
+void findCwndPeaks(SnapResults *results, CwndPeaks *peaks) {
+ char buf[128];
+ int i;
+ int slowStart = 1;
+ int prevCWNDval = -1;
+ int decreasing = 0;

-#if USE_WEB100
- web100_agent_find_var_and_group(agent, "CurCwnd", &group, &var);
- web100_snap_read(var, snap, tmpstr);
- CurCwnd = atoi(web100_value_to_text(web100_get_var_type(var), tmpstr));
-#elif USE_WEB10G
- web10g_find_val(snap, "CurCwnd", &value);
- CurCwnd = value.uv32;
-#endif
+ for(i = 0; i < results->collected; i++) {
+ tcp_stat_snap *snap = results->snapshots[i];
+ int CurCwnd;

- if (slowStart) {
- if (CurCwnd < prevCWNDval) {
- slowStart = 0;
- peaks->max = prevCWNDval;
- peaks->amount = 1;
- decreasing = 1;
- }
- } else {
- // current congestion window < previous value, so, decreasing
- if (CurCwnd < prevCWNDval) {
- // update values based on actual values
- if (prevCWNDval > peaks->max) {
+ tcp_stats_snap_read_var(results->agent, snap, "CurCwnd", buf, sizeof(buf));
+
+ CurCwnd = atoi(buf);
+
+ if (slowStart) {
+ if (CurCwnd < prevCWNDval || prevCWNDval == -1) {
+ slowStart = 0;
peaks->max = prevCWNDval;
+ peaks->amount = 1;
+ decreasing = 1;
}
- if (!decreasing) {
- peaks->amount += 1;
+ } else {
+ // current congestion window < previous value, so, decreasing
+ if (CurCwnd < prevCWNDval || prevCWNDval == -1) {
+ // update values based on actual values
+ if (prevCWNDval > peaks->max) {
+ peaks->max = prevCWNDval;
+ }
+ if (!decreasing) {
+ peaks->amount += 1;
+ }
+ decreasing = 1;
+ // current congestion window size > previous value,
+ } else if (CurCwnd > prevCWNDval) {
+ // not decreasing.
+ if ((peaks->min == -1) || (prevCWNDval < peaks->min)) {
+ peaks->min = prevCWNDval;
+ }
+ decreasing = 0;
}
- decreasing = 1;
- // current congestion window size > previous value,
- } else if (CurCwnd > prevCWNDval) {
- // not decreasing.
- if ((peaks->min == -1) || (prevCWNDval < peaks->min)) {
- peaks->min = prevCWNDval;
- }
- decreasing = 0;
+ prevCWNDval = CurCwnd;
}
}
- prevCWNDval = CurCwnd;
}

/**
@@ -114,62 +96,6 @@
* @param arg pointer to the snapshot structure
* @return void pointer null
*/
-
-void*
-snapWorker(void* arg) {
- /* WARNING void* arg (workerArgs) is on the stack of the function below and
- * doesn't exist forever. */
- WorkerArgs *workerArgs = (WorkerArgs*) arg;
- SnapArgs *snapArgs = workerArgs->snapArgs;
- tcp_stat_agent* agent = workerArgs->agent;
- CwndPeaks* peaks = workerArgs->peaks;
- int writeSnap = workerArgs->writeSnap;
-
- // snap log written into every "delay" milliseconds
- double delay = ((double) snapArgs->delay) / 1000.0;
-
- while (1) {
- pthread_mutex_lock(&mainmutex);
- if (workerLoop) {
- pthread_mutex_unlock(&mainmutex);
- pthread_cond_broadcast(&maincond);
- break;
- }
- pthread_mutex_unlock(&mainmutex);
- mysleep(0.01);
- }
-
- // Find Congestion window peaks from a web_100 snapshot, if enabled
- // Write snap log , if enabled, all in a synchronous manner.
- while (1) {
- pthread_mutex_lock(&mainmutex);
- if (!workerLoop) {
- pthread_mutex_unlock(&mainmutex);
- break;
- }
-#if USE_WEB100
- web100_snap(snapArgs->snap);
- if (peaks) {
- findCwndPeaks(agent, peaks, snapArgs->snap);
- }
- if (writeSnap) {
- web100_log_write(snapArgs->log, snapArgs->snap);
- }
-#elif USE_WEB10G
- estats_read_vars(snapArgs->snap, snapArgs->conn, agent);
- if (peaks) {
- findCwndPeaks(agent, peaks, snapArgs->snap);
- }
- if (writeSnap) {
- estats_record_write_data(snapArgs->log, snapArgs->snap);
- }
-#endif
- pthread_mutex_unlock(&mainmutex);
- mysleep(delay);
- }
-
- return NULL;
-}

/**
* Adds test id to the test suite
@@ -314,114 +240,6 @@
}
return useropt;
}
-
-/** Method to start snap worker thread that collects snap logs
- * @param snaparg object
- * @param tcp_stat_agent Agent
- * @param snaplogenabled Is snap logging enabled?
- * @param workerlooparg integer used to syncronize writing/reading from snaplog/tcp_stat snapshot
- * @param wrkrthreadidarg Thread Id of workera
- * @param metafilevariablename Which variable of the meta file gets assigned the snaplog name (unused now)
- * @param metafilename value of metafile name
- * @param tcp_stat_connection connection pointer
- * @param tcp_stat_group group web100_group pointer
- */
-void start_snap_worker(SnapArgs *snaparg, tcp_stat_agent* agentarg,
- CwndPeaks* peaks, char snaplogenabled,
- pthread_t *wrkrthreadidarg, char *metafilevariablename,
- char *metafilename, tcp_stat_connection conn,
- tcp_stat_group* group) {
- FILE *fplocal;
-
- WorkerArgs workerArgs;
- workerArgs.snapArgs = snaparg;
- workerArgs.agent = agentarg;
- workerArgs.peaks = peaks;
- workerArgs.writeSnap = snaplogenabled;
-
-#if USE_WEB100
- group = web100_group_find(agentarg, "read");
- snaparg->snap = web100_snapshot_alloc(group, conn);
-#elif USE_WEB10G
- snaparg->conn = conn;
- estats_val_data_new(&snaparg->snap);
-#endif
-
- if (snaplogenabled) {
- // memcpy(metafilevariablename, metafilename, strlen(metafilename));
- // The above could have been here, except for a caveat: metafile stores
- // just the file name, but full filename is needed to open the log file
-
- fplocal = fopen(get_logfile(), "a");
-
-#if USE_WEB100
- snaparg->log = web100_log_open_write(metafilename, conn, group);
-#elif USE_WEB10G
- estats_record_open(&snaparg->log, metafilename, "w");
-#endif
- if (fplocal == NULL) {
- log_println(
- 0,
- "Unable to open log file '%s', continuing on without logging",
- get_logfile());
- } else {
- log_println(0, "Snaplog file: %s\n", metafilename);
- fprintf(fplocal, "Snaplog file: %s\n", metafilename);
- fclose(fplocal);
- }
- }
-
- if (pthread_create(wrkrthreadidarg, NULL, snapWorker,
- (void*) &workerArgs)) {
- log_println(0, "Cannot create worker thread for writing snap log!");
- *wrkrthreadidarg = 0;
- }
-
- pthread_mutex_lock(&mainmutex);
- workerLoop= 1;
- // obtain web100 snap into "snaparg.snap"
-#if USE_WEB100
- web100_snap(snaparg->snap);
- if (snaplogenabled) {
- web100_log_write(snaparg->log, snaparg->snap);
- }
-#elif USE_WEB10G
- estats_read_vars(snaparg->snap, conn, agentarg);
- if (snaplogenabled) {
- estats_record_write_data(snaparg->log, snaparg->snap);
- }
-#endif
- pthread_cond_wait(&maincond, &mainmutex);
- pthread_mutex_unlock(&mainmutex);
-}
-
-/**
- * Stop snapWorker
- * @param workerThreadId Worker Thread's ID
- * @param snaplogenabled boolean indication whether snap logging is enabled
- * @param snapArgs_ptr pointer to a snapArgs object
- * */
-void stop_snap_worker(pthread_t *workerThreadId, char snaplogenabled,
- SnapArgs* snapArgs_ptr) {
- if (*workerThreadId) {
- pthread_mutex_lock(&mainmutex);
- workerLoop = 0;
- pthread_mutex_unlock(&mainmutex);
- pthread_join(*workerThreadId, NULL);
- }
- // close writing snaplog, if snaplog recording is enabled
-#if USE_WEB100
- if (snaplogenabled) {
- web100_log_close_write(snapArgs_ptr->log);
- }
- web100_snapshot_free(snapArgs_ptr->snap);
-#elif USE_WEB10G
- if (snaplogenabled) {
- estats_record_close(&snapArgs_ptr->log);
- }
- estats_val_data_free(&snapArgs_ptr->snap);
-#endif
-}

/**
* Start packet tracing for this client
@@ -499,61 +317,6 @@
close(monpipe_arr[0]);
close(monpipe_arr[1]);
}
-
-/**
- * Set Cwnd limit
- * @param connarg tcp_stat_connection pointer
- * @param group_arg tcp_stat group pointer
- * @param agentarg tcp_stat agent pointer
- * */
-void setCwndlimit(tcp_stat_connection connarg, tcp_stat_group* grouparg,
- tcp_stat_agent* agentarg, Options* optionsarg) {
-#if USE_WEB100
- web100_var *LimRwin, *yar;
-#elif USE_WEB10G
- struct estats_val yar;
-#endif
-
- u_int32_t limrwin_val;
-
- if (optionsarg->limit > 0) {
- log_print(1, "Setting Cwnd limit - ");
-
-#if USE_WEB100
- if (connarg != NULL) {
- log_println(1,
- "Got web100 connection pointer for recvsfd socket\n");
- char yuff[32];
- web100_agent_find_var_and_group(agentarg, "CurMSS", &grouparg,
- &yar);
- web100_raw_read(yar, connarg, yuff);
- log_println(1, "MSS = %s, multiplication factor = %d",
- web100_value_to_text(web100_get_var_type(yar), yuff),
- optionsarg->limit);
- limrwin_val = optionsarg->limit
- * (atoi(
- web100_value_to_text(web100_get_var_type(yar),
- yuff)));
- web100_agent_find_var_and_group(agentarg, "LimRwin", &grouparg,
- &LimRwin);
- log_print(1, "now write %d to limit the Receive window",
- limrwin_val);
- web100_raw_write(LimRwin, connarg, &limrwin_val);
-#elif USE_WEB10G
- if (connarg != -1) {
- log_println(1,
- "Got web10g connection for recvsfd socket\n");
- web10g_get_val(agentarg, connarg, "CurMSS", &yar);
- log_println(1, "MSS = %s, multiplication factor = %d",
- yar.uv32, optionsarg->limit);
- limrwin_val = optionsarg->limit * yar.uv32;
- log_print(1, "now write %d to limit the Receive window", limrwin_val);
- estats_write_var("LimRwin", limrwin_val, connarg, agentarg);
-#endif
- log_println(1, " --- Done");
- }
- }
-}

/**
* Check if receiver is clogged and use decision to temporarily
=======================================
--- /branches/Issue139/src/testoptions.h Wed May 28 11:17:18 2014 UTC
+++ /branches/Issue139/src/testoptions.h Thu Jun 12 09:50:00 2014 UTC
@@ -11,6 +11,7 @@

#include "web100srv.h"
#include "protocol.h"
+#include "snap_worker.h"

#define LISTENER_SOCKET_CREATE_FAILED -1
#define SOCKET_CONNECT_TIMEOUT -100
@@ -64,23 +65,14 @@

void catch_s2c_alrm(int signo);

-int test_sfw_srv(int ctlsockfd, tcp_stat_agent* agent, TestOptions* options,
- int conn_options);
+int test_sfw_srv(int ctlsockfd, TestOptions* options, int conn_options);
int test_meta_srv(int ctlsockfd, tcp_stat_agent* agent, TestOptions* options,
int conn_options);

int getCurrentTest();
void setCurrentTest(int testId);

-// void start_snap_worker(SnapArgs *snaparg, tcp_stat_agent *agentarg,
-void start_snap_worker(SnapArgs *snaparg, tcp_stat_agent *agentarg,
- CwndPeaks* peaks, char snaplogenabled,
- pthread_t *wrkrthreadidarg, char *metafilevariablename,
- char *metafilename, tcp_stat_connection conn,
- tcp_stat_group* group);
-
-void stop_snap_worker(pthread_t *workerThreadId, char snaplogenabled,
- SnapArgs* snapArgs_ptr);
+void findCwndPeaks(SnapResults *results, CwndPeaks *peaks);

void setCwndlimit(tcp_stat_connection connarg, tcp_stat_group* grouparg,
tcp_stat_agent* agentarg, Options* optionsarg);
=======================================
--- /branches/Issue139/src/tests_srv.h Mon Oct 14 13:20:21 2013 UTC
+++ /branches/Issue139/src/tests_srv.h Thu Jun 12 09:50:00 2014 UTC
@@ -9,18 +9,19 @@
#ifndef SRC_TESTS_SRV_H_
#define SRC_TESTS_SRV_H_

+#include "snap_worker.h"
#include "testoptions.h"

int test_c2s(int ctlsockfd, tcp_stat_agent* agent, TestOptions* testOptions,
int conn_options, double* c2sspd, int set_buff, int window,
int autotune, char* device, Options* options, int record_reverse,
- int count_vars, char spds[4][256], int* spd_index);
+ char spds[4][256], int* spd_index, SnapResults **snap_results);

// S2C test
int test_s2c(int ctlsockfd, tcp_stat_agent* agent, TestOptions* testOptions,
int conn_options, double* s2cspd, int set_buff, int window,
int autotune, char* device, Options* options, char spds[4][256],
- int* spd_index, int count_vars, CwndPeaks* peaks);
+ int* spd_index, SnapResults **snap_results);

// the middlebox test
int test_mid(int ctlsockfd, tcp_stat_agent* agent, TestOptions* testOptions,
=======================================
--- /branches/Issue139/src/web100-util.c Wed Jun 4 10:51:11 2014 UTC
+++ /branches/Issue139/src/web100-util.c Thu Jun 12 09:50:00 2014 UTC
@@ -18,6 +18,19 @@
#include "web100srv.h"
#include "jsonutils.h"

+#if USE_WEB100
+#include <web100.h>
+#endif
+#if USE_WEB10G
+#include <estats.h>
+#endif
+
+static struct web100_variables {
+ int defined;
+ char name[256]; // key
+ char value[256]; // value
+} web_vars[WEB100_VARS];
+
struct tcp_name {
char* web100_name;
char* web10g_name;
@@ -102,7 +115,7 @@
* @return integer indicating number of web100 variables read
* or indicating failure of initialization
*/
-int tcp_stat_init(char *VarFileName) {
+int tcp_stats_init(char *VarFileName) {
#if USE_WEB100
FILE * fp;
char line[256], trimmedline[256];
@@ -127,8 +140,10 @@
trim(line, strlen(line), trimmedline, sizeof(trimmedline));
strlcpy(web_vars[count_vars].name, trimmedline,
sizeof(web_vars[count_vars].name));
+ web_vars[count_vars].defined = 1;
count_vars++;
}
+ web_vars[count_vars].defined = 0;
fclose(fp);
log_println(1, "web100_init() read %d variables from file", count_vars);

@@ -159,16 +174,6 @@
*/
void tcp_stat_middlebox(int sock, tcp_stat_agent* agent, tcp_stat_connection cn, char *results_keys,
size_t results_keys_strlen, char *results_values, size_t results_strlen) {
-#if USE_WEB100
- web100_var* var;
- web100_group* group;
- web100_snapshot* snap;
- web100_var* LimCwnd;
-#elif USE_WEB10G
- struct estats_val value;
- estats_val_data* data = NULL;
-#endif
-
char buff[8192], line[256];
char* sndbuff;
int i, j, k, currentMSSval = 0;
@@ -181,6 +186,8 @@
u_int32_t limcwnd_val;
struct sockaddr_storage saddr;
socklen_t saddr_size;
+ tcp_stat_group *group;
+ tcp_stat_snap *snap;

// middlebox test results
static char vars[][255] = {"WinScaleSent", "WinScaleRcvd", "SumRTT", "CountRTT", "MaxRwinRcvd" };
@@ -243,33 +250,11 @@
strlcat(results_values, line, results_strlen);

// get current MSS value
-#if USE_WEB100
- // read web100_group and web100_var of CurMSS into group and var
- web100_agent_find_var_and_group(agent, "CurMSS", &group, &var);
- // read variable value from web100 connection
- web100_raw_read(var, cn, buff);
-
- // get current MSS in textual format and append to results
-
- // get current MSS value and append to "results"
- currentMSSval = atoi(
- web100_value_to_text(web100_get_var_type(var), buff));
- snprintf(line, sizeof(line), "%s;",
- web100_value_to_text(web100_get_var_type(var), buff));
-#elif USE_WEB10G
- int type;
- type = web10g_get_val(agent, cn, "CurMSS", &value);
- if (type == -1) {
- log_println(0, "Middlebox: Failed to read the value of CurMSS");
- return;
- } else {
- currentMSSval = value.uv32;
- snprintf(line, sizeof(line), "%u;", value.uv32);
- }
-#endif
+ tcp_stats_read_var(agent, cn, "CurMSS", buff, sizeof(buff));
+ currentMSSval = atoi(buff);

strlcat(results_keys, "CurMSS;", results_keys_strlen);
- strlcat(results_values, line, results_strlen);
+ strlcat(results_values, buff, results_strlen);

log_println(3, "");
log_println(0, "Sending %d Byte packets over the network, and data=%s",
@@ -283,16 +268,7 @@
*/

limcwnd_val = 2 * currentMSSval;
-
-#if USE_WEB100
- // get web100_var and web100_group
- web100_agent_find_var_and_group(agent, "LimCwnd", &group, &LimCwnd);
-
- // set TCP CWND web100 variable to twice the current MSS Value
- web100_raw_write(LimCwnd, cn, &limcwnd_val);
-#elif USE_WEB10G
- estats_write_var("LimCwnd", (uint32_t)limcwnd_val, cn, agent);
-#endif
+ tcp_stats_set_cwnd(agent, cn, limcwnd_val);

log_println(5, "Setting Cwnd Limit to %d octets", limcwnd_val);

@@ -317,13 +293,8 @@
sndbuff[j] = (k++ & 0x7f);
}

-#if USE_WEB100
- // get web100 group with name "read"
- group = web100_group_find(agent, "read");
- snap = web100_snapshot_alloc(group, cn);
-#elif USE_WEB10G
- estats_val_data_new(&data);
-#endif
+ group = tcp_stats_get_group(agent, "read");
+ snap = tcp_stats_init_snapshot(agent, cn, group);

FD_ZERO(&wfd);
FD_SET(sock, &wfd);
@@ -333,24 +304,15 @@
if ((ret == -1) && (errno == EINTR)) /* a signal arrived, ignore it */
continue;

-#if USE_WEB100
- web100_snap(snap);
+ tcp_stats_take_snapshot(agent, cn, snap);

// get next sequence # to be sent
- web100_agent_find_var_and_group(agent, "SndNxt", &group, &var);
- web100_snap_read(var, snap, line);
- SndMax = atoi(web100_value_to_text(web100_get_var_type(var), line));
+ tcp_stats_snap_read_var(agent, snap, "SndNxt", tmpstr, sizeof(tmpstr));
+ SndMax = atoi(tmpstr);
+
// get oldest un-acked sequence number
- web100_agent_find_var_and_group(agent, "SndUna", &group, &var);
- web100_snap_read(var, snap, line);
- SndUna = atoi(web100_value_to_text(web100_get_var_type(var), line));
-#elif USE_WEB10G
- estats_read_vars(data, cn, agent);
- web10g_find_val(data, "SndNxt", &value);
- SndMax = value.uv32;
- web10g_find_val(data, "SndUna", &value);
- SndUna = value.uv32;
-#endif
+ tcp_stats_snap_read_var(agent, snap, "SndUna", tmpstr, sizeof(tmpstr));
+ SndUna = atoi(tmpstr);

// stop sending data if (buf size * 16) <
// [ (Next Sequence # To Be Sent) - (Oldest Unacknowledged Sequence #) - 1 ]
@@ -369,24 +331,12 @@

// get web100 values for the middlebox test result group
for (i = 0; i < sizeof(vars) / sizeof(vars[0]); i++) {
-#if USE_WEB100
- // read web100_group and web100_var of vars[i] into group and var
- web100_agent_find_var_and_group(agent, vars[i], &group, &var);
- // read variable value from web100 connection
- web100_raw_read(var, cn, buff);
-
- snprintf(line, sizeof(line), "%s;",
- web100_value_to_text(web100_get_var_type(var), buff));
-#elif USE_WEB10G
- int type;
- type = web10g_get_val(agent, cn, vars[i], &value);
- if (type == -1) {
+ if (tcp_stats_read_var(agent, cn, vars[i], buff, sizeof(buff)) != 0) {
log_println(0, "Middlebox: Failed to read the value of %s", vars[i]);
return;
- } else {
- snprintf(line, sizeof(line), "%u;", value.uv32);
}
-#endif
+
+ snprintf(line, sizeof(line), "%s;", buff);

if (strcmp(line, "4294967295;") == 0)
snprintf(line, sizeof(line), "%d;", -1);
@@ -399,16 +349,9 @@
}


-#if USE_WEB100
- log_println(5, "Finished with web100_middlebox() routine snap-0x%x, "
- "sndbuff=%x0x", snap, sndbuff);
- web100_snapshot_free(snap);
-#elif USE_WEB10G
-
- estats_val_data_free(&data);
- log_println(5, "Finished with web10g_middlebox() routine, "
- "sndbuff=%x0x", sndbuff);
-#endif
+ log_println(5, "Finished with middlebox() routine, "
+ "sndbuff=%x0x", sndbuff);
+ tcp_stats_free_snapshot(snap);
/* free(sndbuff); */
}

@@ -418,18 +361,12 @@
* @param sock integer socket file descriptor
* @param agent pointer to a tcp_stat_agent
* @param cn A tcp_stat_connection
- * @param count_vars integer number of tcp_stat_vars to get value of
*
*/
void tcp_stat_get_data_recv(int sock, tcp_stat_agent* agent,
- tcp_stat_connection cn, int count_vars) {
-#if USE_WEB100
- web100_var* var = NULL;
- web100_group* group = NULL;
-#elif USE_WEB10G
- estats_val_data* data = NULL;
- estats_error* err = NULL;
-#endif
+ tcp_stat_connection cn) {
+ socklen_t len;
+ struct sockaddr_storage addr;
int i;
char buf[32], line[256], *ctime();
FILE * fp;
@@ -444,75 +381,29 @@
// get values for group, var of IP Address of the Remote host's side of
// connection

-#if USE_WEB100
- web100_agent_find_var_and_group(agent, "RemAddress", &group, &var);
- web100_raw_read(var, cn, buf);
- snprintf(line, sizeof(line), "%s;",
- web100_value_to_text(web100_get_var_type(var), buf));
-#elif USE_WEB10G
- web10g_get_remote_addr(agent, cn, buf, sizeof(buf));
+ len = sizeof(addr);
+ getpeername(sock, (struct sockaddr*)&addr, &len);
+ addr2a(&addr, buf, sizeof(buf));
snprintf(line, sizeof(line), "%s;", buf);
-#endif
// write remote address to log file
if (fp)
fprintf(fp, "%s", line);

// get values for other web100 variables and write to the log file

-#if USE_WEB100
- int ok = 1;
- for (i = 0; i < count_vars; i++) {
- if ((web100_agent_find_var_and_group(agent, web_vars[i].name, &group,
- &var)) != WEB100_ERR_SUCCESS) {
+ for (i = 0; web_vars[i].defined; i++) {
+ char buf[1024];
+ if (tcp_stats_read_var(agent, cn, web_vars[i].name, buf, sizeof(buf)) != 0) {
log_println(1, "Variable %d (%s) not found in KIS", i,
web_vars[i].name);
- ok = 0;
continue;
}

- if (cn == NULL) {
- fprintf(
- stderr,
- "Web100_get_data_recv() failed, return to testing routine\n");
- return;
- }
-
- if ((web100_raw_read(var, cn, buf)) != WEB100_ERR_SUCCESS) {
- if (get_debuglvl() > 9)
- web100_perror("web100_raw_read()");
- continue;
- }
- if (ok == 1) {
- snprintf(web_vars[i].value, sizeof(web_vars[i].value), "%s",
- web100_value_to_text(web100_get_var_type(var), buf));
- if (fp)
- fprintf(fp, "%d;", (int32_t) atoi(web_vars[i].value));
- log_println(9, "%s: %d", web_vars[i].name, atoi(web_vars[i].value));
- }
- ok = 1;
- }
-#elif USE_WEB10G
- estats_val_data_new(&data);
- estats_read_vars(data, cn, agent);
-
- // Loop through all the web10g variables and write to file/log_print them
- for (i = 0; i < data->length; i++) {
- if (data->val[i].masked) continue;
- char * str = NULL;
-
- if ((err = estats_val_as_string(&str, &data->val[i], estats_var_array[i].valtype)) != NULL) {
- log_println(0, "Error: tcp_stat_get_data_recv - estats_val_as_string");
- estats_error_print(stderr, err);
- estats_error_free(&err);
- continue;
- }
+ snprintf(web_vars[i].value, sizeof(web_vars[i].value), "%s", buf);
if (fp)
- fprintf(fp, "%s;", str);
- log_println(9, "%s: %s", estats_var_array[i].name, estats_read_vars);
+ fprintf(fp, "%d;", (int32_t) atoi(web_vars[i].value));
+ log_println(9, "%s: %d", web_vars[i].name, atoi(web_vars[i].value));
}
-
- estats_val_data_free(&data);
-#endif

// close file pointers after web100 variables have been fetched
if (fp) {
@@ -595,7 +486,7 @@
*
*/
int tcp_stat_get_data(tcp_stat_snap* snap, int testsock, int ctlsock,
- tcp_stat_agent* agent, int count_vars, int jsonSupport) {
+ tcp_stat_agent* agent, int jsonSupport) {
char line[256];
#if USE_WEB100
int i;
@@ -606,7 +497,7 @@
assert(snap);
assert(agent);

- for (i = 0; i < count_vars; i++) {
+ for (i = 0; i < web_vars[i].defined; i++) {
if ((web100_agent_find_var_and_group(agent, web_vars[i].name, &group,
&var)) != WEB100_ERR_SUCCESS) {
log_println(9, "Variable %d (%s) not found in KIS: ", i,
@@ -812,60 +703,12 @@
#endif
}

-#if USE_WEB100
-/**
- * Calculate Web100 based Round-Trip Time (RTT) value.
- *
- * "SumRTT" = sum of all sampled round trip times.
- * "CountRTT" = count of samples in SumRTT.
- * Thus, sumRTT/CountRTT = RTT
- * @param ctlsock integer socket file descriptor indicating data recipient
- * @param agent pointer to a web100_agent
- * @param cn pointer to web100_connection
- * @return positive integral round trip time in milliseconds on success,
- * negative integer error code if failure.
- * Error codes :
- * -10 if web100-connection is null.
- * -24 Cannot find CountRTT or SumRTT
web100_variable's var/group.
- * -25 cannot read the value of the countRTT or SumRTT web100_variable.
- *
- *
- */
-int web100_rtt(int sock, web100_agent* agent, web100_connection* cn) {
- web100_var* var;
- char buf[32];
- web100_group* group;
- double count, sum;
-
- if (cn == NULL)
- return (-10);
-
- if ((web100_agent_find_var_and_group(agent, "CountRTT", &group, &var))
- != WEB100_ERR_SUCCESS)
- return (-24);
- if ((web100_raw_read(var, cn, buf)) != WEB100_ERR_SUCCESS) {
- return (-25);
- }
- count = atoi(web100_value_to_text(web100_get_var_type(var), buf));
-
- if ((web100_agent_find_var_and_group(agent, "SumRTT", &group, &var))
- != WEB100_ERR_SUCCESS)
- return (-24);
- if ((web100_raw_read(var, cn, buf)) != WEB100_ERR_SUCCESS) {
- return (-25);
- }
- sum = atoi(web100_value_to_text(web100_get_var_type(var), buf));
- return (sum / count);
-}
-#endif
-
/**
* Check if the "Auto Tune Send Buffer" and "Auto Tune Receive Buffer" options
* are enabled and return status on each
*
* @param sock integer socket file descriptor indicating data recipient
* @param agent pointer to a web100_agent
- * @param cn pointer to web100_connection
* @return On successful fetch of required web100_variables, integers:
* 0x01 if "Autotune send buffer" is not enabled
* 0x02 if "Autotune receive buffer" is not
enabled
@@ -876,13 +719,15 @@
* 23 cannot read the value of the X_SBufMode or X_RBufMode web100_variable.
*/

-int tcp_stat_autotune(int sock, tcp_stat_agent* agent, tcp_stat_connection cn) {
+int tcp_stats_autotune_enabled(tcp_stat_agent *agent, int sock) {
#if USE_WEB100
web100_var* var;
char buf[32];
web100_group* group;
+ web100_connection *cn;
int i, j = 0;

+ cn = tcp_stats_connection_from_socket(agent, sock);
if (cn == NULL)
return (10);

@@ -924,114 +769,19 @@
#endif
}

-#if USE_WEB100
/**
- * Check if the "Auto Tune Send Buffer" and "Auto Tune Receive Buffer" options
- * are enabled. If not, scale the Send window or receive window sizes based on the
- * scaling factors negotiated. Scaling factor sndWinScale is used to set used to set "LimCwnd"
- * (maximum size, in bytes, of the congestion window that may be used)
- * and RcvWindowScale is used to set "LimRwin"( maximum receive window size, in bytes, that may be advertised).
- *
- * This function seems unused currently.
- *
- * @param sock integer socket file descriptor indicating data recipient
- * @param agent pointer to a web100_agent
- * @param cn pointer to web100_connection
- * @return Integer, 0 on success
- * If failure, error codes :
- * 10 - web100-connection is null.
- * 22 - Cannot find LimCwnd web100 variable's
var/group data.
- * 23 - Cannot find LimRwin web100 variable's
var/group data
- * 24 - cannot find SndWinScale web100
variable's var/group data.
- * 25 - cannot read the value of the
SndWinScale web100 variable.
- * 34 - cannot find RcvWinScale web100
variable's var/group data.
- * 35 - cannot read value of RcvWinScale
web100 variable.
- *
- */
-int tcp_stat_setbuff(int sock, tcp_stat_agent* agent, tcp_stat_connection cn,
- int autotune) {
- web100_var* var;
- char buf[32];
- web100_group* group;
- int buff;
- int sScale, rScale;
-
- if (cn == NULL)
- return (10);
-
- // get Window scale used by the sender to decode snd.wnd
- if ((web100_agent_find_var_and_group(agent, "SndWinScale", &group, &var))
- != WEB100_ERR_SUCCESS)
- return (24);
- if ((web100_raw_read(var, cn, buf)) != WEB100_ERR_SUCCESS) {
- return (25);
- }
- sScale = atoi(web100_value_to_text(web100_get_var_type(var), buf));
- if (sScale > 15) // define constant for 15
- sScale = 0;
-
- // get Window scale used by the sender to decode seg.wnd
- if ((web100_agent_find_var_and_group(agent, "RcvWinScale", &group, &var))
- != WEB100_ERR_SUCCESS)
- return (34);
- if ((web100_raw_read(var, cn, buf)) != WEB100_ERR_SUCCESS) {
- return (35);
- }
- rScale = atoi(web100_value_to_text(web100_get_var_type(var), buf));
- if (rScale > 15)
- rScale = 0;
-
- if ((sScale > 0) && (rScale > 0)) {
- // 64k( max TCP rcv window)
- buff = (MAX_TCP_WIN_BYTES * KILO_BITS) << sScale;
- if (autotune & 0x01) { // autotune send buffer is not enabled
- if ((web100_agent_find_var_and_group(agent, "LimCwnd", &group, &var))
- != WEB100_ERR_SUCCESS)
- return (22);
- // attempt writing variable value, log failure
- if ((web100_raw_write(var, cn, &buff)) != WEB100_ERR_SUCCESS) {
- log_println(4,
- "Web100_raw_write(LimCwnd) failed with errorno=%d",
- errno);
- return (23);
- }
- }
- buff = (MAX_TCP_WIN_BYTES * KILO_BITS) << rScale;
-
- if (autotune & 0x02) { // autotune receive buffer is not enabled
- if ((web100_agent_find_var_and_group(agent, "LimRwin", &group, &var))
- != WEB100_ERR_SUCCESS)
- return (22);
- if ((web100_raw_write(var, cn, &buff)) != WEB100_ERR_SUCCESS) {
- log_println(4,
- "Web100_raw_write(LimRwin) failed with errorno=%d",
- errno);
- return (23);
- }
- }
- }
-
- return (0);
-}
-#endif
-
-/**
- * @param sock integer socket file descriptor indicating data recipient
* @param tcp_vars to local copies of tcp_stat variables
* @return integer 0
*/
-int tcp_stat_logvars(struct tcp_vars* vars, int count_vars) {
+int tcp_stat_logvars(struct tcp_vars* vars) {
#if USE_WEB100
int a, b;
for (a = 0; a < sizeof(struct tcp_vars) / sizeof(tcp_stat_var); ++a) {
const char* web100_name = tcp_names[a].web100_name;
if (web100_name == NULL)
continue;
- int PktsIn = -1;
- int DataPktsIn = -1;
- int has_AckPktsIn = 0;

- for (b = 0; b < count_vars; b++) {
+ for (b = 0; web_vars[b].defined; b++) {
if (strcmp(web_vars[b].name, web100_name) == 0) {
tcp_stat_var* var = &((tcp_stat_var *)vars)[a];
*var = atoi(web_vars[b].value);
@@ -1039,7 +789,7 @@
break;
}
}
- if (b == count_vars) {
+ if (web_vars[b].defined == 0) {
log_println(1, "WARNING: Failed to find Web100 var %s", web100_name);
}
}
@@ -1101,83 +851,34 @@
* @return Integer, 0 on success, -1 on failure
*/

-int CwndDecrease(char* logname, u_int32_t *dec_cnt,
+int CwndDecrease(SnapResults *results, u_int32_t *dec_cnt,
u_int32_t *same_cnt, u_int32_t *inc_cnt) {
-#if USE_WEB100
- web100_var* var;
- char buff[256];
- web100_snapshot* snap;
- int s1, s2, cnt, rt;
- web100_log* log;
- web100_group* group;
- web100_agent* agnt;
-#elif USE_WEB10G
- estats_val var;
- char buff[256];
- estats_val_data* snap;
- int s1, s2, cnt, rt;
- estats_record* log;
-#endif
-
-#if USE_WEB100
- // open snaplog file to read values
- if ((log = web100_log_open_read(logname)) == NULL)
- return (0);
- if ((snap = web100_snapshot_alloc_from_log(log)) == NULL)
- return (-1);
- if ((agnt = web100_get_log_agent(log)) == NULL)
- return (-1);
- if ((group = web100_get_log_group(log)) == NULL)
- return (-1);
-
- // Find current values of the congestion window
- if (web100_agent_find_var_and_group(agnt, "CurCwnd", &group, &var)
- != WEB100_ERR_SUCCESS)
- return (-1);
-#elif USE_WEB10G
- estats_record_open(&log, logname, "r");
-#endif
+ int i;
+ int s1, s2, cnt;
+ char buf[1024];

s2 = 0;
cnt = 0;

- // get values and update counts
-#if USE_WEB100
- while (web100_snap_from_log(snap, log) == 0) {
-#elif USE_WEB10G
- while (estats_record_read_data(&snap, log) == NULL) {
-#endif
- if (cnt++ == 0)
+ for(i = 0; i < results->collected; i++) {
+ tcp_stat_snap *snap = results->snapshots[i];
+ if (cnt++ == 0) {
continue;
+ }
s1 = s2;
// Parse snapshot, returning variable values
-#if USE_WEB100
- rt = web100_snap_read(var, snap, buff);
- s2 = atoi(web100_value_to_text(web100_get_var_type(var), buff));
-#elif USE_WEB10G
- rt = web10g_find_val(snap, "CurCwnd", &var);
- s2 = var.uv32;
-#endif
+ if (tcp_stats_snap_read_var(results->agent, snap, "CurCwnd", buf, sizeof(buf)) != 0) {
+ continue;
+ }
+
+ s2 = atoi(buf);

if (cnt < 20) {
-#if USE_WEB100
- log_println(7, "Reading snaplog %p (%d), var = %s", snap, cnt,
- (char*) var);
+ log_println(7, "Reading snaplog %p (%d), var = %s", snap, cnt, buf);
log_println(
7,
- "Checking for Cwnd decreases. rt=%d, s1=%d, s2=%d (%s), dec-cnt=%d",
- rt, s1, s2,
- web100_value_to_text(web100_get_var_type(var), buff),
- *dec_cnt);
-#elif USE_WEB10G
- log_println(7, "Reading snaplog %p (%d), var = %"PRIu64, snap, cnt,
- var.uv64);
- log_println(
- 7,
- "Checking for Cwnd decreases. rt=%d, s1=%d, s2=%d, dec-cnt=%d",
- rt, s1, s2,
- *dec_cnt);
-#endif
+ "Checking for Cwnd decreases. s1=%d, s2=%d, dec-cnt=%d",
+ s1, s2, *dec_cnt);
}
if (s2 < s1)
(*dec_cnt)++;
@@ -1185,22 +886,8 @@
(*same_cnt)++;
if (s2 > s1)
(*inc_cnt)++;
-
-#if USE_WEB100
- if (rt != WEB100_ERR_SUCCESS)
- break;
-#elif USE_WEB10G
- estats_val_data_free(&snap);
- if (rt != -1)
- break;
-#endif
}
-#if USE_WEB100
- web100_snapshot_free(snap);
- web100_log_close_read(log);
-#elif USE_WEB10G
- estats_record_close(&log);
-#endif
+
log_println(
2,
"-=-=-=- CWND window report: increases = %d, decreases = %d, "
@@ -1209,192 +896,224 @@
return (0);
}

+int tcp_stats_read_var(tcp_stat_agent *agent, tcp_stat_connection conn, const char *var_name, char *buf, int bufsize) {
#if USE_WEB100
-/**
- * Generate TCP/IP checksum for our packet
- *
- * @param buff pointer to buffer
- * @param nwords integer length (in bytes) of the header
- * @return unsigned short checksum
- */
-unsigned short csum(unsigned short *buff, int nwords) {
- /* generate a TCP/IP checksum for our packet */
+ web100_group *group;
+ web100_var* var;
+ char tmp[1024];
+ char *tmpstr;

- register int sum = 0;
- u_short answer = 0;
- register u_short *w = buff;
- register int nleft = nwords;
+ // read web100_group and web100_var of vars[i] into group and var
+ web100_agent_find_var_and_group(agent, var_name, &group, &var);
+ // read variable value from web100 connection
+ web100_raw_read(var, conn, tmp);
+ tmpstr = web100_value_to_text(web100_get_var_type(var), tmp);
+ strlcpy(buf, tmpstr, bufsize);
+#elif USE_WEB10G
+ struct estats_val value;
+ int type;

- // make 16 bit words out of every couple of 8 bit words and
- // add up
- while (nleft > 1) {
- sum += *w++;
- nleft -= 2;
+ type = web10g_get_val(agent, conn, var_name, &value);
+ if (type == -1) {
+ return -1;
}

- if (nleft == 1) {
- *(u_char *) (&answer) = *(u_char *) w;
- sum += answer;
- }
+ snprintf(buf, bufsize, "%u", value.uv32);
+#endif

- // form 16 bit words, add and store
- sum = (sum >> 16) + (sum & 0xffff);
- // then add carries to the sume
- sum += (sum >> 16);
+ return 0;
+}

- // 1s complement of the above yields checksum
- answer = ~sum;
- return (answer);
+ int tcp_stats_snap_read_var(tcp_stat_agent *agent, tcp_stat_snap *snap, const char *var_name, char *buf, int bufsize) {
+ #if USE_WEB100
+ web100_group* group;
+ web100_var* var;
+ char tmpstr[256];
+
+ web100_agent_find_var_and_group(agent, var_name, &group, &var);
+ web100_snap_read(var, snap, tmpstr);
+ snprintf(buf, bufsize, "%s", web100_value_to_text(web100_get_var_type(var), tmpstr));
+ #elif USE_WEB10G
+ struct estats_val value;
+
+ web10g_find_val(snap, var_name, &value);
+ snprintf(buf, bufsize, "%u", value.uv32);
+ #endif
+ }
+
+tcp_stat_connection tcp_stats_connection_from_socket(tcp_stat_agent *agent, int sock) {
+ tcp_stat_connection retval;
+
+#if USE_WEB100
+ retval = web100_connection_from_socket(agent, sock);
+#elif USE_WEB10G
+ retval = web10g_connection_from_socket(agent, sock);
+ if (retval == -1)
+ retval = NULL;
+#endif
+ return retval;
}

-/**
- * Try to close out connections in an unexpected or erroneous state.
- *
- * This function seems unused currently.
- *
- * @return integer, 0 if kill succeeded, and
- * -1 if kill failed or there was nothing to kill
- */
-int KillHung(void) {
- web100_agent *agent;
- web100_group *group;
- web100_var *state, *var;
- web100_connection *conn;
- int cid, one = 1, hung = 0;
- int sd;
- char *pkt, buff[64];
- struct in_addr src, dst;
- struct iphdr *ip = NULL;
- struct tcphdr *tcp;
- struct sockaddr_in sin;
- struct pseudo_hdr *phdr;
+ void tcp_stats_set_cwnd(tcp_stat_agent *agent, tcp_stat_connection cn, uint32_t cwnd) {
+ #if USE_WEB100
+ web100_var* LimCwnd;
+ web100_group* group;

- if ((agent = web100_attach(WEB100_AGENT_TYPE_LOCAL, NULL)) == NULL) {
+ // get web100_var and web100_group
+ web100_agent_find_var_and_group(agent, "LimCwnd", &group, &LimCwnd);
+
+ // set TCP CWND web100 variable to twice the current MSS Value
+ web100_raw_write(LimCwnd, cn, &cwnd);
+ #elif USE_WEB10G
+ estats_write_var("LimCwnd", (uint32_t)cwnd, cn, agent);
+ #endif
+ }
+
+tcp_stat_agent *tcp_stats_init_agent() {
+ tcp_stat_agent *agent;
+
+#if USE_WEB100
+ if ((agent = web100_attach(WEB100_AGENT_TYPE_LOCAL,
+ NULL)) == NULL) {
web100_perror("web100_attach");
- return (-1);
+ return NULL;
}
+#elif USE_WEB10G
+ if (estats_nl_client_init(&agent) != NULL) {
+ log_println(0,
+ "Error: estats_client_init failed."
+ "Unable to use web10g.");
+ return NULL;
+ }
+#endif
+ return agent;
+}

- group = web100_group_head(agent);
- conn = web100_connection_head(agent);
+void tcp_stats_free_agent(tcp_stat_agent *agent) {
+#if USE_WEB100
+ web100_detach(agent);
+#elif USE_WEB10G
+ estats_nl_client_destroy(&agent);
+#endif
+ return;
+}

- while (conn) {
- cid = web100_get_connection_cid(conn);
- web100_agent_find_var_and_group(agent, "State", &group, &state);
- web100_raw_read(state, conn, buff);
- if (atoi(web100_value_to_text(web100_get_var_type(state), buff)) == 9) {
- /* Connection is in Last_Ack state, and probably stuck, so generate and
- send a FIN packet to the remote client. This should force the connection
- to close
- */
+tcp_stat_group *tcp_stats_get_group(tcp_stat_agent *agent, char *group_name) {
+ tcp_stat_group *retval;

- log_println(3, "Connection %d was found in LastAck state", cid);
- sin.sin_family = AF_INET;
+#if USE_WEB100
+ retval = web100_group_find(agent, group_name);
+#elif USE_WEB10G
+ retval = NULL;
+#endif

- pkt = malloc(sizeof(struct iphdr) + sizeof(struct tcphdr) + 24);
- memset(pkt, 0, (24 + sizeof(struct iphdr) + sizeof(struct tcphdr)));
- sd = socket(PF_INET, SOCK_RAW, IPPROTO_TCP);
- ip = (struct iphdr *) pkt;
- tcp = (struct tcphdr *) (pkt + sizeof(struct iphdr));
- phdr = (struct pseudo_hdr *) (pkt + sizeof(struct iphdr)
- - sizeof(struct pseudo_hdr));
+ return retval;
+}

- /* Build the TCP packet first, along with the pseudo header, the later
- * the IP header build process will overwrite the pseudo header fields
- */
+tcp_stat_snap *tcp_stats_init_snapshot(tcp_stat_agent *agent, tcp_stat_connection conn, tcp_stat_group *group) {
+ tcp_stat_snap *retval;

- web100_agent_find_var_and_group(agent, "LocalAddress", &group,
- &var);
- web100_raw_read(var, conn, buff);
- log_println(3, "LocalAddress: '%s'",
- web100_value_to_text(web100_get_var_type(var), buff));
- dst.s_addr = inet_addr(
- web100_value_to_text(web100_get_var_type(var), buff));
- web100_agent_find_var_and_group(agent, "RemAddress", &group, &var);
- web100_raw_read(var, conn, buff);
- src.s_addr = inet_addr(
- web100_value_to_text(web100_get_var_type(var), buff));
- log_println(3, "RemAddress: '%s'",
- web100_value_to_text(web100_get_var_type(var), buff));
+#if USE_WEB100
+ retval = web100_snapshot_alloc(group, conn);
+#elif USE_WEB10G
+ estats_val_data_new(&retval);
+#endif

- phdr->protocol = IPPROTO_TCP;
- phdr->len = htons(sizeof(struct tcphdr) / 4);
- phdr->s_addr = src.s_addr;
- phdr->d_addr = dst.s_addr;
+ return retval;
+}

- web100_agent_find_var_and_group(agent, "LocalPort", &group, &var);
- web100_raw_read(var, conn, buff);
- tcp->dest = htons(
- atoi(web100_value_to_text(web100_get_var_type(var), buff)));
- log_println(3, "LocalPort: '%s'",
- web100_value_to_text(web100_get_var_type(var), buff));
- web100_agent_find_var_and_group(agent, "RemPort", &group, &var);
- web100_raw_read(var, conn, buff);
- tcp->source = htons(
- atoi(web100_value_to_text(web100_get_var_type(var), buff)));
- log_println(3, "RemPort: '%s'",
- web100_value_to_text(web100_get_var_type(var), buff));
- sin.sin_port = tcp->dest;
+void tcp_stats_take_snapshot(tcp_stat_agent *agent, tcp_stat_connection conn, tcp_stat_snap *snap) {
+#if USE_WEB100
+ web100_snap(snap);
+#elif USE_WEB10G
+ estats_read_vars(snap, conn, agent);
+#endif
+ return;
+}

- web100_agent_find_var_and_group(agent, "RcvNxt", &group, &var);
- web100_raw_read(var, conn, buff);
- tcp->seq = htonl(
- atoll(
- web100_value_to_text(web100_get_var_type(var),
- buff)));
- log_println(3, "Seq No. (RcvNxt): '%s'",
- web100_value_to_text(web100_get_var_type(var), buff));
- web100_agent_find_var_and_group(agent, "SndUna", &group, &var);
- web100_raw_read(var, conn, buff);
- tcp->ack_seq = htonl(
- atoll(
- web100_value_to_text(web100_get_var_type(var),
- buff)));
- log_println(3, "Ack No. (SndNxt): '%s'",
- web100_value_to_text(web100_get_var_type(var), buff));
+void tcp_stats_free_snapshot(tcp_stat_snap *snap) {
+#if USE_WEB100
+ web100_snapshot_free(snap);
+#elif USE_WEB10G
+ estats_val_data_free(&snap);
+#endif
+}

- tcp->window = 0x7fff;
- tcp->res1 = 0;
- tcp->doff = sizeof(struct tcphdr) / 4;
- /* tcp->syn = 1; */
- tcp->rst = 1;
- tcp->ack = 1;
- /* tcp->fin = 1; */
- tcp->urg_ptr = 0;
- tcp->check = csum((unsigned short *) phdr,
- sizeof(struct tcphdr) + sizeof(struct pseudo_hdr));
+tcp_stat_log *tcp_stats_open_log(char *filename, tcp_stat_connection conn, tcp_stat_group *group, char *mode) {
+ tcp_stat_log *retval;

- bzero(pkt, sizeof(struct iphdr));
- ip->ihl = 5;
- ip->version = 4;
- ip->tos = 0;
- ip->tot_len = sizeof(struct iphdr) + sizeof(struct tcphdr);
- ip->id = htons(31890);
- ip->frag_off = htons(IP_DF);
- ip->ttl = IPDEFTTL;
- ip->protocol = IPPROTO_TCP; /* TCP packet */
- ip->saddr = src.s_addr;
- ip->daddr = dst.s_addr;
- sin.sin_addr.s_addr = dst.s_addr;
+#if USE_WEB100
+ if (strcmp(mode, "w") == 0) {
+ retval = web100_log_open_write(filename, conn, group);
+ }
+ else if (strcmp(mode, "r") == 0) {
+ retval = web100_log_open_read(filename);
+ }
+ else {
+ retval = NULL;
+ }
+#elif USE_WEB10G
+ estats_record_open(&retval, filename, mode);
+#endif

- ip->check = csum((unsigned short *) pkt, sizeof(struct iphdr));
+ return retval;
+}

- if (setsockopt(sd, IPPROTO_IP, IP_HDRINCL, &one, sizeof(one)) < 0)
- return (-1);
+int tcp_stats_read_snapshot(tcp_stat_snap **snap, tcp_stat_log *log) {
+ int retval;

- sendto(sd, (char *) pkt, 40, 0, (struct sockaddr *) &sin,
- sizeof(sin));
+#if USE_WEB100
+ if (*snap == NULL)
+ if ((*snap = web100_snapshot_alloc_from_log(log)) == NULL)
+ return (-1);

- hung = 1;
- }
- // returns the next connection in the sequence
- conn = web100_connection_next(conn);
- }
+ retval = web100_snap_from_log(*snap, log);
+#elif USE_WEB10G
***The diff for this file has been truncated for email.***
=======================================
--- /branches/Issue139/src/web100srv.c Wed May 28 11:17:18 2014 UTC
+++ /branches/Issue139/src/web100srv.c Thu Jun 12 09:50:00 2014 UTC
@@ -85,6 +85,7 @@
#include "runningtest.h"
#include "strlutils.h"
#include "heuristics.h"
+#include "snap_worker.h"
#include "tests_srv.h"
#include "jsonutils.h"

@@ -101,7 +102,6 @@

// list of global variables used throughout this program.
static int window = 64000; // TCP buffer size
-static int count_vars = 0;
int dumptrace = 0;
static int usesyslog = 0;
static int multiple = 0;
@@ -536,7 +536,7 @@

case SIGHUP:
/* Initialize Web100 structures */
- count_vars = tcp_stat_init(VarFileName);
+ tcp_stats_init(VarFileName);

/* The administrator view automatically generates a usage page for the
* NDT server. This page is then accessable to the general public.
@@ -884,11 +884,6 @@

int run_test(tcp_stat_agent* agent, int ctlsockfd, TestOptions* testopt,
char *test_suite) {
-#if USE_WEB100
- tcp_stat_connection conn = NULL;
-#elif USE_WEB10G
- tcp_stat_connection conn = -1;
-#endif
char date[32]; // date indicator
char spds[4][256]; // speed "bin" array containing counters for speeds
char logstr1[4096], logstr2[1024]; // log
@@ -923,6 +918,8 @@
int timeout, dupack;
// int ifspeed;

+ SnapResults *c2s_snap_results, *s2c_snap_results;
+
time_t stime;

double rttsec; // average round trip time
@@ -967,8 +964,7 @@
spd_index = 0;

// obtain web100 connection and check auto-tune status
- conn = tcp_stat_connection_from_socket(agent, ctlsockfd);
- autotune = tcp_stat_autotune(ctlsockfd, agent, conn);
+ autotune = tcp_stats_autotune_enabled(agent, ctlsockfd);

// client needs to be version compatible. Send current version
snprintf(buff, sizeof(buff), "v%s", VERSION "-" TCP_STAT_NAME);
@@ -979,11 +975,6 @@
test_suite);
send_json_message(ctlsockfd, MSG_LOGIN, test_suite, strlen(test_suite),
testopt->json_support, JSON_SINGLE_VALUE);
- /* if ((n = initialize_tests(ctlsockfd, &testopt, conn_options))) {
- log_println(0, "ERROR: Tests initialization failed (%d)", n);
- return;
- }
- */

log_println(1, "Starting test suite:");
if (testopt->midopt) {
@@ -1016,7 +1007,7 @@

/* alarm(20); */
log_println(6, "Starting simple firewall test");
- if ((ret = test_sfw_srv(ctlsockfd, agent, &*testopt, conn_options)) != 0) {
+ if ((ret = test_sfw_srv(ctlsockfd, &*testopt, conn_options)) != 0) {
if (ret < 0)
log_println(6, "SFW test failed with rc=%d", ret);
}
@@ -1025,7 +1016,7 @@
log_println(6, "Starting c2s throughput test");
if ((ret = test_c2s(ctlsockfd, agent, &*testopt, conn_options, &c2sspd,
set_buff, window, autotune, device, &options,
- record_reverse, count_vars, spds, &spd_index)) != 0) {
+ record_reverse, spds, &spd_index, &c2s_snap_results)) != 0) {
if (ret < 0)
log_println(6, "C2S test failed with rc=%d", ret);
log_println(0, "C2S throughput test FAILED!, rc=%d", ret);
@@ -1037,7 +1028,7 @@
log_println(6, "Starting s2c throughput test");
if ((ret = test_s2c(ctlsockfd, agent, &*testopt, conn_options, &s2cspd,
set_buff, window, autotune, device, &options, spds,
- &spd_index, count_vars, &peaks)) != 0) {
+ &spd_index, &s2c_snap_results)) != 0) {
if (ret < 0)
log_println(6, "S2C test failed with rc=%d", ret);
log_println(0, "S2C throughput test FAILED!, rc=%d", ret);
@@ -1056,6 +1047,9 @@
log_println(4, "Finished testing C2S = %0.2f Mbps, S2C = %0.2f Mbps",
c2sspd / 1000, s2cspd / 1000);

+ // Calculate the cwnd peaks from the server-side sending data
+ findCwndPeaks(s2c_snap_results, &peaks);
+
// Determine link speed
calc_linkspeed(spds, spd_index, &c2s_linkspeed_data, &c2s_linkspeed_ack,
&s2c_linkspeed_data, &s2c_linkspeed_ack, runave, &dec_cnt,
@@ -1065,14 +1059,14 @@
// ...determine number of times congestion window has been changed
if (options.cwndDecrease) {
dec_cnt = inc_cnt = same_cnt = 0;
- CwndDecrease(options.s2c_logname, &dec_cnt, &same_cnt, &inc_cnt);
+ CwndDecrease(s2c_snap_results, &dec_cnt, &same_cnt, &inc_cnt);
log_println(2, "####### decreases = %d, increases = %d, no change = %d",
dec_cnt, inc_cnt, same_cnt);
}

// ...other variables
memset(&vars, 0xFF, sizeof(vars));
- tcp_stat_logvars(&vars, count_vars);
+ tcp_stat_logvars(&vars);

// end getting web100 variable values
/* if (rc == 0) { */
@@ -1839,8 +1833,8 @@
log_println(1, "server ready on port %s (family %d)", port, meta.family);

// Initialize tcp_stat structures
- count_vars = tcp_stat_init(VarFileName);
- if (count_vars == -1) {
+ tcp_stats_init(VarFileName);
+ if (tcp_stats_init(VarFileName) == 0) {
log_println(0, "No Web100 variables file found, terminating program");
exit(-5);
}
@@ -2595,20 +2589,11 @@
"pid=%d", chld_pipe[0], chld_pipe[1], chld_pid);
close(listenfd);
close(chld_pipe[1]);
-#if USE_WEB100
- if ((agent = web100_attach(WEB100_AGENT_TYPE_LOCAL,
- NULL)) == NULL) {
- web100_perror("web100_attach");
+
+ if ((agent = tcp_stats_init_agent()) == NULL) {
+ log_println(0, "Unable to initialize TCP stats collection");
return 1;
}
-#elif USE_WEB10G
- if (estats_nl_client_init(&agent) != NULL) {
- log_println(0,
- "Error: estats_client_init failed."
- "Unable to use web10g.");
- return 1;
- }
-#endif

// This is the child process from the above fork(). The parent
// is in control, and will send this child a signal when it gets
@@ -2732,11 +2717,9 @@
child_sig(0);
}
close(ctlsockfd);
-#if USE_WEB100
- web100_detach(agent);
-#elif USE_WEB10G
- estats_nl_client_destroy(&agent);
-#endif
+
+ tcp_stats_free_agent(agent);
+
// log_free(); // Don't free the log we use it all the time
// log_println()
// Also makes valgrind angry
=======================================
--- /branches/Issue139/src/web100srv.h Wed May 28 11:17:18 2014 UTC
+++ /branches/Issue139/src/web100srv.h Thu Jun 12 09:50:00 2014 UTC
@@ -93,13 +93,35 @@
/* Location of default config file */
#define CONFIGFILE "/etc/ndt.conf"

-/* hard-coded port values */
-/*
-#define PORT "3001"
-#define PORT2 "3002"
-#define PORT3 "3003"
-#define PORT4 "3003"
-*/
+#if USE_WEB10G
+#define TCP_STAT_NAME "Web10G"
+typedef struct estats_nl_client tcp_stat_agent;
+typedef int tcp_stat_connection;
+typedef struct estats_val_data tcp_stat_snap;
+/* Not relevent to web10g */
+typedef void tcp_stat_group;
+/* Log currently unimplemented in web10g */
+typedef estats_record tcp_stat_log;
+
+/* Extra Web10G functions web10g-util.c */
+int web10g_find_val(const tcp_stat_snap* data, const char* name,
+ struct estats_val* value);
+int web10g_get_val(tcp_stat_agent* client, tcp_stat_connection conn,
+ const char* name, struct estats_val* value);
+int web10g_connection_from_socket(tcp_stat_agent* client, int sockfd);
+int web10g_get_remote_addr(tcp_stat_agent* client,
+ tcp_stat_connection conn, char* out, int size);
+
+#elif USE_WEB100
+#define TCP_STAT_NAME "Web100"
+typedef web100_agent tcp_stat_agent;
+typedef web100_connection* tcp_stat_connection;
+typedef web100_snapshot tcp_stat_snap;
+/* Group only relevent to web100 */
+typedef web100_group tcp_stat_group;
+typedef web100_log tcp_stat_log;
+
+#endif

// Congestion window peak information
typedef struct CwndPeaks {
@@ -170,11 +192,6 @@
u_int32_t totalcount; // total number of valid speed data bins
};

-struct web100_variables {
- char name[256]; // key
- char value[256]; // value
-} web_vars[WEB100_VARS];
-
struct pseudo_hdr { /* used to compute TCP checksum */
uint64_t s_addr; // source addr
uint64_t d_addr; // destination address
@@ -259,6 +276,17 @@
tcp_stat_var ThruBytesAcked;
};

+typedef struct SnapResults {
+ tcp_stat_agent *agent;
+ tcp_stat_group *group;
+ tcp_stat_connection conn;
+
+ tcp_stat_snap **snapshots;
+
+ int allocated;
+ int collected;
+} SnapResults;
+
/* web100-pcap */
#ifdef HAVE_LIBPCAP
void init_vars(struct spdpair *cur);
@@ -275,52 +303,39 @@

void get_iflist(void);

-#if USE_WEB10G
-#define TCP_STAT_NAME "Web10G"
-typedef struct estats_nl_client tcp_stat_agent;
-typedef int tcp_stat_connection;
-typedef struct estats_val_data tcp_stat_snap;
-/* Not relevent to web10g */
-typedef void tcp_stat_group;
-/* Log currently unimplemented in web10g */
-typedef estats_record tcp_stat_log;
-#define tcp_stat_connection_from_socket web10g_connection_from_socket
+// Generic functions that, at compile-time, reads either from web10g or web100
+int tcp_stats_init(char *VarFileName);
+tcp_stat_agent *tcp_stats_init_agent();
+void tcp_stats_free_agent(tcp_stat_agent *agent);
+int tcp_stats_snap_read_var(tcp_stat_agent *agent, tcp_stat_snap *snap, const char *var_name, char *buf, int bufsize);
+tcp_stat_connection tcp_stats_connection_from_socket(tcp_stat_agent *agent, int sock);
+void tcp_stats_set_cwnd(tcp_stat_agent *agent, tcp_stat_connection cn, uint32_t cwnd);
+
+tcp_stat_group *tcp_stats_get_group(tcp_stat_agent *agent, char *group_name);
+
+tcp_stat_snap *tcp_stats_init_snapshot(tcp_stat_agent *agent, tcp_stat_connection conn, tcp_stat_group *group);
+void tcp_stats_take_snapshot(tcp_stat_agent *agent, tcp_stat_connection conn, tcp_stat_snap *snap);
+int tcp_stats_read_snapshot(tcp_stat_snap **snap, tcp_stat_log *log);
+void tcp_stats_write_snapshot(tcp_stat_log *log, tcp_stat_snap *snap);
+void tcp_stats_free_snapshot(tcp_stat_snap *snap);

-/* Extra Web10G functions web10g-util.c */
-int web10g_find_val(const tcp_stat_snap* data, const char* name,
- struct estats_val* value);
-int web10g_get_val(tcp_stat_agent* client, tcp_stat_connection conn,
- const char* name, struct estats_val* value);
-int web10g_connection_from_socket(tcp_stat_agent* client, int sockfd);
-int web10g_get_remote_addr(tcp_stat_agent* client,
- tcp_stat_connection conn, char* out, int size);
+tcp_stat_log *tcp_stats_open_log(char *filename, tcp_stat_connection conn, tcp_stat_group *group, char *mode);
+void tcp_stats_close_log(tcp_stat_log *log);

-#elif USE_WEB100
-#define TCP_STAT_NAME "Web100"
-typedef web100_agent tcp_stat_agent;
-typedef web100_connection* tcp_stat_connection;
-typedef web100_snapshot tcp_stat_snap;
-/* Group only relevent to web100 */
-typedef web100_group tcp_stat_group;
-typedef web100_log tcp_stat_log;
-#define tcp_stat_connection_from_socket web100_connection_from_socket
+int tcp_stats_autotune_enabled(tcp_stat_agent *agent, int sock);
+void tcp_stats_set_cwnd_limit(tcp_stat_agent *agent, tcp_stat_connection conn, tcp_stat_group* group, uint32_t limit);

-#endif
+int tcp_stats_read_var(tcp_stat_agent *agent, tcp_stat_connection conn, const char *var_name, char *buf, int bufsize);

-int tcp_stat_autotune(int sock, tcp_stat_agent* agent, tcp_stat_connection cn);
-int tcp_stat_init(char *VarFileName);
void tcp_stat_middlebox(int sock, tcp_stat_agent* agent, tcp_stat_connection cn, char *results_keys,
size_t results_keys_strlen, char *results_values, size_t results_strlen);
-int tcp_stat_setbuff(int sock, tcp_stat_agent* agent, tcp_stat_connection cn,
- int autotune);/* Not used so no web10g version */
void tcp_stat_get_data_recv(int sock, tcp_stat_agent* agent,
- tcp_stat_connection cn, int count_vars);
+ tcp_stat_connection cn);
int tcp_stat_get_data(tcp_stat_snap* snap, int testsock, int ctlsock,
- tcp_stat_agent* agent, int count_vars, int jsonSupport);
+ tcp_stat_agent* agent, int jsonSupport);

-int CwndDecrease(char* logname,
- u_int32_t *dec_cnt, u_int32_t *same_cnt, u_int32_t *inc_cnt);
-int tcp_stat_logvars(struct tcp_vars* vars, int count_vars);
+int CwndDecrease(SnapResults *results, u_int32_t *dec_cnt, u_int32_t *same_cnt, u_int32_t *inc_cnt);
+int tcp_stat_logvars(struct tcp_vars* vars);

int KillHung(void);
void writeMeta(int compress, int cputime, int snaplog, int tcpdump);


  • [ndt-dev] [ndt] r1079 committed - Applied aaron-tcp_stats_cleanup changes to Issue139 branch, ndt, 06/12/2014

Archive powered by MHonArc 2.6.16.

Top of Page