ndt-dev - [ndt-dev] [ndt] r1154 committed - Added support for multiple ports (C2S)
Subject: NDT-DEV email list created
List archive
- From:
- To:
- Subject: [ndt-dev] [ndt] r1154 committed - Added support for multiple ports (C2S)
- Date: Wed, 12 Nov 2014 13:42:03 +0000
Revision: 1154
Author:
Date: Wed Nov 12 13:41:40 2014 UTC
Log: Added support for multiple ports (C2S)
https://code.google.com/p/ndt/source/detail?r=1154
Modified:
/branches/MultiplePorts/src/test_c2s_clt.c
/branches/MultiplePorts/src/test_c2s_srv.c
=======================================
--- /branches/MultiplePorts/src/test_c2s_clt.c Fri Nov 7 09:00:32 2014 UTC
+++ /branches/MultiplePorts/src/test_c2s_clt.c Wed Nov 12 13:41:40 2014 UTC
@@ -64,7 +64,6 @@
int msgLen, msgType; // message related data
int c2sport = atoi(PORT2); // default C2S port
- I2Addr sec_addr = NULL; // server address
I2Addr sec_addresses[7]; // server addresses per thread
int retcode; // return code
int one = 1; // socket option store
@@ -156,15 +155,14 @@
log_println(1, " -- port: %d", c2sport);
- // make struct of "address details" of the server using the host name
- if ((sec_addr = I2AddrByNode(get_errhandle(), host)) == NULL) {
- log_println(0, "Unable to resolve server address: %s", strerror(errno));
- return -3;
- }
- I2AddrSetPort(sec_addr, c2sport); // set port value
-
for (i = 0; i < threadsnum; ++i) {
- sec_addresses[i] = I2AddrCopy(sec_addr);
+ // make struct of "address details" of the server using the host name
+ if ((sec_addresses[i] = I2AddrByNode(get_errhandle(), host)) == NULL) {
+ log_println(0, "Unable to resolve server address: %s", strerror(errno));
+ return -3;
+ }
+
+ I2AddrSetPort(sec_addresses[i], c2sport + i); // set port value
if ((retcode = CreateConnectSocket(&(writeWorkerArgs[i].socketDescriptor), NULL, sec_addresses[i], conn_options, buf_size))) {
log_println(0, "Connect() for client to server failed (connection %d)", strerror(errno), i+1);
@@ -238,7 +236,6 @@
for (i = 0; i < threadsnum; ++i) {
I2AddrFree(sec_addresses[i]);
}
- I2AddrFree(sec_addr);
// Calculate C2S throughput in kbps
spdout = ((BITS_8_FLOAT * pkts * lth) / KILO) / t;
=======================================
--- /branches/MultiplePorts/src/test_c2s_srv.c Thu Nov 6 09:42:49 2014 UTC
+++ /branches/MultiplePorts/src/test_c2s_srv.c Wed Nov 12 13:41:40 2014 UTC
@@ -72,6 +72,8 @@
int i, j; // used as loop iterators
int threadsNum = 1;
int activeThreads = 1;
+ long port;
+ int c2ssockfd[7];
struct sockaddr_storage cli_addr[7];
@@ -84,10 +86,10 @@
double testDuration = 10; // default test duration
double bytes_read = 0; // number of bytes read during the throughput tests
struct timeval sel_tv; // time
- fd_set rfd, tmpRfd; // receive file descriptors
+ fd_set rfd[7], tmpRfd[7]; // receive file descriptors (up to 7)
char buff[BUFFSIZE + 1]; // message "payload" buffer
PortPair pair; // socket ports
- I2Addr c2ssrv_addr = NULL; // c2s test's server address
+ I2Addr c2ssrv_addr[7] = {NULL, NULL, NULL, NULL, NULL, NULL, NULL}; // c2s test's server address (up to 7)
// I2Addr src_addr=NULL; // c2s test source address
char listenc2sport[10]; // listening port
pthread_t workerThreadId;
@@ -132,39 +134,62 @@
strlcpy(listenc2sport, "0", sizeof(listenc2sport));
}
- // attempt to bind to a new port and obtain address structure with details
- // of listening port
- while (c2ssrv_addr == NULL) {
- c2ssrv_addr = CreateListenSocket(
- NULL,
- (testOptions->multiple ?
- mrange_next(listenc2sport, sizeof(listenc2sport)) : listenc2sport),
- conn_options, 0);
- if (strcmp(listenc2sport, "0") == 0) {
- log_println(0, "WARNING: ephemeral port number was bound");
- break;
+#ifdef EXTTESTS_ENABLED
+ if (testOptions->exttestsopt) {
+ threadsNum = options->uthreadsnum;
+ testDuration = options->uduration / 1000.0;
+ }
+#endif
+
+ port = strtol(testOptions->multiple ? mrange_next(listenc2sport, sizeof(listenc2sport)) : listenc2sport, NULL, 0);
+
+ for (i = 0; i < options->uthreadsnum; ++i) {
+ snprintf(listenc2sport, sizeof(listenc2sport), "%ld", port + i);
+
+ // attempt to bind to a new port and obtain address structure with details
+ // of listening port
+ while (c2ssrv_addr[i] == NULL) {
+ c2ssrv_addr[i] = CreateListenSocket(NULL, listenc2sport, conn_options, 0);
+ if (strcmp(listenc2sport, "0") == 0) {
+ log_println(0, "WARNING: ephemeral port number was bound");
+ break;
+ }
+ if (testOptions->multiple == 0) {
+ break;
+ }
}
- if (testOptions->multiple == 0) {
- break;
- }
+#ifdef EXTTESTS_ENABLED
+ if (testOptions->exttestsopt) {
+ threadsNum = options->uthreadsnum;
+ testDuration = options->uduration / 1000.0;
}
- if (c2ssrv_addr == NULL) {
- log_println(
- 0,
- "Server (C2S throughput test): CreateListenSocket failed: %s",
- strerror(errno));
- snprintf(buff,
- sizeof(buff),
- "Server (C2S throughput test): CreateListenSocket failed: %s",
- strerror(errno));
- send_json_message(ctlsockfd, MSG_ERROR, buff, strlen(buff), testOptions->json_support, JSON_SINGLE_VALUE);
- return -1;
+#endif
+
+ port = strtol(testOptions->multiple ? mrange_next(listenc2sport, sizeof(listenc2sport)) : listenc2sport, NULL, 0);
+
+ for (i = 0; i < options->uthreadsnum; ++i) {
+ snprintf(listenc2sport, sizeof(listenc2sport), "%ld", port + i);
+
+ // attempt to bind to a new port and obtain address structure with details
+ // of listening port
+ while (c2ssrv_addr[i] == NULL) {
+ c2ssrv_addr[i] = CreateListenSocket(NULL, listenc2sport, conn_options, 0);
+ if (strcmp(listenc2sport, "0") == 0) {
+ log_println(0, "WARNING: ephemeral port number was bound");
+ break;
+ }
+ if (testOptions->multiple == 0) {
+ break;
+ }
+ }
+
+ c2ssockfd[i] = I2AddrFD(c2ssrv_addr[i]);
}
// get socket FD and the ephemeral port number that client will connect to
// run tests
- testOptions->c2ssockfd = I2AddrFD(c2ssrv_addr);
- testOptions->c2ssockport = I2AddrPort(c2ssrv_addr);
+ testOptions->c2ssockfd = c2ssockfd[0];
+ testOptions->c2ssockport = I2AddrPort(c2ssrv_addr[0]);
log_println(1, " -- c2s %d port: %d", testOptions->child0, testOptions->c2ssockport);
#ifdef EXTTESTS_ENABLED
if (testOptions->exttestsopt) {
@@ -206,22 +231,16 @@
// Wait on listening socket and read data once ready.
// Retry 5 times, waiting for activity on the socket
clilen = sizeof(cli_addr);
- log_println(6, "child %d - sent c2s prepare to client",
- testOptions->child0);
- FD_ZERO(&rfd);
- FD_SET(testOptions->c2ssockfd, &rfd);
+ log_println(6, "child %d - sent c2s prepare to client", testOptions->child0);
sel_tv.tv_sec = 5;
sel_tv.tv_usec = 0;
i = 0;
-#ifdef EXTTESTS_ENABLED
- if (testOptions->exttestsopt) {
- threadsNum = options->uthreadsnum;
- testDuration = options->uduration / 1000.0;
- }
-#endif
for (j = 0; j < RETRY_COUNT * threadsNum; j++) {
- msgretvalue = select((testOptions->c2ssockfd) + 1, &rfd, NULL, NULL, &sel_tv); // TODO
+ FD_ZERO(&rfd[i]);
+ FD_SET(c2ssockfd[i], &rfd[i]);
+
+ msgretvalue = select((c2ssockfd[i]) + 1, &rfd[i], NULL, NULL, &sel_tv); // TODO
// socket interrupted. continue waiting for activity on socket
if ((msgretvalue == -1) && (errno == EINTR))
continue;
@@ -235,7 +254,7 @@
// If a valid connection request is received, client has connected.
// Proceed. Note the new socket fd - recvsfd- used in the throughput test
- recvsfd[i] = accept(testOptions->c2ssockfd, (struct sockaddr *) &cli_addr[i], &clilen);
+ recvsfd[i] = accept(c2ssockfd[i], (struct sockaddr *) &cli_addr[i], &clilen);
if (recvsfd[i] > 0) {
i++;
log_println(6, "accept(%d/%d) for %d completed", i, threadsNum, testOptions->child0);
@@ -292,6 +311,7 @@
close(testOptions->c2ssockfd);
for (i = 0; i < threadsNum; i++) {
close(recvsfd[i]);
+ close(c2ssockfd[i]);
}
log_println(
5,
@@ -363,15 +383,23 @@
#endif
sel_tv.tv_sec = testDuration + 1; // time out after test duration + 1sec
sel_tv.tv_usec = 0;
- FD_ZERO(&rfd);
activeThreads = threadsNum;
for (i = 0; i < threadsNum; i++) {
- FD_SET(recvsfd[i], &rfd);
+ FD_ZERO(&rfd[i]);
+ FD_SET(recvsfd[i], &rfd[i]);
}
+
+ j = -1;
for (;;) {
readMainLoop:
- tmpRfd = rfd;
- msgretvalue = select(recvsfd[threadsNum-1] + 1, &tmpRfd, NULL, NULL, &sel_tv);
+ j += 1;
+
+ if (j >= threadsNum) {
+ j = 0;
+ }
+
+ tmpRfd[j] = rfd[j];
+ msgretvalue = select(recvsfd[threadsNum-1] + 1, &tmpRfd[j], NULL, NULL, &sel_tv);
#ifdef EXTTESTS_ENABLED
if (testOptions->exttestsopt && options->uthroughputsnaps && secs() > throughputSnapshotTime) {
if (lastThroughputSnapshot != NULL) {
@@ -395,14 +423,14 @@
}
if (msgretvalue > 0) { // read from socket
for (i = 0; i < threadsNum; i++) {
- if (FD_ISSET(recvsfd[i], &tmpRfd)) {
+ if (FD_ISSET(recvsfd[i], &tmpRfd[i])) {
tmpbytecount = read(recvsfd[i], buff, sizeof(buff));
// read interrupted, continue waiting
if ((tmpbytecount == -1) && (errno == EINTR))
goto readMainLoop;
if (tmpbytecount == 0) { // all data has been read
activeThreads--;
- FD_CLR(recvsfd[i], &rfd);
+ FD_CLR(recvsfd[i], &rfd[i]);
if (activeThreads == 0) {
goto breakMainLoop;
}
@@ -450,6 +478,7 @@
for (i = 0; i < threadsNum; i++) {
close(recvsfd[i]);
+ close(c2ssockfd[i]);
}
close(testOptions->c2ssockfd);
@@ -461,14 +490,21 @@
c2s_childpid);
testOptions->child1 = c2s_childpid;
kill(c2s_childpid, SIGUSR1);
- FD_ZERO(&rfd);
- FD_SET(mon_pipe[0], &rfd);
sel_tv.tv_sec = 1;
sel_tv.tv_usec = 100000;
i = 0;
+ j = -1;
for (;;) {
- msgretvalue = select(mon_pipe[0] + 1, &rfd, NULL, NULL,
+ j += 1;
+
+ if (j >= threadsNum) {
+ j = 0;
+ }
+
+ FD_ZERO(&rfd[j]);
+ FD_SET(mon_pipe[0], &rfd[j]);
+ msgretvalue = select(mon_pipe[0] + 1, &rfd[j], NULL, NULL,
&sel_tv);
if ((msgretvalue == -1) && (errno == EINTR))
continue;
- [ndt-dev] [ndt] r1154 committed - Added support for multiple ports (C2S), ndt, 11/12/2014
Archive powered by MHonArc 2.6.16.