Skip to Content.
Sympa Menu

perfsonar-dev - perfsonar: r3737 - in trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa: . nfcapd nfdump

Subject: perfsonar development work

List archive

perfsonar: r3737 - in trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa: . nfcapd nfdump


Chronological Thread 
  • From:
  • To:
  • Subject: perfsonar: r3737 - in trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa: . nfcapd nfdump
  • Date: Sat, 19 Apr 2008 08:37:24 -0400

Author: michael.bischoff
Date: 2008-04-19 08:37:24 -0400 (Sat, 19 Apr 2008)
New Revision: 3737

Added:

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/LSRegistrationControll.java
Removed:

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/ConfFileServlet.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/NMWGInterfaceBaseChaining.java
Modified:

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowStatisticsResponse.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowsaGetRequest.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowsaMAServiceEngine.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfcapd/Exporter.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfcapd/NfcapdControll.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfcapd/package-info.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/IllegalAggregationRuleException.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/IllegalNfdumpSyntaxException.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpControll.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpQuery.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/RawFlowsQuery.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/StatisticsQuery.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/TimeRange.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/TopQuery.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/WorkaroundProcess.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/package-info.java

trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/package-info.java
Log:
ASSIGNED - bug 333: Lookupservice hasn't been implemented yet.
https://bugzilla.perfsonar.net/show_bug.cgi?id=333
RESOLVED - bug 262 nfdump process not killed when execution time exceeded.
https://bugzilla.perfsonar.net/show_bug.cgi?id=262

Deleted:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/ConfFileServlet.java

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowStatisticsResponse.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowStatisticsResponse.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowStatisticsResponse.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -53,7 +53,7 @@

private Time convertToTime(TimeRange range) {
if(range==null) {
- new IllegalArgumentException("statistics.getRange()
cannot be null for a valid statistic");
+ throw new
IllegalArgumentException("statistics.getRange() cannot be null for a valid
statistic");
}

DateFormat iso = new SimpleDateFormat(ISO8601_PATTERN);

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowsaGetRequest.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowsaGetRequest.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowsaGetRequest.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -154,7 +154,7 @@
String value = parameters.get("starttime");
if(value==null) {
logger.error("no startTime specified.");
- return startTime;
+ return null;
}
DateFormat dateFormat = new
SimpleDateFormat(ISO8601_PATTERN);
try {
@@ -164,7 +164,7 @@
startTime = null;
}
}
- return startTime;
+ return startTime == null ? null : new
Date(startTime.getTime());
}

public Date getEndTime() {
@@ -172,7 +172,7 @@
String value = parameters.get("endtime");
if(value==null) {
logger.error("no endTime specified.");
- return endTime;
+ return null;
}
DateFormat dateFormat = new
SimpleDateFormat(ISO8601_PATTERN);
try {
@@ -182,7 +182,7 @@
endTime = null;
}
}
- return endTime;
+ return endTime == null ? null : new Date(endTime.getTime());
}

public Filter getFilter() {

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowsaMAServiceEngine.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowsaMAServiceEngine.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowsaMAServiceEngine.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -43,6 +43,8 @@
*
* (shorten to FlowsaMAServiceEngine for sanity reasons.)
*
+ * (should be) Stateless, thread safe.
+ *
* @see ServiceEngine
* @author Michael Bischoff
* @version $Id$
@@ -51,14 +53,14 @@

protected static final String SERVICE_TYPE =
"service.MeasurementArchive.flowType";

- protected static final int DEFAULT_MAX_RESULTS = 1000;
- protected static final int DEFAULT_TIMEOUT = 10;
- protected static final int DEFAULT_NFCAPDINTERVAL = 5;
- protected static final File DEFAULT_NFDUMP_EXECUTABLE = new
File("/usr/local/bin/nfdump");
- protected static final File DEFAULT_NFCAPD_EXECUTABLE = new
File("/usr/local/bin/nfcapd");
- protected static final String DEFAULT_READERFACTORYCLASSNAME =
"org.perfsonar.service.measurementArchive.flowsa.FlowsaMAServiceEngine";
- protected static final File DEFAULT_SOURCEDIRECTORY = new
File("/var/nfdump/flows/");
- protected static final boolean DEFAULT_SHOULDSTARTNFCAPDPROCESSES =
false;
+ protected static final int DEFAULT_MAX_RESULTS = 1000;
+ protected static final int DEFAULT_TIMEOUT = 10;
+ protected static final int DEFAULT_NFCAPDINTERVAL = 5;
+ protected static final File DEFAULT_NFDUMP_EXECUTABLE = new
File("/usr/local/bin/nfdump");
+ protected static final File DEFAULT_NFCAPD_EXECUTABLE = new
File("/usr/local/bin/nfcapd");
+ protected static final String DEFAULT_READERFACTORYCLASSNAME =
"org.perfsonar.service.measurementArchive.flowsa.FlowsaMAServiceEngine";
+ protected static final File DEFAULT_SOURCEDIRECTORY = new
File("/var/nfdump/flows/");
+ protected static final boolean DEFAULT_SHOULDSTARTNFCAPDPROCESSES =
false;

protected static final String TIMEOUT_PROPERTY_KEY =
"service.ma.flowsa.query.timeout";
protected static final String NFDUMPLOCATION_PROPERTY_KEY =
"service.ma.flowsa.nfdump.location";
@@ -86,6 +88,7 @@
protected final ConfigurationComponent configuration;
protected final NfdumpControll nfdump;
protected final NfcapdControll nfcapd;
+ protected final LSRegistrationControll ls;

/**
* Default constructor for the Service engine
@@ -99,7 +102,16 @@
logger.debug("FlowsaMAServiceEngine.<Constructor> Successfully
created NfcapdControll.");
this.nfdump = createNfdumpControll();
logger.debug("FlowsaMAServiceEngine.<Constructor> Successfully
created NfdumpControll.");
+
+ this.ls = new LSRegistrationControll();
+ logger.debug("FlowsaMAServiceEngine.<Constructor> Successfully
created LSRegistrationControll.");
+
+ if(ls.IsLsConfigured(configuration) &&
!ls.hasRegistrationBeenStarted()) {
+ ls.start(nfcapd.getExporters());
+ logger.debug("FlowsaMAServiceEngine.<Constructor> Started
LSRegistrationControll.");
+ }
}
+
/* (non-Javadoc)
* @see org.perfsonar.service.base.engine.ServiceEngine#getType()
*/
@@ -200,7 +212,8 @@
request.getAggegrationRule(),
fileData.getDirectories(),
fileData.getStartFile(),
- fileData.getEndFile()
+ fileData.getEndFile(),
+ true
);
logger.debug("FlowsaMAServiceEngine.handleRawFlowRequest:
Executing query.");
List<Flow> result = nfdump.execute(query);
@@ -244,7 +257,7 @@
cal.add(Calendar.MINUTE, nfcapd.getInterval());
range = new TimeRange(start,cal.getTime());
} catch (ParseException e) {
- new RuntimeException("couldn't parse date, this
should never happen here and this exception indicates a bug");
+ throw new RuntimeException("couldn't parse date, this
should never happen here and this exception indicates a bug");
}

return new FlowStatisticsResponse(request.getMetadata(), range,
result);
@@ -274,7 +287,8 @@
request.getTopStatistic(),
request.getOrderBy(),
request.isSplittingProtocol(),
- request.getTopN()
+ request.getTopN(),
+ true
);

logger.debug("FlowsaMAServiceEngine.handleTopFlowRequest: Executing
query.");
@@ -354,7 +368,6 @@

return new NfcapdControll(nfcapdExecutable, exporters,
interval, startNfcapdProcesses);
}
-
/**
* @param request
* @return FileData based on the request
@@ -566,11 +579,24 @@
}
}
/**
+ * tests if the ls registration is configured
+ * @param configuration
+ * @return true if the ls registration is configured
+ */
+ private boolean IsLsConfigured(ConfigurationComponent configuration) {
+ try {
+ String lsUrl =
configuration.getProperty("service.r.ls_url");
+ return !lsUrl.trim().equals("");
+ } catch (PerfSONARException e) {
+ return false;
+ }
+ }
+ /**
* gets the configuration component
* @return
* @throws PerfSONARException
*/
- private ConfigurationComponent getConfigurationComponent() throws
PerfSONARException {
+ protected ConfigurationComponent getConfigurationComponent() throws
PerfSONARException {
try {
return (ConfigurationComponent)
AuxiliaryComponentManager.getInstance().getComponent(ComponentNames.CONFIG);
} catch (PerfSONARException underlayingException) {
@@ -586,7 +612,7 @@
* @return
* @throws PerfSONARException
*/
- private LoggerComponent getLoggerComponent() throws
PerfSONARException {
+ protected LoggerComponent getLoggerComponent() throws
PerfSONARException {
try {
return (LoggerComponent)
AuxiliaryComponentManager.getInstance().getComponent(ComponentNames.LOGGER);
} catch (PerfSONARException underlayingException) {

Added:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/LSRegistrationControll.java

Deleted:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/NMWGInterfaceBaseChaining.java

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfcapd/Exporter.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfcapd/Exporter.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfcapd/Exporter.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -77,7 +77,35 @@
public int compareTo(Exporter other) {
return hostName.compareTo(other.hostName);
}
+

+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((hostName == null) ? 0 :
hostName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ final Exporter other = (Exporter) obj;
+ if (hostName == null) {
+ if (other.hostName != null)
+ return false;
+ } else if (!hostName.equals(other.hostName))
+ return false;
+ return true;
+ }
+
/**
* returns a filename that contains data from before time. the file
name
* returned is the filename of the file thats holds the newest data
before

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfcapd/NfcapdControll.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfcapd/NfcapdControll.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfcapd/NfcapdControll.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -4,14 +4,14 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.Set;
import java.util.TreeSet;

/**
- * Central class for nfcapd support.
+ * Central class for nfcapd support. Stateless (controller)
* @author Michael.Bischoff
- *
*/
public class NfcapdControll {
public static class FileData {
@@ -59,7 +59,7 @@
private final File nfcapdExecutable;
private final Set<Exporter> exporters;
private final boolean startingNfcapdProcesses;
- private int interval;
+ private final int interval;

/**
* @param nfcapdExecutable the nfcapdexecutable file
@@ -167,4 +167,13 @@
public int getInterval() {
return interval;
}
+
+
+ /**
+ * gets all known exporters
+ * @return a unmodifiable set of exporters
+ */
+ public Set<Exporter> getExporters() {
+ return Collections.unmodifiableSet(exporters);
+ }
}

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfcapd/package-info.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfcapd/package-info.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfcapd/package-info.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -1,4 +1,4 @@
/**
- * This package holds Nfcapd related classes.
+ * This package holds Nfcapd related and wrapper classes.
*/
package org.perfsonar.service.measurementArchive.flowsa.nfcapd;
\ No newline at end of file

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/IllegalAggregationRuleException.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/IllegalAggregationRuleException.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/IllegalAggregationRuleException.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -1,6 +1,10 @@
package org.perfsonar.service.measurementArchive.flowsa.nfdump;


+/**
+ * thrown when nfdump doesn't understand the supplied aggregation rule.
+ * @author michael.bischoff
+ */
public class IllegalAggregationRuleException extends
IllegalNfdumpSyntaxException {

private static final long serialVersionUID = 1L;

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/IllegalNfdumpSyntaxException.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/IllegalNfdumpSyntaxException.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/IllegalNfdumpSyntaxException.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -1,5 +1,9 @@
package org.perfsonar.service.measurementArchive.flowsa.nfdump;

+/**
+ * this exception is thrown when illegal nfdump syntax was supplied.
+ * @author michael.bischoff
+ */
public class IllegalNfdumpSyntaxException extends Exception {

private static final long serialVersionUID = 1L;

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpControll.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpControll.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpControll.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -4,11 +4,13 @@
import static java.util.concurrent.TimeUnit.SECONDS;

import java.io.File;
+import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -16,16 +18,19 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;

import
org.perfsonar.service.measurementArchive.flowsa.nfdump.outputreaders.Nfdump1_5_5OutputReaderFactory;

/**
- * Core of the nfdump wrapper.
+ * Core of the nfdump wrapper. Stateless(Controller).
*
* @author Michael Bischoff
* @version $id$
*/
public class NfdumpControll {
+
+ Logger logger = Logger.getLogger(NfdumpControll.class.getName());

/**
* Task to be submitted to the executor of this NfDumpControll. This
abstract task
@@ -37,6 +42,8 @@
protected static abstract class NfdumpTask<Type> implements
Callable<Type> {
protected final List<String> commands = new ArrayList<String>();
protected final NfdumpOutputReaderFactory readerFactory;
+ protected volatile Process process = null;
+ protected volatile NfdumpOutputReader reader = null;

/**
* Standard constructor
@@ -52,26 +59,23 @@
* @see java.util.concurrent.Callable#call()
*/
public Type call() throws Exception {
- Process process = null;
- NfdumpOutputReader reader = null;
- try {
- ProcessBuilder builder= new ProcessBuilder(commands);
- builder.redirectErrorStream(true);
- // TODO remove?
- System.out.println(Arrays.toString(commands.toArray()));
- process = new WorkaroundProcess(builder.start());
- reader = readerFactory.create(process.getInputStream());
- return call(reader);
- } finally {
- if(reader!=null) {
- reader.destroy();
- }
- if(process!=null) {
- process.destroy();
- }
- }
+ ProcessBuilder builder= new ProcessBuilder(commands);
+ builder.redirectErrorStream(true);
+ // TODO remove?
+ System.out.println(Arrays.toString(commands.toArray()));
+ process = new WorkaroundProcess(builder.start());
+ reader = readerFactory.create(process.getInputStream());
+ return call(reader);
}

+ public void destroy() throws IOException {
+ if(process!=null) {
+ process.destroy();
+ }
+ if(reader!=null) {
+ reader.destroy();
+ }
+ }
/**
* method that should call the reader and return the result of this
task
* @param reader created by the supplied readerFactory
@@ -84,18 +88,18 @@
private final NfdumpOutputReaderFactory readerFactory;
private final File nfdumpExecutable;
private final ExecutorService executorService =
Executors.newCachedThreadPool(new MyThreadFactory());
+ private final int maxResults;
+ private final long timeoutValue;
+ private final String anonymiseKey;
+
private final static String FORMAT_FLAG = "-o";
private final static String FORMAT_FLAG_VALUE = "pipe6";
private final static String TIME_FLAG = "-t";
private final static String QUIET_FLAG = "-q";
private final static String STATISTICS_FLAG = "-I";
-
- final int maxResults;
- final long timeoutValue;

/**
- * constructs an NfdumpManager which uses the specified nfdumpExecutable
and
- * chooses an
{@link
NfdumpOutputReader} based on the nfdumpVersion String
+ * constructs an NfdumpManager which uses the specified nfdumpExecutable
* argument
*
* @param nfdumpExecutable file containing the Executable
@@ -106,7 +110,7 @@
* when Nfdump executable is not found this should be an
* configuration issue.
*/
- public NfdumpControll(final File nfdumpExecutable, final String
outputReaderFactoryClassName, final int maxResults, final int timeoutValue)
throws IllegalArgumentException {
+ public NfdumpControll(final File nfdumpExecutable, final String
outputReaderFactoryClassName, final int maxResults, final int timeoutValue,
final String key) throws IllegalArgumentException {
if (!nfdumpExecutable.exists()) {
throw new IllegalArgumentException("Nfdump executable not
found.");
}
@@ -118,7 +122,32 @@
this.nfdumpExecutable = nfdumpExecutable;
this.maxResults = maxResults;
this.timeoutValue = timeoutValue;
+ if(key!=null) {
+ this.anonymiseKey = key;
+ } else {
+ char[] chars = new char[32];
+ Random random = new Random();
+ for(int i = 0; i<32; i++) {
+ chars[i] = (char) (random.nextInt(89)+33);
+ }
+ this.anonymiseKey = String.copyValueOf(chars);
+ }
}
+ /**
+ * constructs an NfdumpManager which uses the specified nfdumpExecutable
and
+ * uses a random key for Anonymising
+ *
+ * @param nfdumpExecutable file containing the Executable
+ * @param outputReaderFactoryClassName className of a factory, passing
null will load the
default(org.perfsonar.service.mesurementArchive.flowType.nfdump.outputreaders.Nfdump152OutputReaderFactory).
+ * @param maxResults the max amount of results a query can return
+ * @param timeoutValue the max time in milliseconds a query can take
+ * @throws IllegalArgumentException
+ * when Nfdump executable is not found this should be an
+ * configuration issue.
+ */
+ public NfdumpControll(final File nfdumpExecutable, final String
outputReaderFactoryClassName, final int maxResults, final int timeoutValue)
throws IllegalArgumentException {
+ this(nfdumpExecutable, outputReaderFactoryClassName, maxResults,
timeoutValue, null);
+ }
/**
* This method takes a RecordQuery, executes it and returns the result.
* @param query to be executed
@@ -135,7 +164,7 @@
throw new ExecutionException(e);
}

- Callable<List<Flow>> task = new
NfdumpTask<List<Flow>>(commands,readerFactory) {
+ NfdumpTask<List<Flow>> task = new
NfdumpTask<List<Flow>>(commands,readerFactory) {
@Override
List<Flow> call(NfdumpOutputReader reader) throws Exception {
List<Flow> result = new ArrayList<Flow>();
@@ -168,7 +197,7 @@
throw new ExecutionException(e);
}

- Callable<Statistics> task = new
NfdumpTask<Statistics>(commands,readerFactory) {
+ NfdumpTask<Statistics> task = new
NfdumpTask<Statistics>(commands,readerFactory) {
@Override
public Statistics call(NfdumpOutputReader reader) throws
Exception {
return reader.readStatistics();
@@ -194,7 +223,7 @@
throw new ExecutionException(e);
}

- Callable<List<TopDatum>> task = new
NfdumpTask<List<TopDatum>>(commands,readerFactory) {
+ NfdumpTask<List<TopDatum>> task = new
NfdumpTask<List<TopDatum>>(commands,readerFactory) {
@Override
public List<TopDatum> call(NfdumpOutputReader reader) throws
Exception {
List<TopDatum> result = new ArrayList<TopDatum>();
@@ -218,7 +247,7 @@
* @throws ExecutionException if it failed to process
* @throws TimeoutException if processing took too long
*/
- protected <Type> Type submit(final Callable<Type> task) throws
InterruptedException, ExecutionException, TimeoutException {
+ protected <Type> Type submit(final NfdumpTask<Type> task) throws
InterruptedException, ExecutionException, TimeoutException {
Future<Type> future = executorService.submit(task);
try {
return future.get(timeoutValue, SECONDS);
@@ -226,6 +255,11 @@
if(!future.isDone()) {
future.cancel(true);
}
+ try {
+ task.destroy();
+ } catch (IOException e) {
+ logger.finest("destroying task failed
("+e.getLocalizedMessage()+")");
+ }
}
}
/**
@@ -287,6 +321,10 @@
List<String> commands = createCommandsForRawQuery();
commands.add(TIME_FLAG);
commands.add(getTimeCommandFrom(query));
+ if(query.isResultAnnonymised()) {
+ commands.add("-K");
+ commands.add(anonymiseKey);
+ }
commands.add("-M");
commands.add(getDirectoryCommandFrom(query));
commands.add("-R");
@@ -339,6 +377,10 @@
List<String> commands = createCommandsForTopQuery();
commands.add(TIME_FLAG);
commands.add(getTimeCommandFrom(query));
+ if(query.isResultAnnonymised()) {
+ commands.add("-K");
+ commands.add(anonymiseKey);
+ }
commands.add("-s");
if(query.isSplitProtocol()) {

commands.add(query.getTopStatistic()+":p/"+query.getOrderBy());
@@ -369,8 +411,7 @@
}
Class<?> factoryClass = Class.forName(className);
return (NfdumpOutputReaderFactory) factoryClass.newInstance();
- }
-
+ }
private String getTimeCommandFrom(NfdumpQuery query) {
DateFormat nfdumpStyle = new SimpleDateFormat("yyyy/MM/dd.HH:mm:ss");
return
nfdumpStyle.format(query.getStartTime())+"-"+nfdumpStyle.format(query.getEndTime());
@@ -385,7 +426,7 @@
}
String[] parts = directory.split("/");
if(parts.length==1) {
- parts = directory.split("\\");
+ parts = directory.split("\\\\");
}
command = command + ":" + parts[parts.length-1];
}
@@ -394,7 +435,6 @@
}
return command;
}
-
/**
* Mostly unimportant class, it simply ensures that the thread priority
is not
* higher than the thread that initiated. It also sets names so that
when

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpQuery.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpQuery.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpQuery.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -16,28 +16,30 @@
private Date endTime;
private String startFile;
private String endFile;
+ private boolean annonymised;

- public NfdumpQuery() {
+ public NfdumpQuery() {
/* default no arg constructor */
}
- public NfdumpQuery(Date startTime, Date endTime, Set<String>
directories, String startFile, String endFile) {
- this.startTime = startTime;
- this.endTime = endTime;
+ public NfdumpQuery(Date startTime, Date endTime, Set<String>
directories, String startFile, String endFile, boolean annonymised) {
+ this.startTime = startTime==null ? null : new
Date(startTime.getTime());
+ this.endTime = endTime==null ? null : new Date(endTime.getTime());
this.directories = directories;
this.startFile = startFile;
this.endFile = endFile;
+ this.annonymised = annonymised;
}
public Date getEndTime() {
- return endTime;
+ return endTime==null ? null : new Date(endTime.getTime());
}
public Date getStartTime() {
- return startTime;
+ return startTime==null ? null : new Date(startTime.getTime());
}
public void setEndTime(Date endTime) {
- this.endTime = endTime;
+ this.endTime = endTime == null ? null : new Date(endTime.getTime());
}
public void setStartTime(Date startTime) {
- this.startTime = startTime;
+ this.startTime = startTime==null ? null : new
Date(startTime.getTime());
}
public Set<String> getDirectories() {
return directories;
@@ -54,4 +56,10 @@
public void setEndFile(String endFile) {
this.endFile = endFile;
}
+ public boolean isResultAnnonymised() {
+ return annonymised;
+ }
+ public void setResultAnnonymised(boolean annonymised) {
+ this.annonymised = annonymised;
+ }
}
\ No newline at end of file

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/RawFlowsQuery.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/RawFlowsQuery.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/RawFlowsQuery.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -19,11 +19,18 @@
}

public RawFlowsQuery(Date startTime, Date endTime, Filter filter,
- AggegrationRule aggegrationRule, Set<String> directories, String
startFile, String endFile) {
- super(startTime,endTime,directories,startFile,endFile);
+ AggegrationRule aggegrationRule, Set<String> directories, String
startFile, String endFile, boolean annonymised) {
+ super(startTime,endTime,directories,startFile,endFile, annonymised);
this.filter = filter;
this.aggegrationRule = aggegrationRule;
}
+
+ public RawFlowsQuery(Date startTime, Date endTime, Filter filter,
+ AggegrationRule aggegrationRule, Set<String> directories, String
startFile, String endFile) {
+ this(startTime, endTime, filter, aggegrationRule, directories,
startFile, endFile, false);
+ }
+
+

public Filter getFilter() {
return filter;

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/StatisticsQuery.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/StatisticsQuery.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/StatisticsQuery.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -10,7 +10,11 @@
* @version $id$
*/
public class StatisticsQuery extends NfdumpQuery {
+ public StatisticsQuery(Date startTime, Date endTime, Set<String>
directories, String startFile, String endFile, boolean annonymised) {
+ super(startTime, endTime, directories, startFile, endFile,
annonymised);
+ }
+
public StatisticsQuery(Date startTime, Date endTime, Set<String>
directories, String startFile, String endFile) {
- super(startTime, endTime, directories, startFile, endFile);
+ this(startTime, endTime, directories, startFile, endFile,
false);
}
}

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/TimeRange.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/TimeRange.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/TimeRange.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -12,16 +12,16 @@
private final Date end;

public TimeRange(Date start, Date end) {
- this.start = start;
- this.end = end;
+ this.start = start==null ? null : new Date(start.getTime());
+ this.end = end==null ? null : new Date(end.getTime());
}

public Date getStart() {
- return start;
+ return start==null ? null : new Date(start.getTime());
}

public Date getEnd() {
- return end;
+ return end==null ? null : new Date(end.getTime());
}

}

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/TopQuery.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/TopQuery.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/TopQuery.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -20,8 +20,8 @@

public TopQuery(Date startTime, Date endTime, Filter filter,
AggegrationRule aggegrationRule, Set<String> directories, String
startFile, String endFile,
- String topStatistic, String orderBy, boolean splitProtocol, int
topN) {
- super(startTime, endTime, directories, startFile, endFile);
+ String topStatistic, String orderBy, boolean splitProtocol, int
topN, boolean annonymised) {
+ super(startTime, endTime, directories, startFile, endFile,
annonymised);
this.filter = filter;
this.aggegrationRule = aggegrationRule;
this.topStatistic = topStatistic;
@@ -29,6 +29,12 @@
this.splitProtocol = splitProtocol;
this.topN = topN;
}
+
+ public TopQuery(Date startTime, Date endTime, Filter filter,
+ AggegrationRule aggegrationRule, Set<String> directories, String
startFile, String endFile,
+ String topStatistic, String orderBy, boolean splitProtocol, int
topN) {
+ this(startTime, endTime, filter, aggegrationRule,
directories, startFile, endFile,topStatistic,orderBy,splitProtocol,topN,
false);
+ }

public Filter getFilter() {
return filter;
@@ -77,4 +83,6 @@
public void setTopN(int topN) {
this.topN = topN;
}
+
+
}

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/WorkaroundProcess.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/WorkaroundProcess.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/WorkaroundProcess.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -3,6 +3,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
+import java.util.logging.Logger;

/**
* wraps around a process.
@@ -14,6 +15,7 @@
public class WorkaroundProcess extends Process {

private final Process process;
+ private final Logger logger =
Logger.getLogger(WorkaroundProcess.class.getName());

public WorkaroundProcess(Process process) {
this.process = process;
@@ -22,7 +24,8 @@
public void destroy() {
Class<? extends Process> processClass = process.getClass();
if(processClass.getName().equals("java.lang.UNIXProcess")) {
- // make sure though a ugly way that our process still
runs, or adleast that the pid hasn't been recycled.
+ String pid = null;
+ // make sure though a ugly way that our process still
runs, or at least that the pid hasn't been recycled.
try{
process.exitValue();
} catch (IllegalThreadStateException e) {
@@ -30,12 +33,16 @@
Field pidField =
processClass.getDeclaredField("pid");
//Breaking java security
pidField.setAccessible(true);

- String pid =
String.valueOf(pidField.get(process));
- new ProcessBuilder("/bin/kill", "-9",
pid).start();
+ pid =
String.valueOf(pidField.get(process));
+ Process sigKillProcess = new
ProcessBuilder("/bin/kill", "-9", pid).start();
+ sigKillProcess.waitFor();
+ logger.fine("Killed process with PID
= " + pid );
} catch (Exception ex) {
- System.out.println("Sigkill
workaround failed. (" + ex.getLocalizedMessage() +")");
+ logger.severe("Sigkill workaround
failed. (pid="+pid+") (cause=" + ex.getLocalizedMessage() +")");
}
}
+ } else {
+ logger.warning("Not a UNIX process.");
}
process.destroy();
}

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/package-info.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/package-info.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/package-info.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -1,4 +1,4 @@
/**
- * This package holds Nfdump related classes.
+ * This package holds Nfdump related and wrapper classes.
*/
package org.perfsonar.service.measurementArchive.flowsa.nfdump;
\ No newline at end of file

Modified:
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/package-info.java
===================================================================
---
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/package-info.java
2008-04-18 17:52:17 UTC (rev 3736)
+++
trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/package-info.java
2008-04-19 12:37:24 UTC (rev 3737)
@@ -1,17 +1,6 @@
/**
* Base Package for Flow selection and aggregation Measurement Archive
*
- * @todo RNC
- * @todo RNC -> java
- * @todo nmwg in service
- * @todo ant files
- * @todo test scripts
- * @todo nfcapd manager
- * @todo request in perfsonarui plugin
- * @todo response parser in perfsonarui plugin
- * @todo documentation
- * @todo polishing
- * @todo release
* @version $Id$
*/
package org.perfsonar.service.measurementArchive.flowsa;
\ No newline at end of file



  • perfsonar: r3737 - in trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa: . nfcapd nfdump, svnlog, 04/19/2008

Archive powered by MHonArc 2.6.16.

Top of Page