Skip to Content.
Sympa Menu

ndt-dev - [ndt-dev] [ndt] r718 committed - modular funcstions to create log directories, start,stop snap logging ...

Subject: NDT-DEV email list created

List archive

[ndt-dev] [ndt] r718 committed - modular funcstions to create log directories, start,stop snap logging ...


Chronological Thread 
  • From:
  • To:
  • Subject: [ndt-dev] [ndt] r718 committed - modular funcstions to create log directories, start,stop snap logging ...
  • Date: Thu, 13 Oct 2011 17:33:27 +0000

Revision: 718
Author:

Date: Thu Oct 13 10:32:49 2011
Log: modular funcstions to create log directories, start,stop snap logging and tcp trace logs and beter named variables for s2c,c2s,midbox test routines
http://code.google.com/p/ndt/source/detail?r=718

Modified:
/branches/kkumar_code_organize/src/logging.c
/branches/kkumar_code_organize/src/testoptions.c
/branches/kkumar_code_organize/src/web100srv.h

=======================================
--- /branches/kkumar_code_organize/src/logging.c Mon Oct 10 19:19:48
2011
+++ /branches/kkumar_code_organize/src/logging.c Thu Oct 13 10:32:49
2011
@@ -896,7 +896,8 @@
memset(filename, 0, 256);
sprintf(filename, "%s/%s", tmp2str, meta.c2s_snaplog);
if (zlib_def(filename) != 0)
- log_println(5, "compression failed");
+ //log_println(5, "compression failed ");
+ log_println(0, "compression failed for
file:%s", meta.c2s_snaplog);
else
strncat(meta.c2s_snaplog, ".gz", 3);

@@ -904,7 +905,8 @@
memset(filename, 0, 256);
sprintf(filename, "%s/%s", tmp2str, meta.s2c_snaplog);
if (zlib_def(filename) != 0)
- log_println(5, "compression failed");
+ //log_println(5, "compression failed");
+ log_println(0, "compression failed for file
:%s",meta.s2c_snaplog);
else
strncat(meta.s2c_snaplog, ".gz", 3);
}
@@ -1037,8 +1039,54 @@
}


-/** Method to create directories for log files */
-/*
-createDir(char *dirName) {
-}
+/** Method to create directories for snap log files
+* @param namebufarg string containing ip address/name of client
+* @param socketaddrarg string containing socket address
+* @param direnamedestarg location to store final directory name
+* @param finalsuffix string constant suffix indicating C2S/S2c etc
*/
+//void createDir(char const* namebufarg, char const *socketaddrarg, char *dirnamedestarg,
+// char *finalsuffix) {
+void createDir(struct sockaddr *cliaddrarg, socklen_t clilenarg, char *dirnamedestarg,
+ char *finalsuffix) {
+ char namebuf[256];
+ size_t namebuflen = 255;
+ char dir[128];
+ DIR *dp;
+ char isoTime[64];
+ char *socketaddrport;
+
+ I2Addr sockAddr = I2AddrBySAddr(get_errhandle(), cliaddrarg, clilenarg, 0, 0);
+ memset(namebuf, 0, 256);
+ I2AddrNodeName(sockAddr, namebuf, &namebuflen);
+ socketaddrport = I2AddrPort(sockAddr);
+
+
+ strncpy(dirnamedestarg, DataDirName, strlen(DataDirName));
+ if ((dp = opendir(dirnamedestarg)) == NULL && errno == ENOENT)
+ mkdir(dirnamedestarg, 0755);
+ closedir(dp);
+ get_YYYY(dir);
+ strncat(dirnamedestarg, dir, 4);
+ if ((dp = opendir(dirnamedestarg)) == NULL && errno == ENOENT)
+ mkdir(dirnamedestarg, 0755);
+ closedir(dp);
+ strncat(dirnamedestarg, "/", 1);
+ get_MM(dir);
+ strncat(dirnamedestarg, dir, 2);
+ if ((dp = opendir(dirnamedestarg)) == NULL && errno == ENOENT)
+ mkdir(dirnamedestarg, 0755);
+ closedir(dp);
+ strncat(dirnamedestarg, "/", 1);
+ get_DD(dir);
+ strncat(dirnamedestarg, dir, 2);
+ if ((dp = opendir(dirnamedestarg)) == NULL && errno == ENOENT)
+ mkdir(dirnamedestarg, 0755);
+ closedir(dp);
+ strncat(dirnamedestarg, "/", 1);
+ sprintf(dir, "%s_%s:%d.%s", get_ISOtime(isoTime), namebuf, socketaddrport, finalsuffix);
+ strncpy(finalsuffix, dir, strlen(dir));
+ strncat(dirnamedestarg, dir, strlen(dir));
+
+ I2AddrFree(sockAddr);
+}
=======================================
--- /branches/kkumar_code_organize/src/testoptions.c Mon Oct 10 19:24:44 2011
+++ /branches/kkumar_code_organize/src/testoptions.c Thu Oct 13 10:32:49 2011
@@ -19,9 +19,8 @@
#include "I2util/util.h"
#include "runningtest.h"

-int mon_pipe1[2];
-int mon_pipe2[2]; // used to store file descriptors of pipes created for snap data in S2c test
-//static int currentTest = TEST_NONE;
+int mon_pipe1[2]; // used to store file descriptors of pipes created for ndttrace for C2S tests
+int mon_pipe2[2]; // used to store file descriptors of pipes created for ndttrace data in S2c test

static const int RETRY_COUNT = 5;

@@ -32,15 +31,15 @@
int delay; // periodicity, in ms, of collecting
snap
} SnapArgs;

-// todo comments for these 2 structs
+// Worker thread characteristics used to record snaplog and Cwnd peaks
typedef struct workerArgs {
- SnapArgs* snapArgs;
- web100_agent* agent;
- CwndPeaks* peaks;
- int writeSnap; // enable writing snaplog
+ SnapArgs* snapArgs; // snapArgs struct pointer
+ web100_agent* agent; // web_100 agent pointer
+ CwndPeaks* peaks; // data indicating Cwnd values
+ int writeSnap; // enable writing snaplog
} WorkerArgs;

-static int workerLoop = 0;
+static int workerLoop = 0; // semaphore
static pthread_mutex_t mainmutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t maincond = PTHREAD_COND_INITIALIZER;
static int slowStart = 1;
@@ -156,8 +155,8 @@
mysleep(0.01);
}

- // Find Congestion window peaks from a web_100 snapshot,
- // and log it in a synchronous manner.
+ // Find Congestion window peaks from a web_100 snapshot, if enabled
+ // Write snap log , if enable, all in a synchronous manner.
while (1) {
pthread_mutex_lock(&mainmutex);
if (!workerLoop) {
@@ -275,9 +274,205 @@
}
return useropt;
}
+
+/* Method to write into snap worker */
+//start_snap_worker (&snapArgs, agent, options->snaplog, &workerLoop);
+
+void start_snap_worker(SnapArgs *snaparg, web100_agent *agentarg, char snaplogenabled, int *workerlooparg,
+ pthread_t *wrkrthreadidarg,
+ char *metafilevariablename, char *metafilename,
+ web100_connection* conn, web100_group* group) {
+ FILE *fplocal;
+
+ WorkerArgs workerArgs;
+ workerArgs.snapArgs = snaparg;
+ workerArgs.agent = agentarg;
+ workerArgs.peaks = NULL;
+ workerArgs.writeSnap = snaplogenabled;
+
+ group = web100_group_find(agentarg, "read");
+ snaparg->snap = web100_snapshot_alloc(group, conn);
+
+ if (snaplogenabled) {
+
+ //memcpy(metafilevariablename, metafilename, strlen(metafilename));
+ // The above could have been here, except for a caveat: metafile stores
+ // just the file name, but full filename is needed to open the log file
+
+ fplocal = fopen(get_logfile(),"a");
+ snaparg->log = web100_log_open_write(metafilename, conn, group);
+ if (fplocal == NULL) {
+ log_println(0, "Unable to open log file '%s', continuing on without logging", get_logfile());
+ }
+ else {
+ log_println(0, "Snaplog file: %s\n", metafilename);
+ fprintf(fplocal, "Snaplog file: %s\n", metafilename);
+ fclose(fplocal);
+ }
+ }
+
+ if (pthread_create(wrkrthreadidarg, NULL, snapWorker, (void*) &workerArgs)) {
+ log_println(0, "Cannot create worker thread for writing snap log!");
+ *wrkrthreadidarg = 0;
+ }
+
+ pthread_mutex_lock(&mainmutex);
+ *workerlooparg = 1;
+ // obtain web100 snap into "snaparg.snap"
+ web100_snap(snaparg->snap);
+ if (snaplogenabled) {
+ web100_log_write(snaparg->log, snaparg->snap);
+ }
+ pthread_cond_wait(&maincond, &mainmutex);
+ pthread_mutex_unlock(&mainmutex);
+}
+

/**
- * Perform the Middlebox test.
+ * Stop snapWorker
+ * @param workerThreadId Worker Thread's ID
+ * @param snaplogenabled boolean indication whether snap logging is enabled
+ * @param snapArgs_ptr pointer to a snapArgs object
+ * */
+void stop_snap_worker (int *workerThreadId, char snaplogenabled, SnapArgs* snapArgs_ptr) {
+ if (*workerThreadId) {
+ pthread_mutex_lock(&mainmutex);
+ workerLoop = 0;
+ pthread_mutex_unlock(&mainmutex);
+ pthread_join(*workerThreadId, NULL);
+ }
+ // close writing snaplog, if snaplog recording is enabled
+ if (snaplogenabled) {
+ web100_log_close_write(snapArgs_ptr->log);
+ }
+
+ web100_snapshot_free(snapArgs_ptr->snap);
+
+}
+
+/**
+ * Start packet tracing for this client
+ * @param socketfd
+ * @param socketfdarg
+ * todo: change testindicatorarg to const instead pf str compare, in web100-pcap too
+ *
+ * */
+
+void start_packet_trace (int socketfdarg, int socketfdarg2, pid_t *childpid, int *imonarg,
+ struct sockaddr *cliaddrarg, socklen_t clilenarg, char* device, PortPair* pairarg, const char* testindicatorarg, int iscompressionenabled, char *copylocationarg) {
+
+ char tmpstr[256];
+ int i, readretval;
+
+ I2Addr src_addr = I2AddrByLocalSockFD(get_errhandle(), socketfdarg,
0);
+
+ if ((*childpid = fork()) == 0) {
+
+ close(socketfdarg2);
+ close(socketfdarg);
+ log_println(0, "%s test Child thinks pipe() returned fd0=%d, fd1=%d", testindicatorarg, imonarg[0], imonarg[1]);
+
+ init_pkttrace(src_addr, cliaddrarg, clilenarg,
+ imonarg, device, &pairarg,
+ testindicatorarg, iscompressionenabled);
+
+ exit(0); // Packet trace finished, terminate gracefully
+ }
+ memset(tmpstr, 0, 256);
+ for (i=0; i< 5; i++) { // read nettrace file name into "tmpstr"
+ readretval = read(imonarg[0], tmpstr, 128);
+ if ((readretval == -1) && (errno == EINTR)) // socket interrupted, try reading again
+ continue;
+ break;
+ }
+
+ // name of nettrace file passed back from pcap child copied into meta structure
+ if (strlen(tmpstr) > 5) {
+ memcpy(copylocationarg, tmpstr, strlen(tmpstr));
+ log_println(0, "**memcopied dir name %s", tmpstr );
+ }
+
+ //free this address now. Lets see if this works alright
+ I2AddrFree(src_addr);
+
+}
+
+/**
+ * Stop packet tracing activity.
+ * @param monpipe_arr pointer to the monitor pipe file-descriptor array
+ *
+ * */
+void stop_packet_trace(int *monpipe_arr) {
+ int retval;
+ int i;
+
+ for (i=0; i<RETRY_COUNT; i++) {
+ retval = write(mon_pipe1[1], "c", 1);
+ if (retval == 1)
+ break;
+ if ((retval == -1) && (errno == EINTR))
+ continue;
+ }
+ close(monpipe_arr[0]);
+ close(monpipe_arr[1]);
+
+}
+
+
+/**
+ * Set Cwnd limit
+ * @param connarg web100_connection pointer
+ * @param group_arg web100 group pointer
+ * @param agentarg web100 agent pointer
+ * */
+void setCwndlimit(web100_connection* connarg, web100_group* grouparg, web100_agent* agentarg, Options* optionsarg) {
+
+ web100_var *LimRwin, *yar;
+ u_int32_t limrwin_val;
+ char yuff[32];
+
+ if (optionsarg->limit > 0) {
+ log_print(1, "Setting Cwnd limit - ");
+
+ if (connarg != NULL) {
+ log_println(1, "Got web100 connection pointer for recvsfd
socket\n");
+ web100_agent_find_var_and_group(agentarg, "CurMSS",
&grouparg, &yar);
+ web100_raw_read(yar, connarg, yuff);
+ log_println(1, "MSS = %s, multiplication factor = %d",
+ web100_value_to_text(web100_get_var_type(yar), yuff), optionsarg->limit);
+ limrwin_val = optionsarg->limit * (atoi(web100_value_to_text(web100_get_var_type(yar), yuff)));
+ web100_agent_find_var_and_group(agentarg, "LimRwin", &grouparg, &LimRwin);
+ log_print(1, "now write %d to limit the Receive
window", limrwin_val);
+ web100_raw_write(LimRwin, connarg, &limrwin_val);
+ log_println(1, " --- Done");
+ }
+
+ }
+}
+
+/**
+ * Check if receiver is clogged and use decision to temporarily
+ * stop sending packets.
+ * @param nextseqtosend integer indicating the Next Sequence Number To Be Sent
+ * @param lastunackedseq integer indicating the oldest un-acked sequence number
+ * @return integer indicating whether buffer is clogged
+ * */
+int is_buffer_clogged(int nextseqtosend, int lastunackedseq) {
+ int recclog = 0;
+ if ( (RECLTH<<2) < (nextseqtosend - lastunackedseq - 1) ) {
+ recclog = 1;
+ }
+ return recclog;
+}
+
+
+
+/**
+ * Perform the Middlebox test. This is a brief throughput test from the Server to the Client
+ * with a limited CWND to check for a duplex mismatch condition.
+ * This test also uses a pre-defined MSS value to check if any intermediate node
+ * is modifying connection settings.
+ *
* @param ctlsockfd Client control socket descriptor
* @param agent Web100 agent used to track the connection
* @param options Test options
@@ -289,7 +484,7 @@
* -1 - Listener socket creation failed
* -3 - web100 connection data not obtained
* -100 - timeout while waiting for client to connect to serverÕs ephemeral port
- * -errono- Other specific socket error numbers
+ * -errno- Other specific socket error numbers
* -101 - Retries exceeded while waiting for
client to connect
* -102 - Retries exceeded while waiting for
data from connected client
* Other used return codes:
@@ -302,24 +497,24 @@
int
test_mid(int ctlsockfd, web100_agent* agent, TestOptions* options, int conn_options, double* s2c_throughput_mid)
{
- //int maxseg=1456;
int maxseg = ETHERNET_MTU_SIZE;
/* int maxseg=1456, largewin=16*1024*1024; */
/* int seg_size, win_size; */
- int midsfd; // socket file-descriptor, used in throughput test from S->C
- int j, msgretvalue;
+ int midsfd; // socket file-descriptor, used in mid-box throughput test from S->C
+ int j; // temporary
integer store
+ int msgretvalue; // return value from
socket read/writes
struct sockaddr_storage cli_addr;
/* socklen_t optlen, clilen; */
socklen_t clilen;
- char buff[BUFFSIZE+1];
- I2Addr midsrv_addr = NULL;
- char listenmidport[10]; // listener socket for middlebox tests
+ char buff[BUFFSIZE+1]; // buf used for message
payload
+ I2Addr midsrv_addr = NULL; // server address
+ char listenmidport[10]; // listener socket for middlebox tests
int msgType;
int msgLen;
web100_connection* conn;
- char tmpstr[256];
- struct timeval sel_tv;
- fd_set rfd;
+ char tmpstr[256]; // temporary string
storage
+ struct timeval sel_tv; // time
+ fd_set rfd; // receiver file
descriptor

// variables used for protocol validation logging
enum TEST_ID thistestId = NONE;
@@ -524,54 +719,33 @@
return 0;
}

-/**
- * Set Cwnd limit
- * @param connarg web100_connection pointer
- * @param group_arg web100 group pointer
- * @param agentarg web100 agent pointer
- * */
-void setCwndlimit(web100_connection* connarg, web100_group* grouparg, web100_agent* agentarg, Options* optionsarg) {
-
- web100_var *LimRwin, *yar;
- u_int32_t limrwin_val;
- char yuff[32];
-
- if (optionsarg->limit > 0) {
- log_print(1, "Setting Cwnd limit - ");
-
- if (connarg != NULL) {
- log_println(1, "Got web100 connection pointer for recvsfd
socket\n");
- web100_agent_find_var_and_group(agentarg, "CurMSS",
&grouparg, &yar);
- web100_raw_read(yar, connarg, yuff);
- log_println(1, "MSS = %s, multiplication factor = %d",
- web100_value_to_text(web100_get_var_type(yar), yuff), optionsarg->limit);
- limrwin_val = optionsarg->limit * (atoi(web100_value_to_text(web100_get_var_type(yar), yuff)));
- web100_agent_find_var_and_group(agentarg, "LimRwin", &grouparg, &LimRwin);
- log_print(1, "now write %d to limit the Receive
window", limrwin_val);
- web100_raw_write(LimRwin, connarg, &limrwin_val);
- log_println(1, " --- Done");
- }
-
- }
-
-
-}
+

/**
- * Perform the C2S Throughput test.
+ * Perform the C2S Throughput test. This test intends to measure throughput
+ * from the client to the server by performing a 10 seconds memory-to-memory data transfer.
+ *
+ * Protocol messages are exchanged between the Client and the Server using the same
+ * connection and message format as the NDTP-Control protocol.Throughput packets are
+ * sent on the new connection and do not follow the NDTP-Control protocol message format.
+ *
+ * When the Client stops streaming the test data (or the server test routine times out),
+ * the Server sends the Client its calculated throughput value.
+ *
* @param ctlsockfd Client control socket descriptor
* @param agent Web100 agent used to track the connection
* @param testOptions Test options
+ * @param conn_options connection options
* @param c2sspd In-out parameter to store C2S throughput value
- * @param set_buff todo
- * @param window todo
- * @param autotune todo
- * @param device string todo
+ * @param set_buff enable setting TCP send/recv buffer size to be used (seems unused in file)
+ * @param window value of TCP send/rcv buffer size intended to be used.
+ * @param autotune autotuning option. Deprecated.
+ * @param device string devine name inout parameter
* @param options Test Option variables
- * @param record_reverse integer *
- * @param count_vars
- * @param spds[] [] todo
- * @param spd_index todo
+ * @param record_reverse integer indicating whether receiver-side statistics have to be logged
+ * @param count_vars count of web100 variables
+ * @param spds[] [] speed check array
+ * @param spd_index index used for speed check array
* @param conn_options Connection options
* @return 0 - success,
* >0 - error code
@@ -590,33 +764,41 @@
int record_reverse, int count_vars, char spds[4][256], int* spd_index)
{
/* int largewin=16*1024*1024; */
- int recvsfd; // receiver socket file descriptor
- pid_t mon_pid1 = 0; // child process pids
- int ret, n, i, j;
+ int recvsfd; // receiver
socket file descriptor
+ pid_t c2s_childpid = 0; // child process pids
+ int msgretvalue, tmpbytecount; // used during the "read"/"write"
process
+ int i, j; // used as
loop iterators
/* int seg_size, win_size; */
struct sockaddr_storage cli_addr;
/* socklen_t optlen, clilen; */
socklen_t clilen;
- char tmpstr[256];
- double t, bytes=0;
- struct timeval sel_tv;
- fd_set rfd;
- char buff[BUFFSIZE+1];
- PortPair pair;
- I2Addr c2ssrv_addr=NULL, src_addr=NULL;
- char listenc2sport[10];
+ char tmpstr[256]; // string array used for all sorts of temp storage purposes
+ double tmptime; // time indicator
+ double bytes_read=0; // number of bytes read during the throughput tests
+ struct timeval sel_tv; // time
+ fd_set rfd; // receive file
descriptor
+ char buff[BUFFSIZE+1]; // message "payload" buffer
+ PortPair pair; // socket ports
+ I2Addr c2ssrv_addr=NULL; // c2s test's server address
+ //I2Addr src_addr=NULL; // c2s test source address
+ char listenc2sport[10]; // listening port
pthread_t workerThreadId;

- // moving to comment out soon-to-be unused variables
- FILE *fp;
- size_t nameBufLen = 255;
- DIR *dp;
- char isoTime[64];
- char namebuf[256], dir[128];
-
+ // comment out unused variables
+ //FILE *fp;
+ //DIR *dp;
+ //char isoTime[64];
+ //char dir[128];
+
+ // server address stores
+ //size_t nameBufLen = 255;
+ //char namebuf[256];
+
+ // web_100 related variables
web100_group* group;
web100_connection* conn;

+ // snap related variables
SnapArgs snapArgs;
snapArgs.snap = NULL;
snapArgs.log = NULL;
@@ -628,6 +810,7 @@
enum TEST_STATUS_INT teststatuses = TEST_NOT_STARTED;
enum PROCESS_STATUS_INT procstatusenum = UNKNOWN;
enum PROCESS_TYPE_INT proctypeenum = CONNECT_TYPE;
+ char namesuffix[256] = "c2s_snaplog";


if (testOptions->c2sopt) {
@@ -685,8 +868,8 @@
sprintf(buff, "%d", testOptions->c2ssockport);

// send TEST_PREPARE message with ephemeral port detail, indicating start of tests
- if ((ret = send_msg(ctlsockfd, TEST_PREPARE, buff, strlen(buff))) < 0)
- return ret;
+ if ((msgretvalue = send_msg(ctlsockfd, TEST_PREPARE, buff, strlen(buff))) < 0)
+ return msgretvalue;

// Wait on listening socket and read data once ready.
// Retry 5 times, waiting for activity on the socket
@@ -696,16 +879,16 @@
FD_SET(testOptions->c2ssockfd, &rfd);
sel_tv.tv_sec = 5;
sel_tv.tv_usec = 0;
- //for (j=0; j<5; j++) { //todo 5 constant decl
+
for (j=0; j<RETRY_COUNT; j++) {
- ret = select((testOptions->c2ssockfd)+1, &rfd, NULL, NULL, &sel_tv);
- if ((ret == -1) && (errno == EINTR)) // socket interrupted. continue waiting for activity on socket
+ msgretvalue = select((testOptions->c2ssockfd)+1, &rfd, NULL, NULL, &sel_tv);
+ if ((msgretvalue == -1) && (errno == EINTR)) // socket interrupted. continue waiting for activity on socket
continue;
- if (ret == 0) // timeout
+ if (msgretvalue == 0) // timeout
return -SOCKET_CONNECT_TIMEOUT;
- if (ret < 0) // other socket errors. exit
+ if (msgretvalue < 0) // other socket errors. exit
return -errno;
- if (j == 4) // retry exceeded. exit
+ if (j == (RETRY_COUNT - 1)) // retry exceeded. exit
return -RETRY_EXCEEDED_WAITING_CONNECT;
recfd:

@@ -714,6 +897,7 @@
recvsfd = accept(testOptions->c2ssockfd, (struct sockaddr *) &cli_addr, &clilen);
if (recvsfd > 0) {
log_println(6, "accept() for %d completed", testOptions->child0);
+
// log protocol validation indicating client accept
procstatusenum = PROCESS_STARTED;
proctypeenum = CONNECT_TYPE;
@@ -727,9 +911,10 @@
}
log_println(6, "------- C2S connection setup for %d returned because (%d)",
testOptions->child0, errno);
- if (recvsfd < 0) // other socket errors, quit
- return -errno;
- if (j == 4) { // retry exceeded, quit
+ if (recvsfd < 0)
{ // other socket errors, quit
+ return -errno;
+ }
+ if (j == (RETRY_COUNT - 1)) { // retry exceeded, quit
log_println(6, "c2s child %d, uable to open connection, return from test", testOptions->child0);
return RETRY_EXCEEDED_WAITING_DATA;
}
@@ -737,135 +922,43 @@

// Get address associated with the throughput test. Used for packet tracing
log_println(6, "child %d - c2s ready for test with fd=%d", testOptions->child0, recvsfd);
- src_addr = I2AddrByLocalSockFD(get_errhandle(), recvsfd, 0);
+
+ // commenting out below to move to init_pkttrace function
+ // src_addr = I2AddrByLocalSockFD(get_errhandle(), recvsfd, 0);

// Get web100 connection. Used to collect web100 variable statistics
conn = web100_connection_from_socket(agent, recvsfd);

// set up packet tracing. Collected data is used for bottleneck link calculations
if (getuid() == 0) {
- pipe(mon_pipe1);
- if ((mon_pid1 = fork()) == 0) {
- /* close(ctlsockfd); */
- close(testOptions->c2ssockfd);
- close(recvsfd);
- log_println(5, "C2S test Child %d thinks pipe() returned fd0=%d, fd1=%d",
- testOptions->child0,mon_pipe1[0], mon_pipe1[1]);
- /* log_println(2, "C2S test calling init_pkttrace() with pd=0x%x", (int) &cli_addr); */
- init_pkttrace(src_addr, (struct sockaddr *) &cli_addr, clilen, mon_pipe1,
- device, &pair, "c2s", options->compress);
- exit(0); /* Packet trace finished, terminate gracefully */
- }
-
- // Get data collected from packet tracing into the C2S "ndttrace" file
- memset(tmpstr, 0, 256);
- for (i=0; i< 5; i++) {
- ret = read(mon_pipe1[0], tmpstr, 128);
- if ((ret == -1) && (errno == EINTR))
- continue;
- break;
- }
- if (strlen(tmpstr) > 5)
- memcpy(meta.c2s_ndttrace, tmpstr, strlen(tmpstr));
- //name of nettrace file passed back from pcap child
- }
+
+ pipe(mon_pipe1);
+ log_println(0, "C2S test calling pkt_trace_start() with pd=%d", clilen);
+ start_packet_trace (recvsfd, testOptions->c2ssockfd, &c2s_childpid, mon_pipe1,
+ (struct sockaddr *) &cli_addr, clilen, device, &pair, "c2s", options->compress, meta.c2s_ndttrace) ;
+ }
+

log_println(5, "C2S test Parent thinks pipe() returned fd0=%d, fd1=%d", mon_pipe1[0], mon_pipe1[1]);

- /* Check, and if needed, set the web100 autotuning function on. This improves
- * performance without requiring the entire system be re-configured. Returned
- * value is true if set is done, so admin knows if system default should be changed
- *
- * 11/22/04, the sBufMode and rBufMode per connection autotuning variables are being
- * depricated, so this is another attempt at making this work. If window scaling is
- * enabled, then scale the buffer size to the correct value. This will also help
- * when reporting the buffer / RTT limit.
- */
-
- /* if (autotune > 0)
- * web100_setbuff(recvsfd, agent, conn, autotune);
- */
-
- /* ok, We are now read to start the throughput tests. First
- * the client will stream data to the server for 10 seconds.
- * Data will be processed by the read loop below.
+ // experimental code, delete when finished
+ setCwndlimit(conn, group, agent,options);
+
+ // Create C->S snaplog directories, and perform some initialization based on options
+
+ /*
+ I2Addr sockAddr = I2AddrBySAddr(get_errhandle(), (struct sockaddr *) &cli_addr, clilen, 0, 0);
+ memset(namebuf, 0, 256);
+ I2AddrNodeName(sockAddr, namebuf, &nameBufLen);
+ //createDir(namebuf, I2AddrPort(sockAddr),options->c2s_logname, namesuffix);
+ *
*/
-
- // experimental code, delete when finished
-
- /*
- {
- web100_var *LimRwin, *yar;
- u_int32_t limrwin_val;
- char yuff[32];
-
- if (options->limit > 0) {
- log_print(1, "Setting Cwnd limit - ");
-
- if (conn != NULL) {
- log_println(1, "Got web100 connection pointer for recvsfd socket\n");
- web100_agent_find_var_and_group(agent, "CurMSS", &group, &yar);
- web100_raw_read(yar, conn, yuff);
- log_println(1, "MSS = %s, multiplication factor = %d",
- web100_value_to_text(web100_get_var_type(yar), yuff), options->limit);
- limrwin_val = options->limit * (atoi(web100_value_to_text(web100_get_var_type(yar), yuff)));
- web100_agent_find_var_and_group(agent, "LimRwin", &group, &LimRwin);
- log_print(1, "now write %d to limit the Receive window", limrwin_val);
- web100_raw_write(LimRwin, conn, &limrwin_val);
- log_println(1, " --- Done");
- }
-
- }
- }
- */
- /* End of test code */
-
- // Create C->S log directories, and open the file for writing, if snaplog option is enabled
- {
- I2Addr sockAddr = I2AddrBySAddr(get_errhandle(), (struct sockaddr *) &cli_addr, clilen, 0, 0);
- memset(namebuf, 0, 256);
- I2AddrNodeName(sockAddr, namebuf, &nameBufLen);
- strncpy(options->c2s_logname, DataDirName, strlen(DataDirName));
- if ((dp = opendir(options->c2s_logname)) == NULL && errno == ENOENT)
- mkdir(options->c2s_logname, 0755);
- closedir(dp);
- get_YYYY(dir);
- strncat(options->c2s_logname, dir, 4);
- if ((dp = opendir(options->c2s_logname)) == NULL && errno == ENOENT)
- mkdir(options->c2s_logname, 0755);
- closedir(dp);
- strncat(options->c2s_logname, "/", 1);
- get_MM(dir);
- strncat(options->c2s_logname, dir, 2);
- if ((dp = opendir(options->c2s_logname)) == NULL && errno == ENOENT)
- mkdir(options->c2s_logname, 0755);
- closedir(dp);
- strncat(options->c2s_logname, "/", 1);
- get_DD(dir);
- strncat(options->c2s_logname, dir, 2);
- if ((dp = opendir(options->c2s_logname)) == NULL && errno == ENOENT)
- mkdir(options->c2s_logname, 0755);
- closedir(dp);
- strncat(options->c2s_logname, "/", 1);
- sprintf(dir, "%s_%s:%d.c2s_snaplog", get_ISOtime(isoTime), namebuf, I2AddrPort(sockAddr));
- strncat(options->c2s_logname, dir, strlen(dir));
- group = web100_group_find(agent, "read");
- snapArgs.snap = web100_snapshot_alloc(group, conn);
- I2AddrFree(sockAddr);
- if (options->snaplog) {
- memcpy(meta.c2s_snaplog, dir, strlen(dir));
- fp = fopen(get_logfile(),"a");
- snapArgs.log = web100_log_open_write(options->c2s_logname, conn, group);
- if (fp == NULL) {
- log_println(0, "Unable to open log file '%s', continuing on without logging", get_logfile());
- }
- else {
- log_println(1, "c2s_snaplog file: %s\n", options->c2s_logname);
- fprintf(fp, "c2s_snaplog file: %s\n", options->c2s_logname);
- fclose(fp);
- }
- }
- } //end creating snaplog dirs and opening logfile
+ createDir((struct sockaddr *) &cli_addr, clilen,options->c2s_logname, namesuffix);
+
+ //group = web100_group_find(agent, "read");
+ //snapArgs.snap = web100_snapshot_alloc(group, conn);
+
+ //I2AddrFree(sockAddr);

sleep(2);

@@ -874,68 +967,49 @@
/* alarm(30); */ /* reset alarm() again, this 10 sec test should finish before this signal
* is generated. */

- // write into snaplog file, based on options. Lock/release web10 log file as necessary
- {
- WorkerArgs workerArgs;
- workerArgs.snapArgs = &snapArgs;
- workerArgs.agent = agent;
- workerArgs.peaks = NULL;
- workerArgs.writeSnap = options->snaplog;
- if (pthread_create(&workerThreadId, NULL, snapWorker, (void*) &workerArgs)) {
- log_println(0, "Cannot create worker thread for writing snap log!");
- workerThreadId = 0;
- }
-
- pthread_mutex_lock(&mainmutex);
- workerLoop = 1;
- web100_snap(snapArgs.snap);
- if (options->snaplog) {
- web100_log_write(snapArgs.log, snapArgs.snap);
- }
- pthread_cond_wait(&maincond, &mainmutex);
- pthread_mutex_unlock(&mainmutex);
+ // If snaplog recording is enabled, update meta file to indicate the same
+ // and proceed to get snapshot and log it. For the C2S test, the
+ // snapshot is not needed any further, and hence it is valid to move the
+ // obtaining of a snapshot based on options->snaplog!
+ if (options->snaplog) {
+ memcpy(meta.c2s_snaplog, namesuffix, strlen(namesuffix));
+ // somewhat a hack - meta file stores names without the full directory
+ // but fopen needs full path
+ start_snap_worker (&snapArgs, agent, options->snaplog, &workerLoop,
+ &workerThreadId,
+ meta.c2s_snaplog, options->c2s_logname, conn, group);
}

// Wait on listening socket and read data once ready.
- t = secs();
+ tmptime = secs();
sel_tv.tv_sec = 11; // time out after 11 seconds
sel_tv.tv_usec = 0;
FD_ZERO(&rfd);
FD_SET(recvsfd, &rfd);
for (;;) {
- ret = select(recvsfd+1, &rfd, NULL, NULL, &sel_tv);
- if ((ret == -1) && (errno == EINTR)) {
+ msgretvalue = select(recvsfd+1, &rfd, NULL, NULL, &sel_tv);
+ if ((msgretvalue == -1) && (errno == EINTR)) {
// socket interrupted. Continue waiting for activity on socket
continue;
}
- if (ret > 0) { // read from socket
- n = read(recvsfd, buff, sizeof(buff));
- if ((n == -1) && (errno == EINTR)) // read interrupted,continue
waiting
+ if (msgretvalue > 0) { // read from socket
+ tmpbytecount = read(recvsfd, buff, sizeof(buff));
+ if ((tmpbytecount == -1) && (errno == EINTR)) // read interrupted,continue waiting
continue;
- if (n == 0) // all data has been read
+ if (tmpbytecount == 0) // all data has been read
break;
- bytes += n; // data byte count has to be increased
+ bytes_read += tmpbytecount; // data byte count has to be increased
continue;
}
break;
}

- t = secs()-t;
- // throughput in kilo bits per sec =
- // (Number of transmitted bytes * 8) / (time
duration)*(1000)
- *c2sspd = (8.e-3 * bytes) / t;
-
- //calculated and assigned the value of the c->s value. hence release resources
- if (workerThreadId) {
- pthread_mutex_lock(&mainmutex);
- workerLoop = 0;
- pthread_mutex_unlock(&mainmutex);
- pthread_join(workerThreadId, NULL);
- }
- // close writing snaplog, if snaplog recording is enabled
- if (options->snaplog) {
- web100_log_close_write(snapArgs.log);
- }
+ tmptime = secs()-tmptime;
+ // throughput in kilo bits per sec = (transmitted_byte_count * 8) / (time_duration)*(1000)
+ *c2sspd = (8.e-3 * bytes_read) / tmptime;
+
+ // c->s throuput value calculated and assigned ! Release resources, conclude snap writing.
+ stop_snap_worker(&workerThreadId,options->snaplog, &snapArgs);

// send the server calculated value of C->S throughput as result to client
sprintf(buff, "%6.0f kbps outbound for child %d", *c2sspd, testOptions->child0);
@@ -943,9 +1017,7 @@
sprintf(buff, "%0.0f", *c2sspd);
send_msg(ctlsockfd, TEST_MSG, buff, strlen(buff));

-
// get receiver side Web100 stats and write them to the log file. close sockets
-
if (record_reverse == 1)
web100_get_data_recv(recvsfd, agent, conn, count_vars);
/* shutdown(recvsfd, SHUT_RD); */
@@ -956,9 +1028,9 @@
// Skip this step if speed-chk isn't running.

if (getuid() == 0) {
- log_println(1, "Signal USR1(%d) sent to child [%d]", SIGUSR1, mon_pid1);
- testOptions->child1 = mon_pid1;
- kill(mon_pid1, SIGUSR1);
+ log_println(1, "Signal USR1(%d) sent to child [%d]", SIGUSR1, c2s_childpid);
+ testOptions->child1 = c2s_childpid;
+ kill(c2s_childpid, SIGUSR1);
FD_ZERO(&rfd);
FD_SET(mon_pipe1[0], &rfd);
sel_tv.tv_sec = 1;
@@ -966,11 +1038,11 @@
i = 0;

for (;;) {
- ret = select(mon_pipe1[0]+1, &rfd, NULL, NULL, &sel_tv);
- if ((ret == -1) && (errno == EINTR))
+ msgretvalue = select(mon_pipe1[0]+1, &rfd, NULL, NULL, &sel_tv);
+ if ((msgretvalue == -1) && (errno == EINTR))
continue;
- if (((ret == -1) && (errno != EINTR)) ||(ret == 0)) {
- log_println(4, "Failed to read pkt-pair data from C2S flow, retcode=%d, reason=%d", ret, errno);
+ if (((msgretvalue == -1) && (errno != EINTR)) ||(msgretvalue == 0)) {
+ log_println(4, "Failed to read pkt-pair data from C2S flow, retcode=%d, reason=%d", msgretvalue, errno);
sprintf(spds[(*spd_index)++], " -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0.0 0 0 0 0 0 -1");
sprintf(spds[(*spd_index)++], " -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0.0 0 0 0 0 0 -1");
break;
@@ -979,10 +1051,10 @@
* just skip the read and go on
* RAC 2/8/10
*/
- if (ret > 0) {
- if ((ret = read(mon_pipe1[0], spds[*spd_index], 128)) < 0)
+ if (msgretvalue > 0) {
+ if ((msgretvalue = read(mon_pipe1[0], spds[*spd_index], 128)) < 0)
sprintf(spds[*spd_index], " -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 0.0 0 0 0 0 0 -1");
- log_println(1, "%d bytes read '%s' from C2S monitor pipe", ret, spds[*spd_index]);
+ log_println(1, "%d bytes read '%s' from C2S monitor pipe", msgretvalue, spds[*spd_index]);
(*spd_index)++;
if (i++ == 1)
break;
@@ -1005,15 +1077,7 @@

// Close opened resources for packet capture
if (getuid() == 0) {
- for (i=0; i<5; i++) {
- ret = write(mon_pipe1[1], "c", 1);
- if (ret == 1)
- break;
- if ((ret == -1) && (errno == EINTR))
- continue;
- }
- close(mon_pipe1[0]);
- close(mon_pipe1[1]);
+ stop_packet_trace(mon_pipe1);
}

// log end of C->S test
@@ -1027,24 +1091,45 @@
//set current test status and free address
setCurrentTest(TEST_NONE);
/* I2AddrFree(c2ssrv_addr); */
- I2AddrFree(src_addr);
- /* testOptions->child1 = mon_pid1; */
- }
- /* I2AddrFree(c2ssrv_addr); */
- /* I2AddrFree(src_addr); */
- /* testOptions->child1 = mon_pid1; */
+ //I2AddrFree(src_addr); //commenting out now
+ /* testOptions->child1 = c2s_childpid; */
+ }
+
return 0;
}

/**
- * Perform the S2C Throughput test.
- * Arguments: ctlsockfd - the client control socket descriptor
- * agent - the Web100 agent used to track the connection
- * testOptions - the test options
- * conn_options - the connection options
- * Returns: 0 - success,
- * >0 - error code.
- * Error codes:
+ * Perform the S2C Throughput test. This throughput test tests the achievable
+ * network bandwidth from the Server to the Client by performing a 10 seconds
+ * memory-to-memory data transfer.
+ *
+ * The Server also collects web100 data variables, that are sent to the Client
+ * at the end of the test session.
+ *
+ * Protocol messages exchanged between the Client and Server
+ * are sent using the same connection and message format as the NDTP-Control protocol.
+ * The throughput packets are sent on the new connection, though, and do not
+ * follow the NDTP-Control protocol message format.
+ *
+ * @param ctlsockfd - the client control socket descriptor
+ * @param agent - the Web100 agent used to track the connection
+ * @param testOptions - the test options
+ * @param conn_options - the connection options
+ * @param testOptions Test options
+ * @param s2cspd In-out parameter to store C2S throughput value
+ * @param set_buff enable setting TCP send/recv buffer size to be used (seems unused in file)
+ * @param window value of TCP send/rcv buffer size intended to be used.
+ * @param autotune autotuning option. Deprecated.
+ * @param device string devine name inout parameter
+ * @param options Test Option variables
+ * @param spds[] [] speed check array
+ * @param spd_index index used for speed check array
+ * @param count_vars count of web100 variables
+ * @param peaks Cwnd peaks structure pointer
+ *
+ * @return 0 - success,
+ * >0 - error code.
+ * Error codes:
* -1 - Message reception errors/inconsistencies in clientÕs final message, or Listener socket creation failed or cannot write message header information while attempting to send
* TEST_PREPARE message
* -2 - Cannot write message data while
attempting to send
@@ -1062,32 +1147,38 @@
int* spd_index, int count_vars, CwndPeaks* peaks)
{
/* int largewin=16*1024*1024; */
- int ret, j, k, n;
- int xmitsfd;
- pid_t mon_pid2 = 0;
+ int ret; // ctrl
protocol read/write return status
+ int j, k, n;
+ int xmitsfd; // transmit (i.e
server) socket fd
+ pid_t s2c_childpid = 0; // s2c_childpid
/* int seg_size, win_size; */
- char tmpstr[256];
+ char tmpstr[256]; // string array used
for temp storage of many char*
struct sockaddr_storage cli_addr;
/* socklen_t optlen, clilen; */
socklen_t clilen;
- double bytes, s, t;
- double x2cspd;
- struct timeval sel_tv;
- fd_set rfd;
- char buff[BUFFSIZE+1];
- int c3=0, i; //todo what is c3?
- PortPair pair;
- I2Addr s2csrv_addr=NULL, src_addr=NULL;
+ double bytes_written; // bytes written in
the througput test
+ double tx_duration; // total time for which data
was txed
+ double tmptime; // temporary time
store
+ double x2cspd; // s->c test throuput
+ struct timeval sel_tv; // time
+ fd_set rfd; // receive file
descriptor
+ char buff[BUFFSIZE+1]; // message payload buffer
+ int bufctrlattempts=0; // number of buffer control
attempts
+ int i; // temporary
var used for iterators etc
+ PortPair pair; // socket ports
+ I2Addr s2csrv_addr=NULL;
char listens2cport[10];
int msgType;
int msgLen;
int sndqueue;
struct sigaction new, old;
- char namebuf[256], dir[126];
- char isoTime[64];
- size_t nameBufLen = 255;
- DIR *dp;
- FILE *fp;
+ //char namebuf[256];
+ //dir[126];
+ //char isoTime[64];
+ //size_t nameBufLen = 255;
+ //DIR *dp;
+ //FILE *fp;
+ //I2Addr src_addr=NULL;

/* experimental code to capture and log multiple copies of the
* web100 variables using the web100_snap() & log() functions.
@@ -1100,14 +1191,15 @@
web100_connection* conn;
web100_var* var;
pthread_t workerThreadId;
- int SndMax=0, SndUna=0;
- int c1=0, c2=0; // sent data attempt queue, Draining Queue. TODO name appr
+ int nextseqtosend=0, lastunackedseq=0;
+ int drainingqueuecount=0, bufctlrnewdata=0; // sent data attempt queue, Draining Queue.

// variables used for protocol validation logs
enum TEST_STATUS_INT teststatuses = TEST_NOT_STARTED;
enum TEST_ID testids = S2C;
enum PROCESS_STATUS_INT procstatusenum = UNKNOWN;
enum PROCESS_TYPE_INT proctypeenum = CONNECT_TYPE;
+ char snaplogsuffix[256] = "s2c_snaplog";

SnapArgs snapArgs;
snapArgs.snap = NULL;
@@ -1172,25 +1264,6 @@
pair.port1 = -1;
pair.port2 = testOptions->s2csockport;

-/*
- if (set_buff > 0) {
- setsockopt(testOptions->s2csockfd, SOL_SOCKET, SO_SNDBUF, &window, sizeof(window));
- setsockopt(testOptions->s2csockfd, SOL_SOCKET, SO_RCVBUF, &window, sizeof(window));
- if (get_debuglvl() > 1) {
- optlen = sizeof(seg_size);
- getsockopt(testOptions->s2csockfd, SOL_TCP, TCP_MAXSEG, &seg_size, &optlen);
- getsockopt(testOptions->s2csockfd, SOL_SOCKET, SO_RCVBUF, &win_size, &optlen);
- log_println(2, "Set MSS to %d, Receiving Window size set to %dKB", seg_size, win_size);
- getsockopt(testOptions->s2csockfd, SOL_SOCKET, SO_SNDBUF, &win_size, &optlen);
- log_println(2, "Sending Window size set to %dKB", win_size);
- }
- }
-*/
- /* if (autotune > 0) {
- * setsockopt(testOptions->s2csockfd, SOL_SOCKET, SO_SNDBUF, &largewin, sizeof(largewin));
- * setsockopt(testOptions->s2csockfd, SOL_SOCKET, SO_RCVBUF, &largewin, sizeof(largewin));
- * }
- */

// Data received from speed-chk. Send TEST_PREPARE "GO" signal with port number
sprintf(buff, "%d", testOptions->s2csockport);
@@ -1213,18 +1286,18 @@
clilen = sizeof(cli_addr);
FD_ZERO(&rfd);
FD_SET(testOptions->c2ssockfd, &rfd);
- sel_tv.tv_sec = 5;
+ sel_tv.tv_sec = 5; // wait for 5 secs
sel_tv.tv_usec = 0;
- for (j=0; j<5; j++) {
+ for (j=0; j<RETRY_COUNT; j++) {
ret = select((testOptions->s2csockfd)+1, &rfd, NULL, NULL, &sel_tv);
if ((ret == -1) && (errno == EINTR))
continue;
if (ret == 0)
- return -100; // timeout
+ return SOCKET_CONNECT_TIMEOUT; // timeout
if (ret < 0)
return -errno; // other socket errors. exit
if (j == 4)
***The diff for this file has been truncated for email.***
=======================================
--- /branches/kkumar_code_organize/src/web100srv.h Sun Oct 9 17:00:18
2011
+++ /branches/kkumar_code_organize/src/web100srv.h Thu Oct 13 10:32:49
2011
@@ -68,21 +68,23 @@
#define PORT3 "3003"
#define PORT4 "3003"

+// Congestion window peak information
typedef struct CwndPeaks {
- int min;
- int max;
- int amount;
+ int min; // trough of peak value
+ int max; // maximun peak value
+ int amount; // number of transitions between peaks
} CwndPeaks;

+// Options to run test with
typedef struct options {
- u_int32_t limit;
- int snapDelay;
- char avoidSndBlockUp; // flag set to indicate avoiding send buffer blocking in the S2C test
- char snaplog;
- char cwndDecrease;
- char s2c_logname[128];
- char c2s_logname[128];
- int compress;
+ u_int32_t limit; // used to calculate receive window limit
+ int snapDelay; // Frequency of snap log collection in milliseconds (i.e logged every snapDelay ms)
+ char avoidSndBlockUp; // flag set to indicate avoiding send buffer blocking in the S2C test
+ char snaplog; // enable collecting snap log
+ char cwndDecrease; // enable analysis of the cwnd changes (S2C test)
+ char s2c_logname[128]; // S2C log file name
+ char c2s_logname[128]; // C2S log file name
+ int compress; // enable compressing log files
} Options;

typedef struct portpair {


  • [ndt-dev] [ndt] r718 committed - modular funcstions to create log directories, start,stop snap logging ..., ndt, 10/13/2011

Archive powered by MHonArc 2.6.16.

Top of Page