Skip to Content.
Sympa Menu

ndt-dev - [ndt-dev] [ndt] r1139 committed - Added support for multiple threads

Subject: NDT-DEV email list created

List archive

[ndt-dev] [ndt] r1139 committed - Added support for multiple threads


Chronological Thread 
  • From:
  • To:
  • Subject: [ndt-dev] [ndt] r1139 committed - Added support for multiple threads
  • Date: Wed, 05 Nov 2014 10:25:59 +0000

Revision: 1139
Author:

Date: Wed Nov 5 10:25:27 2014 UTC
Log: Added support for multiple threads
https://code.google.com/p/ndt/source/detail?r=1139

Modified:
/branches/MultiplePorts/Applet/src/edu/internet2/ndt/NDTConstants.java
/branches/MultiplePorts/Applet/src/edu/internet2/ndt/NDTUtils.java
/branches/MultiplePorts/Applet/src/edu/internet2/ndt/Tcpbw100.java
/branches/MultiplePorts/Applet/src/edu/internet2/ndt/locale/Tcpbw100_msgs_en_US.properties
/branches/MultiplePorts/configure.ac
/branches/MultiplePorts/src/logging.c
/branches/MultiplePorts/src/logging.h
/branches/MultiplePorts/src/ndtptestconstants.h
/branches/MultiplePorts/src/test_c2s_clt.c
/branches/MultiplePorts/src/test_c2s_srv.c
/branches/MultiplePorts/src/test_meta_srv.c
/branches/MultiplePorts/src/test_results_clt.c
/branches/MultiplePorts/src/test_results_clt.h
/branches/MultiplePorts/src/test_s2c_clt.c
/branches/MultiplePorts/src/test_s2c_srv.c
/branches/MultiplePorts/src/testoptions.c
/branches/MultiplePorts/src/testoptions.h
/branches/MultiplePorts/src/usage.c
/branches/MultiplePorts/src/web100-pcap.c
/branches/MultiplePorts/src/web100-util.c
/branches/MultiplePorts/src/web100clt.c
/branches/MultiplePorts/src/web100srv.c
/branches/MultiplePorts/src/web100srv.h

=======================================
--- /branches/MultiplePorts/Applet/src/edu/internet2/ndt/NDTConstants.java Fri Jun 20 08:57:38 2014 UTC
+++ /branches/MultiplePorts/Applet/src/edu/internet2/ndt/NDTConstants.java Wed Nov 5 10:25:27 2014 UTC
@@ -20,26 +20,26 @@
public static final String META_BROWSER_OS = "client.browser.name";
public static final String META_CLIENT_KERNEL_VERSION = "client.kernel.version";
public static final String META_CLIENT_VERSION = "client.version";
- public static final String META_CLIENT_APPLICATION = "client.application";
+ public static final String META_CLIENT_APPLICATION = "client.application";

- // Section: NDT Variables sent by server
- public static final String AVGRTT = "avgrtt";
- public static final String CURRWINRCVD = "CurRwinRcvd";
- public static final String MAXRWINRCVD = "MaxRwinRcvd";
- public static final String LOSS = "loss";
- public static final String MINRTT = "MinRTT";
- public static final String MAXRTT = "MaxRTT";
- public static final String WAITSEC = "waitsec";
- public static final String CURRTO = "CurRTO";
- public static final String SACKSRCVD = "SACKsRcvd";
- public static final String MISMATCH = "mismatch";
- public static final String BAD_CABLE = "bad_cable";
- public static final String CONGESTION = "congestion";
- public static final String CWNDTIME = "cwndtime";
- public static final String RWINTIME = "rwintime";
- public static final String OPTRCVRBUFF = "optimalRcvrBuffer";
- public static final String ACCESS_TECH = "accessTech";
- public static final String DUPACKSIN = "DupAcksIn";
+ // Section: NDT Variables sent by server
+ public static final String AVGRTT = "avgrtt";
+ public static final String CURRWINRCVD = "CurRwinRcvd";
+ public static final String MAXRWINRCVD = "MaxRwinRcvd";
+ public static final String LOSS = "loss";
+ public static final String MINRTT = "MinRTT";
+ public static final String MAXRTT = "MaxRTT";
+ public static final String WAITSEC = "waitsec";
+ public static final String CURRTO = "CurRTO";
+ public static final String SACKSRCVD = "SACKsRcvd";
+ public static final String MISMATCH = "mismatch";
+ public static final String BAD_CABLE = "bad_cable";
+ public static final String CONGESTION = "congestion";
+ public static final String CWNDTIME = "cwndtime";
+ public static final String RWINTIME = "rwintime";
+ public static final String OPTRCVRBUFF = "optimalRcvrBuffer";
+ public static final String ACCESS_TECH = "accessTech";
+ public static final String DUPACKSIN = "DupAcksIn";

/*
* TODO for a later release: Version could be moved to some
"configurable"
@@ -56,6 +56,7 @@
public static final byte TEST_SFW = (1 << 3);
public static final byte TEST_STATUS = (1 << 4);
public static final byte TEST_META = (1 << 5);
+ public static final byte TEST_EXT = (1 << 6);

// Section: Firewall test status
public static final int SFW_NOTTESTED = 0;
@@ -149,7 +150,7 @@
public static final int KILO = 1000; // Used in conversions from seconds->mS,
public static final int KILO_BITS = 1024;// Used in kilobits->bits conversions
public static final double EIGHT = 8.0; // Used in octal number, conversions from Bytes-> bits etc
- // EIGHT is a double to minimize overflow when converting.
+ // EIGHT is a double to minimize overflow when converting.

// Section: Duplex mismatch conditions
public static final int DUPLEX_OK_INDICATOR = 0;
@@ -183,7 +184,7 @@
* Initializes a few constants
*
* @param paramLocale
- * local Locale object
+ * local Locale object
* */
public static void initConstants(Locale paramLocale) {
try {
@@ -202,9 +203,9 @@
* Initializes a few constants
*
* @param paramStrLang
- * local Language String
+ * local Language String
* @param paramStrCountry
- * local country String
+ * local country String
* */
public static void initConstants(String paramStrLang, String paramStrCountry) {
try {
@@ -222,7 +223,7 @@
* Getter method for to fetch from resourceBundle
*
* @param paramStrName
- * name of parameter to be fetched
+ * name of parameter to be fetched
* @return Value of parameter input
*/
public static String getMessageString(String paramStrName) {
=======================================
--- /branches/MultiplePorts/Applet/src/edu/internet2/ndt/NDTUtils.java Tue Feb 4 10:07:07 2014 UTC
+++ /branches/MultiplePorts/Applet/src/edu/internet2/ndt/NDTUtils.java Wed Nov 5 10:25:27 2014 UTC
@@ -2,38 +2,24 @@

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
+import java.text.DecimalFormat;
import java.util.ResourceBundle;

/**
* Class that defines utility methods used by the NDT code
*/
public class NDTUtils {
+ static DecimalFormat decimalFormat = new DecimalFormat("#0.00");

/**
* Utility method to print double value up to the hundredth place.
*
- * @param paramDblToFormat
- * Double numbers to format
+ * @param paramDblToFormat Double numbers to format
* @return String value of double number
*/
public static String prtdbl(double paramDblToFormat) {
- String str = null;
- int i;
-
- if (paramDblToFormat == 0) {
- return ("0");
- }
- str = Double.toString(paramDblToFormat);
- i = str.indexOf(".");
- i = i + 3;
- if (i > str.length()) {
- i = i - 1;
- }
- if (i > str.length()) {
- i = i - 1;
- }
- return (str.substring(0, i));
- } // prtdbl() method ends
+ return decimalFormat.format(paramDblToFormat);
+ }


/**
@@ -104,7 +90,7 @@
* @return true is the given string is not empty; otherwise false
*/
public static boolean isNotEmpty(String str) {
- return !isEmpty(str);
+ return !isEmpty(str);
} // isNotEmpty() method ends


=======================================
--- /branches/MultiplePorts/Applet/src/edu/internet2/ndt/Tcpbw100.java Wed Jun 18 06:12:24 2014 UTC
+++ /branches/MultiplePorts/Applet/src/edu/internet2/ndt/Tcpbw100.java Wed Nov 5 10:25:27 2014 UTC
@@ -88,7 +88,10 @@
import java.net.UnknownHostException;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
+import java.util.List;
import java.util.Locale;
import java.util.ResourceBundle;
import java.util.StringTokenizer;
@@ -216,15 +219,19 @@

boolean _bIsApplication = false;
boolean _bTestInProgress = false;
+ boolean _bSupportExtTests = false;
String sHostName = null;
InetAddress hostAddress = null;
String _sTestResults, _sMidBoxTestResult;
byte _yTests = NDTConstants.TEST_MID | NDTConstants.TEST_C2S
| NDTConstants.TEST_S2C | NDTConstants.TEST_SFW
- | NDTConstants.TEST_STATUS | NDTConstants.TEST_META;
+ | NDTConstants.TEST_STATUS | NDTConstants.TEST_META
+ | NDTConstants.TEST_EXT;
int _iC2sSFWResult = NDTConstants.SFW_NOTTESTED;
int _iS2cSFWResult = NDTConstants.SFW_NOTTESTED;

+ ThroughputSnapshot _uThroughputSnapshots, _dThroughputSnapshots;
+

/*************************************************************************
* JavaScript access API extension Added by Seth Peery and Gregory
Wilson,
* Virginia Tech October 28, 2009 This section adds classwide
variables,
@@ -450,7 +457,7 @@

// get PC buffer imposed throughput limit
public String get_PcBuffSpdLimit() {
- return Double.toString(rwin / rttsec);
+ return Double.toString(rwin / rttsec);
}

// commenting out unused method, but not removing in case of future
use
@@ -1748,17 +1755,11 @@
// Initialise for 64 Kb
_yabuff2Write = new byte[64 * NDTConstants.KILO_BITS];
Message msg = new Message();
+ boolean bThroughputsnaps;
+ final int iTestDuration;
+ int iSnapsdelay, iSnapsoffset, iThreadsnum;
// start C2S throughput tests
if ((_yTests & NDTConstants.TEST_C2S) ==
NDTConstants.TEST_C2S) {
-
showStatus(_resBundDisplayMsgs.getString("outboundTest"));
- _resultsTxtPane.append(_resBundDisplayMsgs
- .getString("runningOutboundTest") + "
");
- _txtStatistics.append(_resBundDisplayMsgs
- .getString("runningOutboundTest") + "
");
- _sEmailText +=
_resBundDisplayMsgs.getString("runningOutboundTest")
- + " ";
- pub_status = "runningOutboundTest";
-
if (paramProtoObj.recv_msg(msg) != NDTConstants.PROTOCOL_MSG_READ_SUCCESS) { // msg


// receive/read


// error
@@ -1781,30 +1782,58 @@
return true;
}
// Server sends port number to bind to in the
TEST_PREPARE
- int iC2sport = parseMsgBodyToInt(new
String(msg.getBody()));
+ String[] sMsgBody = new String(msg.getBody()).split("
");
+ int iC2sport = Integer.parseInt(sMsgBody[0]);
+
+ if (_bSupportExtTests) {
+ iTestDuration = Integer.parseInt(sMsgBody[1]);
+ bThroughputsnaps =
Integer.parseInt(sMsgBody[2]) == 1;
+ iSnapsdelay = Integer.parseInt(sMsgBody[3]);
+ iSnapsoffset = Integer.parseInt(sMsgBody[4]);
+ iThreadsnum = Integer.parseInt(sMsgBody[5]);
+ } else {
+ iTestDuration = 10000;
+ bThroughputsnaps = false;
+ iSnapsdelay = 5000;
+ iSnapsoffset = 1000;
+ iThreadsnum = 1;
+ }
+
+
showStatus(_resBundDisplayMsgs.getString("outboundTest"));
+ String sMessage =
_resBundDisplayMsgs.getString("running")
+ + " "
+ + iTestDuration / 1000
+ + " "
+ +
_resBundDisplayMsgs.getString("runningOutboundTest")
+ + " ";
+
+ _resultsTxtPane.append(sMessage);
+ _txtStatistics.append(sMessage);
+ _sEmailText += sMessage;
+ pub_status = "runningOutboundTest";

// client connects to this port
- final Socket outSocket;
- try {
- outSocket = new Socket(hostAddress, iC2sport);
- } catch (UnknownHostException e) {
- System.err.println("Don't know about host: "
+ sHostName);
- _sErrMsg =
_resBundDisplayMsgs.getString("unknownServer")
- + "\n";
- return true;
- } catch (IOException e) {
- System.err.println("Couldn't get 2nd connection
to: "
- + sHostName);
- _sErrMsg =
_resBundDisplayMsgs.getString("serverBusy15s")
- + "\n";
- return true;
+ List outSockets = new ArrayList(iThreadsnum);
+
+ for (int i = 0; i < iThreadsnum; ++i) {
+ try {
+ outSockets.add(new
Socket(hostAddress, iC2sport));
+ } catch (UnknownHostException e) {
+ System.err.println("Don't know about host:
" + sHostName);
+ _sErrMsg =
_resBundDisplayMsgs.getString("unknownServer")
+ + "\n";
+ return true;
+ } catch (IOException e) {
+ System.err.println("Couldn't get 2nd
connection to: "
+ + sHostName);
+ _sErrMsg =
_resBundDisplayMsgs.getString("serverBusy15s")
+ + "\n";
+ return true;
+ }
}

// Get server IP address from the outSocket.
- pub_host =
outSocket.getInetAddress().getHostAddress().toString();
-
- // Get output Stream from socket to write data into
- final OutputStream outStream =
outSocket.getOutputStream();
+ pub_host = hostAddress.getHostAddress();

// wait here for signal from server application
// This signal tells the client to start pumping out
data
@@ -1831,41 +1860,34 @@

// Fill buffer upto
NDTConstants.PREDEFNED_BUFFER_SIZE packets
byte c = '0';
- int i;
- for (i = 0; i < _iLength; i++) {
+ for (int i = 0; i < _iLength; i++) {
if (c == 'z') {
c = '0';
}
_yabuff2Write[i] = c++;
}
- System.err.println("******Send buffer size =" + i);
+
+ System.err.println("******Send buffer size = " +
_iLength);

_iPkts = 0;
_dTime = System.currentTimeMillis();
pub_time = _dTime;

- // sleep for 10 s
- new Thread() {
+ Thread[] threads = new Thread[iThreadsnum];

- public void run() {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- System.err.println("Thread
interrupted : " + e);
- // Thread was interrupted
while timing 10 seconds
- // of the C->S test. So, streaming 10 seconds of data may not be complete.
- // But, the throughput is correctly calculated based on the number of packets
- // that were actually sent
+ for (int i = 0; i < iThreadsnum; ++i) {
+ final Socket outSocket = (Socket)
outSockets.get(i);

- }
- try {
- outStream.close();
- outSocket.close();
- } catch (IOException e) {
- System.err.println("Caught IOException while closing stream after thread interrupted : " + e);
- }
- }
- }.start();
+ C2SWriterWorker worker = new C2SWriterWorker(
+ i, outSocket, yabuff2Write,
iTestDuration
+ );
+
+ threads[i] = new Thread(worker);
+ }
+
+ for (int i = 0; i < iThreadsnum; ++i) {
+ threads[i].start();
+ }

Timer c2sspdUpdateTimer = new Timer();
c2sspdUpdateTimer.scheduleAtFixedRate(new TimerTask()
{
@@ -1876,30 +1898,16 @@
}
}, 100, _c2sspdUpdateTime);

- // While the 10 s timer ticks, write buffer data into
server socket
- while (true) {
- // System.err.println("Send pkt = " + pkts + "; at
" +
- // System.currentTimeMillis());
+ for (int i = 0; i < iThreadsnum; ++i) {
try {
- outStream.write(_yabuff2Write, 0,
_yabuff2Write.length);
- } catch (SocketException e) {
- System.out.println("SocketException while
writing to server" + e);
- break;
- }
- // catch (InterruptedIOException iioe) {
- catch (IOException ioe) {
- System.out.println("Client socket timed
out");
- break;
+ threads[i].join();
+ } catch (InterruptedException e) {
+ System.err.println("InterruptedException while waiting for threads: " + e);
}
- // In both cases above, thread was
interrupted while timing 10 seconds
- // of the C->S test. So, streaming 10 seconds of data may not be complete.
- // But, the throughput is correctly calculated based on the number of packets
- // that were actually sent
+ }

- _iPkts++;
- // number of bytes sent = (num of iterations)
X (buffer size)
- pub_bytes = (_iPkts * _iLength);
- }
+ // number of bytes sent = (num of iterations) X
(buffer size)
+ pub_bytes = (_iPkts * _iLength);

c2sspdUpdateTimer.cancel();
_dTime = System.currentTimeMillis() - _dTime;
@@ -1950,7 +1958,24 @@
tmpstr3 = new String(msg.getBody());
}

- _dSc2sspd = Double.parseDouble(tmpstr3) /
NDTConstants.KILO;
+ sMsgBody = new String(msg.getBody()).split(" ");
+ _dSc2sspd = Double.parseDouble(sMsgBody[0]) /
NDTConstants.KILO;
+ ThroughputSnapshot lastThroughputSnapshot = null;
+
+ if (bThroughputsnaps) {
+ for (int i = 1; i < sMsgBody.length; i += 2) {
+ if (lastThroughputSnapshot != null) {
+ lastThroughputSnapshot.next =
new ThroughputSnapshot();
+ lastThroughputSnapshot =
lastThroughputSnapshot.next;
+ } else {
+ _uThroughputSnapshots = lastThroughputSnapshot = new ThroughputSnapshot();
+ }
+
+ lastThroughputSnapshot.next = null;
+ lastThroughputSnapshot.time =
Double.parseDouble(sMsgBody[i]);
+ lastThroughputSnapshot.throughput =
Double.parseDouble(sMsgBody[i+1]);
+ }
+ }

// Print results in the most convenient units (kbps
or Mbps)
if (_dSc2sspd < 1.0) {
@@ -2014,17 +2039,12 @@
// byte buff[] = new byte[8192];
byte buff[] = new byte[NDTConstants.PREDEFINED_BUFFER_SIZE];
Message msg = new Message();
+ boolean bThroughputsnaps;
+ final int iTestDuration;
+ int iSnapsdelay, iSnapsoffset, iThreadsnum;
+
// start S2C tests
if ((_yTests & NDTConstants.TEST_S2C) ==
NDTConstants.TEST_S2C) {
-
showStatus(_resBundDisplayMsgs.getString("inboundTest"));
- _resultsTxtPane.append(_resBundDisplayMsgs
- .getString("runningInboundTest") + "
");
- _txtStatistics.append(_resBundDisplayMsgs
- .getString("runningInboundTest") + "
");
- _sEmailText +=
_resBundDisplayMsgs.getString("runningInboundTest")
- + " ";
- pub_status = "runningInboundTest";
-
// Server sends TEST_PREPARE with port to bind to as
message body
if (paramProtoObj.recv_msg(msg) != NDTConstants.PROTOCOL_MSG_READ_SUCCESS) { // read/receive


// error
@@ -2045,27 +2065,56 @@
return true;
}
// get port to bind to for S2C tests
- int iS2cport = parseMsgBodyToInt(new
String(msg.getBody()));
+ String[] sMsgBody = new String(msg.getBody()).split("
");
+ int iS2cport = Integer.parseInt(sMsgBody[0]);
+
+ if (_bSupportExtTests) {
+ iTestDuration = Integer.parseInt(sMsgBody[1]);
+ bThroughputsnaps =
Integer.parseInt(sMsgBody[2]) == 1;
+ iSnapsdelay = Integer.parseInt(sMsgBody[3]);
+ iSnapsoffset = Integer.parseInt(sMsgBody[4]);
+ iThreadsnum = Integer.parseInt(sMsgBody[5]);
+ } else {
+ iTestDuration = 10000;
+ bThroughputsnaps = false;
+ iSnapsdelay = 5000;
+ iSnapsoffset = 1000;
+ iThreadsnum = 1;
+ }
+
+
showStatus(_resBundDisplayMsgs.getString("inboundTest"));
+ String sMessage =
_resBundDisplayMsgs.getString("running")
+ + " "
+ + iTestDuration / 1000
+ + " "
+ +
_resBundDisplayMsgs.getString("runningInboundTest")
+ + " ";
+
+ _resultsTxtPane.append(sMessage);
+ _txtStatistics.append(sMessage);
+ _sEmailText += sMessage;
+ pub_status = "runningInboundTest";

// Create socket and bind to port as instructed by
server
- Socket inSocket;
- try {
- inSocket = new Socket(hostAddress, iS2cport);
- } catch (UnknownHostException e) {
- System.err.println("Don't know about host: "
+ sHostName);
- _sErrMsg = "unknown server\n";
- return true;
- } catch (IOException e) {
- System.err.println("Couldn't get 3rd connection
to: "
- + sHostName);
- _sErrMsg = "Server Failed while receiving
data\n";
- return true;
+ List inSockets = new ArrayList(iThreadsnum);
+
+ for (int i = 0; i < iThreadsnum; ++i) {
+ try {
+ Socket socket = new
Socket(hostAddress, iS2cport);
+ socket.setSoTimeout(iTestDuration +
5000);
+
+ inSockets.add(socket);
+ } catch (UnknownHostException e) {
+ System.err.println("Don't know about host:
" + sHostName);
+ _sErrMsg = "unknown server\n";
+ return true;
+ } catch (IOException e) {
+ System.err.println("Couldn't get 3rd
connection to: "
+ + sHostName);
+ _sErrMsg = "Server Failed while receiving
data\n";
+ return true;
+ }
}
-
- // Get input stream to read bytes from socket
- InputStream srvin = inSocket.getInputStream();
- long iBitCount = 0;
- int inlth;

// wait here for signal from server application

@@ -2089,8 +2138,9 @@
return true;
}

- // Set socket timeout to 15 seconds
- inSocket.setSoTimeout(15000);
+ long iBitCount = 0;
+ int inlth;
+
_dTime = System.currentTimeMillis();
pub_time = _dTime;

@@ -2103,17 +2153,54 @@
}
}, 100, _s2cspdUpdateTime);

- // read data sent by server
+ ThroughputSnapshot lastThroughputSnapshot = null;
+ double throughputSnapshotTime = _dTime + iSnapsoffset;
+ boolean[] activeThreads = new boolean[iThreadsnum];
+ int numberOfActiveThreads = iThreadsnum;
+ int idx = 0;
+
+ Arrays.fill(activeThreads, true);
+
try {
- while ((inlth = srvin.read(buff, 0,
buff.length)) > 0) {
- iBitCount += inlth; // increment bit
count
- pub_bytes = iBitCount;
- if ((System.currentTimeMillis() -
_dTime) > 14500) {
- break;
+ while (numberOfActiveThreads > 0) {
+ if (bThroughputsnaps && System.currentTimeMillis() > throughputSnapshotTime) {
+ if (lastThroughputSnapshot !=
null) {
+
lastThroughputSnapshot.next = new ThroughputSnapshot();
+
lastThroughputSnapshot = lastThroughputSnapshot.next;
+ } else {
+ _dThroughputSnapshots = lastThroughputSnapshot = new ThroughputSnapshot();
+ }
+
+ lastThroughputSnapshot.next =
null;
+ lastThroughputSnapshot.time = (System.currentTimeMillis() - _dTime) / 1000;
+ lastThroughputSnapshot.throughput = ((NDTConstants.EIGHT * iBitCount) / NDTConstants.KILO) / lastThroughputSnapshot.time;
+ throughputSnapshotTime +=
iSnapsdelay;
+ }
+
+ if (activeThreads[idx]) {
+ Socket socket = (Socket)
inSockets.get(idx);
+ InputStream stream =
socket.getInputStream();
+
+ inlth = stream.read(buff, 0,
buff.length);
+
+ if (inlth <= 0) {
+
--numberOfActiveThreads;
+ activeThreads[idx] =
false;
+
+ stream.close();
+ socket.close();
+ } else {
+ iBitCount += inlth;
+ }
+ }
+
+ ++idx;
+
+ if (idx >= iThreadsnum) {
+ idx = 0;
}
}
} catch (IOException ioExcep) {
- // new addition to handle Exception
System.err.println("Couldn't perform s2c testing
to: "
+ sHostName + ":" + ioExcep);
_sErrMsg = "Server Failed while reading socket
data\n";
@@ -2121,6 +2208,8 @@
} finally {
s2cspdUpdateTimer.cancel();
}
+
+ pub_bytes = iBitCount;

// get time duration during which bytes were received
_dTime = System.currentTimeMillis() - _dTime;
@@ -2211,14 +2300,25 @@
pub_s2cspd = _dS2cspd;
pub_status = "done";

- // Perform wrap-up activities for test
- srvin.close();
- inSocket.close();
-
// Client has to send its throughput to server inside
a TEST_MSG
// message
- buff = Double.toString(_dS2cspd *
NDTConstants.KILO).getBytes();
- String tmpstr4 = new String(buff, 0, buff.length);
+ StringBuilder sb = new StringBuilder(Double.toString(_dS2cspd * NDTConstants.KILO));
+
+ if (_dThroughputSnapshots != null) {
+ ThroughputSnapshot snapshotsPtr =
_dThroughputSnapshots;
+
+ while (snapshotsPtr != null) {
+ sb.append(" ")
+
.append(NDTUtils.prtdbl(snapshotsPtr.time))
+ .append(" ")
+
.append(NDTUtils.prtdbl(snapshotsPtr.throughput));
+ snapshotsPtr = snapshotsPtr.next;
+ }
+ }
+
+ String tmpstr4 = sb.toString();
+ buff = tmpstr4.getBytes();
+
System.out.println("Sending '" + tmpstr4 + "' back to
server");
paramProtoObj.send_json_msg(MessageType.TEST_MSG,
buff);

@@ -2733,6 +2833,11 @@

// If we have connected to a Web10G server rebrand ourselves
as such
_sServerType = vVersion.endsWith("Web10G") ? "web10g" :
"web100";
+ _bSupportExtTests = vVersion.contains("-et");
+
+ if (_bSupportExtTests) {
+ _resultsTxtPane.append(_resBundDisplayMsgs.getString("extTestSupported") + "\n\n");
+ }

// Only create the windows once we have connected to the server so this works
createDiagnoseWindow();
@@ -2769,6 +2874,12 @@
tmpstr = new String(msg.getBody());
}
StringTokenizer tokenizer = new StringTokenizer(tmpstr, " ");
+
+ // If the server does not support TEST_EXT it can send an invalid test sequence
+ // (redundant number at the beginning)
+ if (!_bSupportExtTests && tokenizer.hasMoreTokens()) {
+ tokenizer.nextToken();
+ }

// Run all tests requested, based on the ID. In each case, if
tests
// cannot be successfully run,
@@ -3493,6 +3604,7 @@

// Add Packet queuing details found during C2S
throughput test to
// the statistics pane. Data is displayed as a
percentage
+ ThroughputSnapshot snapshot;

if ((_yTests & NDTConstants.TEST_C2S) ==
NDTConstants.TEST_C2S) {
if (_dC2sspd > _dSc2sspd) {
@@ -3514,6 +3626,33 @@
/
_dC2sspd) + "%\n");
}
}
+
+ if (_uThroughputSnapshots != null) {
+ snapshot = _uThroughputSnapshots;
+
+ _txtStatistics.append("---"
+ +
_resBundDisplayMsgs.getString("c2sThroughputSnapshots")
+ + ":\n"
+ );
+
+ while (snapshot != null) {
+ _txtStatistics.append(" * "
+
+_resBundDisplayMsgs.getString("testDuration")
+ + ": "
+ +
NDTUtils.prtdbl(snapshot.time)
+ + " "
+ +
_resBundDisplayMsgs.getString("secs")
+ + ", "
+ +
_resBundDisplayMsgs.getString("throughput")
+ + ": "
+ +
NDTUtils.prtdbl(snapshot.throughput)
+ + " "
+ +
_resBundDisplayMsgs.getString("kbps")
+ + "\n");
+
+ snapshot = snapshot.next;
+ }
+ }
}

// Add Packet queuing details found during S2C
throughput test to
@@ -3539,6 +3678,33 @@
/
_dSs2cspd) + "%\n");
}
}
+
+ if (_dThroughputSnapshots != null) {
+ snapshot = _dThroughputSnapshots;
+
+ _txtStatistics.append("---"
+ +
_resBundDisplayMsgs.getString("s2cThroughputSnapshots")
+ + ":\n"
+ );
+
+ while (snapshot != null) {
+ _txtStatistics.append(" * "
+
+_resBundDisplayMsgs.getString("testDuration")
+ + ": "
+ +
NDTUtils.prtdbl(snapshot.time)
+ + " "
+ +
_resBundDisplayMsgs.getString("secs")
+ + ", "
+ +
_resBundDisplayMsgs.getString("throughput")
+ + ": "
+ +
NDTUtils.prtdbl(snapshot.throughput)
+ + " "
+ +
_resBundDisplayMsgs.getString("kbps")
+ + "\n");
+
+ snapshot = snapshot.next;
+ }
+ }
}

// Add connection details to statistics pane and
email text
@@ -3816,7 +3982,7 @@
*
* server data is ordered as: Server IP; Client IP; CurrentMSS;
* WinScaleSent; WinScaleRcvd; SumRTT; CountRTT; MaxRwinRcvd; Client then adds Server IP; Client IP.
- *
+ *
* @param sMidBoxTestResParam
* String Middlebox results
*/
@@ -3839,14 +4005,14 @@
iWinsRecv = Integer.parseInt(JSONUtils.getValueFromJsonObj(sMidBoxTestResParam, "WinScaleRcvd"));
sClientSideServerIp = JSONUtils.getValueFromJsonObj(sMidBoxTestResParam, "ClientSideServerIp");
sClientSideClientIp = JSONUtils.getValueFromJsonObj(sMidBoxTestResParam, "ClientSideClientIp");
- _iSumRTT = Integer.parseInt(JSONUtils.getValueFromJsonObj(sMidBoxTestResParam, "SumRTT"));
- _iCountRTT = Integer.parseInt(JSONUtils.getValueFromJsonObj(sMidBoxTestResParam, "CountRTT"));
- _iMaxRwinRcvd = Integer.parseInt(JSONUtils.getValueFromJsonObj(sMidBoxTestResParam, "MaxRwinRcvd"));
+ _iSumRTT = Integer.parseInt(JSONUtils.getValueFromJsonObj(sMidBoxTestResParam, "SumRTT"));
+ _iCountRTT = Integer.parseInt(JSONUtils.getValueFromJsonObj(sMidBoxTestResParam, "CountRTT"));
+ _iMaxRwinRcvd = Integer.parseInt(JSONUtils.getValueFromJsonObj(sMidBoxTestResParam, "MaxRwinRcvd"));

- // calculate avgrtt and PC buffer imposed throughput limit
- pub_avgrtt = (double) _iSumRTT / _iCountRTT;
- rwin = _iMaxRwinRcvd * NDTConstants.EIGHT / NDTConstants.KILO_BITS / NDTConstants.KILO_BITS;
- rttsec = pub_avgrtt / NDTConstants.KILO;
+ // calculate avgrtt and PC buffer imposed throughput
limit
+ pub_avgrtt = (double) _iSumRTT / _iCountRTT;
+ rwin = _iMaxRwinRcvd * NDTConstants.EIGHT / NDTConstants.KILO_BITS / NDTConstants.KILO_BITS;
+ rttsec = pub_avgrtt / NDTConstants.KILO;
} else {
StringTokenizer tokens;
int k;
@@ -4413,5 +4579,64 @@
return Integer.parseInt(msg, radix);
}
}
+
+ private class C2SWriterWorker implements Runnable {
+ private int id;
+ private Socket socket;
+ private OutputStream stream;
+ private byte[] buff;
+ private long duration;
+
+ private C2SWriterWorker(int id, Socket socket, byte[] buff, long duration)
+ throws IOException {
+ this.id = id;
+ this.socket = socket;
+ this.buff = buff;
+ this.duration = duration;
+
+ this.stream = socket.getOutputStream();
+
+ System.err.println("C2SWriterWorker: " + id);
+ }
+
+ public void run() {
+ long current = System.currentTimeMillis();
+ long stopTime = current + duration;
+ int threadPackets = 0;
+
+ while (current < stopTime) {
+ try {
+ stream.write(buff, 0, _iLength);
+ } catch (SocketException e) {
+ System.out.println("SocketException while
writing to server" + e);
+ break;
+ }
+ catch (IOException ioe) {
+ System.out.println("Client socket timed
out");
+ break;
+ }
+
+ threadPackets++;
+ current = System.currentTimeMillis();
+ }
+
+ if (id == 0) {
+ _iPkts = threadPackets;
+ }
+
+ try {
+ stream.close();
+ socket.close();
+ } catch (IOException e) {
+ System.err.println("Caught IOException while
closing stream: " + e);
+ }
+ }
+ }
+
+ private class ThroughputSnapshot {
+ private double time;
+ private double throughput;
+ private ThroughputSnapshot next;
+ }

} // class: Tcpbw100
=======================================
--- /branches/MultiplePorts/Applet/src/edu/internet2/ndt/locale/Tcpbw100_msgs_en_US.properties Wed May 28 11:17:18 2014 UTC
+++ /branches/MultiplePorts/Applet/src/edu/internet2/ndt/locale/Tcpbw100_msgs_en_US.properties Wed Nov 5 10:25:27 2014 UTC
@@ -154,8 +154,9 @@
resultsWrongMessage = Tests results: Received wrong type of the message
rtt = RTT
rttFail = Link detection algorithm failed due to excessive Round Trip Times.
-runningInboundTest = running 10s inbound test (server-to-client [S2C]) . . . . . .
-runningOutboundTest = running 10s outbound test (client-to-server [C2S]) . . . . .
+running=running
+runningInboundTest = s inbound test (server-to-client [S2C]) . . . . . .
+runningOutboundTest = s outbound test (client-to-server [C2S]) . . . . .
s2c = S2C
s2cPacketQueuingDetected = [S2C]: Packet queueing detected
s2cThroughput = S2C throughput
@@ -234,3 +235,10 @@
toRunTest = to run test
unexpectedException=Unexpected exception
withoutMessage=without message
+extTestSupported=Extended tests supported
+c2sThroughputSnapshots=C->S (upload): Throughput snapshots
+s2cThroughputSnapshots=S->C (download): Throughput snapshots
+testDuration=Test duration
+secs=secs
+throughput=Throughput
+kbps=kbps
=======================================
--- /branches/MultiplePorts/configure.ac Wed Jun 25 17:56:39 2014 UTC
+++ /branches/MultiplePorts/configure.ac Wed Nov 5 10:25:27 2014 UTC
@@ -67,6 +67,17 @@
AC_MSG_RESULT(yes)
fi

+# Enabling/disabling extended tests code
+AC_MSG_CHECKING(whether to enable extended tests code)
+AC_ARG_ENABLE(exttests, [ --enable-exttests=[yes/no] turn on extended tests code
+ [default=yes]],, enable_exttests=yes)
+if test "x$enable_exttests" = "xno"; then
+ AC_MSG_RESULT(no)
+else
+ CFLAGS="$CFLAGS -DEXTTESTS_ENABLED"
+ AC_MSG_RESULT(yes)
+fi
+
# Enabling/disabling database support
AC_MSG_CHECKING(whether to enable database support)
AC_ARG_ENABLE(database, [ --enable-database=[yes/no] turn on database support
=======================================
--- /branches/MultiplePorts/src/logging.c Mon Oct 14 13:20:21 2013 UTC
+++ /branches/MultiplePorts/src/logging.c Wed Nov 5 10:25:27 2014 UTC
@@ -1062,6 +1062,7 @@
fprintf(fp, "s2c_snaplog file: %s\n", meta.s2c_snaplog);
fprintf(fp, "s2c_ndttrace file: %s\n", meta.s2c_ndttrace);
fprintf(fp, "cputime file: %s\n", meta.CPU_time);
+ fprintf(fp, "web values file: %s\n", meta.web_variables_log);
fprintf(fp, "server IP address: %s\n", meta.server_ip);
fprintf(fp, "server hostname: %s\n", meta.server_name);
fprintf(fp, "server kernel version: %s\n", meta.server_os);
@@ -1079,6 +1080,22 @@
entry = entry->next;
}
}
+#ifdef EXTTESTS_ENABLED
+ if (uThroughputSnapshots != NULL) {
+ struct throughputSnapshot *snapshotsPtr = uThroughputSnapshots;
+ while (snapshotsPtr != NULL) {
+ fprintf(fp, "c2s.throughput.snapshot.%0.2f: %f\n", snapshotsPtr->time, snapshotsPtr->throughput);
+ snapshotsPtr = snapshotsPtr->next;
+ }
+ }
+ if (dThroughputSnapshots != NULL) {
+ struct throughputSnapshot *snapshotsPtr = dThroughputSnapshots;
+ while (snapshotsPtr != NULL) {
+ fprintf(fp, "s2c.throughput.snapshot.%0.2f: %f\n", snapshotsPtr->time, snapshotsPtr->throughput);
+ snapshotsPtr = snapshotsPtr->next;
+ }
+ }
+#endif
fclose(fp);
}
}
=======================================
--- /branches/MultiplePorts/src/logging.h Mon Oct 14 13:20:21 2013 UTC
+++ /branches/MultiplePorts/src/logging.h Wed Nov 5 10:25:27 2014 UTC
@@ -54,6 +54,16 @@
struct metaentry* next; // pointer to next link
};

+#ifdef EXTTESTS_ENABLED
+struct throughputSnapshot {
+ double time;
+ double throughput;
+ struct throughputSnapshot* next;
+};
+
+struct throughputSnapshot *dThroughputSnapshots, *uThroughputSnapshots, *lastThroughputSnapshot;
+#endif
+
/**
* Used to save results of meta tests.
* These values (most) are thes logged in the
@@ -65,6 +75,7 @@
char s2c_snaplog[FILENAME_SIZE]; // S->C test Snaplog file name
char s2c_ndttrace[FILENAME_SIZE]; // S->C NDT trace file name
char CPU_time[FILENAME_SIZE]; // CPU time file
+ char web_variables_log[FILENAME_SIZE]; // web100/web10g variables log
char summary[256]; // Summary data
char date[32]; // Date and,
char time[16]; // time
=======================================
--- /branches/MultiplePorts/src/ndtptestconstants.h Wed May 28 11:17:18 2014 UTC
+++ /branches/MultiplePorts/src/ndtptestconstants.h Wed Nov 5 10:25:27 2014 UTC
@@ -14,6 +14,9 @@
#define TEST_SFW (1L << 3)
#define TEST_STATUS (1L << 4)
#define TEST_META (1L << 5)
+#ifdef EXTTESTS_ENABLED
+ #define TEST_EXT (1L << 6)
+#endif

// will hold "string "middlebox", which is the longest name
#define TEST_NAME_DESC_SIZE 10
=======================================
--- /branches/MultiplePorts/src/test_c2s_clt.c Wed May 28 11:17:18 2014 UTC
+++ /branches/MultiplePorts/src/test_c2s_clt.c Wed Nov 5 10:25:27 2014 UTC
@@ -2,7 +2,7 @@
* This file contains the functions needed to handle C2S throughput
* test (client part).
*
- * Jakub S�awi�ski 2006-08-02
+ * Jakub Sławiński 2006-08-02
*

*/

@@ -23,8 +23,17 @@
int sndqueue;
double spdout, c2sspd;

+typedef struct c2sWriteWorkerArgs {
+ int connectionId;
+ int socketDescriptor;
+ double stopTime;
+ char* buff;
+} WriteWorkerArgs;
+
+void* c2sWriteWorker(void* arg);
+
/**
- * Client to server throughput test. This test performs 10 seconds
+ * Client to server throughput test. This test performs testDuration seconds
* memory-to-memory data transfer to test achievable network bandwidth.
*
* @param ctlSocket Socket on which messages are received from the server
@@ -39,6 +48,7 @@
* 2: Unexpected protocol message (type) received
* 3: Improper message payload
* 4: Incorrect message data received
+ * -1: Unable to create worker threads
* -3: Unable to resolve server address
* -11: Cannot connect to server
*
@@ -55,11 +65,22 @@
int msgLen, msgType; // message related data
int c2sport = atoi(PORT2); // default C2S port
I2Addr sec_addr = NULL; // server address
+ I2Addr sec_addresses[7]; // server addresses per thread
int retcode; // return code
int one = 1; // socket option store
int i, k; // temporary iterator
+ C2SWriteWorkerArgs writeWorkerArgs[7]; // write workers parameters
+ pthread_t writeWorkerIds[7]; // write workers ids
int outSocket; // socket descriptor for the outgoing connection
double t, stop_time; // test-time indicators
+ double testDuration = 10; // default test duration
+ char* strtokptr; // pointer used by the strtok method
+#ifdef EXTTESTS_ENABLED
+ int throughputsnaps = 0; // enable the throughput snapshots writing
+ int snapsdelay = 5000; // specify the delay in the throughput snapshots thread
+ int snapsoffset = 1000; // specify the initial offset in the throughput snapshots thread
+#endif
+ int threadsnum = 1; // specify the number of threads (parallel TCP connections)
// variables used for protocol validation logs
enum TEST_STATUS_INT teststatuses = TEST_NOT_STARTED;
enum TEST_ID testids = C2S;
@@ -100,10 +121,40 @@

// Server sends port number to bind to in the TEST_PREPARE. Check if this
// message body (string) is a valid integral port number.
+#ifdef EXTTESTS_ENABLED
+ if (tests & TEST_EXT) {
+ strtokptr = strtok(buff, " ");
+ c2sport = atoi(strtokptr);
+ strtokptr = strtok(NULL, " ");
+ testDuration = atoi(strtokptr) / 1000.0;
+ strtokptr = strtok(NULL, " ");
+ throughputsnaps = atoi(strtokptr);
+ strtokptr = strtok(NULL, " ");
+ snapsdelay = atoi(strtokptr);
+ strtokptr = strtok(NULL, " ");
+ snapsoffset = atoi(strtokptr);
+ strtokptr = strtok(NULL, " ");
+ threadsnum = atoi(strtokptr);
+
+ log_println(1, " -- test duration: %.1fs", testDuration);
+ log_println(1, " -- throughput snapshots: enabled = %s, delay = %d, offset = %d",
+ throughputsnaps ? "true" : "false", snapsdelay, snapsoffset);
+ log_println(1, " -- threads: %d", threadsnum);
+ lastThroughputSnapshot = NULL;
+ }
+ else {
+ if (check_int(buff, &c2sport)) {
+ log_println(0, "Invalid port number");
+ return 4;
+ }
+ }
+#else
if (check_int(buff, &c2sport)) {
log_println(0, "Invalid port number");
return 4;
}
+#endif
+
log_println(1, " -- port: %d", c2sport);

// make struct of "address details" of the server using the host name
@@ -113,14 +164,15 @@
}
I2AddrSetPort(sec_addr, c2sport); // set port value

- // connect to server and set socket options
- if ((retcode = CreateConnectSocket(&outSocket, NULL, sec_addr, conn_options,
- buf_size))) {
- log_println(0, "Connect() for client to server failed", strerror(errno));
- return -11;
+ for (i = 0; i < threadsnum; ++i) {
+ sec_addresses[i] = I2AddrCopy(sec_addr);
+
+ if ((retcode = CreateConnectSocket(&(writeWorkerArgs[i].socketDescriptor), NULL, sec_addresses[i], conn_options, buf_size))) {
+ log_println(0, "Connect() for client to server failed (connection %d)", strerror(errno), i+1);
+ return -11;
+ }
+ setsockopt(writeWorkerArgs[i].socketDescriptor, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
-
- setsockopt(outSocket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));

// Expect a TEST_START message from server now. Any other message
// type is an error
@@ -134,8 +186,7 @@
return 2;
}

- // Prepare for running a C->S throughput test
- printf("running 10s outbound test (client to server) . . . . . ");
+ printf("running %.1fs outbound test (client to server) . . . . . ", testDuration);
fflush(stdout);

// ....Fill buffer upto NDTConstants.PREDEFNED_BUFFER_SIZE packets
@@ -149,30 +200,50 @@
buff[i] = (k++ % 0x7f);
}

- // set the test time to 10 seconds
+ // set the test time to testDuration seconds
t = secs();
- stop_time = t + 10;
+ stop_time = t + testDuration;
/* ignore the pipe signal */
memset(&new, 0, sizeof(new));
new.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &new, &old);

- // While the 10 s timer ticks, stream data to server. Record the byte count
- do {
- write(outSocket, buff, lth);
- pkts++;
- }while (secs() < stop_time);
+ for (i = 0; i < threadsnum; ++i) {
+ writeWorkerArgs[i].connectionId = i + 1;
+ writeWorkerArgs[i].stopTime = stop_time;
+ writeWorkerArgs[i].buff = buff;
+ }
+
+ if (threadsnum == 1) {
+ c2sWriteWorker((void*) &writeWorkerArgs[0]);
+ }
+ else {
+ for (i = 0; i < threadsnum; ++i) {
+ if (pthread_create(&writeWorkerIds[i], NULL, c2sWriteWorker, (void*) &writeWorkerArgs[i])) {
+ log_println(0, "Cannot create write worker thread for throughput upload test!");
+ writeWorkerIds[i] = 0;
+ return -1;
+ }
+ }
+
+ for (i = 0; i < threadsnum; ++i) {
+ pthread_join(writeWorkerIds[i], NULL);
+ }
+ }
+
sigaction(SIGPIPE, &old, NULL);
- sndqueue = sndq_len(outSocket); // get send-queue length
+ sndqueue = sndq_len(writeWorkerArgs[0].socketDescriptor); // get send-queue length

// get actual duration for which data was sent to the server
t = secs() - t;
+ for (i = 0; i < threadsnum; ++i) {
+ I2AddrFree(sec_addresses[i]);
+ }
I2AddrFree(sec_addr);

// Calculate C2S throughput in kbps
spdout = ((BITS_8_FLOAT * pkts * lth) / KILO) / t;
- // log_println(6," ---C->S CLT speed=%0.0f, pkts= %d, lth=%d, time=%d",
- // spdout, pkts, lth, t);
+ log_println(6, " ---C->S CLT speed=%0.0f, pkts= %d, lth=%d, time=%0.0f", spdout, pkts, lth, t);


// The client has stopped streaming data, and the server is now
@@ -204,6 +275,24 @@
else {
c2sspd = atoi(buff);
}
+#ifdef EXTTESTS_ENABLED
+ if (throughputsnaps != NULL) {
+ char* strtokptr = strtok(buff, " ");
+ while ((strtokptr = strtok(NULL, " ")) != NULL) {
+ if (lastThroughputSnapshot != NULL) {
+ lastThroughputSnapshot->next = (struct throughputSnapshot*) malloc(sizeof(struct throughputSnapshot));
+ lastThroughputSnapshot = lastThroughputSnapshot->next;
+ }
+ else {
+ uThroughputSnapshots = lastThroughputSnapshot = (struct throughputSnapshot*) malloc(sizeof(struct throughputSnapshot));
+ }
+ lastThroughputSnapshot->next = NULL;
+ lastThroughputSnapshot->time = atof(strtokptr);
+ strtokptr = strtok(NULL, " ");
+ lastThroughputSnapshot->throughput = atof(strtokptr);
+ }
+ }
+#endif

// Print results in the most convenient units (kbps or Mbps)
if (c2sspd < KILO)
@@ -230,3 +319,34 @@
}
return 0;
}
+
+void* c2sWriteWorker(void* arg) {
+ C2SWriteWorkerArgs *workerArgs = (C2SWriteWorkerArgs*) arg;
+ int connectionId = workerArgs->connectionId;
+ int socketDescriptor = workerArgs->socketDescriptor;
+ double stopTime = workerArgs->stopTime;
+ char* threadBuff = workerArgs->buff;
+ double threadBytes = 0;
+ int threadPackets = 0, n;
+ double threadTime = secs();
+
+ do {
+ n = write(socketDescriptor, threadBuff, lth); // TODO avoid snd block
+ // socket interrupted, continue attempting to write
+ if ((n == -1) && (errno == EINTR))
+ continue;
+ if (n < 0)
+ break; // all data written. Exit
+ threadPackets++;
+ threadBytes += n;
+ } while (secs() < stopTime);
+
+ if (connectionId == 1) {
+ pkts = threadPackets;
+ }
+
+ log_println(6, " ---C->S CLT thread %d (sc %d): speed=%0.0f, bytes=%0.0f, pkts= %d, lth=%d, time=%0.0f", connectionId, socketDescriptor,
+ ((BITS_8_FLOAT * threadBytes) / KILO) / (secs() - threadTime), threadBytes, threadPackets, lth, secs() - threadTime);
+
+ return NULL;
+}
=======================================
--- /branches/MultiplePorts/src/test_c2s_srv.c Wed May 28 11:17:18 2014 UTC
+++ /branches/MultiplePorts/src/test_c2s_srv.c Wed Nov 5 10:25:27 2014 UTC
@@ -66,19 +66,25 @@
tcp_stat_group* group = NULL;
/* The pipe that will return packet pair results */
int mon_pipe[2];
- int recvsfd; // receiver socket file descriptor
+ int recvsfd[7]; // receiver socket file descriptors (up to 7)
pid_t c2s_childpid = 0; // child process pids
int msgretvalue, tmpbytecount; // used during the "read"/"write" process
int i, j; // used as loop iterators
+ int threadsNum = 1;
+ int activeThreads = 1;

struct sockaddr_storage cli_addr;

socklen_t clilen;
char tmpstr[256]; // string array used for all sorts of temp storage purposes
double tmptime; // time indicator
+#ifdef EXTTESTS_ENABLED
+ double throughputSnapshotTime; // specify the next snapshot time
+#endif
+ double testDuration = 10; // default test duration
double bytes_read = 0; // number of bytes read during the throughput tests
struct timeval sel_tv; // time
- fd_set rfd; // receive file descriptor
+ fd_set rfd, tmpRfd; // receive file descriptors
char buff[BUFFSIZE + 1]; // message "payload" buffer
PortPair pair; // socket ports
I2Addr c2ssrv_addr = NULL; // c2s test's server address
@@ -159,7 +165,18 @@
// run tests
testOptions->c2ssockfd = I2AddrFD(c2ssrv_addr);
testOptions->c2ssockport = I2AddrPort(c2ssrv_addr);
- log_println(1, " -- port: %d", testOptions->c2ssockport);
+ log_println(1, " -- c2s %d port: %d", testOptions->child0, testOptions->c2ssockport);
+#ifdef EXTTESTS_ENABLED
+ if (testOptions->exttestsopt) {
+ log_println(1, " -- c2s ext -- duration = %d", options->uduration);
+ log_println(1, " -- c2s ext -- throughput snapshots: enabled = %s, delay = %d, offset = %d",
+ options->uthroughputsnaps ? "true" : "false", options->usnapsdelay, options->usnapsoffset);
+ log_println(1, " -- c2s ext -- number of threads: %d", options->uthreadsnum);
+ }
+
+
+169
+#endif
pair.port1 = testOptions->c2ssockport;
pair.port2 = -1;

@@ -173,6 +190,14 @@
"Sending 'GO' signal, to tell client %d to head for the next test",
testOptions->child0);
snprintf(buff, sizeof(buff), "%d", testOptions->c2ssockport);
+#ifdef EXTTESTS_ENABLED
+ if (testOptions->exttestsopt) {
+ snprintf(buff, sizeof(buff), "%d %d %d %d %d %d", testOptions->c2ssockport,
+ options->uduration, options->uthroughputsnaps,
+ options->usnapsdelay, options->usnapsoffset, options->uthreadsnum);
+ lastThroughputSnapshot = NULL;
+ }
+#endif

// send TEST_PREPARE message with ephemeral port detail, indicating start
// of tests
@@ -190,10 +215,16 @@
FD_SET(testOptions->c2ssockfd, &rfd);
sel_tv.tv_sec = 5;
sel_tv.tv_usec = 0;
+ i = 0;
+#ifdef EXTTESTS_ENABLED
+ if (testOptions->exttestsopt) {
+ threadsNum = options->uthreadsnum;
+ testDuration = options->uduration / 1000.0;
+ }
+#endif

- for (j = 0; j < RETRY_COUNT; j++) {
- msgretvalue = select((testOptions->c2ssockfd) + 1, &rfd, NULL, NULL,
- &sel_tv);
+ for (j = 0; j < RETRY_COUNT * threadsNum; j++) {
+ msgretvalue = select((testOptions->c2ssockfd) + 1, &rfd, NULL, NULL, &sel_tv); // TODO
// socket interrupted. continue waiting for activity on socket
if ((msgretvalue == -1) && (errno == EINTR))
continue;
@@ -201,27 +232,29 @@
return -SOCKET_CONNECT_TIMEOUT;
if (msgretvalue < 0) // other socket errors. exit
return -errno;
- if (j == (RETRY_COUNT - 1)) // retry exceeded. exit
+ if (j == (RETRY_COUNT*threadsNum - 1)) // retry exceeded. exit
return -RETRY_EXCEEDED_WAITING_CONNECT;
recfd:

// If a valid connection request is received, client has connected.
// Proceed. Note the new socket fd - recvsfd- used in the throughput test
- recvsfd = accept(testOptions->c2ssockfd,
- (struct sockaddr *) &cli_addr, &clilen);
- if (recvsfd > 0) {
- log_println(6, "accept() for %d completed",
- testOptions->child0);
+ recvsfd[i] = accept(testOptions->c2ssockfd, (struct sockaddr *) &cli_addr[i], &clilen);
+ if (recvsfd[i] > 0) {
+ i++;
+ log_println(6, "accept(%d/%d) for %d completed", i, threadsNum, testOptions->child0);
+
+ if (i < threadsNum) {
+ continue;
+ }

// log protocol validation indicating client accept
procstatusenum = PROCESS_STARTED;
proctypeenum = CONNECT_TYPE;
- protolog_procstatus(testOptions->child0, testids, proctypeenum,
- procstatusenum, recvsfd);
+ protolog_procstatus(testOptions->child0, testids, proctypeenum, procstatusenum, recvsfd[i]);
break;
}
// socket interrupted, wait some more
- if ((recvsfd == -1) && (errno == EINTR)) {
+ if ((recvsfd[i] == -1) && (errno == EINTR)) {
log_println(
6,
"Child %d interrupted while waiting for accept() to complete",
@@ -232,10 +265,10 @@
6,
"------- C2S connection setup for %d returned because (%d)",
testOptions->child0, errno);
- if (recvsfd < 0) { // other socket errors, quit
+ if (recvsfd[i] < 0) { // other socket errors, quit
return -errno;
}
- if (j == (RETRY_COUNT - 1)) { // retry exceeded, quit
+ if (j == (RETRY_COUNT*threadsNum - 1)) { // retry exceeded, quit
log_println(
6,
"c2s child %d, uable to open connection, return from test",
@@ -245,40 +278,31 @@
}

// Get address associated with the throughput test. Used for packet tracing
- log_println(6, "child %d - c2s ready for test with fd=%d",
- testOptions->child0, recvsfd);
+ log_println(6, "child %d - c2s ready for test with fd=%d", testOptions->child0, recvsfd[i]);

// commenting out below to move to init_pkttrace function
- I2Addr src_addr = I2AddrByLocalSockFD(get_errhandle(), recvsfd, 0);
+ I2Addr src_addr = I2AddrByLocalSockFD(get_errhandle(), recvsfd[i], 0);

// Get tcp_stat connection. Used to collect tcp_stat variable statistics
- conn = tcp_stat_connection_from_socket(agent, recvsfd);
+ conn = tcp_stat_connection_from_socket(agent, recvsfd[i]);

// set up packet tracing. Collected data is used for bottleneck link
// calculations
if (getuid() == 0) {
- /*
- pipe(mon_pipe1);
- log_println(0, "%s test calling pkt_trace_start() with pd=%d for device %s, addr %s",
- currenttestdesc, clilen, device , src_addr);
-
- start_packet_trace(recvsfd, testOptions->c2ssockfd, &c2s_childpid,
- mon_pipe1, (struct sockaddr *) &cli_addr, clilen, device,
- &pair, currenttestdesc, options->compress, meta.c2s_ndttrace);
- */
-
pipe(mon_pipe);
if ((c2s_childpid = fork()) == 0) {
/* close(ctlsockfd); */
close(testOptions->c2ssockfd);
- close(recvsfd);
+ for (i = 0; i < threadsNum; i++) {
+ close(recvsfd[i]);
+ }
log_println(
5,
"C2S test Child %d thinks pipe() returned fd0=%d, fd1=%d",
testOptions->child0, mon_pipe[0], mon_pipe[1]);
log_println(2, "C2S test calling init_pkttrace() with pd=%p",
- &cli_addr);
- init_pkttrace(src_addr, (struct sockaddr *) &cli_addr, clilen,
+ &cli_addr[0]);
+ init_pkttrace(src_addr, (struct sockaddr *) &cli_addr[0], clilen,
mon_pipe, device, &pair, "c2s", options->compress);
log_println(1, "c2s is exiting gracefully");
/* Close the pipe */
@@ -311,7 +335,7 @@

// Create C->S snaplog directories, and perform some initialization based on
// options
- create_client_logdir((struct sockaddr *) &cli_addr, clilen,
+ create_client_logdir((struct sockaddr *) &cli_addr[0], clilen,
options->c2s_logname, sizeof(options->c2s_logname),
namesuffix,
sizeof(namesuffix));
@@ -337,26 +361,61 @@
meta.c2s_snaplog, options->c2s_logname, conn, group);
// Wait on listening socket and read data once ready.
tmptime = secs();
- sel_tv.tv_sec = 11; // time out after 11 seconds
+#ifdef EXTTESTS_ENABLED
+ throughputSnapshotTime = tmptime + (options->usnapsoffset / 1000.0);
+#endif
+ sel_tv.tv_sec = testDuration + 1; // time out after test duration + 1sec
sel_tv.tv_usec = 0;
FD_ZERO(&rfd);
- FD_SET(recvsfd, &rfd);
+ activeThreads = threadsNum;
+ for (i = 0; i < threadsNum; i++) {
+ FD_SET(recvsfd[i], &rfd);
+ }
for (;;) {
- msgretvalue = select(recvsfd + 1, &rfd, NULL, NULL, &sel_tv);
+readMainLoop:
+ tmpRfd = rfd;
+ msgretvalue = select(recvsfd[threadsNum-1] + 1, &tmpRfd, NULL, NULL, &sel_tv);
+#ifdef EXTTESTS_ENABLED
+ if (testOptions->exttestsopt && options->uthroughputsnaps && secs() > throughputSnapshotTime) {
+ if (lastThroughputSnapshot != NULL) {
+ lastThroughputSnapshot->next = (struct throughputSnapshot*) malloc(sizeof(struct throughputSnapshot));
+ lastThroughputSnapshot = lastThroughputSnapshot->next;
+ }
+ else {
+ uThroughputSnapshots = lastThroughputSnapshot = (struct throughputSnapshot*) malloc(sizeof(struct throughputSnapshot));
+ }
+ lastThroughputSnapshot->next = NULL;
+ lastThroughputSnapshot->time = secs() - tmptime;
+ lastThroughputSnapshot->throughput = (8.e-3 * bytes_read) / (lastThroughputSnapshot->time); // kbps
+ log_println(6, " ---C->S: Throughput at %0.2f secs: Received %0.0f bytes, Spdin= %f",
+ lastThroughputSnapshot->time, bytes_read, lastThroughputSnapshot->throughput);
+ throughputSnapshotTime += options->usnapsdelay / 1000.0;
+ }
+#endif
if ((msgretvalue == -1) && (errno == EINTR)) {
// socket interrupted. Continue waiting for activity on socket
continue;
}
if (msgretvalue > 0) { // read from socket
- tmpbytecount = read(recvsfd, buff, sizeof(buff));
- // read interrupted, continue waiting
- if ((tmpbytecount == -1) && (errno == EINTR))
- continue;
- if (tmpbytecount == 0) // all data has been read
- break;
- bytes_read += tmpbytecount; // data byte count has to be increased
+ for (i = 0; i < threadsNum; i++) {
+ if (FD_ISSET(recvsfd[i], &tmpRfd)) {
+ tmpbytecount = read(recvsfd[i], buff, sizeof(buff));
+ // read interrupted, continue waiting
+ if ((tmpbytecount == -1) && (errno == EINTR))
+ goto readMainLoop;
+ if (tmpbytecount == 0) { // all data has been read
+ activeThreads--;
+ FD_CLR(recvsfd[i], &rfd);
+ if (activeThreads == 0) {
+ goto breakMainLoop;
+ }
+ }
+ bytes_read += tmpbytecount; // data byte count has to be increased
+ }
+ }
continue;
}
+breakMainLoop:
break;
}

@@ -374,15 +433,27 @@
testOptions->child0);
log_println(1, "%s", buff);
snprintf(buff, sizeof(buff), "%0.0f", *c2sspd);
+#ifdef EXTTESTS_ENABLED
+ if (uThroughputSnapshots != NULL) {
+ struct throughputSnapshot *snapshotsPtr = uThroughputSnapshots;
+ while (snapshotsPtr != NULL) {
+ int currBuffLength = strlen(buff);
+ snprintf(&buff[currBuffLength], sizeof(buff)-currBuffLength, " %0.2f %0.2f", snapshotsPtr->time, snapshotsPtr->throughput);
+ snapshotsPtr = snapshotsPtr->next;
+ }
+ }
+#endif
send_json_message(ctlsockfd, TEST_MSG, buff, strlen(buff), testOptions->json_support, JSON_SINGLE_VALUE);

// get receiver side Web100 stats and write them to the log file. close
// sockets
if (record_reverse == 1)
- tcp_stat_get_data_recv(recvsfd, agent, conn, count_vars);
+ tcp_stat_get_data_recv(recvsfd[i], agent, conn, count_vars);


- close(recvsfd);
+ for (i = 0; i < threadsNum; i++) {
+ close(recvsfd[i]);
+ }
close(testOptions->c2ssockfd);

// Next, send speed-chk a flag to retrieve the data it collected.
=======================================
--- /branches/MultiplePorts/src/test_meta_srv.c Wed May 28 11:17:18 2014 UTC
+++ /branches/MultiplePorts/src/test_meta_srv.c Wed Nov 5 10:25:27 2014 UTC
@@ -20,12 +20,16 @@
#include "testoptions.h"
#include "jsonutils.h"

+ void addAdditionalMetaEntry(struct metaentry **ptr, char* key, char* value);
+ void addAdditionalMetaIntEntry(struct metaentry **ptr, char* key, int value);
+
/**
* Performs the META test.
* @param ctlsockfd Client control socket descriptor
* @param agent UNUSED Web100 agent used to track the connection
* @param testOptions The test options
* @param conn_options The connection options
+ * @param options Test Option variables
* @return 0 - success,
* >0 - error code.
* Error codes:
@@ -40,12 +44,12 @@
*/

int test_meta_srv(int ctlsockfd, tcp_stat_agent* agent,
- TestOptions* testOptions, int conn_options) {
+ TestOptions* testOptions, int conn_options, Options* options) {
int j;
int msgLen, msgType;
char buff[BUFFSIZE + 1];
struct metaentry *new_entry = NULL;
- char* value, *jsonMsgValue;
+ char* value;

// protocol validation logs
enum TEST_ID testids = META;
@@ -56,7 +60,7 @@
log_println(1, " <-- META test -->");

// log protocol validation details
- teststatuses = TEST_STARTED;
+ teststatuses = TEST_STARTED;
protolog_status(testOptions->child0, testids, teststatuses, ctlsockfd);

// first message exchanged is am empty TEST_PREPARE message
@@ -154,20 +158,22 @@
/*continue;*/
}

- // now get all the key-value tuples
- if (new_entry) {
- new_entry->next = (struct metaentry *) malloc(
- sizeof(struct metaentry));
- new_entry = new_entry->next;
- } else {
- new_entry = (struct metaentry *) malloc(
- sizeof(struct metaentry));
- meta.additional = new_entry;
- }
- snprintf(new_entry->key, sizeof(new_entry->key), "%s", buff);
- snprintf(new_entry->value, sizeof(new_entry->value), "%s", value);
+ addAdditionalMetaEntry(&new_entry, buff, value);
+ }
+#ifdef EXTTESTS_ENABLED
+ if (testOptions->exttestsopt) {
+ addAdditionalMetaIntEntry(&new_entry, "ext.c2s.duration", options->uduration);
+ addAdditionalMetaEntry(&new_entry, "ext.c2s.throughputsnaps", options->uthroughputsnaps ? "true" : "false");
+ addAdditionalMetaIntEntry(&new_entry, "ext.c2s.snapsdelay", options->usnapsdelay);
+ addAdditionalMetaIntEntry(&new_entry, "ext.c2s.snapsoffset", options->usnapsoffset);
+ addAdditionalMetaIntEntry(&new_entry, "ext.c2s.threadsnum", options->uthreadsnum);
+ addAdditionalMetaIntEntry(&new_entry, "ext.s2c.duration", options->dduration);
+ addAdditionalMetaEntry(&new_entry, "ext.s2c.throughputsnaps", options->dthroughputsnaps ? "true" : "false");
+ addAdditionalMetaIntEntry(&new_entry, "ext.s2c.snapsdelay", options->dsnapsdelay);
+ addAdditionalMetaIntEntry(&new_entry, "ext.s2c.snapsoffset", options->dsnapsoffset);
+ addAdditionalMetaIntEntry(&new_entry, "ext.s2c.threadsnum", options->dthreadsnum);
}
- new_entry->next = NULL; // ensure meta list ends here
+#endif

// Finalize test by sending appropriate message, and setting status
if (send_json_message(ctlsockfd, TEST_FINALIZE, "", 0,
@@ -185,3 +191,22 @@
}
return 0;
}
+
+void addAdditionalMetaEntry(struct metaentry **ptr, char* key, char* value) {
+ if (*ptr) {
+ (*ptr)->next = (struct metaentry *) malloc(sizeof(struct metaentry));
+ (*ptr) = (*ptr)->next;
+ } else {
+ (*ptr) = (struct metaentry *) malloc(sizeof(struct metaentry));
+ meta.additional = (*ptr);
+ }
+ (*ptr)->next = NULL; // ensure meta list ends here
+ snprintf((*ptr)->key, sizeof((*ptr)->key), "%s", key);
+ snprintf((*ptr)->value, sizeof((*ptr)->value), "%s", value);
+}
+
+void addAdditionalMetaIntEntry(struct metaentry **ptr, char* key, int value) {
+ char tmpbuff[256];
+ snprintf(tmpbuff, sizeof(tmpbuff), "%d", value);
+ addAdditionalMetaEntry(ptr, key, tmpbuff);
+}
=======================================
--- /branches/MultiplePorts/src/test_results_clt.c Mon Oct 14 13:20:21 2013 UTC
+++ /branches/MultiplePorts/src/test_results_clt.c Wed Nov 5 10:25:27 2014 UTC
@@ -467,6 +467,35 @@
printf("Server Data reports link is '%3d', Server Acks report link is "
"'%3d'\n", s2c_linkspeed_data, s2c_linkspeed_ack);
}
+
+#ifdef EXTTESTS_ENABLED
+/**
+ * Print calculated throughput snapshots
+ *
+ * @param dThroughputSnapshots Throughput snapshots calculated during s2c test
+ * @param uThroughputSnapshots Throughput snapshots calculated during c2s test
+ */
+void print_throughput_snapshots(struct throughputSnapshot *dThroughputSnapshots,
+ struct throughputSnapshot *uThroughputSnapshots) {
+ struct throughputSnapshot *snapshotsPtr;
+ if (uThroughputSnapshots != NULL) {
+ snapshotsPtr = uThroughputSnapshots;
+ printf(" ---C->S (upload): Throughput snapshots:\n");
+ while (snapshotsPtr != NULL) {
+ printf(" * Test duration: %0.2f secs, Throughput: %f kbps\n", snapshotsPtr->time, snapshotsPtr->throughput);
+ snapshotsPtr = snapshotsPtr->next;
+ }
+ }
+ if (dThroughputSnapshots != NULL) {
+ snapshotsPtr = dThroughputSnapshots;
+ printf(" ---S->C (download): Throughput snapshots:\n");
+ while (snapshotsPtr != NULL) {
+ printf(" * Test duration: %0.2f secs, Throughput: %f kbps\n", snapshotsPtr->time, snapshotsPtr->throughput);
+ snapshotsPtr = snapshotsPtr->next;
+ }
+ }
+}
+#endif

/**
* A Network Address Translation (NAT) box is detected by
=======================================
--- /branches/MultiplePorts/src/test_results_clt.h Thu Nov 15 21:33:44 2012 UTC
+++ /branches/MultiplePorts/src/test_results_clt.h Wed Nov 5 10:25:27 2014 UTC
@@ -78,6 +78,11 @@
int c2s_linkspeed_ack, int s2c_linkspeed_data,
int s2c_linkspeed_ack);

+#ifdef EXTTESTS_ENABLED
+void print_throughput_snapshots(struct throughputSnapshot *dThroughputSnapshots,
+ struct throughputSnapshot *uThroughputSnapshots);
+#endif
+
// Check if a Network Address translation box is modifying IP addresses
// of server or client
void check_NAT(char *ssip, char *csip, char *scip, char *ccip);
=======================================
--- /branches/MultiplePorts/src/test_s2c_clt.c Wed May 28 11:17:18 2014 UTC
+++ /branches/MultiplePorts/src/test_s2c_clt.c Wed Nov 5 10:25:27 2014 UTC
@@ -2,7 +2,7 @@
* This file contains the functions needed to handle S2C throughput
* test (client part).
*
- * Jakub S�awi�ski 2006-08-02
+ * Jakub Sławiński 2006-08-02
*

*/

@@ -51,13 +51,25 @@
int msgLen, msgType;
int s2cport = atoi(PORT3);
I2Addr sec_addr = NULL;
+ I2Addr sec_addresses[7]; // server addresses per thread
int inlth, retcode, one = 1, set_size;
- int inSocket;
+ int inSocket[7]; // up to 7
socklen_t optlen;
uint64_t bytes;
- double t;
+ double testStartTime, t;
+ double testDuration = 10; // default test duration
+ char* strtokptr; // pointer used by the strtok method
+#ifdef EXTTESTS_ENABLED
+ int throughputsnaps = 0; // enable the throughput snapshots writing
+ int snapsdelay = 5000; // specify the delay in the throughput snapshots thread
+ int snapsoffset = 1000; // specify the initial offset in the throughput snapshots thread
+ double throughputSnapshotTime; // specify the next snapshot time
+#endif
+ int threadsnum = 1; // specify the number of threads (parallel TCP connections)
+ int activeThreads = 1;
+ int i;
struct timeval sel_tv;
- fd_set rfd;
+ fd_set rfd, tmpRfd;
char* ptr, *jsonMsgValue;

// variables used for protocol validation logs
@@ -94,10 +106,40 @@
log_println(0, "Improper message");
return 3;
}
+#ifdef EXTTESTS_ENABLED
+ if (tests & TEST_EXT) {
+ strtokptr = strtok(buff, " ");
+ s2cport = atoi(strtokptr);
+ strtokptr = strtok(NULL, " ");
+ testDuration = atoi(strtokptr) / 1000.0;
+ strtokptr = strtok(NULL, " ");
+ throughputsnaps = atoi(strtokptr);
+ strtokptr = strtok(NULL, " ");
+ snapsdelay = atoi(strtokptr);
+ strtokptr = strtok(NULL, " ");
+ snapsoffset = atoi(strtokptr);
+ strtokptr = strtok(NULL, " ");
+ threadsnum = atoi(strtokptr);
+
+ log_println(1, " -- test duration: %.1fs", testDuration);
+ log_println(1, " -- throughput snapshots: enabled = %s, delay = %d, offset = %d",
+ throughputsnaps ? "true" : "false", snapsdelay, snapsoffset);
+ log_println(1, " -- threads: %d", threadsnum);
+ lastThroughputSnapshot = NULL;
+ }
+ else {
+ if (check_int(buff, &s2cport)) {
+ log_println(0, "Invalid port number");
+ return 4;
+ }
+ }
+#else
if (check_int(buff, &s2cport)) {
log_println(0, "Invalid port number");
return 4;
}
+#endif
+
log_println(1, " -- port: %d", s2cport);

/* Cygwin seems to want/need this extra getsockopt() function
@@ -115,13 +157,14 @@
I2AddrSetPort(sec_addr, s2cport); // set port to value obtained from server

// Connect to the server; set socket options
- if ((retcode = CreateConnectSocket(&inSocket, NULL, sec_addr, conn_options,
- buf_size))) {
- log_println(0, "Connect() for Server to Client failed", strerror(errno));
- return -15;
+ for (i = 0; i < threadsnum; ++i) {
+ sec_addresses[i] = I2AddrCopy(sec_addr);
+ if ((retcode = CreateConnectSocket(&inSocket[i], NULL, sec_addresses[i], conn_options, buf_size))) {
+ log_println(0, "Connect() for Server to Client failed (connection %d)", strerror(errno), i+1);
+ return -15;
+ }
+ setsockopt(inSocket[i], SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
-
- setsockopt(inSocket, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));

/* Linux updates the sel_tv time values everytime select returns. This
* means that eventually the timer will reach 0 seconds and select will
@@ -144,33 +187,64 @@
return 2;
}

- // server performs the S->C throughput test now. Get data sent by server.
-
- printf("running 10s inbound test (server to client) . . . . . . ");
+ printf("running %.1fs inbound test (server to client) . . . . . ", testDuration);
fflush(stdout);

- // Set socket timeout to 15 seconds
+ // Set socket timeout to testDuration seconds
bytes = 0;
- t = secs() + 15.0;
- sel_tv.tv_sec = 15;
+ t = testStartTime + testDuration + 5;
+#ifdef EXTTESTS_ENABLED
+ throughputSnapshotTime = testStartTime + (snapsoffset / 1000.0);
+#endif
+ sel_tv.tv_sec = testDuration + 5;
sel_tv.tv_usec = 5;
FD_ZERO(&rfd);
- FD_SET(inSocket, &rfd);
- // Read data sent by server as soon as it is available. Stop listening if
- // ...timeout has been exceeded.
+ activeThreads = threadsnum;
+ for (i = 0; i < threadsnum; i++) {
+ FD_SET(inSocket[i], &rfd);
+ }
+ // Read data sent by server as soon as it is available. Stop listening if timeout has been exceeded.
for (;;) {
- retcode = select(inSocket+1, &rfd, NULL, NULL, &sel_tv);
+ tmpRfd = rfd;
+ retcode = select(inSocket[threadsnum-1]+1, &tmpRfd, NULL, NULL, &sel_tv);
if (secs() > t) {
log_println(5, "Receive test running long, break out of read loop");
break;
}
+#ifdef EXTTESTS_ENABLED
+ if (throughputsnaps && secs() > throughputSnapshotTime) {
+ if (lastThroughputSnapshot != NULL) {
+ lastThroughputSnapshot->next = (struct throughputSnapshot*) malloc(sizeof(struct throughputSnapshot));
+ lastThroughputSnapshot = lastThroughputSnapshot->next;
+ }
+ else {
+ dThroughputSnapshots = lastThroughputSnapshot = (struct throughputSnapshot*) malloc(sizeof(struct throughputSnapshot));
+ }
+ lastThroughputSnapshot->next = NULL;
+ lastThroughputSnapshot->time = secs() - testStartTime;
+ lastThroughputSnapshot->throughput = ((BITS_8_FLOAT * bytes) / KILO) / (lastThroughputSnapshot->time); // kbps
+ log_println(6, " ---S->C: Throughput at %0.2f secs: Received %lld bytes, Spdin= %f kbps",
+ lastThroughputSnapshot->time, bytes, lastThroughputSnapshot->throughput);
+ throughputSnapshotTime += snapsdelay / 1000.0;
+ }
+#endif
if (retcode > 0) {
- inlth = read(inSocket, buff, sizeof(buff));
- if (inlth == 0)
- break;
- bytes += inlth;
+ for (i = 0; i < threadsnum; i++) {
+ if (FD_ISSET(inSocket[i], &tmpRfd)) {
+ inlth = read(inSocket[i], buff, sizeof(buff));
+ if (inlth == 0) {
+ activeThreads--;
+ FD_CLR(inSocket[i], &rfd);
+ if (activeThreads == 0) {
+ goto breakOuterLoop;
+ }
+ }
+ bytes += inlth;
+ }
+ }
continue;
}
+breakOuterLoop:
if (get_debuglvl() > 5) {
log_println(0, "s2c read loop exiting:", strerror(errno));
}
@@ -179,11 +253,10 @@

// get actual time for which data was received, and calculate throughput
// based on it.
- t = secs() - t + 15.0;
+ t = secs() - testStartTime;
spdin = ((BITS_8_FLOAT * bytes) / KILO) / t; // kbps

- // log_println(0,"S->C: Received %d bytes in %0.2f secs: Spdin= %f", bytes,
- // t, spdin);
+ log_println(6, " ---S->C: Received %lld bytes in %0.2f secs: Spdin= %f", bytes, t, spdin);

// Server sends calculated throughput value, unsent data amount in the
// socket queue and overall number of sent bytes in a TEST_MSG
@@ -258,6 +331,16 @@

// send TEST_MSG to server with the client-calculated throughput
snprintf(buff, sizeof(buff), "%0.0f", spdin);
+#ifdef EXTTESTS_ENABLED
+ if (dThroughputSnapshots != NULL) {
+ struct throughputSnapshot *snapshotsPtr = dThroughputSnapshots;
+ while (snapshotsPtr != NULL) {
+ int currBuffLength = strlen(buff);
+ snprintf(&buff[currBuffLength], sizeof(buff)-currBuffLength, " %0.2f %0.2f", snapshotsPtr->time, snapshotsPtr->throughput);
+ snapshotsPtr = snapshotsPtr->next;
+ }
+ }
+#endif
send_json_message(ctlSocket, TEST_MSG, buff, strlen(buff),
jsonSupport, JSON_SINGLE_VALUE);

=======================================
--- /branches/MultiplePorts/src/test_s2c_srv.c Wed Jun 18 05:44:40 2014 UTC
+++ /branches/MultiplePorts/src/test_s2c_srv.c Wed Nov 5 10:25:27 2014 UTC
@@ -26,6 +26,15 @@
extern pthread_mutex_t mainmutex;
extern pthread_cond_t maincond;

+typedef struct s2cWriteWorkerArgs {
+ int connectionId;
+ int socketDescriptor;
+ double stopTime;
+ char* buff;
+} S2CWriteWorkerArgs;
+
+void* s2cWriteWorker(void* arg);
+
const char RESULTS_KEYS[] = "ThroughputValue UnsentDataAmount TotalSentByte";

/**
@@ -65,6 +74,7 @@
* -2 - Cannot write message data while
attempting to send
* TEST_PREPARE message, or Unexpected message type
received
* -3 - Received message is invalid
+ * -4: Unable to create worker threads
* -100 - timeout while waiting for client to connect to server�s ephemeral port
* -101 - Retries exceeded while waiting for
client to connect
* -102 - Retries exceeded while waiting for data from connected client
@@ -78,31 +88,35 @@
/* experimental code to capture and log multiple copies of the
* web100 variables using the web100_snap() & log() functions.
*/
- web100_snapshot* tsnap = NULL;
- web100_snapshot* rsnap = NULL;
- web100_group* tgroup;
- web100_group* rgroup;
+ web100_snapshot* tsnap[7] = {NULL, NULL, NULL, NULL, NULL, NULL, NULL};
+ web100_snapshot* rsnap[7] = {NULL, NULL, NULL, NULL, NULL, NULL, NULL};
+ web100_group* tgroup[7];
+ web100_group* rgroup[7];
web100_var* var;
#elif USE_WEB10G
- estats_val_data* snap;
+ estats_val_data* snap[7]; // up to 7 connections
#endif
- tcp_stat_connection conn;
+ tcp_stat_connection conn[7]; // up to 7 connections
/* Just a holder for web10g */
tcp_stat_group* group = NULL;
/* Pipe that handles returning packet pair timing */
int mon_pipe[2];
int ret; // ctrl protocol read/write return status
int j, k, n;
- int xmitsfd; // transmit (i.e server) socket fd
+ int threadsNum = 1;
+ S2CWriteWorkerArgs writeWorkerArgs[7]; // write workers parameters
+ pthread_t writeWorkerIds[7]; // write workers ids
+ int xmitsfd[7]; // transmit (i.e server) socket fds (up to 7)
pid_t s2c_childpid = 0; // s2c_childpid

char tmpstr[256]; // string array used for temp storage of many char*
- struct sockaddr_storage cli_addr;
+ struct sockaddr_storage cli_addr[7];

socklen_t clilen;
double bytes_written; // bytes written in the throughput test
double tx_duration; // total time for which data was txed
double tmptime; // temporary time store
+ double testDuration = 10; // default test duration
double x2cspd; // s->c test throughput
struct timeval sel_tv; // time
fd_set rfd; // receive file descriptor
@@ -208,14 +222,29 @@
// run tests
testOptions->s2csockfd = I2AddrFD(s2csrv_addr);
testOptions->s2csockport = I2AddrPort(s2csrv_addr);
- log_println(1, " -- s2c %d port: %d", testOptions->child0,
- testOptions->s2csockport);
+ log_println(1, " -- s2c %d port: %d", testOptions->child0, testOptions->s2csockport);
+#ifdef EXTTESTS_ENABLED
+ if (testOptions->exttestsopt) {
+ log_println(1, " -- s2c ext -- duration = %d", options->dduration);
+ log_println(1, " -- s2c ext -- throughput snapshots: enabled = %s, delay = %d, offset = %d",
+ options->dthroughputsnaps ? "true" : "false", options->dsnapsdelay, options->dsnapsoffset);
+ log_println(1, " -- s2c ext -- number of threads: %d", options->dthreadsnum);
+ }
+#endif
pair.port1 = -1;
pair.port2 = testOptions->s2csockport;

// Data received from speed-chk. Send TEST_PREPARE "GO" signal with port
// number
snprintf(buff, sizeof(buff), "%d", testOptions->s2csockport);
+#ifdef EXTTESTS_ENABLED
+ if (testOptions->exttestsopt) {
+ snprintf(buff, sizeof(buff), "%d %d %d %d %d %d", testOptions->s2csockport,
+ options->dduration, options->dthroughputsnaps,
+ options->dsnapsdelay, options->dsnapsoffset, options->dthreadsnum);
+ lastThroughputSnapshot = NULL;
+ }
+#endif
j = send_json_message(ctlsockfd, TEST_PREPARE, buff, strlen(buff),
testOptions->json_support, JSON_SINGLE_VALUE);
if (j == -1) {
@@ -236,12 +265,20 @@
log_println(1, "%d waiting for data on testOptions->s2csockfd",
testOptions->child0);

- clilen = sizeof(cli_addr);
+ clilen = sizeof(cli_addr[0]);
FD_ZERO(&rfd);
FD_SET(testOptions->s2csockfd, &rfd);
sel_tv.tv_sec = 5; // wait for 5 secs
sel_tv.tv_usec = 0;
- for (j = 0; j < RETRY_COUNT; j++) {
+ i = 0;
+#ifdef EXTTESTS_ENABLED
+ if (testOptions->exttestsopt) {
+ threadsNum = options->dthreadsnum;
+ testDuration = options->dduration / 1000.0;
+ }
+#endif
+
+ for (j = 0; j < RETRY_COUNT*threadsNum; j++) {
ret = select((testOptions->s2csockfd) + 1, &rfd, NULL, NULL,
&sel_tv);
if ((ret == -1) && (errno == EINTR))
@@ -250,25 +287,26 @@
return SOCKET_CONNECT_TIMEOUT; // timeout
if (ret < 0)
return -errno; // other socket errors. exit
- if (j == 4)
+ if (j == (RETRY_COUNT*threadsNum - 1))
return RETRY_EXCEEDED_WAITING_CONNECT; // retry exceeded. exit

// If a valid connection request is received, client has connected.
// Proceed.
// Note the new socket fd - xmitfd - used in the throughput test
-ximfd: xmitsfd = accept(testOptions->s2csockfd,
- (struct sockaddr *) &cli_addr, &clilen);
- if (xmitsfd > 0) {
- log_println(6, "accept() for %d completed",
- testOptions->child0);
+ximfd: xmitsfd[i] = accept(testOptions->s2csockfd, (struct sockaddr *) &cli_addr[i], &clilen);
+ if (xmitsfd[i] > 0) {
+ i++;
+ log_println(6, "accept(%d/%d) for %d completed", i, threadsNum, testOptions->child0);
+ if (i < threadsNum) {
+ continue;
+ }
procstatusenum = PROCESS_STARTED;
proctypeenum = CONNECT_TYPE;
- protolog_procstatus(testOptions->child0, testids, proctypeenum,
- procstatusenum, xmitsfd);
+ protolog_procstatus(testOptions->child0, testids, proctypeenum, procstatusenum, xmitsfd[0]);
break;
}
// socket interrupted, wait some more
- if ((xmitsfd == -1) && (errno == EINTR)) {
+ if ((xmitsfd[i] == -1) && (errno == EINTR)) {
log_println(
6,
"Child %d interrupted while waiting for accept() to complete",
@@ -279,9 +317,9 @@
6,
"------- S2C connection setup for %d returned because (%d)",
testOptions->child0, errno);
- if (xmitsfd < 0) // other socket errors, quit
+ if (xmitsfd[i] < 0) // other socket errors, quit
return -errno;
- if (++j == 4) { // retry exceeded, quit
+ if (j == (RETRY_COUNT*threadsNum - 1)) { // retry exceeded, quit
log_println(
6,
"s2c child %d, unable to open connection, return from test",
@@ -289,36 +327,33 @@
return -102;
}
}
- src_addr = I2AddrByLocalSockFD(get_errhandle(), xmitsfd, 0);
- conn = tcp_stat_connection_from_socket(agent, xmitsfd);
+ src_addr = I2AddrByLocalSockFD(get_errhandle(), xmitsfd[0], 0);
+ for (i = 0; i < threadsNum; ++i) {
+ conn[i] = tcp_stat_connection_from_socket(agent, xmitsfd[i]);
+ }

// set up packet capture. The data collected is used for bottleneck link
// calculations
- if (xmitsfd > 0) {
+ if (xmitsfd[0] > 0) {
log_println(6, "S2C child %d, ready to fork()",
testOptions->child0);
if (getuid() == 0) {
- /*
- pipe(mon_pipe2);
- start_packet_trace(xmitsfd, testOptions->s2csockfd,
- &s2c_childpid, mon_pipe2, (struct sockaddr *) &cli_addr,
- clilen, device, &pair, "s2c", options->compress,
- meta.s2c_ndttrace);
- */
pipe(mon_pipe);
if ((s2c_childpid = fork()) == 0) {
/* close(ctlsockfd); */
close(testOptions->s2csockfd);
- close(xmitsfd);
+ for (i = 0; i < threadsNum; i++) {
+ close(xmitsfd[i]);
+ }
log_println(
5,
"S2C test Child thinks pipe() returned fd0=%d, fd1=%d",
mon_pipe[0], mon_pipe[1]);
- // log_println(2, "S2C test calling init_pkttrace() with pd=0x%x",
- // (int) &cli_addr);
- init_pkttrace(src_addr, (struct sockaddr *) &cli_addr,
+ log_println(2, "S2C test calling init_pkttrace() with pd=%p",
+ &cli_addr[0]);
+ init_pkttrace(src_addr, (struct sockaddr *) &cli_addr[0],
clilen, mon_pipe, device, &pair, "s2c",
- options->compress);
+ options->dduration / 1000.0);
log_println(6,
"S2C test ended, why is timer still running?");
/* Close the pipe */
@@ -342,7 +377,7 @@
}

/* experimental code, delete when finished */
- setCwndlimit(conn, group, agent, options);
+ setCwndlimit(conn[0], group, agent, options);
/* End of test code */

// create directory to write web100 snaplog trace
@@ -357,14 +392,16 @@
// system("/sbin/sysctl -w net.ipv4.route.flush=1");
system("echo 1 > /proc/sys/net/ipv4/route/flush");
}
+ for (i = 0; i < threadsNum; ++i) {
#if USE_WEB100
- rgroup = web100_group_find(agent, "read");
- rsnap = web100_snapshot_alloc(rgroup, conn);
- tgroup = web100_group_find(agent, "tune");
- tsnap = web100_snapshot_alloc(tgroup, conn);
+ rgroup[i] = web100_group_find(agent, "read");
+ rsnap[i] = web100_snapshot_alloc(rgroup[i], conn[i]);
+ tgroup[i] = web100_group_find(agent, "tune");
+ tsnap[i] = web100_snapshot_alloc(tgroup[i], conn[i]);
#elif USE_WEB10G
- estats_val_data_new(&snap);
+ estats_val_data_new(&snap[i]);
#endif
+ }

// fill send buffer with random printable data for throughput test
bytes_written = 0;
@@ -400,77 +437,93 @@
conn, group);*///new file changes
start_snap_worker(&snapArgs, agent, peaks, options->snaplog,
&workerThreadId, meta.s2c_snaplog, options->s2c_logname,
- conn, group);
+ conn[0], group);

/* alarm(20); */
tmptime = secs(); // current time
- tx_duration = tmptime + 10.0; // set timeout to 10 s in future
+ tx_duration = tmptime + testDuration; // set timeout to test duration s in future
+
+ for (i = 0; i < threadsNum; ++i) {
+ writeWorkerArgs[i].connectionId = i + 1;
+ writeWorkerArgs[i].socketDescriptor = xmitsfd[i];
+ writeWorkerArgs[i].stopTime = tx_duration;
+ writeWorkerArgs[i].buff = buff;
+ }
+

log_println(6, "S2C child %d beginning test", testOptions->child0);

- while (secs() < tx_duration) {
+ if (threadsNum == 1) {
+ while (secs() < tx_duration) {
// Increment total attempts at sending-> buffer control
bufctrlattempts++;
- if (options->avoidSndBlockUp) { // Do not block send buffers
- pthread_mutex_lock(&mainmutex);
+ if (options->avoidSndBlockUp) { // Do not block send buffers
+ pthread_mutex_lock(&mainmutex);

- // get details of next sequence # to be sent and fetch value from
- // snap file
+ // get details of next sequence # to be sent and fetch value from snap file
#if USE_WEB100
- web100_agent_find_var_and_group(agent, "SndNxt", &group,
- &var);
- web100_snap_read(var, snapArgs.snap, tmpstr);
- nextseqtosend = atoi(
- web100_value_to_text(web100_get_var_type(var),
- tmpstr));
- // get oldest un-acked sequence number
- web100_agent_find_var_and_group(agent, "SndUna", &group,
- &var);
- web100_snap_read(var, snapArgs.snap, tmpstr);
- lastunackedseq = atoi(
- web100_value_to_text(web100_get_var_type(var),
- tmpstr));
+ web100_agent_find_var_and_group(agent, "SndNxt", &group, &var);
+ web100_snap_read(var, snapArgs.snap, tmpstr);
+ nextseqtosend = atoi(web100_value_to_text(web100_get_var_type(var), tmpstr));
+ // get oldest un-acked sequence number
+ web100_agent_find_var_and_group(agent, "SndUna", &group, &var);
+ web100_snap_read(var, snapArgs.snap, tmpstr);
+ lastunackedseq = atoi(web100_value_to_text(web100_get_var_type(var), tmpstr));
#elif USE_WEB10G
- struct estats_val value;
- web10g_find_val(snapArgs.snap, "SndNxt", &value);
- nextseqtosend = value.uv32;
- web10g_find_val(snapArgs.snap, "SndUna", &value);
- lastunackedseq = value.uv32;
+ struct estats_val value;
+ web10g_find_val(snapArgs.snap, "SndNxt", &value);
+ nextseqtosend = value.uv32;
+ web10g_find_val(snapArgs.snap, "SndUna", &value);
+ lastunackedseq = value.uv32;
#endif
- pthread_mutex_unlock(&mainmutex);
+ pthread_mutex_unlock(&mainmutex);

- // Temporarily stop sending data if you sense that the buffer is
- // overwhelmed
- // This is calculated by checking if (8192 * 4) <
- // ((Next Sequence Number To Be Sent) -
- // (Oldest Unacknowledged Sequence Number) - 1)
- if (is_buffer_clogged(nextseqtosend, lastunackedseq)) {
- // Increment draining queue value
- drainingqueuecount++;
+ // Temporarily stop sending data if you sense that the buffer is overwhelmed
+ // This is calculated by checking if (8192 * 4) < ((Next Sequence Number To Be Sent) - (Oldest Unacknowledged Sequence Number) - 1)
+ if (is_buffer_clogged(nextseqtosend, lastunackedseq)) {
+ // Increment draining queue value
+ drainingqueuecount++;
+ continue;
+ }
+ }
+
+ // attempt to write random data into the client socket
+ n = write(xmitsfd[0], buff, RECLTH);
+ // socket interrupted, continue attempting to write
+ if ((n == -1) && (errno == EINTR))
continue;
+ if (n < 0)
+ break; // all data written. Exit
+ bytes_written += n;
+
+ if (options->avoidSndBlockUp) {
+ bufctlrnewdata++; // increment "sent data" queue
+ }
+ } // Completed end of trying to transmit data for the goodput test
+ }
+ else {
+ for (i = 0; i < threadsNum; ++i) {
+ if (pthread_create(&writeWorkerIds[i], NULL, s2cWriteWorker, (void*) &writeWorkerArgs[i])) {
+ log_println(0, "Cannot create write worker thread for throughput download test!");
+ writeWorkerIds[i] = 0;
+ return -4;
}
}

- // attempt to write random data into the client socket
- n = write(xmitsfd, buff, RECLTH);
- // socket interrupted, continue attempting to write
- if ((n == -1) && (errno == EINTR))
- continue;
- if (n < 0)
- break; // all data written. Exit
- bytes_written += n;
-
- if (options->avoidSndBlockUp) {
- bufctlrnewdata++; // increment "sent data" queue
+ for (i = 0; i < threadsNum; ++i) {
+ pthread_join(writeWorkerIds[i], NULL);
}
- } // Completed end of trying to transmit data for the goodput test
+ }
+
/* alarm(10); */
sigaction(SIGALRM, &old, NULL);
- sndqueue = sndq_len(xmitsfd);
+ sndqueue = sndq_len(xmitsfd[0]);

// finalize the midbox test ; disabling socket used for throughput test
log_println(6, "S2C child %d finished test", testOptions->child0);
- shutdown(xmitsfd, SHUT_WR); /* end of write's */
+ for (i = 0; i < threadsNum; i++) {
+ shutdown(xmitsfd[i], SHUT_WR); /* end of write's */
+ }

// get actual time duration during which data was transmitted
tx_duration = secs() - tmptime;
@@ -503,13 +556,14 @@
s2c_childpid);
}

-
+ for (i = 0; i < threadsNum; ++i) {
#if USE_WEB100
- web100_snap(rsnap);
- web100_snap(tsnap);
+ web100_snap(rsnap[i]);
+ web100_snap(tsnap[i]);
#elif USE_WEB10G
- estats_read_vars(snap, conn, agent);
+ estats_read_vars(snap[i], conn[i], agent);
#endif
+ }

log_println(1, "sent %d bytes to client in %0.2f seconds",
(int) bytes_written, tx_duration);
@@ -594,14 +648,20 @@

#if USE_WEB100
// send web100 data to client
- ret = tcp_stat_get_data(tsnap, xmitsfd, ctlsockfd, agent, count_vars, testOptions->json_support);
- web100_snapshot_free(tsnap);
+ ret = tcp_stat_get_data(tsnap, xmitsfd, threadsNum, ctlsockfd, agent, count_vars, testOptions->json_support);
+ for (i = 0; i < threadsNum; ++i) {
+ web100_snapshot_free(tsnap[i]);
+ }
// send tuning-related web100 data collected to client
- ret = tcp_stat_get_data(rsnap, xmitsfd, ctlsockfd, agent, count_vars, testOptions->json_support);
- web100_snapshot_free(rsnap);
+ ret = tcp_stat_get_data(rsnap, xmitsfd, threadsNum, ctlsockfd, agent, count_vars, testOptions->json_support);
+ for (i = 0; i < threadsNum; ++i) {
+ web100_snapshot_free(rsnap[i]);
+ }
#elif USE_WEB10G
- ret = tcp_stat_get_data(snap, xmitsfd, ctlsockfd, agent, count_vars, testOptions->json_support);
- estats_val_data_free(&snap);
+ ret = tcp_stat_get_data(snap, xmitsfd, threadsNum, ctlsockfd, agent, count_vars, testOptions->json_support);
+ for (i = 0; i < threadsNum; ++i) {
+ estats_val_data_free(&snap[i]);
+ }
#endif

// If sending web100 variables above failed, indicate to client
@@ -655,10 +715,30 @@
return -3;
}
*s2cspd = atoi(buff); // save Throughput value as seen by client
+#ifdef EXTTESTS_ENABLED
+ if (testOptions->exttestsopt && options->dthroughputsnaps) {
+ char* strtokptr = strtok(buff, " ");
+ while ((strtokptr = strtok(NULL, " ")) != NULL) {
+ if (lastThroughputSnapshot != NULL) {
+ lastThroughputSnapshot->next = (struct throughputSnapshot*) malloc(sizeof(struct throughputSnapshot));
+ lastThroughputSnapshot = lastThroughputSnapshot->next;
+ }
+ else {
+ dThroughputSnapshots = lastThroughputSnapshot = (struct throughputSnapshot*) malloc(sizeof(struct throughputSnapshot));
+ }
+ lastThroughputSnapshot->next = NULL;
+ lastThroughputSnapshot->time = atof(strtokptr);
+ strtokptr = strtok(NULL, " ");
+ lastThroughputSnapshot->throughput = atof(strtokptr);
+ }
+ }
+#endif
log_println(6, "S2CSPD from client %f", *s2cspd);
// Final activities of ending tests. Close sockets, file descriptors,
// send finalise message to client
- close(xmitsfd);
+ for (i = 0; i < threadsNum; i++) {
+ close(xmitsfd[i]);
+ }
if (send_json_message(ctlsockfd, TEST_FINALIZE, "", 0,
testOptions->json_support, JSON_SINGLE_VALUE) < 0)
log_println(6,
@@ -679,3 +759,32 @@
}
return 0;
}
+
+void* s2cWriteWorker(void* arg) {
+ S2CWriteWorkerArgs *workerArgs = (S2CWriteWorkerArgs*) arg;
+ int connectionId = workerArgs->connectionId;
+ int socketDescriptor = workerArgs->socketDescriptor;
+ double stopTime = workerArgs->stopTime;
+ char* threadBuff = workerArgs->buff;
+ double threadBytes = 0;
+ int threadPackets = 0, n;
+ double threadTime = secs();
+
+
+ while (secs() < stopTime) {
+ // attempt to write random data into the client socket
+ n = write(socketDescriptor, threadBuff, RECLTH); // TODO avoid snd block
+ // socket interrupted, continue attempting to write
+ if ((n == -1) && (errno == EINTR))
+ continue;
+ if (n < 0)
+ break; // all data written. Exit
+ threadPackets++;
+ threadBytes += n;
+ }
+
+ log_println(6, " ---S->C thread %d (sc %d): speed=%0.0f, bytes=%0.0f, pkts=%d, lth=%d, time=%0.0f", connectionId, socketDescriptor,
+ ((BITS_8_FLOAT * threadBytes) / KILO) / (secs() - threadTime), threadBytes, threadPackets, RECLTH, secs() - threadTime);
+
+ return NULL;
+}
=======================================
--- /branches/MultiplePorts/src/testoptions.c Wed May 28 11:17:18 2014 UTC
+++ /branches/MultiplePorts/src/testoptions.c Wed Nov 5 10:25:27 2014 UTC
@@ -289,7 +289,10 @@
// set_protologfile(get_remotehost(), protologlocalarr);

if (!(useropt
- & (TEST_MID | TEST_C2S | TEST_S2C | TEST_SFW | TEST_STATUS
+ & (TEST_MID | TEST_C2S | TEST_S2C | TEST_SFW | TEST_STATUS
+#ifdef EXTTESTS_ENABLED
+ | TEST_EXT
+#endif
| TEST_META))) {
// message received does not indicate a valid test!
send_json_message(ctlsockfd, MSG_ERROR, invalid_test_suite,
@@ -422,62 +425,6 @@
estats_val_data_free(&snapArgs_ptr->snap);
#endif
}
-
-/**
- * Start packet tracing for this client
- * @param socketfdarg socket file descriptor to initialize packet trace from
- * @param socketfdarg socket file descriptor to close
- * @param childpid pid resulting from fork()
- * @param imonarg int array of socket fd from pipe
- * @param cliaddrarg sock_addr used to determine client IP
- * @param clilenarg socket length
- * @param device device type
- * @param pairarg portpair struct instance
- * @param testindicator string indicating C2S/S2c test
- * @param iscompressionenabled is compression enabled (for log files)?
- * @param copylocationarg memory location to copy resulting string (from packet trace) into
- * */
-
-void start_packet_trace(int socketfdarg, int socketfdarg2, pid_t *childpid,
- int *imonarg, struct sockaddr *cliaddrarg,
- socklen_t clilenarg, char* device, PortPair* pairarg,
- const char* testindicatorarg, int iscompressionenabled,
- char *copylocationarg) {
- char tmpstr[256];
- int i, readretval;
-
- I2Addr src_addr = I2AddrByLocalSockFD(get_errhandle(), socketfdarg, 0);
-
- if ((*childpid = fork()) == 0) {
- close(socketfdarg2);
- close(socketfdarg);
- log_println(0, "%s test Child thinks pipe() returned fd0=%d, fd1=%d",
- testindicatorarg, imonarg[0], imonarg[1]);
-
- init_pkttrace(src_addr, cliaddrarg, clilenarg, imonarg, device,
- pairarg, testindicatorarg, iscompressionenabled);
-
- exit(0); // Packet trace finished, terminate gracefully
- }
- memset(tmpstr, 0, 256);
- for (i = 0; i < 5; i++) { // read nettrace file name into "tmpstr"
- readretval = read(imonarg[0], tmpstr, 128);
- if ((readretval == -1) && (errno == EINTR))
- // socket interrupted, try reading again
- continue;
- break;
- }
-
- // name of nettrace file passed back from pcap child copied into meta
- // structure
- if (strlen(tmpstr) > 5) {
- memcpy(copylocationarg, tmpstr, strlen(tmpstr));
- log_println(8, "**start pkt trace, memcopied dir name %s", tmpstr);
- }
-
- // free this address now.
- I2AddrFree(src_addr);
-}

/**
* Stop packet tracing activity.
=======================================
--- /branches/MultiplePorts/src/testoptions.h Wed May 28 11:17:18 2014 UTC
+++ /branches/MultiplePorts/src/testoptions.h Wed Nov 5 10:25:27 2014 UTC
@@ -47,6 +47,7 @@
int State; // seems unused currently

int metaopt; // meta test to be perfomed?
+ int exttestsopt; // extended tests supported?
} TestOptions;

// Snap log characteristics
@@ -66,8 +67,8 @@

int test_sfw_srv(int ctlsockfd, tcp_stat_agent* agent, TestOptions* options,
int conn_options);
-int test_meta_srv(int ctlsockfd, tcp_stat_agent* agent, TestOptions* options,
- int conn_options);
+int test_meta_srv(int ctlsockfd, tcp_stat_agent* agent, TestOptions* testOptions,
+ int conn_options, Options* options);

int getCurrentTest();
void setCurrentTest(int testId);
=======================================
--- /branches/MultiplePorts/src/usage.c Tue Mar 25 09:36:53 2014 UTC
+++ /branches/MultiplePorts/src/usage.c Wed Nov 5 10:25:27 2014 UTC
@@ -70,6 +70,7 @@
printf(" (default %s/serverdata)\n", BASEDIR);
printf(" -S, --logfacility #F - specify syslog facility name\n");
printf(" Note: this doesn't enable 'syslog'\n\n");
+ printf(" --savewebvalues - enable web values writing to a separate file\n\n");
#ifdef EXPERIMENTAL_ENABLED
printf(" Experimental code:\n\n");
printf(" --avoidsndblockup - enable code to avoid send buffers blocking in the S2C test\n");
@@ -81,6 +82,23 @@
printf(" --cputime - enable the cputime writing\n");
printf(" -y, --limit #limit - enable the throughput limiting code\n\n");
#endif
+#if EXTTESTS_ENABLED
+ printf(" Extended tests code:\n\n");
+ printf(" --uduration #msec - specify upload test duration (default 10000 msec)\n");
+ printf(" --uthroughputsnaps - enable the throughput snapshots for upload test writing\n");
+ printf(" --usnapsdelay #msec - specify the delay in the throughput snapshots thread for upload test (default 5000 msec)\n");
+ printf(" Note: this doesn't enable 'uthroughputsnaps'\n");
+ printf(" --usnapsoffset #msec - specify the initial offset in the throughput snapshots thread for upload test (default 1000 msec)\n");
+ printf(" Note: this doesn't enable 'uthroughputsnaps'\n");
+ printf(" --uthreadsnum #num - specify the number of threads (parallel TCP connections) for upload test (default 1 thread, maximum 7)\n");
+ printf(" --dduration #msec - specify download test duration (default 10000 msec)\n");
+ printf(" --dthroughputsnaps - enable the throughput snapshots for download test writing\n");
+ printf(" --dsnapsdelay #msec - specify the delay in the throughput snapshots thread for download test (default 5000 msec)\n");
+ printf(" Note: this doesn't enable 'dthroughputsnaps'\n");
+ printf(" --dsnapsoffset #msec - specify the initial offset in the throughput snapshots thread for download test (default 1000 msec)\n");
+ printf(" Note: this doesn't enable 'dthroughputsnaps'\n");
+ printf(" --dthreadsnum #num - specify the number of threads (parallel TCP connections) for download test (default 1 thread, maximum 7)\n\n");
+#endif
#if defined(HAVE_ODBC) && defined(DATABASE_ENABLED) && defined(HAVE_SQL_H)
printf(" Database support:\n\n");
printf(" --enableDBlogging - enable the test results logging to the database\n");
=======================================
--- /branches/MultiplePorts/src/web100-pcap.c Wed Mar 12 06:27:30 2014 UTC
+++ /branches/MultiplePorts/src/web100-pcap.c Wed Nov 5 10:25:27 2014 UTC
@@ -696,12 +696,12 @@
* @param device devive detail string
* @param pair PortPair strcuture
* @param direction string indicating C2S/S2c test
- * @param compress Option indicating whether log files (here, ndttrace) needs to be compressed. Unused here.
+ * @param expectedTestTime expected test time duration + 50seconds
*/

void init_pkttrace(I2Addr srcAddr, struct sockaddr *sock_addr,
socklen_t saddrlen, int monitor_pipe[2], char *device,
- PortPair* pair, const char *direction, int compress) {
+ PortPair* pair, const char *direction, int expectedTestTime) {
char cmdbuf[256], dir[256];
pcap_handler printer;
u_char * pcap_userdata = (u_char*) pair;
@@ -964,7 +964,7 @@
}

/* kill process off if parent doesn't send a signal. */
- alarm(60);
+ alarm(50 + expectedTestTime);

if (pcap_loop(pd, cnt, printer, pcap_userdata) < 0) {
log_println(5, "pcap_loop exited %s", pcap_geterr(pd));
=======================================
--- /branches/MultiplePorts/src/web100-util.c Wed Jun 4 10:51:11 2014 UTC
+++ /branches/MultiplePorts/src/web100-util.c Wed Nov 5 10:25:27 2014 UTC
@@ -125,8 +125,9 @@
// remove unwanted chars (right now, trailing/preceding chars from wb100
// var names)
trim(line, strlen(line), trimmedline, sizeof(trimmedline));
- strlcpy(web_vars[count_vars].name, trimmedline,
- sizeof(web_vars[count_vars].name));
+ for (i = 0; i < 7; ++i) {
+ strlcpy(web_vars[i][count_vars].name, trimmedline, sizeof(web_vars[i][count_vars].name));
+ }
count_vars++;
}
fclose(fp);
@@ -462,10 +463,8 @@
#if USE_WEB100
int ok = 1;
for (i = 0; i < count_vars; i++) {
- if ((web100_agent_find_var_and_group(agent, web_vars[i].name, &group,
- &var)) != WEB100_ERR_SUCCESS) {
- log_println(1, "Variable %d (%s) not found in KIS", i,
- web_vars[i].name);
+ if ((web100_agent_find_var_and_group(agent, web_vars[0][i].name, &group, &var)) != WEB100_ERR_SUCCESS) {
+ log_println(1, "Variable %d (%s) not found in KIS", i, web_vars[0][i].name);
ok = 0;
continue;
}
@@ -483,11 +482,10 @@
continue;
}
if (ok == 1) {
- snprintf(web_vars[i].value, sizeof(web_vars[i].value), "%s",
- web100_value_to_text(web100_get_var_type(var), buf));
+ snprintf(web_vars[0][i].value, sizeof(web_vars[0][i].value), "%s", web100_value_to_text(web100_get_var_type(var), buf));
if (fp)
- fprintf(fp, "%d;", (int32_t) atoi(web_vars[i].value));
- log_println(9, "%s: %d", web_vars[i].name, atoi(web_vars[i].value));
+ fprintf(fp, "%d;", (int32_t) atoi(web_vars[0][i].value));
+ log_println(9, "%s: %d", web_vars[0][i].name, atoi(web_vars[0][i].value));
}
ok = 1;
}
@@ -524,9 +522,9 @@
#if USE_WEB10G
/* Persistent storage needed. These are filled by tcp_stat_get_data
* and later read by tcp_stat_logvars and free()'d */
-static estats_val_data* dataDumpSave = NULL;
-static int X_SndBuf = -1;
-static int X_RcvBuf = -1;
+static estats_val_data* dataDumpSave[7] = {NULL, NULL, NULL, NULL, NULL, NULL, NULL};
+static int X_SndBuf[7] = {-1, -1, -1, -1, -1, -1, -1};
+static int X_RcvBuf[7] = {-1, -1, -1, -1, -1, -1, -1};
#endif


@@ -594,11 +592,11 @@
* @param jsonSupport indicates if messages should be sent usin JSON format
*
*/
-int tcp_stat_get_data(tcp_stat_snap* snap, int testsock, int ctlsock,
+int tcp_stat_get_data(tcp_stat_snap** snap, int* testsock, int threadsNum, int ctlsock,
tcp_stat_agent* agent, int count_vars, int jsonSupport) {
char line[256];
#if USE_WEB100
- int i;
+ int i, t;
web100_var* var;
web100_group* group;
char buf[32];
@@ -606,208 +604,194 @@
assert(snap);
assert(agent);

- for (i = 0; i < count_vars; i++) {
- if ((web100_agent_find_var_and_group(agent, web_vars[i].name, &group,
- &var)) != WEB100_ERR_SUCCESS) {
- log_println(9, "Variable %d (%s) not found in KIS: ", i,
- web_vars[i].name);
- continue;
- }
+ for (t = 0; t < threadsNum; ++t) {
+ assert(snap[t]);

- // if no snapshot provided, no way to get values
- if (snap == NULL) {
- fprintf(stderr,
- "Web100_get_data() failed, return to testing routine\n");
- log_println(6,
- "Web100_get_data() failed, return to testing routine\n");
- return (-1);
- }
+ for (i = 0; i < count_vars; i++) {
+ if ((web100_agent_find_var_and_group(agent, web_vars[t][i].name, &group, &var)) != WEB100_ERR_SUCCESS) {
+ log_println(9, "Variable %d (%s) not found in KIS: ", i, web_vars[t][i].name);
+ continue;
+ }

- // handle an unsuccessful data retrieval
- if ((web100_snap_read(var, snap, buf)) != WEB100_ERR_SUCCESS) {
- if (get_debuglvl() > 9) {
- log_print(9, "Variable %d (%s): ", i, web_vars[i].name);
- web100_perror("web100_snap_read()");
+ // if no snapshot provided, no way to get values
+ if (snap[t] == NULL) {
+ fprintf(stderr, "Web100_get_data() failed, return to testing routine\n");
+ log_println(6, "Web100_get_data() failed, return to testing routine\n");
+ return (-1);
}
- continue;
+
+ // handle an unsuccessful data retrieval
+ if ((web100_snap_read(var, snap[t], buf)) != WEB100_ERR_SUCCESS) {
+ if (get_debuglvl() > 9) {
+ log_print(9, "Variable %d (%s): ", i, web_vars[t][i].name);
+ web100_perror("web100_snap_read()");
+ }
+ continue;
+ }
+
+ // assign values and transmit message with all web100 variables to socket receiver end
+ snprintf(web_vars[t][i].value, sizeof(web_vars[t][i].value), "%s", web100_value_to_text(web100_get_var_type(var), buf));
+ /* Why do we atoi after getting as text anyway ?? */
+ if (t == 0) {
+ snprintf(line, sizeof(line), "%s: %d\n", web_vars[t][i].name, atoi(web_vars[t][i].value));
+ send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
+ log_print(9, "%s", line);
+ }
}
-
- // assign values and transmit message with all web100 variables to socket
- // receiver end
- snprintf(web_vars[i].value, sizeof(web_vars[i].value), "%s",
- web100_value_to_text(web100_get_var_type(var), buf));
- /* Why do we atoi after getting as text anyway ?? */
- snprintf(line, sizeof(line), "%s: %d\n", web_vars[i].name,
- atoi(web_vars[i].value));
- send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
- log_print(9, "%s", line);
}
log_println(6, "S2C test - Send web100 data to client pid=%d", getpid());
return (0);
#elif USE_WEB10G
- int j;
+ int j, t;
unsigned int xbuf_size;
struct estats_val val;
estats_error* err;
-
- xbuf_size = sizeof(X_RcvBuf);
- if (getsockopt(testsock,
- SOL_SOCKET, SO_RCVBUF, (void *)&X_RcvBuf, &xbuf_size) != 0) {
- log_println(0, "Error: failed to getsockopt() SO_RCVBUF");
- }
- xbuf_size = sizeof(X_SndBuf);
- if (getsockopt(testsock,
- SOL_SOCKET, SO_SNDBUF, (void *)&X_SndBuf, &xbuf_size) != 0) {
- log_println(0, "Error: failed to getsockopt() SO_RCVBUF");
- }

assert(snap);

- /* Need to save this for later */
- estats_val_data_new(&dataDumpSave);
- memcpy(dataDumpSave, snap, sizeof(struct estats_val_data)
- + (sizeof(struct estats_val) * snap->length));
+ for (t = 0; t < threadsNum; ++t) {
+ xbuf_size = sizeof(X_RcvBuf[t]);
+ if (getsockopt(testsock[t], SOL_SOCKET, SO_RCVBUF, (void *)&X_RcvBuf[t], &xbuf_size) != 0) {
+ log_println(0, "Error: failed to getsockopt() SO_RCVBUF");
+ }
+ xbuf_size = sizeof(X_SndBuf[t]);
+ if (getsockopt(testsock[t], SOL_SOCKET, SO_SNDBUF, (void *)&X_SndBuf[t], &xbuf_size) != 0) {
+ log_println(0, "Error: failed to getsockopt() SO_RCVBUF");
+ }

- for (j = 0; j < snap->length; j++) {
- char *str;
- if (snap->val[j].masked) continue;
+ assert(snap[t]);

- if ((err = estats_val_as_string(&str, &snap->val[j],
- estats_var_array[j].valtype)) != NULL) {
- log_println(0, "In tcp_stat_get_data() estats_val_as_string()"
- " failed for %s", estats_var_array[j].name);
- estats_error_print(stderr, err);
- estats_error_free(&err);
- continue;
- }
- snprintf(line, sizeof(line), "%s: %s\n",
- estats_var_array[j].name, str);
- send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
- log_print(9, "%s", line);
- free(str);
- str = NULL;
- }
+ /* Need to save this for later */
+ estats_val_data_new(&dataDumpSave[t]);
+ memcpy(dataDumpSave[t], snap[t], sizeof(struct estats_val_data) + (sizeof(struct estats_val) * snap[t]->length));

- /* This is the list of changed variable names that the client tries to read.
- * Web100 -> Web10g
- * ECNEnabled -> ECN
- * NagleEnabled -> Nagle
- * SACKEnabled -> WillSendSACK & WillUseSACK
- * TimestampsEnabled -> TimeStamps
- * PktsRetrans -> SegsRetrans
- * X_Rcvbuf -> Not in web10g not used by client but send anyway
- * X_Sndbuf -> Not in web10g but could be interesting for the client
- * DataPktsOut -> DataSegsOut
- * AckPktsOut -> Depreciated
- * MaxCwnd -> MaxSsCwnd MaxCaCwnd
- * SndLimTimeSender -> SndLimTimeSnd
- * DataBytesOut -> DataOctetsOut
- * AckPktsIn -> Depreciated
- * SndLimTransSender -> SndLimTransSnd
- * PktsOut -> SegsOut
- * CongestionSignals -> CongSignals
- * RcvWinScale -> Same as WinScaleSent if WinScaleSent != -1
- */
- static const char* frame_web100 = "-~~~Web100_old_var_names~~~-: 1\n";
- int type;
- char *str = NULL;
- send_json_message(ctlsock, TEST_MSG, frame_web100, strlen(frame_web100), jsonSupport, JSON_SINGLE_VALUE);
+ if (t == 0) {
+ for (j = 0; j < snap[t]->length; j++) {
+ char *str;
+ if (snap[t]->val[j].masked) continue;

- /* ECNEnabled -> ECN */
- type = web10g_find_val(snap, "ECN", &val);
- if (type != ESTATS_SIGNED32) {
- log_println(0, "In tcp_stat_get_data(), web10g_find_val()"
- " failed to find ECN bad type=%d", type);
- } else {
- snprintf(line, sizeof(line), "ECNEnabled: %"PRId32"\n", (val.sv32 == 1) ? 1 : 0);
- send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
- }
+ if ((err = estats_val_as_string(&str, &snap[t]->val[j], estats_var_array[j].valtype)) != NULL) {
+ log_println(0, "In tcp_stat_get_data() estats_val_as_string() failed for %s", estats_var_array[j].name);
+ estats_error_print(stderr, err);
+ estats_error_free(&err);
+ continue;
+ }
+ snprintf(line, sizeof(line), "%s: %s\n", estats_var_array[j].name, str);
+ send_msg(ctlsock, TEST_MSG, (const void *) line, strlen(line));
+ log_print(9, "%s", line);
+ free(str);
+ str = NULL;
+ }

- /* NagleEnabled -> Nagle */
- type = web10g_find_val(snap, "Nagle", &val);
- if (type != ESTATS_SIGNED32) {
- log_println(0, "In tcp_stat_get_data(), web10g_find_val()"
- " failed to find Nagle bad type=%d", type);
- } else {
- snprintf(line, sizeof(line), "NagleEnabled: %"PRId32"\n", (val.sv32 == 2) ? 1 : 0);
- send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
- }
+ /* This is the list of changed variable names that the client tries to read.
+ * Web100 -> Web10g
+ * ECNEnabled -> ECN
+ * NagleEnabled -> Nagle
+ * SACKEnabled -> WillSendSACK & WillUseSACK
+ * TimestampsEnabled -> TimeStamps
+ * PktsRetrans -> SegsRetrans
+ * X_Rcvbuf -> Not in web10g not used by client but send anyway
+ * X_Sndbuf -> Not in web10g but could be interesting for the client
+ * DataPktsOut -> DataSegsOut
+ * AckPktsOut -> Depreciated
+ * MaxCwnd -> MaxSsCwnd MaxCaCwnd
+ * SndLimTimeSender -> SndLimTimeSnd
+ * DataBytesOut -> DataOctetsOut
+ * AckPktsIn -> Depreciated
+ * SndLimTransSender -> SndLimTransSnd
+ * PktsOut -> SegsOut
+ * CongestionSignals -> CongSignals
+ * RcvWinScale -> Same as WinScaleSent if WinScaleSent != -1
+ */
+ static const char* frame_web100 = "-~~~Web100_old_var_names~~~-: 1\n";
+ int type;
+ char *str = NULL;
+ send_json_message(ctlsock, TEST_MSG, (const void *)frame_web100, strlen(frame_web100), jsonSupport, JSON_SINGLE_VALUE);

- /* SACKEnabled -> WillUseSACK & WillSendSACK */
- type = web10g_find_val(snap, "WillUseSACK", &val);
- if (type == -1) {
- log_println(0, "In tcp_stat_get_data(), web10g_find_val()"
- " failed to find WillUseSACK bad type=%d", type);
- } else {
- /* Yes this comes through as 3 from web100 */
- snprintf(line, sizeof(line), "SACKEnabled: %d\n", (val.sv32 == 1) ? 3 : 0);
- send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
- }
+ /* ECNEnabled -> ECN */
+ type = web10g_find_val(snap[t], "ECN", &val);
+ if (type != ESTATS_SIGNED32) {
+ log_println(0, "In tcp_stat_get_data(), web10g_find_val() failed to find ECN bad type=%d", type);
+ } else {
+ snprintf(line, sizeof(line), "ECNEnabled: %"PRId32"\n", (val.sv32 == 1) ? 1 : 0);
+ send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
+ }

- /* TimestampsEnabled -> TimeStamps */
- type = web10g_find_val(snap, "TimeStamps", &val);
- if (type != ESTATS_SIGNED32) {
- log_println(0, "In tcp_stat_get_data(), web10g_find_val()"
- " failed to find TimeStamps bad type=%d", type);
- } else {
- snprintf(line, sizeof(line), "TimestampsEnabled: %"PRId32"\n", (val.sv32 == 1) ? 1 : 0);
- send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
- }
+ /* NagleEnabled -> Nagle */
+ type = web10g_find_val(snap[t], "Nagle", &val);
+ if (type != ESTATS_SIGNED32) {
+ log_println(0, "In tcp_stat_get_data(), web10g_find_val() failed to find Nagle bad type=%d", type);
+ } else {
+ snprintf(line, sizeof(line), "NagleEnabled: %"PRId32"\n", (val.sv32 == 2) ? 1 : 0);
+ send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
+ }

- /* PktsRetrans -> SegsRetrans */
- print_10gvar_renamed("SegsRetrans", "PktsRetrans", snap, line,
- sizeof(line), ctlsock, jsonSupport);
+ /* SACKEnabled -> WillUseSACK & WillSendSACK */
+ type = web10g_find_val(snap[t], "WillUseSACK", &val);
+ if (type == -1) {
+ log_println(0, "In tcp_stat_get_data(), web10g_find_val() failed to find WillUseSACK bad type=%d", type);
+ } else {
+ /* Yes this comes through as 3 from web100 */
+ snprintf(line, sizeof(line), "SACKEnabled: %d\n", (val.sv32 == 1) ? 3 : 0);
+ send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
+ }

- /* DataPktsOut -> DataSegsOut */
- print_10gvar_renamed("DataSegsOut", "DataPktsOut", snap, line,
- sizeof(line), ctlsock, jsonSupport);
+ /* TimestampsEnabled -> TimeStamps */
+ type = web10g_find_val(snap[t], "TimeStamps", &val);
+ if (type != ESTATS_SIGNED32) {
+ log_println(0, "In tcp_stat_get_data(), web10g_find_val() failed to find TimeStamps bad type=%d", type);
+ } else {
+ snprintf(line, sizeof(line), "TimestampsEnabled: %"PRId32"\n", (val.sv32 == 1) ? 1 : 0);
+ send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
+ }

- /* MaxCwnd -> MAX(MaxSsCwnd, MaxCaCwnd) */
- print_10gvar_renamed("MaxCwnd", "MaxCwnd", snap, line,
- sizeof(line), ctlsock, jsonSupport);
+ /* PktsRetrans -> SegsRetrans */
+ print_10gvar_renamed("SegsRetrans", "PktsRetrans", snap[t], line, sizeof(line), ctlsock);

- /* SndLimTimeSender -> SndLimTimeSnd */
- print_10gvar_renamed("SndLimTimeSnd", "SndLimTimeSender", snap, line,
- sizeof(line), ctlsock, jsonSupport);
+ /* DataPktsOut -> DataSegsOut */
+ print_10gvar_renamed("DataSegsOut", "DataPktsOut", snap[t], line, sizeof(line), ctlsock);

- /* DataBytesOut -> DataOctetsOut */
- print_10gvar_renamed("HCDataOctetsOut", "DataBytesOut", snap, line,
- sizeof(line), ctlsock, jsonSupport);
+ /* MaxCwnd -> MAX(MaxSsCwnd, MaxCaCwnd) */
+ print_10gvar_renamed("MaxCwnd", "MaxCwnd", snap[t], line, sizeof(line), ctlsock);

- /* SndLimTransSender -> SndLimTransSnd */
- print_10gvar_renamed("SndLimTransSnd", "SndLimTransSender", snap, line,
- sizeof(line), ctlsock, jsonSupport);
+ /* SndLimTimeSender -> SndLimTimeSnd */
+ print_10gvar_renamed("SndLimTimeSnd", "SndLimTimeSender", snap[t], line, sizeof(line), ctlsock);

- /* PktsOut -> SegsOut */
- print_10gvar_renamed("SegsOut", "PktsOut", snap, line,
- sizeof(line), ctlsock, jsonSupport);
+ /* DataBytesOut -> DataOctetsOut */
+ print_10gvar_renamed("HCDataOctetsOut", "DataBytesOut", snap[t], line, sizeof(line), ctlsock);

- /* CongestionSignals -> CongSignals */
- print_10gvar_renamed("CongSignals", "CongestionSignals", snap, line,
- sizeof(line), ctlsock, jsonSupport);
+ /* SndLimTransSender -> SndLimTransSnd */
+ print_10gvar_renamed("SndLimTransSnd", "SndLimTransSender", snap[t], line, sizeof(line), ctlsock);

- /* RcvWinScale -> Same as WinScaleSent if WinScaleSent != -1 */
- type = web10g_find_val(snap, "WinScaleSent", &val);
- if (type == -1) {
- log_println(0, "In tcp_stat_get_data(), web10g_find_val()"
- " failed to find WinScaleSent");
- } else {
- if (val.sv32 == -1)
- snprintf(line, sizeof(line), "RcvWinScale: %u\n", 0);
- else
- snprintf(line, sizeof(line), "RcvWinScale: %d\n", val.sv32);
- send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
- }
+ /* PktsOut -> SegsOut */
+ print_10gvar_renamed("SegsOut", "PktsOut", snap[t], line, sizeof(line), ctlsock);

- /* X_Rcvbuf & X_Sndbuf */
- snprintf(line, sizeof(line), "X_Rcvbuf: %d\n", X_RcvBuf);
- send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
- snprintf(line, sizeof(line), "X_Sndbuf: %d\n", X_SndBuf);
- send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
+ /* CongestionSignals -> CongSignals */
+ print_10gvar_renamed("CongSignals", "CongestionSignals", snap[t], line, sizeof(line), ctlsock);

- send_json_message(ctlsock, TEST_MSG, frame_web100, strlen(frame_web100), jsonSupport, JSON_SINGLE_VALUE);
+ /* RcvWinScale -> Same as WinScaleSent if WinScaleSent != -1 */
+ type = web10g_find_val(snap[t], "WinScaleSent", &val);
+ if (type == -1) {
+ log_println(0, "In tcp_stat_get_data(), web10g_find_val() failed to find WinScaleSent");
+ } else {
+ if (val.sv32 == -1)
+ snprintf(line, sizeof(line), "RcvWinScale: %u\n", 0);
+ else
+ snprintf(line, sizeof(line), "RcvWinScale: %d\n", val.sv32);
+ send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
+ }

- log_println(6, "S2C test - Send web100 data to client pid=%d", getpid());
+ /* X_Rcvbuf & X_Sndbuf */
+ snprintf(line, sizeof(line), "X_Rcvbuf: %d\n", X_RcvBuf[t]);
+ send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
+ snprintf(line, sizeof(line), "X_Sndbuf: %d\n", X_SndBuf[t]);
+ send_json_message(ctlsock, TEST_MSG, line, strlen(line), jsonSupport, JSON_SINGLE_VALUE);
+
+ send_json_message(ctlsock, TEST_MSG, frame_web100, strlen(frame_web100), jsonSupport, JSON_SINGLE_VALUE);
+
+ log_println(6, "S2C test - Send web100 data to client pid=%d", getpid());
+ }
+ }
return 0;
#endif
}
@@ -1020,7 +1004,7 @@
* @param tcp_vars to local copies of tcp_stat variables
* @return integer 0
*/
-int tcp_stat_logvars(struct tcp_vars* vars, int count_vars) {
+int tcp_stat_logvars(struct tcp_vars* vars, int connId, int count_vars) {
#if USE_WEB100
int a, b;
for (a = 0; a < sizeof(struct tcp_vars) / sizeof(tcp_stat_var); ++a) {
@@ -1032,10 +1016,10 @@
int has_AckPktsIn = 0;

for (b = 0; b < count_vars; b++) {
- if (strcmp(web_vars[b].name, web100_name) == 0) {
+ if (strcmp(web_vars[connId][b].name, web100_name) == 0) {
tcp_stat_var* var = &((tcp_stat_var *)vars)[a];
- *var = atoi(web_vars[b].value);
- log_println(5, "Found %s : %i", web100_name, *var);
+ *var = atoi(web_vars[connId][b].value);
+ log_println(5, "Found %s : %"VARtype, web100_name, *var);
break;
}
}
@@ -1046,7 +1030,7 @@
#elif USE_WEB10G
int a;
estats_val val;
- assert(dataDumpSave);
+ assert(dataDumpSave[connId]);

for (a = 0; a < sizeof(struct tcp_vars) / sizeof(tcp_stat_var); ++a) {
const char* web10g_name = tcp_names[a].web10g_name;
@@ -1055,7 +1039,7 @@
continue;

/* Find each item in the list */
- if ((vartype = web10g_find_val(dataDumpSave, web10g_name, &val)) == -1) {
+ if ((vartype = web10g_find_val(dataDumpSave[connId], web10g_name, &val)) == -1) {
log_println(1, "WARNING: Failed to find Web10g var %s", web10g_name);
} else {
tcp_stat_var* var = &((tcp_stat_var *)vars)[a];
@@ -1076,17 +1060,135 @@
*var = (tcp_stat_var) val.uv8;
break;
}
- log_println(5, "Found %s : %i", web10g_name, *var);
+ log_println(5, "Found %s : %"VARtype, web10g_name, *var);
}
}
/* Our special case */
- vars->Sndbuf = X_SndBuf;
+ vars->Sndbuf = X_SndBuf[connId];

- estats_val_data_free(&dataDumpSave);
+ estats_val_data_free(&dataDumpSave[connId]);
#endif
return 0;
}

+void tcp_stat_logvars_to_file(char* webVarsValuesLog, int connNum, struct tcp_vars* vars) {
+ int a, i;
+ for (i = 0; i < connNum; ++i) {
+ char webVarsFileName[256];
+ snprintf(webVarsFileName, strlen(webVarsValuesLog) - 10, "%s", webVarsValuesLog);
+ snprintf(&webVarsFileName[strlen(webVarsFileName)], sizeof(webVarsFileName)-strlen(webVarsFileName),
+ "_%d_%s.log", i+1, TCP_STAT_NAME);
+ FILE* file = fopen(webVarsFileName, "w");
+
+ if (!file) {
+ return;
+ }
+
+ for (a = 0; a < sizeof(struct tcp_vars) / sizeof(tcp_stat_var); ++a) {
+#if USE_WEB100
+ char* var_name = tcp_names[a].web100_name;
+#elif USE_WEB10G
+ char* var_name = tcp_names[a].web10g_name;
+ if ((var_name == NULL) && (strcmp("X_Sndbuf", tcp_names[a].web100_name) == 0)) {
+ var_name = "Sndbuf";
+ }
+#endif
+ tcp_stat_var* var = &((tcp_stat_var *)&vars[i])[a];
+ fprintf(file, "%s: %"VARtype"\n", var_name, *var);
+ }
+
+ fclose(file);
+ }
+}
+
+tcp_stat_var agg_vars_sum(int connNum, int varId, struct tcp_vars* vars) {
+ int i;
+ tcp_stat_var varValue;
+ for (i = 0; i < connNum; ++i) {
+ varValue += *&((tcp_stat_var *)&vars[i])[varId];
+ }
+ return varValue;
+}
+
+tcp_stat_var agg_vars_max(int connNum, int varId, struct tcp_vars* vars) {
+ int i;
+ tcp_stat_var varValue = *&((tcp_stat_var *)&vars[0])[varId];
+ for (i = 1; i < connNum; ++i) {
+ if (*&((tcp_stat_var *)&vars[i])[varId] > varValue) {
+ varValue = *&((tcp_stat_var *)&vars[i])[varId];
+ }
+ }
+ return varValue;
+}
+
+tcp_stat_var agg_vars_min(int connNum, int varId, struct tcp_vars* vars) {
+ int i;
+ tcp_stat_var varValue = *&((tcp_stat_var *)&vars[0])[varId];
+ for (i = 1; i < connNum; ++i) {
+ if (*&((tcp_stat_var *)&vars[i])[varId] < varValue) {
+ varValue = *&((tcp_stat_var *)&vars[i])[varId];
+ }
+ }
+ return varValue;
+}
+
+tcp_stat_var agg_vars_avg(int connNum, int varId, struct tcp_vars* vars) {
+ int i;
+ tcp_stat_var varValue;
+ for (i = 0; i < connNum; ++i) {
+ varValue += *&((tcp_stat_var *)&vars[i])[varId];
+ }
+ return varValue / connNum;
+}
+
+void tcp_stat_log_agg_vars_to_file(char* webVarsValuesLog, int connNum, struct tcp_vars* vars) {
+ int a;
+ tcp_stat_var varValue;
+
+ FILE* file = fopen(webVarsValuesLog, "w");
+
+ if (!file) {
+ return;
+ }
+
+ for (a = 0; a < sizeof(struct tcp_vars) / sizeof(tcp_stat_var); ++a) {
+#if USE_WEB100
+ char* var_name = tcp_names[a].web100_name;
+#elif USE_WEB10G
+ char* var_name = tcp_names[a].web10g_name;
+ if ((var_name == NULL) && (strcmp("X_Sndbuf", tcp_names[a].web100_name) == 0)) {
+ var_name = "Sndbuf";
+ }
+#endif
+ if ((strcmp("MinRTT", tcp_names[a].web100_name) == 0)) {
+ varValue = agg_vars_min(connNum, a, vars);
+ fprintf(file, "MIN(%s): %"VARtype"\n", var_name, varValue);
+ }
+ else if ((strcmp("CurRTO", tcp_names[a].web100_name) == 0)) {
+ varValue = agg_vars_avg(connNum, a, vars);
+ fprintf(file, "AVG(%s): %"VARtype"\n", var_name, varValue);
+ }
+ else if ((strcmp("CurMSS", tcp_names[a].web100_name) == 0) ||
+ (strcmp("MaxRwinRcvd", tcp_names[a].web100_name) == 0) ||
+ (strcmp("X_Sndbuf", tcp_names[a].web100_name) == 0) ||
+ (strcmp("CurCwnd", tcp_names[a].web100_name) == 0) ||
+ (strcmp("MaxSsthresh", tcp_names[a].web100_name) == 0) ||
+ (strcmp("CurRwinRcvd", tcp_names[a].web100_name) == 0) ||
+ (strcmp("MaxCwnd", tcp_names[a].web100_name) == 0) ||
+ (strcmp("RcvWinScale", tcp_names[a].web100_name) == 0) ||
+ (strcmp("SndWinScale", tcp_names[a].web100_name) == 0) ||
+ (strcmp("MaxRTT", tcp_names[a].web100_name) == 0)) {
+ varValue = agg_vars_max(connNum, a, vars);
+ fprintf(file, "MAX(%s): %"VARtype"\n", var_name, varValue);
+ }
+ else {
+ varValue = agg_vars_sum(connNum, a, vars);
+ fprintf(file, "SUM(%s): %"VARtype"\n", var_name, varValue);
+ }
+ }
+
+ fclose(file);
+}

/**
* Routine to read snaplog file and determine the number of times the
=======================================
--- /branches/MultiplePorts/src/web100clt.c Wed Jun 4 10:51:11 2014 UTC
+++ /branches/MultiplePorts/src/web100clt.c Wed Nov 5 10:25:27 2014 UTC
@@ -237,6 +237,10 @@
// client and server's views of link speed
print_linkspeed_dataacks((tests & TEST_C2S), c2sData,
c2sAck, s2cData, s2cAck);
+
+#ifdef EXTTESTS_ENABLED
+ print_throughput_snapshots(dThroughputSnapshots, uThroughputSnapshots);
+#endif
}
} else {
printf("No %s data collected! Possible Duplex Mismatch condition "
@@ -519,6 +523,9 @@
// 16384 = 2 * BUFFSIZE
// which tests have been selectedto be performed?
unsigned char tests = TEST_MID | TEST_C2S | TEST_S2C | TEST_SFW |
+#ifdef EXTTESTS_ENABLED
+ TEST_EXT |
+#endif
TEST_STATUS | TEST_META;
int ctlSocket; // socket fd
int ctlport = atoi(PORT); // default port number
@@ -532,6 +539,10 @@
int conn_options = 0; // connection options received from user
int debug = 0; // debug flag
int testId; // test ID received from server
+#ifdef EXTTESTS_ENABLED
+ int serverWithExtendedTests = 0;
+ char testsBuff[32];
+#endif
int jsonSupport = 1; // indicates if client should sent messages in JSON format
int retry = 0; // flag set after invalid login message is being received
char *invalid_login_msg = "Invalid login message.";
@@ -661,6 +672,12 @@
if (tests & TEST_STATUS) {
log_println(1, "* New Client, implements queuing feedback");
}
+
+#ifdef EXTTESTS_ENABLED
+ if (tests & TEST_EXT) {
+ log_println(1, "* Extended tests supported");
+ }
+#endif

log_println(1, "Requesting test suite:");
if (tests & TEST_MID) {
@@ -812,7 +829,7 @@
/* add alarm() signal to kill off client if the server never finishes the tests
* RAC 7/13/09
*/
- alarm(90);
+ alarm(300);

// Tests can be started. Read server response again.
// The server must send MSG_LOGIN message to verify version.
@@ -845,7 +862,6 @@
log_println(0, "Incompatible version number");
exit(4);
}
- log_println(5, "Server version: %s", &buff[1]);

ServerType = "Web100";
if (strlen(buff) > 8) {
@@ -857,6 +873,18 @@
buff[strlen(buff) - 7] = '\0';
}
}
+
+ log_println(5, "Server version: %s, type: %s", &buff[1], ServerType);
+#ifdef EXTTESTS_ENABLED
+ if (strlen(buff) > 3) {
+ if (strcmp(&buff[strlen(buff) - 3], "-et") == 0) {
+ serverWithExtendedTests = 1;
+ buff[strlen(buff) - 3] = 0;
+ }
+ }
+
+ log_println(5, "Server supports extended tests: %s", serverWithExtendedTests ? "true" : "false");
+#endif

log_println(5, "Compare versions. Server:%s Client:%s Compare result: %i", &buff[1], VERSION, strcmp(&buff[1], VERSION));
if (strcmp(&buff[1], VERSION)) { //older server did not send type server at the end
@@ -895,6 +923,16 @@
}
ptr = strtok_r(buff, " ", &strtokbuf);

+#ifdef EXTTESTS_ENABLED
+ if (!serverWithExtendedTests) {
+ snprintf(testsBuff, sizeof(testsBuff), "%d", tests);
+ tests &= (~TEST_EXT);
+ if (strlen(testsBuff) > 2) {
+ ptr = strtok_r(NULL, " ", &strtokbuf);
+ }
+ }
+#endif
+
// Run all tests requested, based on the ID.
while (ptr) {
if (check_int(ptr, &testId)) {
=======================================
--- /branches/MultiplePorts/src/web100srv.c Wed May 28 11:17:18 2014 UTC
+++ /branches/MultiplePorts/src/web100srv.c Wed Nov 5 10:25:27 2014 UTC
@@ -120,6 +120,8 @@

static Options options;
static CwndPeaks peaks;
+static int webVarsValues = 0;
+static char webVarsValuesLog[256];
static int cputime = 0;
static char cputimelog[256];
static pthread_t workerThreadId, zombieThreadId;
@@ -167,6 +169,18 @@
{ "cputime", 0, 0, 309},
{ "limit", 1, 0, 'y'},
#endif
+#ifdef EXTTESTS_ENABLED
+ { "uduration", 1, 0, 314},
+ { "uthroughputsnaps", 0, 0, 315},
+ { "usnapsdelay", 1, 0, 316},
+ { "usnapsoffset", 1, 0, 317},
+ { "uthreadsnum", 1, 0, 322},
+ { "dduration", 1, 0, 318},
+ { "dthroughputsnaps", 0, 0, 319},
+ { "dsnapsdelay", 1, 0, 320},
+ { "dsnapsoffset", 1, 0, 321},
+ { "dthreadsnum", 1, 0, 323},
+#endif
{ "buffer", 1, 0, 'b' },
{ "file", 1, 0, 'f' },
{ "interface", 1, 0, 'i' },
@@ -181,6 +195,7 @@
{ "adminfile", 1, 0, 'A' },
{ "log_dir", 1, 0, 'L' },
{ "logfacility", 1, 0, 'S' },
+ { "savewebvalues", 0, 0, 324},
#if defined(HAVE_ODBC) && defined(DATABASE_ENABLED) && defined(HAVE_SQL_H)
{ "enableDBlogging", 0, 0, 310},
{ "dbDSN", 1, 0, 311},
@@ -690,6 +705,41 @@
} else if (strncasecmp(key, "limit", 5) == 0) {
options.limit = atoi(val);
continue;
+ } else if (strncasecmp(key, "savewebvalues", 13) == 0) {
+ webVarsValues = 1;
+ continue;
+#ifdef EXTTESTS_ENABLED
+ } else if (strncasecmp(key, "uduration", 9) == 0) {
+ options.uduration = atoi(val);
+ continue;
+ } else if (strncasecmp(key, "uthroughputsnaps", 16) == 0) {
+ options.uthroughputsnaps = 1;
+ continue;
+ } else if (strncasecmp(key, "usnapsdelay", 11) == 0) {
+ options.usnapsdelay = atoi(val);
+ continue;
+ } else if (strncasecmp(key, "usnapsoffset", 12) == 0) {
+ options.usnapsoffset = atoi(val);
+ continue;
+ } else if (strncasecmp(key, "uthreadsnum", 11) == 0) {
+ options.uthreadsnum = atoi(val);
+ continue;
+ } else if (strncasecmp(key, "dduration", 9) == 0) {
+ options.dduration = atoi(val);
+ continue;
+ } else if (strncasecmp(key, "dthroughputsnaps", 16) == 0) {
+ options.dthroughputsnaps = 1;
+ continue;
+ } else if (strncasecmp(key, "dsnapsdelay", 11) == 0) {
+ options.dsnapsdelay = atoi(val);
+ continue;
+ } else if (strncasecmp(key, "dsnapsoffset", 12) == 0) {
+ options.dsnapsoffset = atoi(val);
+ continue;
+ } else if (strncasecmp(key, "dthreadsnum", 11) == 0) {
+ options.dthreadsnum = atoi(val);
+ continue;
+#endif
} else if (strncasecmp(key, "refresh", 5) == 0) {
refresh = atoi(val);
continue;
@@ -896,7 +946,7 @@
char isoTime[64];

// int n; // temporary iterator variable --// commented out -> calc_linkspeed
- struct tcp_vars vars;
+ struct tcp_vars vars[7]; // up to 7 connections

int link = CANNOT_DETERMINE_LINK; // local temporary variable indicative of
// link speed. Transmitted but unused at client end , which has a similar
@@ -916,6 +966,7 @@
int s2c_linkspeed_data = 0;
int s2c_linkspeed_ack = 0;
// int j; // commented out -> calc_linkspeed
+ int i;
int totalcnt;
int autotune;
// values collected from the speed tests
@@ -970,8 +1021,14 @@
conn = tcp_stat_connection_from_socket(agent, ctlsockfd);
autotune = tcp_stat_autotune(ctlsockfd, agent, conn);

+#ifdef EXTTESTS_ENABLED
+#define ADD_CAPABILITIES(x) x"-et"
+#else
+#define ADD_CAPABILITIES(x) x
+#endif
+
// client needs to be version compatible. Send current version
- snprintf(buff, sizeof(buff), "v%s", VERSION "-" TCP_STAT_NAME);
+ snprintf(buff, sizeof(buff), "v%s", ADD_CAPABILITIES(VERSION) "-" TCP_STAT_NAME);
send_json_message(ctlsockfd, MSG_LOGIN, buff, strlen(buff), testopt->json_support, JSON_SINGLE_VALUE);

// initiate test with MSG_LOGIN message.
@@ -994,9 +1051,15 @@
}
if (testopt->c2sopt) {
log_println(1, " > C2S throughput test");
+ if (testopt->exttestsopt) {
+ log_println(1, " * Extended tests supported");
+ }
}
if (testopt->s2copt) {
log_println(1, " > S2C throughput test");
+ if (testopt->exttestsopt) {
+ log_println(1, " * Extended tests supported");
+ }
}
if (testopt->metaopt) {
log_println(1, " > META test");
@@ -1046,7 +1109,7 @@
}

log_println(6, "Starting META test");
- if ((ret = test_meta_srv(ctlsockfd, agent, &*testopt, conn_options)) != 0) {
+ if ((ret = test_meta_srv(ctlsockfd, agent, &*testopt, conn_options, &options)) != 0) {
if (ret < 0) {
log_println(6, "META test failed with rc=%d", ret);
}
@@ -1072,58 +1135,65 @@

// ...other variables
memset(&vars, 0xFF, sizeof(vars));
- tcp_stat_logvars(&vars, count_vars);
+ for (i = 0; i < (testopt->exttestsopt ? options.dthreadsnum : 1); ++i) {
+ tcp_stat_logvars(&vars[i], i, count_vars);
+ }
+
+ if (webVarsValues) {
+ tcp_stat_logvars_to_file(webVarsValuesLog, testopt->exttestsopt ? options.dthreadsnum : 1, vars);
+ tcp_stat_log_agg_vars_to_file(webVarsValuesLog, testopt->exttestsopt ? options.dthreadsnum : 1, vars);
+ }

// end getting web100 variable values
/* if (rc == 0) { */

// section to calculate duplex mismatch
// Calculate average round trip time and convert to seconds
- rttsec = calc_avg_rtt(vars.SumRTT, vars.CountRTT, &avgrtt);
+ rttsec = calc_avg_rtt(vars[0].SumRTT, vars[0].CountRTT, &avgrtt);
// Calculate packet loss
- packetloss_s2c = calc_packetloss(vars.CongestionSignals, vars.PktsOut,
+ packetloss_s2c = calc_packetloss(vars[0].CongestionSignals, vars[0].PktsOut,
c2s_linkspeed_data);

// Calculate ratio of packets arriving out of order
- oo_order = calc_packets_outoforder(vars.DupAcksIn, vars.AckPktsIn);
+ oo_order = calc_packets_outoforder(vars[0].DupAcksIn, vars[0].AckPktsIn);

// calculate theoretical maximum goodput in bits
- bw_theortcl = calc_max_theoretical_throughput(vars.CurrentMSS, rttsec,
+ bw_theortcl = calc_max_theoretical_throughput(vars[0].CurrentMSS, rttsec,
packetloss_s2c);

// get window sizes
- calc_window_sizes(&vars.SndWinScale, &vars.RcvWinScale, vars.Sndbuf,
- vars.MaxRwinRcvd, vars.MaxCwnd, &rwin, &swin, &cwin);
+ calc_window_sizes(&vars[0].SndWinScale, &vars[0].RcvWinScale, vars[0].Sndbuf,
+ vars[0].MaxRwinRcvd, vars[0].MaxCwnd, &rwin, &swin, &cwin);

// Total test time
- totaltime = calc_totaltesttime(vars.SndLimTimeRwin, vars.SndLimTimeCwnd,
- vars.SndLimTimeSender);
+ totaltime = calc_totaltesttime(vars[0].SndLimTimeRwin, vars[0].SndLimTimeCwnd,
+ vars[0].SndLimTimeSender);

// time spent being send-limited due to client's recv window
- rwintime = calc_sendlimited_rcvrfault(vars.SndLimTimeRwin, totaltime);
+ rwintime = calc_sendlimited_rcvrfault(vars[0].SndLimTimeRwin, totaltime);

// time spent in being send-limited due to congestion window
- cwndtime = calc_sendlimited_cong(vars.SndLimTimeCwnd, totaltime);
+ cwndtime = calc_sendlimited_cong(vars[0].SndLimTimeCwnd, totaltime);

// time spent in being send-limited due to own fault
- sendtime = calc_sendlimited_sndrfault(vars.SndLimTimeSender, totaltime);
+ sendtime = calc_sendlimited_sndrfault(vars[0].SndLimTimeSender, totaltime);

timesec = totaltime / MEGA; // total time in microsecs

// get fraction of total test time waiting for packets to arrive
- RTOidle = calc_RTOIdle(vars.Timeouts, vars.CurrentRTO, timesec);
+ RTOidle = calc_RTOIdle(vars[0].Timeouts, vars[0].CurrentRTO, timesec);

// get timeout, retransmission, acks and dup acks ratios.
- tmoutsratio = (double) vars.Timeouts / vars.PktsOut;
- rtranratio = (double) vars.PktsRetrans / vars.PktsOut;
- acksratio = (double) vars.AckPktsIn / vars.PktsOut;
- dackratio = (double) vars.DupAcksIn / (double) vars.AckPktsIn;
+ tmoutsratio = (double) vars[0].Timeouts / vars[0].PktsOut;
+ rtranratio = (double) vars[0].PktsRetrans / vars[0].PktsOut;
+ acksratio = (double) vars[0].AckPktsIn / vars[0].PktsOut;
+ dackratio = (double) vars[0].DupAcksIn / (double) vars[0].AckPktsIn;

// get actual throughput in Mbps (totaltime is in microseconds)
- realthruput = calc_real_throughput(vars.DataBytesOut, totaltime);
+ realthruput = calc_real_throughput(vars[0].DataBytesOut, totaltime);

// total time spent waiting
- waitsec = cal_totalwaittime(vars.CurrentRTO, vars.Timeouts);
+ waitsec = cal_totalwaittime(vars[0].CurrentRTO, vars[0].Timeouts);

log_println(2, "CWND limited test = %0.2f while unlimited = %0.2f", s2c2spd,
s2cspd);
@@ -1145,8 +1215,8 @@
old_mismatch = 1;

if (old_mismatch == 1) {
- if (detect_duplexmismatch(cwndtime, bw_theortcl, vars.PktsRetrans, timesec,
- vars.MaxSsthresh, RTOidle, link, s2cspd, s2c2spd,
+ if (detect_duplexmismatch(cwndtime, bw_theortcl, vars[0].PktsRetrans, timesec,
+ vars[0].MaxSsthresh, RTOidle, link, s2cspd, s2c2spd,
multiple)) {
if (is_c2s_throughputbetter(c2sspd, s2cspd)) {
// also, S->C throughput is lesser than C->S throughput
@@ -1183,7 +1253,7 @@

// Faulty hardware link heuristic.
if (detect_faultyhardwarelink(packetloss_s2c, cwndtime, timesec,
- vars.MaxSsthresh))
+ vars[0].MaxSsthresh))
bad_cable = POSSIBLE_BAD_CABLE;

// test for Ethernet link (assume Fast E.)
@@ -1193,13 +1263,13 @@

// test for wireless link
if (detect_wirelesslink(sendtime, realthruput, bw_theortcl,
- vars.SndLimTransRwin, vars.SndLimTransCwnd, rwintime,
+ vars[0].SndLimTransRwin, vars[0].SndLimTransCwnd, rwintime,
link)) {
link = LINK_WIRELESS;
}

// test for DSL/Cable modem link
- if (detect_DSLCablelink(vars.SndLimTimeSender, vars.SndLimTransSender,
+ if (detect_DSLCablelink(vars[0].SndLimTimeSender, vars[0].SndLimTransSender,
realthruput, bw_theortcl, link)) {
link = LINK_DSLORCABLE;
}
@@ -1211,7 +1281,7 @@
// ...and the number of transitions into the 'Sender Limited' state is
// greater than 30 per second

- if (detect_halfduplex(rwintime, vars.SndLimTransRwin, vars.SndLimTransSender,
+ if (detect_halfduplex(rwintime, vars[0].SndLimTransRwin, vars[0].SndLimTransSender,
timesec))
half_duplex = POSSIBLE_HALF_DUPLEX;

@@ -1245,7 +1315,7 @@

snprintf(buff, sizeof(buff),
"cwin: %0.4f\nrttsec: %0.6f\nSndbuf: %"VARtype"\naspd: %0.5f\n"
- "CWND-Limited: %0.2f\n", cwin, rttsec, vars.Sndbuf, aspd, s2c2spd);
+ "CWND-Limited: %0.2f\n", cwin, rttsec, vars[0].Sndbuf, aspd, s2c2spd);
send_json_message(ctlsockfd, MSG_RESULTS, buff, strlen(buff), testopt->json_support, JSON_SINGLE_VALUE);

snprintf(buff, sizeof(buff),
@@ -1271,18 +1341,18 @@
VARtype",%"VARtype",%"VARtype",%"VARtype",%"VARtype",%"
VARtype",%"VARtype",%"VARtype",",
(int) s2c2spd, (int) s2cspd, (int) c2sspd, vars.Timeouts,
- vars.SumRTT, vars.CountRTT, vars.PktsRetrans, vars.FastRetran,
- vars.DataPktsOut, vars.AckPktsOut, vars.CurrentMSS, vars.DupAcksIn,
- vars.AckPktsIn);
+ vars[0].SumRTT, vars[0].CountRTT, vars[0].PktsRetrans, vars[0].FastRetran,
+ vars[0].DataPktsOut, vars[0].AckPktsOut, vars[0].CurrentMSS, vars[0].DupAcksIn,
+ vars[0].AckPktsIn);
memcpy(meta.summary, tmpstr, strlen(tmpstr));
memset(tmpstr, 0, sizeof(tmpstr));
snprintf(tmpstr, sizeof(tmpstr), "%"VARtype",%"VARtype",%"VARtype",%"
VARtype",%"VARtype",%"VARtype",%"VARtype",%"VARtype
",%"VARtype",%"VARtype",%"VARtype",%"VARtype",%"VARtype",",
- vars.MaxRwinRcvd, vars.Sndbuf, vars.MaxCwnd, vars.SndLimTimeRwin,
- vars.SndLimTimeCwnd, vars.SndLimTimeSender, vars.DataBytesOut,
- vars.SndLimTransRwin, vars.SndLimTransCwnd, vars.SndLimTransSender,
- vars.MaxSsthresh, vars.CurrentRTO, vars.CurrentRwinRcvd);
+ vars[0].MaxRwinRcvd, vars[0].Sndbuf, vars[0].MaxCwnd, vars[0].SndLimTimeRwin,
+ vars[0].SndLimTimeCwnd, vars[0].SndLimTimeSender, vars[0].DataBytesOut,
+ vars[0].SndLimTransRwin, vars[0].SndLimTransCwnd, vars[0].SndLimTransSender,
+ vars[0].MaxSsthresh, vars[0].CurrentRTO, vars[0].CurrentRwinRcvd);

strlcat(meta.summary, tmpstr, sizeof(meta.summary));
memset(tmpstr, 0, sizeof(tmpstr));
@@ -1294,18 +1364,18 @@
snprintf(tmpstr, sizeof(tmpstr), ",%d,%d,%d,%d,%"VARtype",%"VARtype
",%"VARtype",%"VARtype",%d",
c2s_linkspeed_data, c2s_linkspeed_ack, s2c_linkspeed_data,
- s2c_linkspeed_ack, vars.CongestionSignals, vars.PktsOut, vars.MinRTT,
- vars.RcvWinScale, autotune);
+ s2c_linkspeed_ack, vars[0].CongestionSignals, vars[0].PktsOut, vars[0].MinRTT,
+ vars[0].RcvWinScale, autotune);

strlcat(meta.summary, tmpstr, sizeof(meta.summary));
memset(tmpstr, 0, sizeof(tmpstr));
snprintf(tmpstr, sizeof(tmpstr), ",%"VARtype",%"VARtype",%"VARtype",%"
VARtype",%"VARtype",%"VARtype",%"VARtype",%"VARtype",%"
VARtype",%"VARtype,
- vars.CongAvoid, vars.CongestionOverCount, vars.MaxRTT,
- vars.OtherReductions, vars.CurTimeoutCount, vars.AbruptTimeouts,
- vars.SendStall, vars.SlowStart, vars.SubsequentTimeouts,
- vars.ThruBytesAcked);
+ vars[0].CongAvoid, vars[0].CongestionOverCount, vars[0].MaxRTT,
+ vars[0].OtherReductions, vars[0].CurTimeoutCount, vars[0].AbruptTimeouts,
+ vars[0].SendStall, vars[0].SlowStart, vars[0].SubsequentTimeouts,
+ vars[0].ThruBytesAcked);

strlcat(meta.summary, tmpstr, sizeof(meta.summary));
memset(tmpstr, 0, sizeof(tmpstr));
@@ -1327,49 +1397,49 @@
fprintf(fp, "%s,%d,%d,%d,%"VARtype",%"VARtype",%"VARtype",%"
VARtype",%"VARtype",%"VARtype",%"VARtype",%"VARtype",%"
VARtype",%"VARtype",", rmt_host,
- (int) s2c2spd, (int) s2cspd, (int) c2sspd, vars.Timeouts,
- vars.SumRTT, vars.CountRTT, vars.PktsRetrans, vars.FastRetran,
- vars.DataPktsOut, vars.AckPktsOut, vars.CurrentMSS, vars.DupAcksIn,
- vars.AckPktsIn);
+ (int) s2c2spd, (int) s2cspd, (int) c2sspd, vars[0].Timeouts,
+ vars[0].SumRTT, vars[0].CountRTT, vars[0].PktsRetrans, vars[0].FastRetran,
+ vars[0].DataPktsOut, vars[0].AckPktsOut, vars[0].CurrentMSS, vars[0].DupAcksIn,
+ vars[0].AckPktsIn);
fprintf(fp, "%"VARtype",%"VARtype",%"VARtype",%"VARtype",%"VARtype","
"%"VARtype",%"VARtype",%"VARtype",%"VARtype",%"VARtype",%"
- VARtype",%"VARtype",%"VARtype",", vars.MaxRwinRcvd,
- vars.Sndbuf, vars.MaxCwnd, vars.SndLimTimeRwin, vars.SndLimTimeCwnd,
- vars.SndLimTimeSender, vars.DataBytesOut, vars.SndLimTransRwin,
- vars.SndLimTransCwnd, vars.SndLimTransSender, vars.MaxSsthresh,
- vars.CurrentRTO, vars.CurrentRwinRcvd);
+ VARtype",%"VARtype",%"VARtype",", vars[0].MaxRwinRcvd,
+ vars[0].Sndbuf, vars[0].MaxCwnd, vars[0].SndLimTimeRwin, vars[0].SndLimTimeCwnd,
+ vars[0].SndLimTimeSender, vars[0].DataBytesOut, vars[0].SndLimTransRwin,
+ vars[0].SndLimTransCwnd, vars[0].SndLimTransSender, vars[0].MaxSsthresh,
+ vars[0].CurrentRTO, vars[0].CurrentRwinRcvd);
fprintf(fp, "%d,%d,%d,%d,%d", link, mismatch, bad_cable, half_duplex,
congestion);
fprintf(fp, ",%d,%d,%d,%d,%"VARtype",%"VARtype",%"VARtype",%"VARtype",%d",
c2s_linkspeed_data,
c2s_linkspeed_ack, s2c_linkspeed_data, s2c_linkspeed_ack,
- vars.CongestionSignals, vars.PktsOut, vars.MinRTT, vars.RcvWinScale,
+ vars[0].CongestionSignals, vars[0].PktsOut, vars[0].MinRTT, vars[0].RcvWinScale,
autotune);
fprintf(fp, ",%"VARtype",%"VARtype",%"VARtype",%"VARtype",%"VARtype
",%"VARtype",%"VARtype",%"VARtype",%"VARtype",%"VARtype,
- vars.CongAvoid,
- vars.CongestionOverCount, vars.MaxRTT, vars.OtherReductions,
- vars.CurTimeoutCount, vars.AbruptTimeouts, vars.SendStall,
- vars.SlowStart, vars.SubsequentTimeouts, vars.ThruBytesAcked);
+ vars[0].CongAvoid,
+ vars[0].CongestionOverCount, vars[0].MaxRTT, vars[0].OtherReductions,
+ vars[0].CurTimeoutCount, vars[0].AbruptTimeouts, vars[0].SendStall,
+ vars[0].SlowStart, vars[0].SubsequentTimeouts, vars[0].ThruBytesAcked);
fprintf(fp, ",%d,%d,%d\n", peaks.min, peaks.max, peaks.amount);
fclose(fp);
}
db_insert(spds, runave, cputimelog, options.s2c_logname,
options.c2s_logname, testName, testPort, date, rmt_host, s2c2spd,
- s2cspd, c2sspd, vars.Timeouts, vars.SumRTT, vars.CountRTT,
- vars.PktsRetrans, vars.FastRetran, vars.DataPktsOut,
- vars.AckPktsOut, vars.CurrentMSS, vars.DupAcksIn, vars.AckPktsIn,
- vars.MaxRwinRcvd, vars.Sndbuf, vars.MaxCwnd, vars.SndLimTimeRwin,
- vars.SndLimTimeCwnd, vars.SndLimTimeSender, vars.DataBytesOut,
- vars.SndLimTransRwin, vars.SndLimTransCwnd, vars.SndLimTransSender,
- vars.MaxSsthresh, vars.CurrentRTO, vars.CurrentRwinRcvd, link,
+ s2cspd, c2sspd, vars[0].Timeouts, vars[0].SumRTT, vars[0].CountRTT,
+ vars[0].PktsRetrans, vars[0].FastRetran, vars[0].DataPktsOut,
+ vars[0].AckPktsOut, vars[0].CurrentMSS, vars[0].DupAcksIn, vars[0].AckPktsIn,
+ vars[0].MaxRwinRcvd, vars[0].Sndbuf, vars[0].MaxCwnd, vars[0].SndLimTimeRwin,
+ vars[0].SndLimTimeCwnd, vars[0].SndLimTimeSender, vars[0].DataBytesOut,
+ vars[0].SndLimTransRwin, vars[0].SndLimTransCwnd, vars[0].SndLimTransSender,
+ vars[0].MaxSsthresh, vars[0].CurrentRTO, vars[0].CurrentRwinRcvd, link,
mismatch, bad_cable, half_duplex, congestion, c2s_linkspeed_data,
c2s_linkspeed_ack, s2c_linkspeed_data, s2c_linkspeed_ack,
- vars.CongestionSignals, vars.PktsOut, vars.MinRTT, vars.RcvWinScale,
- autotune, vars.CongAvoid, vars.CongestionOverCount, vars.MaxRTT,
- vars.OtherReductions, vars.CurTimeoutCount, vars.AbruptTimeouts,
- vars.SendStall, vars.SlowStart, vars.SubsequentTimeouts,
- vars.ThruBytesAcked, peaks.min, peaks.max, peaks.amount);
+ vars[0].CongestionSignals, vars[0].PktsOut, vars[0].MinRTT, vars[0].RcvWinScale,
+ autotune, vars[0].CongAvoid, vars[0].CongestionOverCount, vars[0].MaxRTT,
+ vars[0].OtherReductions, vars[0].CurTimeoutCount, vars[0].AbruptTimeouts,
+ vars[0].SendStall, vars[0].SlowStart, vars[0].SubsequentTimeouts,
+ vars[0].ThruBytesAcked, peaks.min, peaks.max, peaks.amount);
if (usesyslog == 1) {
snprintf(
logstr1, sizeof(logstr1),
@@ -1380,9 +1450,9 @@
"AckPktsOut=%"VARtype","
"CurrentMSS=%"VARtype",DupAcksIn=%"VARtype","
"AckPktsIn=%"VARtype",",
- rmt_host, c2sspd, s2cspd, vars.Timeouts, vars.SumRTT, vars.CountRTT,
- vars.PktsRetrans, vars.FastRetran, vars.DataPktsOut, vars.AckPktsOut,
- vars.CurrentMSS, vars.DupAcksIn, vars.AckPktsIn);
+ rmt_host, c2sspd, s2cspd, vars[0].Timeouts, vars[0].SumRTT, vars[0].CountRTT,
+ vars[0].PktsRetrans, vars[0].FastRetran, vars[0].DataPktsOut, vars[0].AckPktsOut,
+ vars[0].CurrentMSS, vars[0].DupAcksIn, vars[0].AckPktsIn);
snprintf(
logstr2, sizeof(logstr2),
"MaxRwinRcvd=%"VARtype",Sndbuf=%"VARtype","
@@ -1393,10 +1463,10 @@
"SndLimTransSender=%"VARtype","
"MaxSsthresh=%"VARtype",CurrentRTO=%"VARtype","
"CurrentRwinRcvd=%"VARtype",",
- vars.MaxRwinRcvd, vars.Sndbuf, vars.MaxCwnd, vars.SndLimTimeRwin,
- vars.SndLimTimeCwnd, vars.SndLimTimeSender, vars.DataBytesOut,
- vars.SndLimTransRwin, vars.SndLimTransCwnd, vars.SndLimTransSender,
- vars.MaxSsthresh, vars.CurrentRTO, vars.CurrentRwinRcvd);
+ vars[0].MaxRwinRcvd, vars[0].Sndbuf, vars[0].MaxCwnd, vars[0].SndLimTimeRwin,
+ vars[0].SndLimTimeCwnd, vars[0].SndLimTimeSender, vars[0].DataBytesOut,
+ vars[0].SndLimTransRwin, vars[0].SndLimTransCwnd, vars[0].SndLimTransSender,
+ vars[0].MaxSsthresh, vars[0].CurrentRTO, vars[0].CurrentRwinRcvd);
strlcat(logstr1, logstr2, sizeof(logstr1));
snprintf(
logstr2, sizeof(logstr2),
@@ -1406,7 +1476,7 @@
VARtype",RcvWinScale=%"VARtype"\n",
link, mismatch, bad_cable, half_duplex, congestion, c2s_linkspeed_data,
c2s_linkspeed_ack, s2c_linkspeed_data, s2c_linkspeed_ack,
- vars.CongestionSignals, vars.PktsOut, vars.MinRTT, vars.RcvWinScale);
+ vars[0].CongestionSignals, vars[0].PktsOut, vars[0].MinRTT, vars[0].RcvWinScale);
strlcat(logstr1, logstr2, sizeof(logstr1));
syslog(LOG_FACILITY | LOG_INFO, "%s", logstr1);
closelog();
@@ -1424,16 +1494,16 @@
* updated. Otherwise the changes are lost when the client terminates.
*/
if (admin_view == 1) {
- totalcnt = calculate(date, vars.SumRTT, vars.CountRTT,
- vars.CongestionSignals, vars.PktsOut, vars.DupAcksIn,
- vars.AckPktsIn, vars.CurrentMSS, vars.SndLimTimeRwin,
- vars.SndLimTimeCwnd, vars.SndLimTimeSender,
- vars.MaxRwinRcvd, vars.CurrentCwnd, vars.Sndbuf,
- vars.DataBytesOut, mismatch, bad_cable, (int) c2sspd,
+ totalcnt = calculate(date, vars[0].SumRTT, vars[0].CountRTT,
+ vars[0].CongestionSignals, vars[0].PktsOut, vars[0].DupAcksIn,
+ vars[0].AckPktsIn, vars[0].CurrentMSS, vars[0].SndLimTimeRwin,
+ vars[0].SndLimTimeCwnd, vars[0].SndLimTimeSender,
+ vars[0].MaxRwinRcvd, vars[0].CurrentCwnd, vars[0].Sndbuf,
+ vars[0].DataBytesOut, mismatch, bad_cable, (int) c2sspd,
(int) s2cspd, c2s_linkspeed_data, s2c_linkspeed_ack,
1);
- gen_html((int) c2sspd, (int) s2cspd, vars.MinRTT, vars.PktsRetrans,
- vars.Timeouts, vars.Sndbuf, vars.MaxRwinRcvd, vars.CurrentCwnd,
+ gen_html((int) c2sspd, (int) s2cspd, vars[0].MinRTT, vars[0].PktsRetrans,
+ vars[0].Timeouts, vars[0].Sndbuf, vars[0].MaxRwinRcvd, vars[0].CurrentCwnd,
mismatch, bad_cable, totalcnt, refresh);
}
shutdown(ctlsockfd, SHUT_WR);
@@ -1461,6 +1531,7 @@
tcp_stat_agent* agent;
char *lbuf = NULL, *ctime();
char buff[32], tmpstr[256];
+ char* testsBuff[32];
char test_suite[16];
FILE * fp;
size_t lbuf_max = 0;
@@ -1495,6 +1566,16 @@
options.cwndDecrease = 0;
memset(options.s2c_logname, 0, 256);
memset(options.c2s_logname, 0, 256);
+ options.uduration = 10000;
+ options.uthroughputsnaps = 0;
+ options.usnapsdelay = 5000;
+ options.usnapsoffset = 1000;
+ options.uthreadsnum = 1;
+ options.dduration = 10000;
+ options.dthroughputsnaps = 0;
+ options.dsnapsdelay = 5000;
+ options.dsnapsoffset = 1000;
+ options.dthreadsnum = 1;
peaks.min = -1;
peaks.max = -1;
peaks.amount = -1;
@@ -1713,6 +1794,47 @@
case 313:
dbPWD = optarg;
break;
+ case 314:
+ options.uduration = atoi(optarg);
+ break;
+ case 315:
+ options.uthroughputsnaps = 1;
+ break;
+ case 316:
+ options.usnapsdelay = atoi(optarg);
+ break;
+ case 317:
+ options.usnapsoffset = atoi(optarg);
+ break;
+ case 322:
+ if (check_rint(optarg, &options.uthreadsnum, 1, 7)) {
+ char tmpText[200];
+ snprintf(tmpText, sizeof(tmpText), "Invalid number of threads for upload test: %s", optarg);
+ short_usage(argv[0], tmpText);
+ }
+ break;
+ case 318:
+ options.dduration = atoi(optarg);
+ break;
+ case 319:
+ options.dthroughputsnaps = 1;
+ break;
+ case 320:
+ options.dsnapsdelay = atoi(optarg);
+ break;
+ case 321:
+ options.dsnapsoffset = atoi(optarg);
+ break;
+ case 323:
+ if (check_rint(optarg, &options.dthreadsnum, 1, 7)) {
+ char tmpText[200];
+ snprintf(tmpText, sizeof(tmpText), "Invalid number of threads for download test: %s", optarg);
+ short_usage(argv[0], tmpText);
+ }
+ break;
+ case 324:
+ webVarsValues = 1;
+ break;
case 'T':
refresh = atoi(optarg);
break;
@@ -1804,6 +1926,16 @@
}
log_println(1, "\tDebug level set to %d", debug);

+#ifdef EXTTESTS_ENABLED
+ log_println(3, "\tExtended tests options:");
+ log_println(3, "\t\t * upload: duration = %d, threads = %d, throughput snapshots: enabled = %s, delay = %d, offset = %d",
+ options.uduration, options.uthreadsnum, options.uthroughputsnaps ? "true" : "false",
+ options.usnapsdelay, options.usnapsoffset);
+ log_println(3, "\t\t * download: duration = %d, threads = %d, throughput snapshots: enabled = %s, delay = %d, offset = %d",
+ options.dduration, options.dthreadsnum, options.dthroughputsnaps ? "true" : "false",
+ options.dsnapsdelay, options.dsnapsoffset);
+#endif
+
initialize_db(useDB, dbDSN, dbUID, dbPWD);

memset(&new, 0, sizeof(new));
@@ -2652,9 +2784,9 @@

memset(test_suite, 0, sizeof(test_suite));
t_opts = atoi(buff+3);
- memcpy(test_suite, buff+5, (strlen(buff)-5));
- /* memcpy(test_suite, buff+6, 7); */
- log_println(5, "extracting test_suite '%s' and t_opts '%x' from "
+ snprintf(testsBuff, sizeof(testsBuff), "%d", t_opts);
+ memcpy(test_suite, buff+3+strlen(testsBuff), (strlen(buff+3+strlen(testsBuff))));
+ log_println(5, "extrafing test_suite '%s' and t_opts '%x' from "
"buff '%s'", test_suite, t_opts, buff);

// construct cputime log folder
@@ -2667,9 +2799,7 @@
I2AddrFree(tmp_addr);
memset(cputimelog, 0, 256);
if (cputime) {
- snprintf(dir, sizeof(dir), "%s_%s:%d.cputime",
- get_ISOtime(isoTime, sizeof(isoTime)), name,
- testPort);
+ snprintf(dir, sizeof(dir), "%s_%s:%d.cputime", get_ISOtime(isoTime, sizeof(isoTime)), name, testPort);
log_println(8, "CPUTIME:suffix=%s", dir);
create_named_logdir(cputimelog, sizeof(cputimelog), dir, 0);
memcpy(meta.CPU_time, dir, strlen(dir));
@@ -2681,6 +2811,12 @@
memset(cputimelog, 0, 256);
}
}
+ memset(webVarsValuesLog, 0, 256);
+ if (webVarsValues) {
+ snprintf(dir, sizeof(dir), "%s_%s:%d_%s.log", get_ISOtime(isoTime, sizeof(isoTime)), name, testPort, TCP_STAT_NAME);
+ create_named_logdir(webVarsValuesLog, sizeof(webVarsValuesLog), dir, 0);
+ memcpy(meta.web_variables_log, dir, strlen(dir));
+ }
}
// write the incoming connection data into the log file
fp = fopen(get_logfile(), "a");
@@ -2709,12 +2845,19 @@
testopt.c2sopt = TOPT_ENABLED;
if (t_opts & TEST_S2C)
testopt.s2copt = TOPT_ENABLED;
- // die in 120 seconds, but only if a test doesn't get started
+
alarm(120);
- // reset alarm() before every test
- log_println(6, "setting master alarm() to 120 seconds, tests "
- "must start (complete?) before this timer expires");
+ log_println(6, "setting master alarm() to 120 seconds, tests must complete before this timer expires");

+#ifdef EXTTESTS_ENABLED
+ if (t_opts & TEST_EXT) {
+ testopt.exttestsopt = TOPT_ENABLED;
+ alarm(100 + (options.uduration / 1000.0) + (options.dduration / 1000.0));
+ log_println(6, " * changed master alarm() to %d due to enabled extended tests",
+ (int) (100 + (options.uduration / 1000.0) + (options.dduration / 1000.0)));
+ }
+#endif
+
// run tests based on options
if (strncmp(test_suite, "Invalid", 7) != 0) {
log_println(3, "Valid test sequence requested, run test for "
=======================================
--- /branches/MultiplePorts/src/web100srv.h Wed May 28 11:17:18 2014 UTC
+++ /branches/MultiplePorts/src/web100srv.h Wed Nov 5 10:25:27 2014 UTC
@@ -110,16 +110,24 @@

// Options to run test with
typedef struct options {
- u_int32_t limit; // used to calculate receive window limit
- int snapDelay; // Frequency of snap log collection in milliseconds
- // (i.e logged every snapDelay ms)
- char avoidSndBlockUp; // flag set to indicate avoiding send buffer
- // blocking in the S2C test
- char snaplog; // enable collecting snap log
- char cwndDecrease; // enable analysis of the cwnd changes (S2C test)
- char s2c_logname[256]; // S2C log file name - size changed to 256
- char c2s_logname[256]; // C2S log file name - size changed to 256
- int compress; // enable compressing log files
+ u_int32_t limit; // used to calculate receive window limit
+ int snapDelay; // frequency of snap log collection in milliseconds (i.e logged every snapDelay ms)
+ char avoidSndBlockUp; // flag set to indicate avoiding send buffer blocking in the S2C test
+ char snaplog; // enable collecting snap log
+ char cwndDecrease; // enable analysis of the cwnd changes (S2C test)
+ char s2c_logname[256]; // S2C log file name - size changed to 256
+ char c2s_logname[256]; // C2S log file name - size changed to 256
+ int compress; // enable compressing log files
+ int uduration; // upload test duration
+ char uthroughputsnaps; // enable the throughput snapshots for upload test writing
+ int usnapsdelay; // specify the delay in the throughput snapshots thread for upload test
+ int usnapsoffset; // specify the initial offset in the throughput snapshots thread for upload test
+ int uthreadsnum; // specify the number of threads (parallel TCP connections) for upload test
+ int dduration; // download test duration
+ char dthroughputsnaps; // enable the throughput snapshots for download test writing
+ int dsnapsdelay; // specify the delay in the throughput snapshots thread for download test
+ int dsnapsoffset; // specify the initial offset in the throughput snapshots thread for download test
+ int dthreadsnum; // specify the number of threads (parallel TCP connections) for download test
} Options;

typedef struct portpair {
@@ -173,7 +181,7 @@
struct web100_variables {
char name[256]; // key
char value[256]; // value
-} web_vars[WEB100_VARS];
+} web_vars[7][WEB100_VARS];

struct pseudo_hdr { /* used to compute TCP checksum */
uint64_t s_addr; // source addr
@@ -267,7 +275,7 @@
int port3);
void init_pkttrace(I2Addr srcAddr, struct sockaddr *sock_addr,
socklen_t saddrlen, int monitor_pipe[2], char *device,
- PortPair* pair, const char* direction, int compress);
+ PortPair* pair, const char* direction, int expectedTestTime);
void force_breakloop();
#endif

@@ -315,12 +323,16 @@
int autotune);/* Not used so no web10g version */
void tcp_stat_get_data_recv(int sock, tcp_stat_agent* agent,
tcp_stat_connection cn, int count_vars);
-int tcp_stat_get_data(tcp_stat_snap* snap, int testsock, int ctlsock,
+int tcp_stat_get_data(tcp_stat_snap* snap, int* testsock, int threadsNum, int ctlsock,
tcp_stat_agent* agent, int count_vars, int jsonSupport);

int CwndDecrease(char* logname,
u_int32_t *dec_cnt, u_int32_t *same_cnt, u_int32_t *inc_cnt);
-int tcp_stat_logvars(struct tcp_vars* vars, int count_vars);
+int tcp_stat_logvars(struct tcp_vars* vars, int connId, int count_vars);
+
+void tcp_stat_logvars_to_file(char* webVarsValuesLog, int connNum, struct tcp_vars* vars);
+
+void tcp_stat_log_agg_vars_to_file(webVarsValuesLog, testopt->exttestsopt ? options.dthreadsnum : 1, vars);

int KillHung(void);
void writeMeta(int compress, int cputime, int snaplog, int tcpdump);


  • [ndt-dev] [ndt] r1139 committed - Added support for multiple threads, ndt, 11/05/2014

Archive powered by MHonArc 2.6.16.

Top of Page