Skip to Content.
Sympa Menu

ndt-dev - [ndt-dev] [ndt] r1004 committed - Cleanup how snapshot taking is done. Instead of writing to a log file ...

Subject: NDT-DEV email list created

List archive

[ndt-dev] [ndt] r1004 committed - Cleanup how snapshot taking is done. Instead of writing to a log file ...


Chronological Thread 
  • From:
  • To:
  • Subject: [ndt-dev] [ndt] r1004 committed - Cleanup how snapshot taking is done. Instead of writing to a log file ...
  • Date: Mon, 10 Mar 2014 19:53:48 +0000

Revision: 1004
Author:

Date: Mon Mar 10 19:53:32 2014 UTC
Log: Cleanup how snapshot taking is done. Instead of writing to a log file during the test, collect the results and then write them at the end.


http://code.google.com/p/ndt/source/detail?r=1004

Added:
/branches/aaron-tcp_stats_cleanup/src/snap_worker.c
/branches/aaron-tcp_stats_cleanup/src/snap_worker.h
Modified:
/branches/aaron-tcp_stats_cleanup/src/Makefile.am
/branches/aaron-tcp_stats_cleanup/src/test_c2s_srv.c
/branches/aaron-tcp_stats_cleanup/src/test_s2c_srv.c
/branches/aaron-tcp_stats_cleanup/src/testoptions.c
/branches/aaron-tcp_stats_cleanup/src/testoptions.h
/branches/aaron-tcp_stats_cleanup/src/tests_srv.h
/branches/aaron-tcp_stats_cleanup/src/web100-util.c
/branches/aaron-tcp_stats_cleanup/src/web100srv.c
/branches/aaron-tcp_stats_cleanup/src/web100srv.h

=======================================
--- /dev/null
+++ /branches/aaron-tcp_stats_cleanup/src/snap_worker.c Mon Mar 10 19:53:32 2014 UTC
@@ -0,0 +1,205 @@
+/**
+ * This file contains the functions used for the thread that takes periodic
+ * snapshots of the TCP statistics.
+ *
+ * Aaron Brown 2014-03-10
+ *

+ */
+
+#include <tgmath.h>
+#include <pthread.h>
+
+#include "logging.h"
+#include "snap_worker.h"
+#include "utils.h"
+
+static void *snap_worker(void* arg);
+
+SnapResults *alloc_snap_results(int max_snapshots, tcp_stat_agent *agent, tcp_stat_connection conn, tcp_stat_group *group) {
+ SnapResults *snapResults;
+ int i;
+
+ snapResults = calloc(1, sizeof(SnapResults));
+ if (!snapResults) {
+ goto error_out;
+ }
+
+ snapResults->agent = agent;
+ snapResults->conn = conn;
+ snapResults->group = group;
+
+ snapResults->allocated = 0;
+ snapResults->collected = 0;
+
+ snapResults->snapshots = calloc(max_snapshots, sizeof(tcp_stat_snap *));
+ if (!snapResults->snapshots) {
+ goto error_out;
+ }
+
+ for(i = 0; i < max_snapshots; i++) {
+ snapResults->snapshots[i] = tcp_stats_init_snapshot(agent, conn, group);
+ if (!snapResults->snapshots[i]) {
+ goto error_out;
+ }
+
+ snapResults->allocated++;
+ }
+
+ return snapResults;
+
+error_out:
+ free_snap_results(snapResults);
+ return NULL;
+}
+
+void free_snap_results(SnapResults *results) {
+ int i;
+
+ for(i = 0; i < results->allocated; i++) {
+ tcp_stats_free_snapshot(results->snapshots[i]);
+ }
+
+ free(results);
+}
+
+SnapWorker *alloc_snap_worker(tcp_stat_agent* agent, tcp_stat_group *group,
+ tcp_stat_connection conn, double delay, int max_snapshots) {
+ SnapWorker *snapWorker = NULL;
+
+ snapWorker = calloc(1, sizeof(SnapWorker));
+ if (!snapWorker) {
+ goto error_out;
+ }
+
+ snapWorker->delay = delay;
+
+ if (pthread_mutex_init(&snapWorker->mutex, NULL) != 0) {
+ goto error_out;
+ }
+
+ if (pthread_cond_init(&snapWorker->cond, NULL) != 0) {
+ goto error_out;
+ }
+
+ snapWorker->results = alloc_snap_results(max_snapshots, agent, conn, group);
+ if (!snapWorker->results) {
+ goto error_out;
+ }
+
+ return snapWorker;
+
+error_out:
+ free_snap_worker(snapWorker);
+
+ return NULL;
+}
+
+void free_snap_worker(SnapWorker *worker) {
+ if (worker->results)
+ free_snap_results(worker->results);
+
+ pthread_mutex_destroy(&worker->mutex);
+ pthread_cond_destroy(&worker->cond);
+
+ free(worker);
+}
+
+/** Method to start snap worker thread that collects snap logs
+ * @param snaparg object
+ * @param tcp_stat_agent Agent
+ * @param snaplogenabled Is snap logging enabled?
+ * @param workerlooparg integer used to syncronize writing/reading from snaplog/tcp_stat snapshot
+ * @param wrkrthreadidarg Thread Id of workera
+ * @param metafilevariablename Which variable of the meta file gets assigned the snaplog name (unused now)
+ * @param metafilename value of metafile name
+ * @param tcp_stat_connection connection pointer
+ * @param tcp_stat_group group web100_group pointer
+ */
+SnapWorker *start_snap_worker(tcp_stat_agent* agent, tcp_stat_group *group,
+ tcp_stat_connection conn, int delay, int test_length) {
+ SnapWorker *snapWorker = NULL;
+ double converted_delay = ((double) delay) / 1000.0;
+
+ // pad the maximum test length to be sure we have enough snapshots
+ int max_number = ceil((test_length + 1) / converted_delay);
+
+ snapWorker = alloc_snap_worker(agent, group, conn, converted_delay, max_number);
+ if (!snapWorker) {
+ goto error_out;
+ }
+
+ pthread_mutex_lock(&snapWorker->mutex);
+
+ if (pthread_create(&snapWorker->pthread, NULL, snap_worker,
+ (void*) snapWorker)) {
+ log_println(0, "Cannot create worker thread for writing snap log!");
+ goto error_out;
+ }
+
+ // obtain web100 snap into "snaparg.snap"
+ pthread_cond_wait(&snapWorker->cond, &snapWorker->mutex);
+ pthread_mutex_unlock(&snapWorker->mutex);
+
+ return snapWorker;
+
+error_out:
+ if (snapWorker)
+ free_snap_worker(snapWorker);
+
+ return NULL;
+}
+
+/**
+ * Stop the snap worker process
+ * @param workerThreadId Worker Thread's ID
+ * @param snaplogenabled boolean indication whether snap logging is enabled
+ * @param snapArgs_ptr pointer to a snapArgs object
+ * */
+SnapResults *stop_snap_worker(SnapWorker *worker) {
+ SnapResults *results;
+
+ pthread_mutex_lock(&worker->mutex);
+ worker->do_exit = 1;
+ pthread_mutex_unlock(&worker->mutex);
+
+ pthread_join(worker->pthread, NULL);
+
+ results = worker->results;
+
+ worker->results = NULL;
+
+ free_snap_worker(worker);
+
+ return results;
+}
+
+static void *snap_worker(void* arg) {
+ SnapWorker *workerArgs = (SnapWorker *) arg;
+ SnapResults *results = workerArgs->results;
+
+ int i;
+
+ pthread_mutex_lock(&workerArgs->mutex);
+ pthread_cond_signal(&workerArgs->cond);
+
+ i = 0;
+ while (1) {
+ if (workerArgs->do_exit) {
+ pthread_mutex_unlock(&workerArgs->mutex);
+ break;
+ }
+
+ if (i < results->allocated) {
+ tcp_stats_take_snapshot(results->agent, results->conn, results->snapshots[i]);
+
+ results->collected++;
+ i++;
+ }
+
+ pthread_mutex_unlock(&workerArgs->mutex);
+ mysleep(workerArgs->delay);
+ pthread_mutex_lock(&workerArgs->mutex);
+ }
+
+ return NULL;
+}
=======================================
--- /dev/null
+++ /branches/aaron-tcp_stats_cleanup/src/snap_worker.h Mon Mar 10 19:53:32 2014 UTC
@@ -0,0 +1,36 @@
+/**
+ * This file contains the functions used for the thread that takes periodic
+ * snapshots of the TCP statistics.
+ *
+ * Aaron Brown 2014-03-10
+ *

+ */
+
+#ifndef SNAP_WORKER_H
+#define SNAP_WORKER_H
+
+#include <pthread.h>
+
+#include "web100srv.h"
+
+typedef struct SnapWorker {
+ double delay;
+
+ SnapResults *results;
+
+ int do_exit;
+ pthread_t pthread;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+} SnapWorker;
+
+SnapResults *alloc_snap_results(int max_snapshots, tcp_stat_agent *agent, tcp_stat_connection conn, tcp_stat_group *group);
+void free_snap_results(SnapResults *results);
+SnapWorker *alloc_snap_worker(tcp_stat_agent* agent, tcp_stat_group *group,
+ tcp_stat_connection conn, double delay, int max_snapshots);
+void free_snap_worker(SnapWorker *worker);
+SnapWorker *start_snap_worker(tcp_stat_agent* agent, tcp_stat_group *group,
+ tcp_stat_connection conn, int delay, int test_length);
+SnapResults *stop_snap_worker(SnapWorker *worker);
+
+#endif
=======================================
--- /branches/aaron-tcp_stats_cleanup/src/Makefile.am Mon Oct 14 13:20:21 2013 UTC
+++ /branches/aaron-tcp_stats_cleanup/src/Makefile.am Mon Mar 10 19:53:32 2014 UTC
@@ -70,7 +70,7 @@
web100srv_SOURCES = web100srv.c web100-util.c web100-pcap.c web100-admin.c runningtest.c \
network.c usage.c utils.c mrange.c logging.c testoptions.c ndtptestconstants.c \
protocol.c test_sfw_srv.c test_meta_srv.c ndt_odbc.c strlutils.c heuristics.c \
- test_c2s_srv.c test_s2c_srv.c test_mid_srv.c
+ test_c2s_srv.c test_s2c_srv.c test_mid_srv.c snap_worker.c
web100srv_LDFLAGS = $(NDTLDFLAGS) $(I2UTILLDFLAGS)
web100srv_LDADD = $(NDTLIBS) $(I2UTILLIBS) $(I2UTILLIBDEPS) -lpthread $(ZLIB)
web100srv_CPPFLAGS ='-DBASEDIR="$(ndtdir)"' -DFORCE_WEB100
=======================================
--- /branches/aaron-tcp_stats_cleanup/src/test_c2s_srv.c Mon Mar 10 13:48:11 2014 UTC
+++ /branches/aaron-tcp_stats_cleanup/src/test_c2s_srv.c Mon Mar 10 19:53:32 2014 UTC
@@ -20,6 +20,7 @@
#include "protocol.h"
#include "network.h"
#include "mrange.h"
+#include "snap_worker.h"

/**
* Perform the C2S Throughput test. This test intends to measure throughput
@@ -59,9 +60,10 @@
int test_c2s(int ctlsockfd, tcp_stat_agent* agent, TestOptions* testOptions,
int conn_options, double* c2sspd, int set_buff, int window,
int autotune, char* device, Options* options, int record_reverse,
- char spds[4][256], int* spd_index) {
+ char spds[4][256], int* spd_index, SnapResults **snap_results) {
tcp_stat_connection conn;
tcp_stat_group* group = NULL;
+ SnapWorker *snap_worker;
/* The pipe that will return packet pair results */
int mon_pipe[2];
int recvsfd; // receiver socket file descriptor
@@ -82,13 +84,7 @@
I2Addr c2ssrv_addr = NULL; // c2s test's server address
// I2Addr src_addr=NULL; // c2s test source address
char listenc2sport[10]; // listening port
- pthread_t workerThreadId;

- // snap related variables
- SnapArgs snapArgs;
- snapArgs.snap = NULL;
- snapArgs.log = NULL;
- snapArgs.delay = options->snapDelay;
wait_sig = 0;

// Test ID and status descriptors
@@ -96,7 +92,6 @@
enum TEST_STATUS_INT teststatuses = TEST_NOT_STARTED;
enum PROCESS_STATUS_INT procstatusenum = UNKNOWN;
enum PROCESS_TYPE_INT proctypeenum = CONNECT_TYPE;
- char namesuffix[256] = "c2s_snaplog";

if (testOptions->c2sopt) {
setCurrentTest(TEST_C2S);
@@ -300,17 +295,13 @@
log_println(5, "C2S test Parent thinks pipe() returned fd0=%d, fd1=%d",
mon_pipe[0], mon_pipe[1]);

+ group = tcp_stats_get_group(agent, "read");
+
// experimental code, delete when finished
if (options->limit > 0) {
tcp_stats_set_cwnd_limit(agent, conn, group, options->limit);
}

- // Create C->S snaplog directories, and perform some initialization based on
- // options
- create_client_logdir((struct sockaddr *) &cli_addr, clilen,
- options->c2s_logname, sizeof(options->c2s_logname),
- namesuffix,
- sizeof(namesuffix));
sleep(2);

// send empty TEST_START indicating start of the test
@@ -318,19 +309,8 @@
/* alarm(30); */ // reset alarm() again, this 10 sec test should finish
// before this signal is generated.

- // If snaplog recording is enabled, update meta file to indicate the same
- // and proceed to get snapshot and log it.
- // This block is needed here since the meta file stores names without the
- // full directory but fopen needs full path. Else, it could have gone into
- // the "start_snap_worker" method
- if (options->snaplog) {
- memcpy(meta.c2s_snaplog, namesuffix, strlen(namesuffix));
- /*start_snap_worker(&snapArgs, agent, options->snaplog, &workerLoop,
- &workerThreadId, meta.c2s_snaplog, options->c2s_logname,
- conn, group); */
- }
- start_snap_worker(&snapArgs, agent, NULL, options->snaplog, &workerThreadId,
- meta.c2s_snaplog, options->c2s_logname, conn, group);
+ snap_worker = start_snap_worker(agent, group, conn, options->snapDelay, 10);
+
// Wait on listening socket and read data once ready.
tmptime = secs();
sel_tv.tv_sec = 11; // time out after 11 seconds
@@ -363,7 +343,7 @@

// c->s throuput value calculated and assigned ! Release resources, conclude
// snap writing.
- stop_snap_worker(&workerThreadId, options->snaplog, &snapArgs);
+ *snap_results = stop_snap_worker(snap_worker);

// send the server calculated value of C->S throughput as result to client
snprintf(buff, sizeof(buff), "%6.0f kbps outbound for child %d", *c2sspd,
@@ -449,6 +429,30 @@
// protocol logs
teststatuses = TEST_ENDED;
protolog_status(testOptions->child0, testids, teststatuses, ctlsockfd);
+
+ // save the snapshots
+ if (options->snaplog) {
+ int i;
+ char namesuffix[256] = "c2s_snaplog";
+ tcp_stat_log *log;
+
+ // Create C->S snaplog directories, and perform some initialization based on
+ // options
+ create_client_logdir((struct sockaddr *) &cli_addr, clilen,
+ options->c2s_logname, sizeof(options->c2s_logname),
+ namesuffix,
+ sizeof(namesuffix));
+
+ memcpy(meta.c2s_snaplog, namesuffix, strlen(namesuffix));
+
+ log = tcp_stats_open_log(options->c2s_logname, (*snap_results)->conn, (*snap_results)->group, "w");
+
+ for(i = 0; i < (*snap_results)->collected; i++) {
+ tcp_stats_write_snapshot(log, (*snap_results)->snapshots[i]);
+ }
+
+ tcp_stats_close_log(log);
+ }

// set current test status and free address
setCurrentTest(TEST_NONE);
=======================================
--- /branches/aaron-tcp_stats_cleanup/src/test_s2c_srv.c Mon Mar 10 13:48:44 2014 UTC
+++ /branches/aaron-tcp_stats_cleanup/src/test_s2c_srv.c Mon Mar 10 19:53:32 2014 UTC
@@ -22,10 +22,6 @@
#include "network.h"
#include "mrange.h"

-extern pthread_mutex_t mainmutex;
-extern pthread_cond_t maincond;
-
-
/**
* Perform the S2C Throughput test. This throughput test tests the achievable
* network bandwidth from the Server to the Client by performing a 10 seconds
@@ -52,7 +48,6 @@
* @param options Test Option variables
* @param spds[][] speed check array
* @param spd_index index used for speed check array
- * @param peaks Cwnd peaks structure pointer
*
* @return 0 - success,
* >0 - error code.
@@ -70,7 +65,7 @@
int test_s2c(int ctlsockfd, tcp_stat_agent* agent, TestOptions* testOptions,
int conn_options, double* s2cspd, int set_buff, int window,
int autotune, char* device, Options* options, char spds[4][256],
- int* spd_index, CwndPeaks* peaks) {
+ int* spd_index, SnapResults **snap_results) {
#if USE_WEB100
/* experimental code to capture and log multiple copies of the
* web100 variables using the web100_snap() & log() functions.
@@ -84,7 +79,6 @@
#endif
tcp_stat_connection conn;
/* Just a holder for web10g */
- tcp_stat_group* group = NULL;
/* Pipe that handles returning packet pair timing */
int mon_pipe[2];
int ret; // ctrl protocol read/write return status
@@ -114,7 +108,10 @@
int sndqueue;
struct sigaction new, old;

- pthread_t workerThreadId;
+ SnapWorker *snap_worker;
+
+ tcp_stat_group *group;
+ tcp_stat_snap *snap;
int nextseqtosend = 0, lastunackedseq = 0;
int drainingqueuecount = 0, bufctlrnewdata = 0;

@@ -123,12 +120,7 @@
enum TEST_ID testids = S2C;
enum PROCESS_STATUS_INT procstatusenum = UNKNOWN;
enum PROCESS_TYPE_INT proctypeenum = CONNECT_TYPE;
- char snaplogsuffix[256] = "s2c_snaplog";

- SnapArgs snapArgs;
- snapArgs.snap = NULL;
- snapArgs.log = NULL;
- snapArgs.delay = options->snapDelay;
wait_sig = 0;

// Determine port to be used. Compute based on options set earlier
@@ -279,7 +271,10 @@
}
}
src_addr = I2AddrByLocalSockFD(get_errhandle(), xmitsfd, 0);
+
conn = tcp_stats_connection_from_socket(agent, xmitsfd);
+ group = tcp_stats_get_group(agent, "read");
+ snap = tcp_stats_init_snapshot(agent, conn, group);

// set up packet capture. The data collected is used for bottleneck link
// calculations
@@ -337,11 +332,6 @@

/* End of test code */

- // create directory to write web100 snaplog trace
- create_client_logdir((struct sockaddr *) &cli_addr, clilen,
- options->s2c_logname, sizeof(options->s2c_logname),
- snaplogsuffix, sizeof(snaplogsuffix));
-
/* Kludge way of nuking Linux route cache. This should be done
* using the sysctl interface.
*/
@@ -381,17 +371,7 @@
// Write snap logs if option is enabled. update meta log to point to
// this snaplog

- // If snaplog option is enabled, save snaplog details in meta file
- if (options->snaplog) {
- memcpy(meta.s2c_snaplog, snaplogsuffix, strlen(snaplogsuffix));
- }
- // get web100 snapshot and also log it based on options
- /*start_snap_worker(&snapArgs, agent, options->snaplog, &workerLoop,
- &workerThreadId, meta.s2c_snaplog, options->s2c_logname,
- conn, group);*///new file changes
- start_snap_worker(&snapArgs, agent, peaks, options->snaplog,
- &workerThreadId, meta.s2c_snaplog, options->s2c_logname,
- conn, group);
+ snap_worker = start_snap_worker(agent, group, conn, options->snapDelay, 10);

/* alarm(20); */
tmptime = secs(); // current time
@@ -403,21 +383,19 @@
// Increment total attempts at sending-> buffer control
bufctrlattempts++;
if (options->avoidSndBlockUp) { // Do not block send buffers
- pthread_mutex_lock(&mainmutex);
+ tcp_stats_take_snapshot(agent, conn, snap);

// get details of next sequence # to be sent and fetch value from
// snap file

// get next sequence # to be sent
- tcp_stats_snap_read_var(agent, snapArgs.snap, "SndNxt", tmpstr, sizeof(tmpstr));
+ tcp_stats_snap_read_var(agent, snap, "SndNxt", tmpstr, sizeof(tmpstr));
nextseqtosend = atoi(tmpstr);

// get oldest un-acked sequence number
- tcp_stats_snap_read_var(agent, snapArgs.snap, "SndUna", tmpstr, sizeof(tmpstr));
+ tcp_stats_snap_read_var(agent, snap, "SndUna", tmpstr, sizeof(tmpstr));
lastunackedseq = atoi(tmpstr);

- pthread_mutex_unlock(&mainmutex);
-
// Temporarily stop sending data if you sense that the buffer is
// overwhelmed
// This is calculated by checking if (8192 * 4) <
@@ -459,7 +437,7 @@
x2cspd = (8.e-3 * bytes_written) / tx_duration;

// Release semaphore, and close snaplog file. finalize other data
- stop_snap_worker(&workerThreadId, options->snaplog, &snapArgs);
+ *snap_results = stop_snap_worker(snap_worker);

// send the x2cspd to the client
memset(buff, 0, sizeof(buff));
@@ -631,6 +609,30 @@
// log protocol validation logs
teststatuses = TEST_ENDED;
protolog_status(testOptions->child0, testids, teststatuses, ctlsockfd);
+
+ // save the snapshots
+ if (options->snaplog) {
+ int i;
+ char namesuffix[256] = "s2c_snaplog";
+ tcp_stat_log *log;
+
+ // Create C->S snaplog directories, and perform some initialization based on
+ // options
+ create_client_logdir((struct sockaddr *) &cli_addr, clilen,
+ options->s2c_logname, sizeof(options->s2c_logname),
+ namesuffix,
+ sizeof(namesuffix));
+
+ memcpy(meta.s2c_snaplog, namesuffix, strlen(namesuffix));
+
+ log = tcp_stats_open_log(options->s2c_logname, (*snap_results)->conn, (*snap_results)->group, "w");
+
+ for(i = 0; i < (*snap_results)->collected; i++) {
+ tcp_stats_write_snapshot(log, (*snap_results)->snapshots[i]);
+ }
+
+ tcp_stats_close_log(log);
+ }

setCurrentTest(TEST_NONE);
}
=======================================
--- /branches/aaron-tcp_stats_cleanup/src/testoptions.c Mon Mar 10 13:48:44 2014 UTC
+++ /branches/aaron-tcp_stats_cleanup/src/testoptions.c Mon Mar 10 19:53:32 2014 UTC
@@ -20,22 +20,7 @@
#include "runningtest.h"
#include "strlutils.h"
#include "web100srv.h"
-
-
-// Worker thread characteristics used to record snaplog and Cwnd peaks
-typedef struct workerArgs {
- SnapArgs* snapArgs; // snapArgs struct pointer
- tcp_stat_agent* agent; // tcp_stat agent pointer
- CwndPeaks* peaks; // data indicating Cwnd values
- int writeSnap; // enable writing snaplog
-} WorkerArgs;
-
-int workerLoop = 0;
-pthread_mutex_t mainmutex = PTHREAD_MUTEX_INITIALIZER;
-pthread_cond_t maincond = PTHREAD_COND_INITIALIZER;
-static int slowStart = 1;
-static int prevCWNDval = -1;
-static int decreasing = 0;
+#include "snap_worker.h"

/**
* Count the CWND peaks from a snapshot and record the minimal and maximum one.
@@ -45,43 +30,50 @@
* @param peaks Structure containing CWND peaks information
* @param snap Web100 snapshot structure
*/
-void findCwndPeaks(tcp_stat_agent* agent, CwndPeaks* peaks,
- tcp_stat_snap* snap) {
- int CurCwnd;
+void findCwndPeaks(SnapResults *results, CwndPeaks *peaks) {
char buf[128];
+ int i;
+ int slowStart = 1;
+ int prevCWNDval = -1;
+ int decreasing = 0;

- tcp_stats_snap_read_var(agent, snap, "CurCwnd", buf, sizeof(buf));
+ for(i = 0; i < results->collected; i++) {
+ tcp_stat_snap *snap = results->snapshots[i];
+ int CurCwnd;

- CurCwnd = atoi(buf);
+ tcp_stats_snap_read_var(results->agent, snap, "CurCwnd", buf, sizeof(buf));

- if (slowStart) {
- if (CurCwnd < prevCWNDval) {
- slowStart = 0;
- peaks->max = prevCWNDval;
- peaks->amount = 1;
- decreasing = 1;
- }
- } else {
- // current congestion window < previous value, so, decreasing
- if (CurCwnd < prevCWNDval) {
- // update values based on actual values
- if (prevCWNDval > peaks->max) {
+ CurCwnd = atoi(buf);
+
+ if (slowStart) {
+ if (CurCwnd < prevCWNDval || prevCWNDval == -1) {
+ slowStart = 0;
peaks->max = prevCWNDval;
+ peaks->amount = 1;
+ decreasing = 1;
}
- if (!decreasing) {
- peaks->amount += 1;
+ } else {
+ // current congestion window < previous value, so, decreasing
+ if (CurCwnd < prevCWNDval || prevCWNDval == -1) {
+ // update values based on actual values
+ if (prevCWNDval > peaks->max) {
+ peaks->max = prevCWNDval;
+ }
+ if (!decreasing) {
+ peaks->amount += 1;
+ }
+ decreasing = 1;
+ // current congestion window size > previous value,
+ } else if (CurCwnd > prevCWNDval) {
+ // not decreasing.
+ if ((peaks->min == -1) || (prevCWNDval < peaks->min)) {
+ peaks->min = prevCWNDval;
+ }
+ decreasing = 0;
}
- decreasing = 1;
- // current congestion window size > previous value,
- } else if (CurCwnd > prevCWNDval) {
- // not decreasing.
- if ((peaks->min == -1) || (prevCWNDval < peaks->min)) {
- peaks->min = prevCWNDval;
- }
- decreasing = 0;
}
+ prevCWNDval = CurCwnd;
}
- prevCWNDval = CurCwnd;
}

/**
@@ -96,59 +88,6 @@
}
log_println(0, "Unknown (%d) signal was caught", signo);
}
-
-/**
- * Write the snap logs with fixed time intervals in a separate
- * thread, locking and releasing resources as necessary.
- * @param arg pointer to the snapshot structure
- * @return void pointer null
- */
-
-void*
-snapWorker(void* arg) {
- /* WARNING void* arg (workerArgs) is on the stack of the function below and
- * doesn't exist forever. */
- WorkerArgs *workerArgs = (WorkerArgs*) arg;
- SnapArgs *snapArgs = workerArgs->snapArgs;
- tcp_stat_agent* agent = workerArgs->agent;
- CwndPeaks* peaks = workerArgs->peaks;
- int writeSnap = workerArgs->writeSnap;
-
- // snap log written into every "delay" milliseconds
- double delay = ((double) snapArgs->delay) / 1000.0;
-
- while (1) {
- pthread_mutex_lock(&mainmutex);
- if (workerLoop) {
- pthread_mutex_unlock(&mainmutex);
- pthread_cond_broadcast(&maincond);
- break;
- }
- pthread_mutex_unlock(&mainmutex);
- mysleep(0.01);
- }
-
- // Find Congestion window peaks from a web_100 snapshot, if enabled
- // Write snap log , if enabled, all in a synchronous manner.
- while (1) {
- pthread_mutex_lock(&mainmutex);
- if (!workerLoop) {
- pthread_mutex_unlock(&mainmutex);
- break;
- }
- tcp_stats_take_snapshot(agent, snapArgs->conn, snapArgs->snap);
- if (peaks) {
- findCwndPeaks(agent, peaks, snapArgs->snap);
- }
- if (writeSnap) {
- tcp_stats_write_snapshot(snapArgs->log, snapArgs->snap);
- }
- pthread_mutex_unlock(&mainmutex);
- mysleep(delay);
- }
-
- return NULL;
-}

/**
* Adds test id to the test suite
@@ -249,94 +188,6 @@
}
return useropt;
}
-
-/** Method to start snap worker thread that collects snap logs
- * @param snaparg object
- * @param tcp_stat_agent Agent
- * @param snaplogenabled Is snap logging enabled?
- * @param workerlooparg integer used to syncronize writing/reading from snaplog/tcp_stat snapshot
- * @param wrkrthreadidarg Thread Id of workera
- * @param metafilevariablename Which variable of the meta file gets assigned the snaplog name (unused now)
- * @param metafilename value of metafile name
- * @param tcp_stat_connection connection pointer
- * @param tcp_stat_group group web100_group pointer
- */
-void start_snap_worker(SnapArgs *snaparg, tcp_stat_agent* agentarg,
- CwndPeaks* peaks, char snaplogenabled,
- pthread_t *wrkrthreadidarg, char *metafilevariablename,
- char *metafilename, tcp_stat_connection conn,
- tcp_stat_group* group) {
- FILE *fplocal;
-
- WorkerArgs workerArgs;
- workerArgs.snapArgs = snaparg;
- workerArgs.agent = agentarg;
- workerArgs.peaks = peaks;
- workerArgs.writeSnap = snaplogenabled;
-
- group = tcp_stats_get_group(agentarg, "read");
-
- snaparg->snap = tcp_stats_init_snapshot(agentarg, conn, group);
-
- if (snaplogenabled) {
- // memcpy(metafilevariablename, metafilename, strlen(metafilename));
- // The above could have been here, except for a caveat: metafile stores
- // just the file name, but full filename is needed to open the log file
-
- fplocal = fopen(get_logfile(), "a");
-
- snaparg->log = tcp_stats_open_log(metafilename, conn, group, "w");
- log_println( 0, "snaparg->log: %X", snaparg->log);
-
- if (fplocal == NULL) {
- log_println(
- 0,
- "Unable to open log file '%s', continuing on without logging",
- get_logfile());
- } else {
- log_println(0, "Snaplog file: %s\n", metafilename);
- fprintf(fplocal, "Snaplog file: %s\n", metafilename);
- fclose(fplocal);
- }
- }
-
- if (pthread_create(wrkrthreadidarg, NULL, snapWorker,
- (void*) &workerArgs)) {
- log_println(0, "Cannot create worker thread for writing snap log!");
- *wrkrthreadidarg = 0;
- }
-
- pthread_mutex_lock(&mainmutex);
- workerLoop= 1;
- // obtain web100 snap into "snaparg.snap"
- tcp_stats_take_snapshot(agentarg, conn, snaparg->snap);
- if (snaplogenabled) {
- tcp_stats_write_snapshot(snaparg->log, snaparg->snap);
- }
- pthread_cond_wait(&maincond, &mainmutex);
- pthread_mutex_unlock(&mainmutex);
-}
-
-/**
- * Stop snapWorker
- * @param workerThreadId Worker Thread's ID
- * @param snaplogenabled boolean indication whether snap logging is enabled
- * @param snapArgs_ptr pointer to a snapArgs object
- * */
-void stop_snap_worker(pthread_t *workerThreadId, char snaplogenabled,
- SnapArgs* snapArgs_ptr) {
- if (*workerThreadId) {
- pthread_mutex_lock(&mainmutex);
- workerLoop = 0;
- pthread_mutex_unlock(&mainmutex);
- pthread_join(*workerThreadId, NULL);
- }
- // close writing snaplog, if snaplog recording is enabled
- if (snaplogenabled) {
- tcp_stats_close_log(snapArgs_ptr->log);
- }
- tcp_stats_free_snapshot(snapArgs_ptr->snap);
-}

/**
* Start packet tracing for this client
=======================================
--- /branches/aaron-tcp_stats_cleanup/src/testoptions.h Mon Mar 10 13:40:31 2014 UTC
+++ /branches/aaron-tcp_stats_cleanup/src/testoptions.h Mon Mar 10 19:53:32 2014 UTC
@@ -10,6 +10,7 @@
#define SRC_TESTOPTIONS_H_

#include "web100srv.h"
+#include "snap_worker.h"

#define LISTENER_SOCKET_CREATE_FAILED -1
#define SOCKET_CONNECT_TIMEOUT -100
@@ -66,15 +67,7 @@
int getCurrentTest();
void setCurrentTest(int testId);

-// void start_snap_worker(SnapArgs *snaparg, tcp_stat_agent *agentarg,
-void start_snap_worker(SnapArgs *snaparg, tcp_stat_agent *agentarg,
- CwndPeaks* peaks, char snaplogenabled,
- pthread_t *wrkrthreadidarg, char *metafilevariablename,
- char *metafilename, tcp_stat_connection conn,
- tcp_stat_group* group);
-
-void stop_snap_worker(pthread_t *workerThreadId, char snaplogenabled,
- SnapArgs* snapArgs_ptr);
+void findCwndPeaks(SnapResults *results, CwndPeaks *peaks);

void setCwndlimit(tcp_stat_connection connarg, tcp_stat_group* grouparg,
tcp_stat_agent* agentarg, Options* optionsarg);
=======================================
--- /branches/aaron-tcp_stats_cleanup/src/tests_srv.h Mon Mar 10 13:48:11 2014 UTC
+++ /branches/aaron-tcp_stats_cleanup/src/tests_srv.h Mon Mar 10 19:53:32 2014 UTC
@@ -9,18 +9,19 @@
#ifndef SRC_TESTS_SRV_H_
#define SRC_TESTS_SRV_H_

+#include "snap_worker.h"
#include "testoptions.h"

int test_c2s(int ctlsockfd, tcp_stat_agent* agent, TestOptions* testOptions,
int conn_options, double* c2sspd, int set_buff, int window,
int autotune, char* device, Options* options, int record_reverse,
- char spds[4][256], int* spd_index);
+ char spds[4][256], int* spd_index, SnapResults **snap_results);

// S2C test
int test_s2c(int ctlsockfd, tcp_stat_agent* agent, TestOptions* testOptions,
int conn_options, double* s2cspd, int set_buff, int window,
int autotune, char* device, Options* options, char spds[4][256],
- int* spd_index, CwndPeaks* peaks);
+ int* spd_index, SnapResults **snap_results);

// the middlebox test
int test_mid(int ctlsockfd, tcp_stat_agent* agent, TestOptions* testOptions,
=======================================
--- /branches/aaron-tcp_stats_cleanup/src/web100-util.c Mon Mar 10 13:49:35 2014 UTC
+++ /branches/aaron-tcp_stats_cleanup/src/web100-util.c Mon Mar 10 19:53:32 2014 UTC
@@ -839,30 +839,26 @@
* @return Integer, 0 on success, -1 on failure
*/

-int CwndDecrease(tcp_stat_agent *agent, char* logname, u_int32_t *dec_cnt,
+int CwndDecrease(SnapResults *results, u_int32_t *dec_cnt,
u_int32_t *same_cnt, u_int32_t *inc_cnt) {
- tcp_stat_log *log;
- tcp_stat_snap *snap = NULL;
+ int i;
int s1, s2, cnt;
char buf[1024];

- log = tcp_stats_open_log(logname, NULL, NULL, "r");
-
s2 = 0;
cnt = 0;

- // get values and update counts
- while (tcp_stats_read_snapshot(&snap, log) == 0) {
+ for(i = 0; i < results->collected; i++) {
+ tcp_stat_snap *snap = results->snapshots[i];
+
if (cnt++ == 0) {
- tcp_stats_free_snapshot(snap);
continue;
}

s1 = s2;
// Parse snapshot, returning variable values

- if (tcp_stats_snap_read_var(agent, snap, "CurCwnd", buf, sizeof(buf)) != 0) {
- tcp_stats_free_snapshot(snap);
+ if (tcp_stats_snap_read_var(results->agent, snap, "CurCwnd", buf, sizeof(buf)) != 0) {
continue;
}

@@ -882,11 +878,7 @@
(*same_cnt)++;
if (s2 > s1)
(*inc_cnt)++;
-
- tcp_stats_free_snapshot(snap);
}
-
- tcp_stats_close_log(log);

log_println(
2,
=======================================
--- /branches/aaron-tcp_stats_cleanup/src/web100srv.c Mon Mar 10 13:48:44 2014 UTC
+++ /branches/aaron-tcp_stats_cleanup/src/web100srv.c Mon Mar 10 19:53:32 2014 UTC
@@ -85,6 +85,7 @@
#include "runningtest.h"
#include "strlutils.h"
#include "heuristics.h"
+#include "snap_worker.h"
#include "tests_srv.h"

static char lgfn[FILENAME_SIZE]; // log file name
@@ -915,6 +916,8 @@
int timeout, dupack;
// int ifspeed;

+ SnapResults *c2s_snap_results, *s2c_snap_results;
+
time_t stime;

double rttsec; // average round trip time
@@ -1010,7 +1013,7 @@
log_println(6, "Starting c2s throughput test");
if ((ret = test_c2s(ctlsockfd, agent, &*testopt, conn_options, &c2sspd,
set_buff, window, autotune, device, &options,
- record_reverse, spds, &spd_index)) != 0) {
+ record_reverse, spds, &spd_index, &c2s_snap_results)) != 0) {
if (ret < 0)
log_println(6, "C2S test failed with rc=%d", ret);
log_println(0, "C2S throughput test FAILED!, rc=%d", ret);
@@ -1022,7 +1025,7 @@
log_println(6, "Starting s2c throughput test");
if ((ret = test_s2c(ctlsockfd, agent, &*testopt, conn_options, &s2cspd,
set_buff, window, autotune, device, &options, spds,
- &spd_index, &peaks)) != 0) {
+ &spd_index, &s2c_snap_results)) != 0) {
if (ret < 0)
log_println(6, "S2C test failed with rc=%d", ret);
log_println(0, "S2C throughput test FAILED!, rc=%d", ret);
@@ -1041,6 +1044,9 @@
log_println(4, "Finished testing C2S = %0.2f Mbps, S2C = %0.2f Mbps",
c2sspd / 1000, s2cspd / 1000);

+ // Calculate the cwnd peaks from the server-side sending data
+ findCwndPeaks(s2c_snap_results, &peaks);
+
// Determine link speed
calc_linkspeed(spds, spd_index, &c2s_linkspeed_data, &c2s_linkspeed_ack,
&s2c_linkspeed_data, &s2c_linkspeed_ack, runave, &dec_cnt,
@@ -1050,7 +1056,7 @@
// ...determine number of times congestion window has been changed
if (options.cwndDecrease) {
dec_cnt = inc_cnt = same_cnt = 0;
- CwndDecrease(agent, options.s2c_logname, &dec_cnt, &same_cnt, &inc_cnt);
+ CwndDecrease(s2c_snap_results, &dec_cnt, &same_cnt, &inc_cnt);
log_println(2, "####### decreases = %d, increases = %d, no change = %d",
dec_cnt, inc_cnt, same_cnt);
}
=======================================
--- /branches/aaron-tcp_stats_cleanup/src/web100srv.h Mon Mar 10 13:48:44 2014 UTC
+++ /branches/aaron-tcp_stats_cleanup/src/web100srv.h Mon Mar 10 19:53:32 2014 UTC
@@ -93,13 +93,35 @@
/* Location of default config file */
#define CONFIGFILE "/etc/ndt.conf"

-/* hard-coded port values */
-/*
-#define PORT "3001"
-#define PORT2 "3002"
-#define PORT3 "3003"
-#define PORT4 "3003"
-*/
+#if USE_WEB10G
+#define TCP_STAT_NAME "Web10G"
+typedef struct estats_nl_client tcp_stat_agent;
+typedef int tcp_stat_connection;
+typedef struct estats_val_data tcp_stat_snap;
+/* Not relevent to web10g */
+typedef void tcp_stat_group;
+/* Log currently unimplemented in web10g */
+typedef estats_record tcp_stat_log;
+
+/* Extra Web10G functions web10g-util.c */
+int web10g_find_val(const tcp_stat_snap* data, const char* name,
+ struct estats_val* value);
+int web10g_get_val(tcp_stat_agent* client, tcp_stat_connection conn,
+ const char* name, struct estats_val* value);
+int web10g_connection_from_socket(tcp_stat_agent* client, int sockfd);
+int web10g_get_remote_addr(tcp_stat_agent* client,
+ tcp_stat_connection conn, char* out, int size);
+
+#elif USE_WEB100
+#define TCP_STAT_NAME "Web100"
+typedef web100_agent tcp_stat_agent;
+typedef web100_connection* tcp_stat_connection;
+typedef web100_snapshot tcp_stat_snap;
+/* Group only relevent to web100 */
+typedef web100_group tcp_stat_group;
+typedef web100_log tcp_stat_log;
+
+#endif

// Congestion window peak information
typedef struct CwndPeaks {
@@ -254,6 +276,17 @@
tcp_stat_var ThruBytesAcked;
};

+typedef struct SnapResults {
+ tcp_stat_agent *agent;
+ tcp_stat_group *group;
+ tcp_stat_connection conn;
+
+ tcp_stat_snap **snapshots;
+
+ int allocated;
+ int collected;
+} SnapResults;
+
/* web100-pcap */
#ifdef HAVE_LIBPCAP
void init_vars(struct spdpair *cur);
@@ -270,36 +303,6 @@

void get_iflist(void);

-#if USE_WEB10G
-#define TCP_STAT_NAME "Web10G"
-typedef struct estats_nl_client tcp_stat_agent;
-typedef int tcp_stat_connection;
-typedef struct estats_val_data tcp_stat_snap;
-/* Not relevent to web10g */
-typedef void tcp_stat_group;
-/* Log currently unimplemented in web10g */
-typedef estats_record tcp_stat_log;
-
-/* Extra Web10G functions web10g-util.c */
-int web10g_find_val(const tcp_stat_snap* data, const char* name,
- struct estats_val* value);
-int web10g_get_val(tcp_stat_agent* client, tcp_stat_connection conn,
- const char* name, struct estats_val* value);
-int web10g_connection_from_socket(tcp_stat_agent* client, int sockfd);
-int web10g_get_remote_addr(tcp_stat_agent* client,
- tcp_stat_connection conn, char* out, int size);
-
-#elif USE_WEB100
-#define TCP_STAT_NAME "Web100"
-typedef web100_agent tcp_stat_agent;
-typedef web100_connection* tcp_stat_connection;
-typedef web100_snapshot tcp_stat_snap;
-/* Group only relevent to web100 */
-typedef web100_group tcp_stat_group;
-typedef web100_log tcp_stat_log;
-
-#endif
-
// Generic functions that, at compile-time, reads either from web10g or web100
int tcp_stats_init(char *VarFileName);
tcp_stat_agent *tcp_stats_init_agent();
@@ -331,8 +334,9 @@
int tcp_stat_get_data(tcp_stat_snap* snap, int testsock, int ctlsock,
tcp_stat_agent* agent);

-int CwndDecrease(tcp_stat_agent *agent, char* logname,
- u_int32_t *dec_cnt, u_int32_t *same_cnt, u_int32_t *inc_cnt);
+int CwndDecrease(SnapResults *results, u_int32_t *dec_cnt,
+ u_int32_t *same_cnt, u_int32_t *inc_cnt);
+
int tcp_stat_logvars(struct tcp_vars* vars);

int KillHung(void);


  • [ndt-dev] [ndt] r1004 committed - Cleanup how snapshot taking is done. Instead of writing to a log file ..., ndt, 03/10/2014

Archive powered by MHonArc 2.6.16.

Top of Page