Skip to Content.
Sympa Menu

ndt-dev - [ndt-dev] [ndt] r1161 committed - Added support for multiple ports (S2C)

Subject: NDT-DEV email list created

List archive

[ndt-dev] [ndt] r1161 committed - Added support for multiple ports (S2C)


Chronological Thread 
  • From:
  • To:
  • Subject: [ndt-dev] [ndt] r1161 committed - Added support for multiple ports (S2C)
  • Date: Thu, 13 Nov 2014 12:03:51 +0000

Revision: 1161
Author:

Date: Thu Nov 13 12:03:37 2014 UTC
Log: Added support for multiple ports (S2C)
https://code.google.com/p/ndt/source/detail?r=1161

Modified:
/branches/MultiplePorts/src/test_s2c_clt.c
/branches/MultiplePorts/src/test_s2c_srv.c

=======================================
--- /branches/MultiplePorts/src/test_s2c_clt.c Thu Nov 6 09:42:49 2014 UTC
+++ /branches/MultiplePorts/src/test_s2c_clt.c Thu Nov 13 12:03:37 2014 UTC
@@ -50,7 +50,6 @@
char buff[BUFFSIZE + 1];
int msgLen, msgType;
int s2cport = atoi(PORT3);
- I2Addr sec_addr = NULL;
I2Addr sec_addresses[7]; // server addresses per thread
int inlth, retcode, one = 1, set_size;
int inSocket[7]; // up to 7
@@ -67,9 +66,9 @@
#endif
int threadsnum = 1; // specify the number of threads (parallel TCP connections)
int activeThreads = 1;
- int i;
+ int i, j;
struct timeval sel_tv;
- fd_set rfd, tmpRfd;
+ fd_set rfd[7], tmpRfd[7];
char* ptr, *jsonMsgValue;

// variables used for protocol validation logs
@@ -149,16 +148,15 @@
optlen = sizeof(set_size);
getsockopt(ctlSocket, SOL_SOCKET, SO_SNDBUF, &set_size, &optlen);

- // get "address details" of the server using the host name
- if ((sec_addr = I2AddrByNode(get_errhandle(), host)) == NULL) {
- log_println(0, "Unable to resolve server address: %s", strerror(errno));
- return -3;
- }
- I2AddrSetPort(sec_addr, s2cport); // set port to value obtained from server
-
// Connect to the server; set socket options
for (i = 0; i < threadsnum; ++i) {
- sec_addresses[i] = I2AddrCopy(sec_addr);
+ // get "address details" of the server using the host name
+ if ((sec_addresses[i] = I2AddrByNode(get_errhandle(), host)) == NULL) {
+ log_println(0, "Unable to resolve server address: %s", strerror(errno));
+ return -3;
+ }
+ I2AddrSetPort(sec_addresses[i], s2cport + i); // set port to value obtained from server
+
if ((retcode = CreateConnectSocket(&inSocket[i], NULL, sec_addresses[i], conn_options, buf_size))) {
log_println(0, "Connect() for Server to Client failed (connection %d)", strerror(errno), i+1);
return -15;
@@ -199,15 +197,23 @@
#endif
sel_tv.tv_sec = testDuration + 5;
sel_tv.tv_usec = 5;
- FD_ZERO(&rfd);
+
activeThreads = threadsnum;
for (i = 0; i < threadsnum; i++) {
- FD_SET(inSocket[i], &rfd);
+ FD_ZERO(&rfd[i]);
+ FD_SET(inSocket[i], &rfd[i]);
}
// Read data sent by server as soon as it is available. Stop listening if timeout has been exceeded.
+ j = -1;
for (;;) {
- tmpRfd = rfd;
- retcode = select(inSocket[threadsnum-1]+1, &tmpRfd, NULL, NULL, &sel_tv);
+ j += 1;
+
+ if (j >= threadsnum) {
+ j = 0;
+ }
+
+ tmpRfd[j] = rfd[j];
+ retcode = select(inSocket[threadsnum-1]+1, &tmpRfd[j], NULL, NULL, &sel_tv);
if (secs() > t) {
log_println(5, "Receive test running long, break out of read loop");
break;
@@ -231,11 +237,11 @@
#endif
if (retcode > 0) {
for (i = 0; i < threadsnum; i++) {
- if (FD_ISSET(inSocket[i], &tmpRfd)) {
+ if (FD_ISSET(inSocket[i], &tmpRfd[i])) {
inlth = read(inSocket[i], buff, sizeof(buff));
if (inlth == 0) {
activeThreads--;
- FD_CLR(inSocket[i], &rfd);
+ FD_CLR(inSocket[i], &rfd[i]);
if (activeThreads == 0) {
goto breakOuterLoop;
}
@@ -328,8 +334,6 @@
else
printf("%0.2f Mb/s\n", spdin/1000);

- I2AddrFree(sec_addr);
-
// send TEST_MSG to server with the client-calculated throughput
snprintf(buff, sizeof(buff), "%0.0f", spdin);
#ifdef EXTTESTS_ENABLED
=======================================
--- /branches/MultiplePorts/src/test_s2c_srv.c Thu Nov 6 09:42:49 2014 UTC
+++ /branches/MultiplePorts/src/test_s2c_srv.c Thu Nov 13 12:03:37 2014 UTC
@@ -119,12 +119,12 @@
double testDuration = 10; // default test duration
double x2cspd; // s->c test throughput
struct timeval sel_tv; // time
- fd_set rfd; // receive file descriptor
+ fd_set rfd[7]; // receive file descriptor (up to 7)
char buff[BUFFSIZE + 1]; // message payload buffer
int bufctrlattempts = 0; // number of buffer control attempts
int i; // temporary var used for iterators etc
PortPair pair; // socket ports
- I2Addr s2csrv_addr = NULL;
+ I2Addr s2csrv_addr[7] = {NULL, NULL, NULL, NULL, NULL, NULL, NULL}; // s2c test's server address (up to 7)
I2Addr src_addr = NULL;
char listens2cport[10];
int msgType;
@@ -132,6 +132,8 @@
int sndqueue;
struct sigaction new, old;
char* jsonMsgValue;
+ long port;
+ int s2csockfd[7];

pthread_t workerThreadId;
int nextseqtosend = 0, lastunackedseq = 0;
@@ -179,49 +181,51 @@
strlcpy(listens2cport, "0", sizeof(listens2cport));
}

- // attempt to bind to a new port and obtain address structure with details
- // of listening port
- while (s2csrv_addr == NULL) {
- s2csrv_addr = CreateListenSocket(
- NULL,
- testOptions->multiple ?
- mrange_next(listens2cport, sizeof(listens2cport)) :
- listens2cport,
- conn_options, 0);
- if (s2csrv_addr == NULL) {
- /*
- log_println(1, " Calling KillHung() because s2csrv_address failed to bind");
- if (KillHung() == 0)
- continue;
- */
- }
- if (strcmp(listens2cport, "0") == 0) {
- log_println(0, "WARNING: ephemeral port number was bound");
- break;
- }
- if (testOptions->multiple == 0) {
- break;
- }
+#ifdef EXTTESTS_ENABLED
+ if (testOptions->exttestsopt) {
+ threadsNum = options->dthreadsnum;
+ testDuration = options->dduration / 1000.0;
}
- if (s2csrv_addr == NULL) {
- log_println(
+#endif
+
+ port = strtol(testOptions->multiple ? mrange_next(listens2cport, sizeof(listens2cport)) : listens2cport, NULL, 0);
+
+ for (i = 0; i < threadsNum; ++i) {
+ snprintf(listens2cport, sizeof(listens2cport), "%ld", port + i);
+
+ // attempt to bind to a new port and obtain address structure with details
+ // of listening port
+ while (s2csrv_addr[i] == NULL) {
+ s2csrv_addr[i] = CreateListenSocket(NULL, listens2cport, conn_options, 0);
+ if (strcmp(listens2cport, "0") == 0) {
+ log_println(0, "WARNING: ephemeral port number was bound");
+ break;
+ }
+ if (testOptions->multiple == 0) {
+ break;
+ }
+ }
+ if (s2csrv_addr[i] == NULL) {
+ log_println(
0,
"Server (S2C throughput test): CreateListenSocket failed: %s",
strerror(errno));
- snprintf(
- buff,
+ snprintf(buff,
sizeof(buff),
"Server (S2C throughput test): CreateListenSocket failed: %s",
strerror(errno));
- send_json_message(ctlsockfd, MSG_ERROR, buff, strlen(buff),
- testOptions->json_support, JSON_SINGLE_VALUE);
- return -1;
+ send_json_message(ctlsockfd, MSG_ERROR, buff, strlen(buff),
+ testOptions->json_support, JSON_SINGLE_VALUE);
+ return -1;
+ }
+
+ s2csockfd[i] = I2AddrFD(s2csrv_addr[i]);
}

// get socket FD and the ephemeral port number that client will connect to
// run tests
- testOptions->s2csockfd = I2AddrFD(s2csrv_addr);
- testOptions->s2csockport = I2AddrPort(s2csrv_addr);
+ testOptions->s2csockfd = s2csockfd[0];
+ testOptions->s2csockport = I2AddrPort(s2csrv_addr[0]);
log_println(1, " -- s2c %d port: %d", testOptions->child0, testOptions->s2csockport);
#ifdef EXTTESTS_ENABLED
if (testOptions->exttestsopt) {
@@ -266,21 +270,15 @@
testOptions->child0);

clilen = sizeof(cli_addr[0]);
- FD_ZERO(&rfd);
- FD_SET(testOptions->s2csockfd, &rfd);
sel_tv.tv_sec = 5; // wait for 5 secs
sel_tv.tv_usec = 0;
i = 0;
-#ifdef EXTTESTS_ENABLED
- if (testOptions->exttestsopt) {
- threadsNum = options->dthreadsnum;
- testDuration = options->dduration / 1000.0;
- }
-#endif

for (j = 0; j < RETRY_COUNT*threadsNum; j++) {
- ret = select((testOptions->s2csockfd) + 1, &rfd, NULL, NULL,
- &sel_tv);
+ FD_ZERO(&rfd[i]);
+ FD_SET(s2csockfd[i], &rfd[i]);
+
+ ret = select((s2csockfd[i]) + 1, &rfd[i], NULL, NULL, &sel_tv);
if ((ret == -1) && (errno == EINTR))
continue;
if (ret == 0)
@@ -293,7 +291,7 @@
// If a valid connection request is received, client has connected.
// Proceed.
// Note the new socket fd - xmitfd - used in the throughput test
-ximfd: xmitsfd[i] = accept(testOptions->s2csockfd, (struct sockaddr *) &cli_addr[i], &clilen);
+ximfd: xmitsfd[i] = accept(s2csockfd[i], (struct sockaddr *) &cli_addr[i], &clilen);
if (xmitsfd[i] > 0) {
i++;
log_println(6, "accept(%d/%d) for %d completed", i, threadsNum, testOptions->child0);
@@ -582,14 +580,22 @@
s2c_childpid);
testOptions->child2 = s2c_childpid;
kill(s2c_childpid, SIGUSR2);
- FD_ZERO(&rfd);
- FD_SET(mon_pipe[0], &rfd);
sel_tv.tv_sec = 1;
sel_tv.tv_usec = 100000;
i = 0;
+ j = -1;

for (;;) {
- ret = select(mon_pipe[0] + 1, &rfd, NULL, NULL, &sel_tv);
+ j += 1;
+
+ if (j >= threadsNum) {
+ j = 0;
+ }
+
+ FD_ZERO(&rfd[j]);
+ FD_SET(mon_pipe[0], &rfd[j]);
+
+ ret = select(mon_pipe[0] + 1, &rfd[j], NULL, NULL, &sel_tv);
if ((ret == -1) && (errno == EINTR)) {
log_println(
6,


  • [ndt-dev] [ndt] r1161 committed - Added support for multiple ports (S2C), ndt, 11/13/2014

Archive powered by MHonArc 2.6.16.

Top of Page