Skip to Content.
Sympa Menu

perfsonar-dev - perfsonar: r3942 - trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription

Subject: perfsonar development work

List archive

perfsonar: r3942 - trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription


Chronological Thread 
  • From:
  • To:
  • Subject: perfsonar: r3942 - trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription
  • Date: Mon, 2 Jun 2008 19:41:56 -0400

Author: michael.bischoff
Date: 2008-06-02 19:41:56 -0400 (Mon, 02 Jun 2008)
New Revision: 3942

Modified:

trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowMpRequest.java

trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/NfReplayControll.java

trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/Pipe.java

trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/SubscriptionService.java
Log:
Fixed pipe problem
Fixed no data being exported
improved logging.

improved overall robustness

Modified:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowMpRequest.java
===================================================================
---
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowMpRequest.java
2008-06-02 15:34:06 UTC (rev 3941)
+++
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowMpRequest.java
2008-06-02 23:41:56 UTC (rev 3942)
@@ -46,8 +46,7 @@

key = filterKey(request);

- logger.debug("common params: " + params);
- logger.debug("routers: " + routers);
+ logger.debug("FlowMpRequest#FlowMpRequest(): Created request
common params: " + params + " routers: " + routers);
}

/**
@@ -67,9 +66,7 @@
// the first MD block contains information common to
the router
// selection information
if (meta.getParameters() != null &&
meta.getParameters() instanceof
org.ggf.ns.nmwg.tools.netflow.v2_0.Parameters) {
- logger.debug("Common router selection
information:");
for (Parameter param :
((org.ggf.ns.nmwg.tools.netflow.v2_0.Parameters)
meta.getParameters()).getParameterArray()) {
- logger.debug("" +
param.getParameterName() + " = "+ param.getParameterValue());

parameters.put(param.getParameterName(), param.getParameterValue());
}
}
@@ -92,8 +89,7 @@
for (Metadata meta : request.getMetadataArray()) {
if (meta.getSubject() != null && meta.getSubject()
instanceof org.ggf.ns.nmwg.tools.netflow.v2_0.Subject) {
Subject subj =
(org.ggf.ns.nmwg.tools.netflow.v2_0.Subject) meta.getSubject();
- logger.debug("" +
subj.getRouter().getName().getName());
-
+
// add the routers to the list by name

specifiedRouters.add(subj.getRouter().getName().getName());
}

Modified:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/NfReplayControll.java
===================================================================
---
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/NfReplayControll.java
2008-06-02 15:34:06 UTC (rev 3941)
+++
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/NfReplayControll.java
2008-06-02 23:41:56 UTC (rev 3942)
@@ -63,9 +63,7 @@
} catch (InterruptedException e) {
logger.warn("ending polling directory");
} finally {
- if(stream!=null) {
- close(stream);
- }
+ if(stream!=null) close(stream);
if(nfcapdProcess!=null)
nfcapdProcess.destroy();
}
}
@@ -117,7 +115,7 @@
interestedSubscribers.add(subscription);
}
while((subscription = removalQueue.poll())!=null) {
- interestedSubscribers.add(subscription);
+ interestedSubscribers.remove(subscription);
}
}

@@ -138,17 +136,21 @@
List<Integer> resultValues = new ArrayList<Integer>();
for(int i=0;i<submitted;i++) {
try {
- resultValues.add(task.take().get());
+ Integer value = task.take().get();
+ resultValues.add(value);
} catch (ExecutionException e) {
+ e.printStackTrace(); //FIXME remove
+
if(e.getCause() != null) {

logger.error("processNewFile() nfreplaytask failed " +
e.getCause().getLocalizedMessage());
} else {

logger.error("processNewFile() nfreplaytask failed " +
e.getLocalizedMessage() );
}
+ resultValues.add(Integer.valueOf(-1));
}
}
- logger.debug("processNewFile() Finished processing
file, nfreplaytask's exit values are:
"+Arrays.toString(resultValues.toArray()));
- if(!target.delete()) logger.error("processNewFile()
Couldn't delete file("+target+") after processing it");
+ logger.debug("processNewFile() Finished processing
file, nfreplaytask's exit values are: "+resultValues);
+ if(!target.delete()) logger.debug("processNewFile()
Couldn't delete file("+target+") after processing it");
else logger.debug("processNewFile() deleted "+
target) ;
}

@@ -182,6 +184,9 @@
if(files==null) {
return new File[0]; // always return a empty
array never null.
}
+ if(files.length == 0) {
+ return files;
+ }
Arrays.sort(files); // TODO ensure right sorting (on
windows, but there is no nfdump for windows)

logger.debug("NfReplayControll.ExportingRouter.getTarget() selected files: "+
Arrays.toString(files));
return files;
@@ -207,7 +212,7 @@
builder2 = new ProcessBuilder();
commands = builder2.command();
}
-
+
commands.add(nfreplayExecutable);
commands.add("-p");
commands.add(String.valueOf(port));
@@ -215,7 +220,7 @@
commands.add("-r");
commands.add(target.getAbsolutePath());
}
- if(filter!=null) {
+ if(filter!=null && !filter.equals("")) {
commands.add(filter);
}
}
@@ -232,26 +237,58 @@
return commands;
}

+ /**
+ * @return exitvalue of the nfreplay process or if it's piped:
+ * nfdumpexitValue * 1000 + nfreplay since exitvalue should
be
+ * below 255 it should be save.
+ *
+ * @throws IOException,
{@link
Exception}
+ */
public Integer call() throws Exception {
Process proces = null;
Process pipeTarget = null;
+ Pipe pipe = null;
+ IOException pipeIOException = null;
try {
String commands =
Arrays.toString(builder1.command().toArray());
+ if(builder2!=null) {
+ commands += " piped into " +
Arrays.toString(builder2.command().toArray());
+ }
+ logger.debug("Executing replay: " + commands);
+
proces = builder1.start();
if(builder2!=null) {
pipeTarget = builder2.start();
- commands += " piped into " +
Arrays.toString(builder2.command().toArray());
- new Pipe(proces,pipeTarget).call();

+ try {
+ pipe = new
Pipe(proces,pipeTarget);
+ int bytesWrittenCount =
pipe.call().intValue();
+
logger.debug("NfReplayTask#call() data piped: " + bytesWrittenCount/1024 +
"kb written");
+ } catch (IOException e) {
+ pipeIOException = e;
+ }
}
- logger.debug("Executing replay: " + commands);
- return Integer.valueOf(proces.waitFor());
+
+ Integer resultValue =
Integer.valueOf(proces.waitFor());
+
+ if(pipeTarget!=null) {
+ resultValue =
Integer.valueOf(pipeTarget.waitFor() + resultValue.intValue() * 1000);
+ }
+
+ if(pipeIOException!=null) {
+ throw new Exception("IOException
exception occoured, exitvalues of the processes are "+ resultValue,
pipeIOException);
+ }
+
+ return resultValue;
} finally {
- if(proces != null) {
- proces.destroy();
+ if(proces != null) proces.destroy();
+ if(pipeTarget != null) pipeTarget.destroy();
+ if(pipe != null) {
+ try {
+ pipe.destroy();
+ } catch (IOException e) {
+ logger.debug("Could not
destroy pipe ("+ e.getLocalizedMessage() + ")");
+ }
}
- if(pipeTarget != null) {
- pipeTarget.destroy();
- }
}
}


Modified:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/Pipe.java
===================================================================
---
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/Pipe.java
2008-06-02 15:34:06 UTC (rev 3941)
+++
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/Pipe.java
2008-06-02 23:41:56 UTC (rev 3942)
@@ -1,6 +1,8 @@
package org.perfsonar.service.measurementPoint.flowsubscription;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Callable;
@@ -11,12 +13,12 @@
*
* @author michael.bischoff
*/
-public class Pipe implements Callable<Boolean> {
+public class Pipe implements Callable<Integer> {

private final InputStream source;
private final OutputStream target;
- private final int flushAfterThisManyBytes;
-
+// private final int flushAfterThisManyBytes;
+
/**
* Default constructor flushes after 128 bytes.
* @param source Process to read stdout from
@@ -33,33 +35,47 @@
*/
public Pipe(Process source, Process target, int
flushAfterThisManyBytes) {
this.source = new
BufferedInputStream(source.getInputStream());
- this.target = new
BufferedOutputStream(target.getOutputStream());
- this.flushAfterThisManyBytes = flushAfterThisManyBytes;
+ this.target = new
BufferedOutputStream(target.getOutputStream(),flushAfterThisManyBytes);
+// this.flushAfterThisManyBytes = flushAfterThisManyBytes;
}

/* (non-Javadoc)
* @see java.util.concurrent.Callable#call()
*/
- public Boolean call() throws Exception {
+ public Integer call() throws Exception {
int i;
- int bytesWrittenSinceLastFlush = 0;
+ int bytesWrittenCount = 0;
+
while ((i = source.read())!=-1) {
// allow this task to get Stopped quickly.
if(Thread.interrupted()) {
target.flush();
throw new InterruptedException();
- }
- // flush to not overload buffers of the receiving
party.
- bytesWrittenSinceLastFlush++;
-
if(bytesWrittenSinceLastFlush>flushAfterThisManyBytes) {
- target.flush();
- bytesWrittenSinceLastFlush = 0;
- }
+ }
+ target.write(i);

- target.write(i);
+ bytesWrittenCount++;
+ //TODO evaluate need
+// if(bytesWrittenCount >= flushAfterThisManyBytes) {
+// bytesWrittenCount = 0;
+// target.flush();
+// }
}
- target.flush();
- return Boolean.TRUE;
+
+ destroy();
+ return Integer.valueOf(bytesWrittenCount);
}

-}
+ public void destroy() throws IOException {
+ try {
+ source.close();
+ } finally {
+ try {
+ target.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Destroying pipe
failed, couldn't close target stream and closing the source stream might have
failed too",e);
+ }
+ }
+
+ }
+}
\ No newline at end of file

Modified:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/SubscriptionService.java
===================================================================
---
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/SubscriptionService.java
2008-06-02 15:34:06 UTC (rev 3941)
+++
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/SubscriptionService.java
2008-06-02 23:41:56 UTC (rev 3942)
@@ -21,7 +21,7 @@
protected final class TimeoutChecker implements Runnable {

public TimeoutChecker() {
- new Thread(this).start();
+ new Thread(this,"TimeoutChecker").start();
}

public void run() {



  • perfsonar: r3942 - trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription, svnlog, 06/02/2008

Archive powered by MHonArc 2.6.16.

Top of Page