perfsonar-dev - perfsonar: r2783 - in branches/new-structure/trunk/surfnet_java-flowsa-ma: . conf conf/axis-1.4/WEB-INF doc src/main/java/org/perfsonar/service src/main/java/org/perfsonar/service/measurementArchive/flowsa src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/outputreaders
Subject: perfsonar development work
List archive
perfsonar: r2783 - in branches/new-structure/trunk/surfnet_java-flowsa-ma: . conf conf/axis-1.4/WEB-INF doc src/main/java/org/perfsonar/service src/main/java/org/perfsonar/service/measurementArchive/flowsa src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/outputreaders
Chronological Thread
- From:
- To:
- Subject: perfsonar: r2783 - in branches/new-structure/trunk/surfnet_java-flowsa-ma: . conf conf/axis-1.4/WEB-INF doc src/main/java/org/perfsonar/service src/main/java/org/perfsonar/service/measurementArchive/flowsa src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/outputreaders
- Date: Fri, 7 Sep 2007 03:27:40 -0400
Author: michael.bischoff
Date: 2007-09-07 03:27:40 -0400 (Fri, 07 Sep 2007)
New Revision: 2783
Removed:
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/mesurementArchive/
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/.classpath
branches/new-structure/trunk/surfnet_java-flowsa-ma/.project
branches/new-structure/trunk/surfnet_java-flowsa-ma/conf/axis-1.4/WEB-INF/server-config.wsdd
branches/new-structure/trunk/surfnet_java-flowsa-ma/conf/service.properties
branches/new-structure/trunk/surfnet_java-flowsa-ma/doc/README.txt
branches/new-structure/trunk/surfnet_java-flowsa-ma/pom.xml
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/EventType.java
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowStatisticsResponse.java
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowsaMAServiceEngine.java
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/RawFlowsResponse.java
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpControll.java
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpOutputReader.java
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpQuery.java
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/RawFlowsQuery.java
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/Statistics.java
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/StatisticsQuery.java
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/outputreaders/Nfdump152OutputReaderImpl.java
Log:
big fix.
Modified: branches/new-structure/trunk/surfnet_java-flowsa-ma/.classpath
===================================================================
--- branches/new-structure/trunk/surfnet_java-flowsa-ma/.classpath
2007-09-07 07:23:44 UTC (rev 2782)
+++ branches/new-structure/trunk/surfnet_java-flowsa-ma/.classpath
2007-09-07 07:27:40 UTC (rev 2783)
@@ -1,8 +1,8 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
- <classpathentry kind="src" path="src/main/java"/>
- <classpathentry kind="src" path="src/test/java"/>
- <classpathentry kind="con"
path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
- <classpathentry kind="con"
path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
- <classpathentry kind="output" path="bin"/>
-</classpath>
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry kind="src" path="src/main/java"/>
+ <classpathentry kind="src" path="src/test/java"/>
+ <classpathentry kind="con"
path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry kind="con"
path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+ <classpathentry kind="output" path="bin"/>
+</classpath>
Modified: branches/new-structure/trunk/surfnet_java-flowsa-ma/.project
===================================================================
--- branches/new-structure/trunk/surfnet_java-flowsa-ma/.project
2007-09-07 07:23:44 UTC (rev 2782)
+++ branches/new-structure/trunk/surfnet_java-flowsa-ma/.project
2007-09-07 07:27:40 UTC (rev 2783)
@@ -1,23 +1,23 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
- <name>flowsa-ma</name>
- <comment></comment>
- <projects>
- </projects>
- <buildSpec>
- <buildCommand>
- <name>org.eclipse.jdt.core.javabuilder</name>
- <arguments>
- </arguments>
- </buildCommand>
- <buildCommand>
- <name>org.maven.ide.eclipse.maven2Builder</name>
- <arguments>
- </arguments>
- </buildCommand>
- </buildSpec>
- <natures>
- <nature>org.eclipse.jdt.core.javanature</nature>
- <nature>org.maven.ide.eclipse.maven2Nature</nature>
- </natures>
-</projectDescription>
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+ <name>flowsa-ma</name>
+ <comment></comment>
+ <projects>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
+ <name>org.maven.ide.eclipse.maven2Builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ <nature>org.maven.ide.eclipse.maven2Nature</nature>
+ </natures>
+</projectDescription>
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/conf/axis-1.4/WEB-INF/server-config.wsdd
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/conf/axis-1.4/WEB-INF/server-config.wsdd
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/conf/axis-1.4/WEB-INF/server-config.wsdd
2007-09-07 07:27:40 UTC (rev 2783)
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="UTF-8"?>
+<?xml version="1.0" encoding="UTF-8"?>
<deployment xmlns="http://xml.apache.org/axis/wsdd/"
xmlns:java="http://xml.apache.org/axis/wsdd/providers/java">
<globalConfiguration>
<parameter name="adminPassword" value="admin"/>
@@ -60,4 +60,4 @@
<handler type="LocalResponder"/>
</responseFlow>
</transport>
-</deployment>
+</deployment>
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/conf/service.properties
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/conf/service.properties
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/conf/service.properties
2007-09-07 07:27:40 UTC (rev 2783)
@@ -11,17 +11,20 @@
# === Flowsa specific properties ===
# exporters - list of source devices that generated our data.
-# service.ma.flowsa.exporters - a number indicating the amount of diferent
sources
-# service.ma.flowsa.source1.hostname -
-# service.ma.flowsa.source1.port -
-# service.ma.flowsa.source1.description -
-service.ma.flowsa.exporters=
+# service.ma.flowsa.exporters - a number indicating the amount of different
sources
+# service.ma.flowsa.exporter1.hostname -
+# service.ma.flowsa.exporter1.port -
+# service.ma.flowsa.exporter1.description -
+service.ma.flowsa.exporters=1
+service.ma.flowsa.exporter1.hostname=test
+service.ma.flowsa.exporter1.port=0
+service.ma.flowsa.exporter1.description=test
# --- query properties ---
-# query.timeout - timeout/max execution time of a query (value is in secunds)
+# query.timeout - timeout/max execution time of a query (value is in seconds)
# query.max-results - if more then <value> results are encountered stop
processing
service.ma.flowsa.query.timeout=10
-service.ma.flowsa.query.max-results=1000
+service.ma.flowsa.query.max-results=10000
# --- nfdump properties ---
# nfdump.source - location of nfcapd files
@@ -31,10 +34,14 @@
service.ma.flowsa.nfdump.location=/usr/local/bin/nfdump
service.ma.flowsa.nfdump.ouputreader=org.perfsonar.service.measurementArchive.flowsa.nfdump.outputreaders.Nfdump152OutputReaderFactory
-# === Logger properties ===
+# --- nfcapd properties ---
+# nfcapd.start-processes - if nfcapd should start processes for exporters,
specify false if this is done externally
+service.ma.flowsa.nfcapd.start-processes=false
+
+# === Logger properties ===
service.log.log4j.config=/perfsonar/conf/log4j.properties
-# === nmwg ===
+# === nmwg ===
service.sax_parser.config=/perfsonar/conf/objects.config
Modified: branches/new-structure/trunk/surfnet_java-flowsa-ma/doc/README.txt
===================================================================
--- branches/new-structure/trunk/surfnet_java-flowsa-ma/doc/README.txt
2007-09-07 07:23:44 UTC (rev 2782)
+++ branches/new-structure/trunk/surfnet_java-flowsa-ma/doc/README.txt
2007-09-07 07:27:40 UTC (rev 2783)
@@ -2,4 +2,5 @@
http://wiki.perfsonar.net/jra1-wiki/index.php/Flow_Selection_and_Aggregation_MA
for up to date information about the service.
-mvn cargo:deploy -DtomcatHostname=sonar1.amsterdam.surfnet.nl
-DtomcatUsername=username -DtomcatPassword=password
\ No newline at end of file
+mvn cargo:deploy -DtomcatHostname=sonar1.amsterdam.surfnet.nl
-DtomcatUsername=username -DtomcatPassword=password
+port = 80 and proto tcp and src ip 225.188.105.116
\ No newline at end of file
Modified: branches/new-structure/trunk/surfnet_java-flowsa-ma/pom.xml
===================================================================
--- branches/new-structure/trunk/surfnet_java-flowsa-ma/pom.xml 2007-09-07
07:23:44 UTC (rev 2782)
+++ branches/new-structure/trunk/surfnet_java-flowsa-ma/pom.xml 2007-09-07
07:27:40 UTC (rev 2783)
@@ -8,10 +8,7 @@
<packaging>war</packaging>
<name>Flowsa Measurement Archive</name>
<version>0.0.1</version>
- <description>
- Flow selection and aggregation Measurement Archive for
- perfSONAR. For more information see the wiki-page.
- </description>
+ <description>Flow selection and aggregation Measurement Archive for
perfSONAR. For more information see the wiki-page.</description>
<url>
http://wiki.perfsonar.net/jra1-wiki/index.php/Flow_Selection_and_Aggregation_MA
</url>
@@ -172,12 +169,12 @@
<dependency>
<groupId>perfsonar</groupId>
<artifactId>perfsonar-base</artifactId>
- <version>1.0.20070825</version>
+ <version>1.0.20070814</version>
</dependency>
<dependency>
<groupId>nmwg</groupId>
<artifactId>nmwg</artifactId>
- <version>1.0.20070815</version>
+ <version>1.0.20070906</version>
</dependency>
<dependency>
<groupId>xerces</groupId>
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/EventType.java
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/EventType.java
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/EventType.java
2007-09-07 07:27:40 UTC (rev 2783)
@@ -13,7 +13,7 @@
/** ActionType request for a raw flow query * */
RAW_FLOW_REQUEST("http://ggf.org/ns/nmwg/tools/flow/raw/2.0/"),
/** ActionType request for a flow statistics query * */
-
STATISTICS_FLOW_REQUEST("http://ggf.org/ns/nmwg/tools/flow/stats/2.0/"),
+
STATISTICS_FLOW_REQUEST("http://ggf.org/ns/nmwg/tools/flow/stat/2.0/"),
/** ActionType request for a top flow query * */
TOP_FLOW_REQUEST("http://ggf.org/ns/nmwg/tools/flow/top/2.0/");
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowStatisticsResponse.java
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowStatisticsResponse.java
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowStatisticsResponse.java
2007-09-07 07:27:40 UTC (rev 2783)
@@ -1,14 +1,44 @@
package org.perfsonar.service.measurementArchive.flowsa;
-import org.ggf.ns.nmwg.base.v2_0.Message;
+import org.ggf.ns.nmwg.base.v2_0.Data;
+import org.ggf.ns.nmwg.base.v2_0.Metadata;
+import org.ggf.ns.nmwg.tools.flow.v2_0.Datum;
import org.perfsonar.service.measurementArchive.flowsa.nfdump.Statistics;
-public class FlowStatisticsResponse extends Message {
+public class FlowStatisticsResponse extends FlowsaGetResponse {
- public FlowStatisticsResponse(Statistics statistics) {
- // TODO Auto-generated constructor stub
+ public FlowStatisticsResponse(Metadata[] metadatas, Statistics
statistics) {
+ super(metadatas);
+ setData(convertToData(statistics));
}
-
-
+ private Data convertToData(Statistics statistics) {
+ Data data = new Data();
+ data.setId("data1");
+ Datum datum = new Datum();
+ datum.setFlows(statistics.getFlows());
+ datum.setFlowsTcp(statistics.getFlowsTcp());
+ datum.setFlowsUdp(statistics.getFlowsUdp());
+ datum.setFlowsIcmp(statistics.getFlowsIcmp());
+ datum.setFlowsOther(statistics.getFlowsOther());
+ datum.setPackets(statistics.getPackets());
+ datum.setPacketsTcp(statistics.getPacketsTcp());
+ datum.setPacketsUdp(statistics.getPacketsUdp());
+ datum.setPacketsIcmp(statistics.getPacketsIcmp());
+ datum.setPacketsOther(statistics.getPacketsOther());
+ datum.setBytes(statistics.getBytes());
+ datum.setBytesTcp(statistics.getBytesTcp());
+ datum.setBytesUdp(statistics.getBytesUdp());
+ datum.setBytesIcmp(statistics.getBytesIcmp());
+ datum.setBytesOther(statistics.getBytesOther());
+ datum.setFirst(statistics.getFirst());
+ datum.setLast(statistics.getLast());
+ datum.setMsecFirst(statistics.getMsecFirst());
+ datum.setMsecLast(statistics.getMsecLast());
+ datum.setSequenceFailures(statistics.getSequenceFailures());
+ data.addChild(datum);
+ return data;
+ }
+
+
}
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowsaMAServiceEngine.java
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowsaMAServiceEngine.java
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/FlowsaMAServiceEngine.java
2007-09-07 07:27:40 UTC (rev 2783)
@@ -1,12 +1,16 @@
package org.perfsonar.service.measurementArchive.flowsa;
import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.ggf.ns.nmwg.base.v2_0.Message;
-import org.ggf.ns.nmwg.base.v2_0.Metadata;
import org.perfsonar.base.auxiliary.AuxiliaryComponentManager;
import org.perfsonar.base.auxiliary.ComponentNames;
import
org.perfsonar.base.auxiliary.components.configuration.ConfigurationComponent;
@@ -14,6 +18,8 @@
import org.perfsonar.base.exceptions.PerfSONARException;
import org.perfsonar.service.base.engine.ActionType;
import org.perfsonar.service.base.engine.ServiceEngine;
+import org.perfsonar.service.measurementArchive.flowsa.nfcapd.Exporter;
+import org.perfsonar.service.measurementArchive.flowsa.nfcapd.NfcapdControll;
import org.perfsonar.service.measurementArchive.flowsa.nfdump.Flow;
import org.perfsonar.service.measurementArchive.flowsa.nfdump.NfdumpControll;
import org.perfsonar.service.measurementArchive.flowsa.nfdump.RawFlowsQuery;
@@ -21,13 +27,12 @@
import
org.perfsonar.service.measurementArchive.flowsa.nfdump.StatisticsQuery;
import org.perfsonar.service.measurementArchive.flowsa.nfdump.TopData;
import org.perfsonar.service.measurementArchive.flowsa.nfdump.TopQuery;
-import
org.perfsonar.service.measurementArchive.flowsa.nfdump.filters.SimpleFilter;
/**
* Flow Selection and Aggregation Measurement Archive Service Engine
* implementation.
*
- * (shortend to FlowsaMAServiceEngine for sanity reasons.)
+ * (shorten to FlowsaMAServiceEngine for sanity reasons.)
*
* @see ServiceEngine
* @author Michael Bischoff
@@ -35,21 +40,31 @@
*/
public class FlowsaMAServiceEngine implements ServiceEngine {
- protected final static String SERVICE_TYPE =
"service.MeasurementArchive.flowType";
+ protected static final String SERVICE_TYPE =
"service.MeasurementArchive.flowType";
protected static final int DEFAULT_MAX_RESULTS = 1000;
- protected static final int DEFAULT_TIMEOUT_VALUE = 10;
+ protected static final int DEFAULT_TIMEOUT = 10;
protected static final File DEFAULT_EXECUTABLE_FILE = new
File("/usr/local/bin/nfdump");
- protected static final String DEFAULT_READERFACTORYCLASSNAME_VALUE =
"org.perfsonar.service.measurementArchive.flowsa.FlowsaMAServiceEngine";
+ protected static final String DEFAULT_READERFACTORYCLASSNAME =
"org.perfsonar.service.measurementArchive.flowsa.FlowsaMAServiceEngine";
+ protected static final File DEFAULT_SOURCEDIRECTORY = new
File("/var/nfdump/flows/");
+ protected static final boolean DEFAULT_SHOULDSTARTNFCAPDPROCESSES =
false;
protected static final String TIMEOUT_PROPERTY_KEY =
"service.ma.flowsa.query.timeout";
protected static final String NFDUMPLOCATION_PROPERTY_KEY =
"service.ma.flowsa.nfdump.location";
protected static final String MAXRESULTS_PROPERTY_KEY =
"service.ma.flowsa.query.max-results";
protected static final String READERFACTORYCLASSNAME_PROPERTY_KEY =
"service.ma.flowsa.nfdump.ouputreader";
+ protected static final String SOURCEDIRECTORY_PROPERTY_KEY =
"service.ma.flowsa.source";
+ protected static final String SHOULDSTARTNFCAPDPROCESSES_PROPERTY_KEY
= "service.ma.flowsa.nfcapd.start-processes";
+ protected static final String EXPORTERS_PROPERTY_KEY =
"service.ma.flowsa.exporters";
+
+ protected static final String EXPORTER_PROPERTY_PART =
"service.ma.flowsa.exporter";
+ protected static final String EXPORTERHOST_PROPERTY_PART =
".hostname";
+ protected static final String EXPORTERPORT_PROPERTY_PART = ".port";
protected final LoggerComponent logger;
protected final ConfigurationComponent configuration;
protected final NfdumpControll nfdump;
+ protected final NfcapdControll nfcapd;
/**
* Default constructor for the Service engine
@@ -57,23 +72,12 @@
*/
public FlowsaMAServiceEngine() throws PerfSONARException {
this.logger = getLoggerComponent();
- this.logger.debug("Starting... FlowTypeMAServiceEngine");
+ logger.debug("FlowsaMAServiceEngine.<Constructor> Creating...
FlowTypeMAServiceEngine.");
this.configuration = getConfigurationComponent();
-
- File nfdumpExecutable = getExecutableFrom(configuration);
- if (!nfdumpExecutable.exists()) {
- throw new PerfSONARException(
- "error.common.no_configuration",
- "Nfdump executable not found."
- );
- }
-
- int maxResults = getMaxResultsFrom(configuration);
- int timeoutValue = getTimeoutValueFrom(configuration);
- String readerFactoryClassName =
getReaderFactoryClassNameFrom(configuration);
-
- this.nfdump = new
NfdumpControll(nfdumpExecutable,readerFactoryClassName,maxResults,timeoutValue);
- logger.debug("Successfully created NFdumpControll.");
+ this.nfcapd = createNfcapdControll();
+ logger.debug("FlowsaMAServiceEngine.<Constructor> Successfully
created NfcapdControll.");
+ this.nfdump = createNfdumpControll();
+ logger.debug("FlowsaMAServiceEngine.<Constructor> Successfully
created NfdumpControll.");
}
/* (non-Javadoc)
* @see org.perfsonar.service.base.engine.ServiceEngine#getType()
@@ -94,12 +98,17 @@
if(ActionType.GET_DATA.equalsIgnoreCase(actionType)) {
try {
- return processGetDataRequest(request);
+ Message response = processGetDataRequest(new
FlowsaGetRequest(request));
+ logger.debug("FlowsaMAServiceEngine.takeAction:
Processing complete, returning response.");
+ return response;
} catch (InterruptedException e) {
+ logger.error("Fetching data was interrupted (" +
e.getLocalizedMessage() +")");
throw new
PerfSONARException("error.ma.fetching.interrupted","Fetching was
interrupted",e);
} catch (ExecutionException e) {
+ logger.error("Exception while fetching(" +
e.getLocalizedMessage() +")");
throw new
PerfSONARException("error.ma.fetching","Exception while fetching data",e);
} catch (TimeoutException e) {
+ logger.error("Fetching timed out.");
throw new
PerfSONARException("error.ma.fetching.timeout","Fetching took too long");
}
}
@@ -111,115 +120,253 @@
}
/**
* Processes a
{@link
ActionType}.GET_DATA request
- * @param request
+ * @param flowsaGetRequest
* @return a response message to be send back to the client.
* @throws PerfSONARException
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
- protected Message processGetDataRequest(final Message request) throws
PerfSONARException, InterruptedException, ExecutionException,
TimeoutException {
- String eventTypeValue = extractEventTypeFrom(request);
- EventType type = EventType.get(eventTypeValue);
- logger.debug("FlowsaMAServiceEngine.processGetDataRequest: eventype
indentified as: " + type);
- switch (type) {
+ protected Message processGetDataRequest(final FlowsaGetRequest
flowsaGetRequest) throws PerfSONARException, InterruptedException,
ExecutionException, TimeoutException {
+ EventType eventType = flowsaGetRequest.getEventType();
+ if(eventType==null) {
+ throw new PerfSONARException(
+ "error.common.action_not_supported",
+ "EventType " + flowsaGetRequest.getEventTypeValue() + "
not understood."
+ );
+ }
+
+ switch (eventType) {
case RAW_FLOW_REQUEST:
- return handleRawFlowRequest(request);
+ return handleRawFlowRequest(flowsaGetRequest);
case STATISTICS_FLOW_REQUEST:
- return handleStatisticsFlowRequest(request);
+ return handleStatisticsFlowRequest(flowsaGetRequest);
case TOP_FLOW_REQUEST:
- return handleTopFlowRequest(request);
-
+ return handleTopFlowRequest(flowsaGetRequest);
default:
- if(type!=null) {
- throw new PerfSONARException(
- "error.common.action_not_implemented",
- "EventType is not yet implemented"
- );
- }
throw new PerfSONARException(
- "error.common.action_not_supported",
- "EventType " + eventTypeValue + " not understood."
- );
+ "error.common.action_not_implemented",
+ "EventType("+flowsaGetRequest.getEventType()+") is
not yet implemented"
+ );
}
}
/**
- * @param request creates a query from the request and executes it
+ * @param flowsaGetRequest creates a query from the request and executes
it
* in nfdump, the result is put into a response or a exception is
* thrown
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
+ * @throws PerfSONARException
*/
- protected RawFlowsResponse handleRawFlowRequest(Message request) throws
InterruptedException, ExecutionException, TimeoutException {
- RawFlowsQuery query = createFlowQueryFrom(request);
+ protected RawFlowsResponse handleRawFlowRequest(FlowsaGetRequest
request) throws InterruptedException, ExecutionException, TimeoutException,
PerfSONARException {
+ SortedSet<String> files = new TreeSet<String>();
+ Set<String> directories = new TreeSet<String>();
+ for(String hostName : request.getHostNames()) {
+ Exporter exporter = nfcapd.getExporterByHostName(hostName);
+ if(exporter!=null) {
+ files.addAll(Arrays.asList(exporter.getFileNames()));
+ directories.add(exporter.getDirectory());
+ } else {
+ logger.error("No exporter with hostname: "+hostName);
+ }
+ }
+
+ if(directories.size() == 0) {
+ throw new PerfSONARException("error.ma.query","Query didn't
contain a subject known by this MA");
+ }
+
+ RawFlowsQuery query = new RawFlowsQuery(
+ request.getStartTime(),
+ request.getEndTime(),
+ request.getFilter(),
+ request.getAggegrationRule(),
+ directories,
+ files
+ );
+ logger.debug("FlowsaMAServiceEngine.handleRawFlowRequest: Executing
query.");
List<Flow> result = nfdump.execute(query);
- RawFlowsResponse response = new
RawFlowsResponse(request.getMetadataArray(), result);
+ logger.debug("FlowsaMAServiceEngine.handleRawFlowRequest: Creating
response.");
+ RawFlowsResponse response = new
RawFlowsResponse(request.getMetadata(), result);
return response;
}
/**
* handles a Statistics request. creates a query from
* the request and executes it in nfdump, the result
* is put into a response or a exception is thrown
- * @param request
+ * @param flowsaGetRequest
* @return StatisticsResponse
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
+ * @throws PerfSONARException
*/
- protected Message handleStatisticsFlowRequest(Message request) throws
InterruptedException, ExecutionException, TimeoutException {
- StatisticsQuery query = createStatisticsQueryFrom(request);
- Statistics statistics = nfdump.execute(query);
- return new FlowStatisticsResponse(statistics);
+ protected Message handleStatisticsFlowRequest(FlowsaGetRequest request)
throws InterruptedException, ExecutionException, TimeoutException,
PerfSONARException {
+ SortedSet<String> files = new TreeSet<String>();
+ Set<String> directories = new TreeSet<String>();
+ for(String hostName : request.getHostNames()) {
+ Exporter exporter = nfcapd.getExporterByHostName(hostName);
+ if(exporter!=null) {
+ files.addAll(Arrays.asList(exporter.getFileNames()));
+ directories.add(exporter.getDirectory());
+ } else {
+ logger.error("No exporter with hostname: "+hostName);
+ }
+ }
+
+ if(directories.size() == 0) {
+ throw new PerfSONARException("error.ma.query","Query didn't
contain a subject known by this MA");
+ }
+
+ StatisticsQuery query = new StatisticsQuery(
+ request.getStartTime(),
+ request.getEndTime(),
+ directories,
+ files
+ );
+ logger.debug("FlowsaMAServiceEngine.handleRawFlowRequest: Executing
query.");
+ Statistics result = nfdump.execute(query);
+ logger.debug("FlowsaMAServiceEngine.handleRawFlowRequest: Creating
response.");
+ FlowStatisticsResponse response = new
FlowStatisticsResponse(request.getMetadata(), result);
+ return response;
}
/**
* handles a TopFlowRequest. creates a query from
* the request and executes it in nfdump, the result
* is put into a response or a exception is thrown
- * @param request
+ * @param flowsaGetRequest
* @return a TopFlowResponse
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
- protected Message handleTopFlowRequest(Message request) throws
InterruptedException, ExecutionException, TimeoutException {
- TopQuery query = createTopQueryFrom(request);
+ protected Message handleTopFlowRequest(FlowsaGetRequest
flowsaGetRequest) throws InterruptedException, ExecutionException,
TimeoutException {
+ TopQuery query = new TopQuery();
TopData result = nfdump.execute(query);
return new TopFlowsResponse(result);
}
- /**
- * converts a message to a RawFlowsQuery
- * @param request
- * @return rawflowsquery
- */
- protected RawFlowsQuery createFlowQueryFrom(Message request) {
- RawFlowsQuery query = new RawFlowsQuery();
- for(Object data : request.getMetadataMap().keySet()) {
- logger.debug("Metadatakey: "+data);
+
+ /**
+ * @return
+ * @throws PerfSONARException
+ */
+ protected NfdumpControll createNfdumpControll() throws
PerfSONARException {
+ File nfdumpExecutable = getExecutableFrom(configuration);
+ if (!nfdumpExecutable.exists()) {
+ throw new PerfSONARException(
+ "error.common.no_configuration",
+ "Nfdump executable not found."
+ );
+ }
+
+ int maxResults = getMaxResultsFrom(configuration);
+ int timeoutValue = getTimeoutValueFrom(configuration);
+ String readerFactoryClassName =
getReaderFactoryClassNameFrom(configuration);
+ try {
+ return new
NfdumpControll(nfdumpExecutable,readerFactoryClassName,maxResults,timeoutValue);
+ } catch (Exception e) {
+ throw new PerfSONARException(
+ "error.common.no_configuration",
+ "Nfdump executable not found.",
+ e
+ );
+ }
+ }
+ /**
+ * @return NFCapdControll
+ * @throws PerfSONARException on failure
+ */
+ protected NfcapdControll createNfcapdControll() throws
PerfSONARException {
+ File sourceDirectory = getSourceDirectory(configuration);
+ if(!sourceDirectory.exists()) {
+ logger.warn("FlowsaMAServiceEngine.checkSource Source
directory doesn't exist trying to create.");
+ try {
+ sourceDirectory.createNewFile();
+
logger.debug("FlowsaMAServiceEngine.checkSource Source directory created.");
+ } catch (IOException e) {
+
logger.error("FlowsaMAServiceEngine.checkSource Source directory couldn't be
created.");
+ throw new PerfSONARException(
+ "", // TODO error code
+ "Couldn't create NfcapdControll
source directory was missing and couldn't be created."
+ );
+ }
+ }
+
+ boolean startNfcapdProcesses =
getShouldStartNfcapdProcesses(configuration);
+
+ if(startNfcapdProcesses) {
+ if(!sourceDirectory.canWrite()) {
+ throw new PerfSONARException(
+ "", //TODO error code
+ "Can't write to source
directory."
+ );
+ }
+ }
+
+ Set<Exporter> exporters = getExporters(sourceDirectory,
configuration);
+
+ return new NfcapdControll(exporters, startNfcapdProcesses);
+ }
+ /**
+ * looks up the property from the specified source. returns the
default value
+ * if the source didn't supply one.
+ * @param source
+ * @return boolean if nfcapdControll should start it's own processes
+ */
+ private boolean getShouldStartNfcapdProcesses(ConfigurationComponent
source) {
+ boolean shouldStart = DEFAULT_SHOULDSTARTNFCAPDPROCESSES;
+ try {
+ shouldStart =
Boolean.parseBoolean(source.getProperty(SHOULDSTARTNFCAPDPROCESSES_PROPERTY_KEY));
+ } catch (PerfSONARException e) {
+ logger.debug("No configuration found: using standard value for
should start nfcapd processes");
+ return DEFAULT_SHOULDSTARTNFCAPDPROCESSES;
}
- query.setFilter(new SimpleFilter(""));
- throw new UnsupportedOperationException("not implemented");
- }
- /**
- * converts a message to a StatisticsQuery
- * @param request
- * @return statisticsquery containing the parameter from the request.
- */
- protected StatisticsQuery createStatisticsQueryFrom(Message request) {
- // TODO create query
- throw new UnsupportedOperationException("not implemented");
- }
- /**
- * converts a Message to a Topquery
- * @param request
- * @return topquery
- */
- protected TopQuery createTopQueryFrom(Message request) {
- // TODO create query
- throw new UnsupportedOperationException("not implemented");
- }
+ return shouldStart;
+ }
/**
+ * Get's the exporters from the configuration source. Checks access
to the specified source dir.
+ * @param sourceDir
+ * @param source
+ * @return
+ */
+ private Set<Exporter> getExporters(File sourceDir,
ConfigurationComponent source) {
+ int numberOfExporters;
+ try {
+ numberOfExporters =
Integer.parseInt(source.getProperty(EXPORTERS_PROPERTY_KEY));
+ } catch (PerfSONARException e) {
+ logger.warn("No configuration found: cannot proceed without a
value for exporters");
+ return null;
+ } catch (NumberFormatException e) {
+ logger.warn("Bad configuration: value for exporters could not
be parsed.");
+ return null;
+ }
+
+ Set<Exporter> exporters = new TreeSet<Exporter>();
+ for(int i=1; i<=numberOfExporters; i++) {
+ Exporter exporter = getExporter(i, sourceDir, source);
+ if(exporter!=null) {
+ exporters.add(exporter);
+ } else {
+ logger.warn("FlowsaMAServiceEngine.getExporters:
could not create exporter "+i+".");
+ }
+ }
+ return exporters;
+ }
+ private Exporter getExporter(int i, File sourceDir,
ConfigurationComponent source) {
+ // get required
+ Exporter exporter;
+ try {
+ String host =
source.getProperty(EXPORTER_PROPERTY_PART + i + EXPORTERHOST_PROPERTY_PART);
+ int port =
Integer.parseInt(source.getProperty(EXPORTER_PROPERTY_PART + i +
EXPORTERPORT_PROPERTY_PART));
+ exporter = new Exporter(sourceDir, host, port);
+ } catch (Exception e) {
+ return null;
+ }
+ //TODO set optional properties.
+ return exporter;
+ }
+ /**
* looks up the property from the specified source. returns the
default value
* if the source didn't supply one.
* @param source
@@ -247,10 +394,10 @@
timeoutValue =
Integer.parseInt(source.getProperty(TIMEOUT_PROPERTY_KEY));
} catch (PerfSONARException e) {
logger.debug("No configuration found: using standard MaxResult
for nfDump");
- return DEFAULT_TIMEOUT_VALUE;
+ return DEFAULT_TIMEOUT;
} catch (NumberFormatException e) {
logger.debug("Couldn't parse from configuration: using standard
MaxResult for nfDump");
- return DEFAULT_TIMEOUT_VALUE;
+ return DEFAULT_TIMEOUT;
}
return timeoutValue;
}
@@ -285,24 +432,25 @@
name =
source.getProperty(READERFACTORYCLASSNAME_PROPERTY_KEY);
} catch (PerfSONARException e) {
logger.debug("No configuration found: using standard
ReaderFactory class-name for nfDump");
- return DEFAULT_READERFACTORYCLASSNAME_VALUE;
+ return DEFAULT_READERFACTORYCLASSNAME;
}
return name;
}
/**
- * extracts the event type from the request.
- * @param request
- * @return eventtype
+ * looks up the property from the specified source. returns the
default value
+ * if the source didn't supply one.
+ * @param source
+ * @return property for SourceDirectory
*/
- private String extractEventTypeFrom(Message request) {
- String eventType;
- for(Metadata metadata : request.getMetadataArray()) {
- eventType = metadata.getEventType().getEventType();
- if(eventType!=null) {
- return eventType;
- }
- }
- return null;
+ private File getSourceDirectory(ConfigurationComponent source) {
+ String name;
+ try {
+ name = source.getProperty(SOURCEDIRECTORY_PROPERTY_KEY);
+ } catch (PerfSONARException e) {
+ logger.debug("No configuration found: using standard
ReaderFactory class-name for nfDump");
+ return DEFAULT_SOURCEDIRECTORY;
+ }
+ return new File(name);
}
/**
* gets the configuration component
@@ -335,5 +483,6 @@
underlayingException
);
}
- }
+ }
+
}
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/RawFlowsResponse.java
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/RawFlowsResponse.java
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/RawFlowsResponse.java
2007-09-07 07:27:40 UTC (rev 2783)
@@ -1,35 +1,35 @@
package org.perfsonar.service.measurementArchive.flowsa;
+import java.util.Iterator;
import java.util.List;
import org.ggf.ns.nmwg.base.v2_0.Data;
-import org.ggf.ns.nmwg.base.v2_0.Message;
import org.ggf.ns.nmwg.base.v2_0.Metadata;
import org.ggf.ns.nmwg.tools.flow.v2_0.Datum;
import org.perfsonar.service.measurementArchive.flowsa.nfdump.Flow;
-public class RawFlowsResponse extends Message {
+public class RawFlowsResponse extends FlowsaGetResponse {
private final List<Flow> flows;
public RawFlowsResponse(Metadata[] metadatas, List<Flow> flows) {
+ super(metadatas);
this.flows = flows;
- for(Metadata metadata : metadatas) {
- setMetadata(metadata);
- }
setData(convertToData(flows));
}
private Data convertToData(List<Flow> flows) {
Data data = new Data();
- for(Flow flow : flows) {
- data.addChild(convertToDatum(flow));
- }
+ data.setId("data1");
+ Iterator<Flow> iterator = flows.iterator();
+ while(iterator.hasNext()) {
+ data.addChild(convertToDatum(iterator.next()));
+ iterator.remove(); // free up resources.
+ }
return data;
}
private Datum convertToDatum(Flow flow) {
- Datum datum = new Datum();
-
+ Datum datum = new Datum();
datum.setAdressFamily(flow.getAdressFamily());
datum.setBytes(flow.getBytes());
datum.setDestinationAdress(flow.getDestinationAdress());
@@ -47,7 +47,6 @@
datum.setTimeFirstSeen(flow.getTimeFirstSeen());
datum.setTimeLastSeen(flow.getTimeLastSeen());
datum.setTypeOfService(flow.getTypeOfService());
-
return datum;
}
@@ -55,5 +54,4 @@
return flows;
}
-
}
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpControll.java
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpControll.java
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpControll.java
2007-09-07 07:27:40 UTC (rev 2783)
@@ -5,6 +5,7 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
@@ -36,11 +37,16 @@
public Type call() throws Exception {
Process process = null;
+ NfdumpOutputReader reader = null;
try {
+ System.out.println(Arrays.toString(commands.toArray()));
process = new ProcessBuilder(commands).start();
- NfdumpOutputReader reader =
readerFactory.create(process.getInputStream());
+ reader = readerFactory.create(process.getInputStream());
return call(reader);
} finally {
+ if(reader!=null) {
+ reader.destroy();
+ }
if(process!=null) {
process.destroy();
}
@@ -51,8 +57,7 @@
}
private final NfdumpOutputReaderFactory readerFactory;
private final File nfdumpExecutable;
- private final ExecutorService executorService = Executors
- .newCachedThreadPool(new MyThreadFactory());
+ private final ExecutorService executorService =
Executors.newCachedThreadPool(new MyThreadFactory());
private final static String FORMAT_FLAG = "-o";
private final static String FORMAT_FLAG_VALUE = "pipe6";
private final static String TIME_FLAG = "-t";
@@ -63,8 +68,8 @@
final long timeoutValue;
/**
- * constructs an NfdumpManager wich uses the specified nfdumpExecutable
and
- * choses an
{@link
NfdumpOutputReader} based on the nfdumpVersion String
+ * constructs an NfdumpManager which uses the specified nfdumpExecutable
and
+ * chooses an
{@link
NfdumpOutputReader} based on the nfdumpVersion String
* argument
*
* @param nfdumpExecutable file containing the Executable
@@ -72,7 +77,7 @@
* @param maxResults the max amount of results a query can return
* @param timeoutValue the max time in milliseconds a query can take
* @throws IllegalArgumentException
- * wenn Nfdump executable is not found this should be an
+ * when Nfdump executable is not found this should be an
* configuration issue.
*/
public NfdumpControll(final File nfdumpExecutable, final String
outputReaderFactoryClassName, final int maxResults, final int timeoutValue)
throws IllegalArgumentException {
@@ -92,28 +97,27 @@
* This method takes a RecordQuery, executes it and returns the result.
* @param query to be executed
* @return List of Records
- * @throws InterruptedException wenn waiting for timeout was interrupted
- * @throws ExecutionException wenn something went wrong executing the
query-task
- * @throws TimeoutException wenn the query-task took too long
+ * @throws InterruptedException when waiting for timeout was interrupted
+ * @throws ExecutionException when something went wrong executing the
query-task
+ * @throws TimeoutException when the query-task took too long
*/
public List<Flow> execute(RawFlowsQuery query) throws
InterruptedException, ExecutionException, TimeoutException {
- List<String> commands = createCommandsFrom(query);
+ List<String> commands;
+ try {
+ commands = createCommandsFrom(query);
+ } catch (QueryException e) {
+ throw new ExecutionException(e);
+ }
Callable<List<Flow>> task = new
NfdumpTask<List<Flow>>(commands,readerFactory) {
@Override
List<Flow> call(NfdumpOutputReader reader) throws Exception {
List<Flow> result = new ArrayList<Flow>();
Flow record = null;
- int i = 0;
while(( record = reader.readRecord()) != null) {
if(result.size() > maxResults) {
throw new QueryException("Query returned to many
results");
}
- i++;
- if(i>=100) {
- System.out.println("+");
- i = 0;
- }
result.add(record);
}
return result;
@@ -131,7 +135,12 @@
* @throws TimeoutException wenn the query-task took too long
*/
public Statistics execute(StatisticsQuery query) throws
InterruptedException, ExecutionException, TimeoutException {
- List<String> commands = createCommandsFrom(query);
+ List<String> commands;
+ try {
+ commands = createCommandsFrom(query);
+ } catch (QueryException e) {
+ throw new ExecutionException(e);
+ }
Callable<Statistics> task = new
NfdumpTask<Statistics>(commands,readerFactory) {
@Override
@@ -216,36 +225,28 @@
* to be converted
* @return a List of commands(String) that can be passed onto a
* Processbuilder
+ * @throws QueryException
* @see RawFlowsQuery
*/
- protected List<String> createCommandsFrom(RawFlowsQuery query) {
+ protected List<String> createCommandsFrom(RawFlowsQuery query) throws
QueryException {
List<String> commands = createCommandsForRecordQuery();
- // commands.add(TIME_FLAG);
+ //commands.add(TIME_FLAG);
- // if(task.getTarget().length == 0) {
- // throw new MalformedTaskException("");
- // }
-
- if (query.getTarget().length != 1) {
- for (File target : query.getTarget()) {
- commands.add("-R");
- commands.add(target.getAbsolutePath());
- }
- } else {
- commands.add("-r");
- commands.add(query.getTarget()[0].getAbsolutePath());
- }
- if (query.getAggegrationRule() != null) {
+ commands.add("-M");
+ commands.add(getDirectoryCommandFrom(query));
+ commands.add("-R");
+ commands.add(".");
+ if (query.getAggegrationRule() != null ) {
commands.add("-A");
commands.add(query.getAggegrationRule().getExpression());
}
- if (query.getFilter() != null) {
+ if (query.getFilter() != null) {
commands.add(query.getFilter().getExpression());
}
return commands;
}
- /**
+ /**
* convenience method for creating a command list for processbuilder this
* method creates one based upon a StatisticsQuery takes
*
{@link
#createCommandsForStatisticQuery()} as a base
@@ -254,26 +255,18 @@
* to be converted
* @return a List of commands(String) that can be passed onto a
* Processbuilder
+ * @throws QueryException
* @see StatisticsQuery
*/
- protected List<String> createCommandsFrom(StatisticsQuery query) {
+ protected List<String> createCommandsFrom(StatisticsQuery query) throws
QueryException {
List<String> commands = createCommandsForStatisticQuery();
- // commands.add(TIME_FLAG);
+ //commands.add(TIME_FLAG);
- // if(task.getTarget().length == 0) {
- // throw new MalformedTaskException("");
- // }
+ commands.add("-M");
+ commands.add(getDirectoryCommandFrom(query));
+ commands.add("-R");
+ commands.add(".");
- if (query.getTarget().length != 1) {
- for (File target : query.getTarget()) {
- commands.add("-R");
- commands.add(target.getAbsolutePath());
- }
- } else {
- commands.add("-r");
- commands.add(query.getTarget()[0].getAbsolutePath());
- }
-
return commands;
}
/**
@@ -300,6 +293,26 @@
Class factoryClass = Class.forName(className);
return (NfdumpOutputReaderFactory) factoryClass.newInstance();
}
+
+ private String getDirectoryCommandFrom(NfdumpQuery query) throws
QueryException {
+ String command = null;
+ for(String directory : query.getDirectories()) {
+ if(command == null) {
+ command = directory;
+ break;
+ }
+ String[] parts = directory.split("/");
+ if(parts.length==1) {
+ parts = directory.split("\\");
+ }
+ command += ":" + parts[parts.length-1];
+ }
+ if(command != null) {
+ return command;
+ }
+ throw new QueryException("Query didn't contain any directories to
target");
+ }
+
/**
* Mostly unimporant class, it simply ensures that the thread priority
is not
* higher than the thread that initiated. It also sets names so that
when
@@ -317,8 +330,9 @@
}
public synchronized Thread newThread(Runnable target) {
+ i++;
return new Thread(threadGroup, target, "nfdump-worker-thread
"+i);
}
}
-
-}
+
+}
\ No newline at end of file
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpOutputReader.java
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpOutputReader.java
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpOutputReader.java
2007-09-07 07:27:40 UTC (rev 2783)
@@ -18,9 +18,32 @@
* @version $id$
*/
public interface NfdumpOutputReader {
+ /**
+ * reads a record from the stream.
+ *
+ * @return the record encountered at stream or null if end of stream has
+ * been reached
+ * @see
org.perfsonar.service.measurementArchive.flowsa.nfdump.NfdumpOutputReader#readRecord()
+ * @throws IOException throws an IOException if the reader couldn't
parse the input
+ */
public Flow readRecord() throws IOException;
+ /**
+ * @return
+ * @throws IOException
+ */
public Statistics readStatistics() throws IOException;
+ /**
+ * @return
+ * @throws IOException
+ */
public TopData readTopData() throws IOException;
+
+ /**
+ * Method to allow the reader to clean up resources such as
+ * buffers, and flush the stream read from.
+ * @throws IOException
+ */
+ public void destroy() throws IOException;
}
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpQuery.java
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpQuery.java
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/NfdumpQuery.java
2007-09-07 07:27:40 UTC (rev 2783)
@@ -1,7 +1,8 @@
package org.perfsonar.service.measurementArchive.flowsa.nfdump;
-import java.io.File;
import java.util.Date;
+import java.util.Set;
+import java.util.SortedSet;
/**
* A abstract representation of a query for nfdump
@@ -11,21 +12,20 @@
*/
public abstract class NfdumpQuery {
- private File[] target;
+ private Set<String> directories;
+ private Set<String> files;
private Date startTime;
private Date endTime;
public NfdumpQuery() {
/* default no arg constructor */
}
- public NfdumpQuery(Date startTime, Date endTime, File... target) {
+ public NfdumpQuery(Date startTime, Date endTime, Set<String>
directories, SortedSet<String> files) {
this.startTime = startTime;
this.endTime = endTime;
- this.target = target;
+ this.directories = directories;
+ this.files = files;
}
- public File[] getTarget() {
- return target;
- }
public Date getEndTime() {
return endTime;
}
@@ -38,7 +38,16 @@
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
- public void setTarget(File... target) {
- this.target = target;
- }
+ public Set<String> getDirectories() {
+ return directories;
+ }
+ public void setDirectories(Set<String> directories) {
+ this.directories = directories;
+ }
+ public Set<String> getFiles() {
+ return files;
+ }
+ public void setFiles(Set<String> files) {
+ this.files = files;
+ }
}
\ No newline at end of file
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/RawFlowsQuery.java
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/RawFlowsQuery.java
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/RawFlowsQuery.java
2007-09-07 07:27:40 UTC (rev 2783)
@@ -1,7 +1,8 @@
package org.perfsonar.service.measurementArchive.flowsa.nfdump;
-import java.io.File;
import java.util.Date;
+import java.util.Set;
+import java.util.SortedSet;
/**
* Rawflows implementation of a
{@link
NfdumpQuery}
@@ -19,8 +20,8 @@
}
public RawFlowsQuery(Date startTime, Date endTime, Filter filter,
- AggegrationRule aggegrationRule, File... target) {
- super(startTime,endTime,target);
+ AggegrationRule aggegrationRule, Set<String> directories,
SortedSet<String> files) {
+ super(startTime,endTime,directories,files);
this.filter = filter;
this.aggegrationRule = aggegrationRule;
}
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/Statistics.java
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/Statistics.java
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/Statistics.java
2007-09-07 07:27:40 UTC (rev 2783)
@@ -4,8 +4,148 @@
* A data object/ java bean that holds statistics data
*
* @author Michael Bischoff
- * @version $id$
+ * @version $Id$
*/
public class Statistics {
- //TODO implement bean
+ private String flows;
+ private String flowsTcp;
+ private String flowsUdp;
+ private String flowsIcmp;
+ private String flowsOther;
+ private String packets;
+ private String packetsTcp;
+ private String packetsUdp;
+ private String packetsIcmp;
+ private String packetsOther;
+ private String bytes;
+ private String bytesTcp;
+ private String bytesUdp;
+ private String bytesIcmp;
+ private String bytesOther;
+ private String first;
+ private String last;
+ private String msecFirst;
+ private String msecLast;
+ private String sequenceFailures;
+
+ public String getFlows() {
+ return flows;
+ }
+ public void setFlows(String flows) {
+ this.flows = flows;
+ }
+ public String getFlowsTcp() {
+ return flowsTcp;
+ }
+ public void setFlowsTcp(String flowsTcp) {
+ this.flowsTcp = flowsTcp;
+ }
+ public String getFlowsUdp() {
+ return flowsUdp;
+ }
+ public void setFlowsUdp(String flowsUdp) {
+ this.flowsUdp = flowsUdp;
+ }
+ public String getFlowsIcmp() {
+ return flowsIcmp;
+ }
+ public void setFlowsIcmp(String flowsIcmp) {
+ this.flowsIcmp = flowsIcmp;
+ }
+ public String getFlowsOther() {
+ return flowsOther;
+ }
+ public void setFlowsOther(String flowsOther) {
+ this.flowsOther = flowsOther;
+ }
+ public String getPackets() {
+ return packets;
+ }
+ public void setPackets(String packets) {
+ this.packets = packets;
+ }
+ public String getPacketsTcp() {
+ return packetsTcp;
+ }
+ public void setPacketsTcp(String packetsTcp) {
+ this.packetsTcp = packetsTcp;
+ }
+ public String getPacketsUdp() {
+ return packetsUdp;
+ }
+ public void setPacketsUdp(String packetsUdp) {
+ this.packetsUdp = packetsUdp;
+ }
+ public String getPacketsIcmp() {
+ return packetsIcmp;
+ }
+ public void setPacketsIcmp(String packetsIcmp) {
+ this.packetsIcmp = packetsIcmp;
+ }
+ public String getPacketsOther() {
+ return packetsOther;
+ }
+ public void setPacketsOther(String packetsOther) {
+ this.packetsOther = packetsOther;
+ }
+ public String getBytes() {
+ return bytes;
+ }
+ public void setBytes(String bytes) {
+ this.bytes = bytes;
+ }
+ public String getBytesTcp() {
+ return bytesTcp;
+ }
+ public void setBytesTcp(String bytesTcp) {
+ this.bytesTcp = bytesTcp;
+ }
+ public String getBytesUdp() {
+ return bytesUdp;
+ }
+ public void setBytesUdp(String bytesUdp) {
+ this.bytesUdp = bytesUdp;
+ }
+ public String getBytesIcmp() {
+ return bytesIcmp;
+ }
+ public void setBytesIcmp(String bytesIcmp) {
+ this.bytesIcmp = bytesIcmp;
+ }
+ public String getBytesOther() {
+ return bytesOther;
+ }
+ public void setBytesOther(String bytesOther) {
+ this.bytesOther = bytesOther;
+ }
+ public String getFirst() {
+ return first;
+ }
+ public void setFirst(String first) {
+ this.first = first;
+ }
+ public String getLast() {
+ return last;
+ }
+ public void setLast(String last) {
+ this.last = last;
+ }
+ public String getMsecFirst() {
+ return msecFirst;
+ }
+ public void setMsecFirst(String msecFirst) {
+ this.msecFirst = msecFirst;
+ }
+ public String getMsecLast() {
+ return msecLast;
+ }
+ public void setMsecLast(String msecLast) {
+ this.msecLast = msecLast;
+ }
+ public String getSequenceFailures() {
+ return sequenceFailures;
+ }
+ public void setSequenceFailures(String sequenceFailures) {
+ this.sequenceFailures = sequenceFailures;
+ }
}
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/StatisticsQuery.java
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/StatisticsQuery.java
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/StatisticsQuery.java
2007-09-07 07:27:40 UTC (rev 2783)
@@ -1,5 +1,9 @@
package org.perfsonar.service.measurementArchive.flowsa.nfdump;
+import java.util.Date;
+import java.util.Set;
+import java.util.SortedSet;
+
/**
* An implementation of
{@link
NfdumpQuery} for Statistics
*
@@ -7,5 +11,7 @@
* @version $id$
*/
public class StatisticsQuery extends NfdumpQuery {
- //TODO implement Query
+ public StatisticsQuery(Date startTime, Date endTime, Set<String>
directories,SortedSet<String> files) {
+ super(startTime, endTime, directories, files);
+ }
}
Modified:
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/outputreaders/Nfdump152OutputReaderImpl.java
===================================================================
---
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/outputreaders/Nfdump152OutputReaderImpl.java
2007-09-07 07:23:44 UTC (rev 2782)
+++
branches/new-structure/trunk/surfnet_java-flowsa-ma/src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/outputreaders/Nfdump152OutputReaderImpl.java
2007-09-07 07:27:40 UTC (rev 2783)
@@ -35,11 +35,7 @@
myReader = new BufferedReader(isr);
}
- /**
- * reads a record from the stream.
- *
- * @return the record encountered at stream or null if end of stream has
- * been reached
+ /* (non-Javadoc)
* @see
org.perfsonar.service.measurementArchive.flowsa.nfdump.NfdumpOutputReader#readRecord()
*/
public Flow readRecord() throws IOException {
@@ -50,11 +46,115 @@
return null;
}
+ /* (non-Javadoc)
+ * @see
org.perfsonar.service.measurementArchive.flowsa.nfdump.NfdumpOutputReader#readStatistics()
+ */
public Statistics readStatistics() throws IOException {
- // TODO implement
- throw new IOException("Cannot read nfdump output: Number of Lines
did not match");
+ Statistics statistics = new Statistics();
+ String line;
+ for(int lineNumber = 1;(line = myReader.readLine()) !=
null;lineNumber++) {
+ String[] parts = line.trim().split(":");
+ check(parts.length == 2);
+ switch(lineNumber) {
+ case 1:
+ check(parts[0].equals("Ident"));
+ break;
+ case 2:
+ check(parts[0].equals("Flows"));
+ statistics.setFlows(parts[1].trim());
+ break;
+ case 3:
+ check(parts[0].equals("Flows_tcp"));
+ statistics.setFlowsTcp(parts[1].trim());
+ break;
+ case 4:
+ check(parts[0].equals("Flows_udp"));
+ statistics.setFlowsUdp(parts[1].trim());
+ break;
+ case 5:
+ check(parts[0].equals("Flows_icmp"));
+ statistics.setFlowsIcmp(parts[1].trim());
+ break;
+ case 6:
+ check(parts[0].equals("Flows_other"));
+ statistics.setFlowsOther(parts[1].trim());
+ break;
+ case 7:
+ check(parts[0].equals("Packets"));
+ statistics.setPackets(parts[1].trim());
+ break;
+ case 8:
+ check(parts[0].equals("Packets_tcp"));
+ statistics.setPacketsTcp(parts[1].trim());
+ break;
+ case 9:
+ check(parts[0].equals("Packets_udp"));
+ statistics.setPacketsUdp(parts[1].trim());
+ break;
+ case 10:
+ check(parts[0].equals("Packets_icmp"));
+ statistics.setPacketsIcmp(parts[1].trim());
+ break;
+ case 11:
+ check(parts[0].equals("Packets_other"));
+ statistics.setPacketsOther(parts[1].trim());
+ break;
+ case 12:
+ check(parts[0].equals("Bytes"));
+ statistics.setBytes(parts[1].trim());
+ break;
+ case 13:
+ check(parts[0].equals("Bytes_tcp"));
+ statistics.setBytesTcp(parts[1].trim());
+ break;
+ case 14:
+ check(parts[0].equals("Bytes_udp"));
+ statistics.setBytesUdp(parts[1].trim());
+ break;
+ case 15:
+ check(parts[0].equals("Bytes_icmp"));
+ statistics.setBytesIcmp(parts[1].trim());
+ break;
+ case 16:
+ check(parts[0].equals("Bytes_other"));
+ statistics.setBytesOther(parts[1].trim());
+ break;
+ case 17:
+ check(parts[0].equals("First"));
+ statistics.setFirst(parts[1].trim());
+ break;
+ case 18:
+ check(parts[0].equals("Last"));
+ statistics.setLast(parts[1].trim());
+ break;
+ case 19:
+ check(parts[0].equals("msec_first"));
+ statistics.setMsecFirst(parts[1].trim());
+ break;
+ case 20:
+ check(parts[0].equals("msec_last"));
+ statistics.setMsecLast(parts[1].trim());
+ break;
+ case 21:
+ check(parts[0].equals("Sequence failures"));
+
statistics.setSequenceFailures(parts[1].trim());
+ return statistics;
+ default:
+ throw new IOException("Cannot read nfdump
output: Number of Lines did not match");
+ }
+ }
+ throw new IOException("Cannot read nfdump output: Number of Lines did
not match");
}
+ private void check(boolean testValue) throws IOException {
+ if(testValue == false) {
+ throw new IOException("Cannot read nfdump output:
unexpected output");
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see
org.perfsonar.service.measurementArchive.flowsa.nfdump.NfdumpOutputReader#readTopData()
+ */
public TopData readTopData() throws IOException {
// TODO implement
throw new IOException("Cannot read nfdump output: Number of Lines
did not match");
@@ -89,5 +189,12 @@
record.setBytes(values[23]);
return record;
}
+
+ /* (non-Javadoc)
+ * @see
org.perfsonar.service.measurementArchive.flowsa.nfdump.NfdumpOutputReader#destroy()
+ */
+ public void destroy() throws IOException {
+ myReader.close();
+ }
}
- perfsonar: r2783 - in branches/new-structure/trunk/surfnet_java-flowsa-ma: . conf conf/axis-1.4/WEB-INF doc src/main/java/org/perfsonar/service src/main/java/org/perfsonar/service/measurementArchive/flowsa src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump src/main/java/org/perfsonar/service/measurementArchive/flowsa/nfdump/outputreaders, svnlog, 09/07/2007
Archive powered by MHonArc 2.6.16.