perfsonar-dev - perfsonar: r3840 - in trunk/surfnet_java-flowsubscription-mp: . conf src/main/java/org/perfsonar/service/measurementPoint/flowsubscription
Subject: perfsonar development work
List archive
perfsonar: r3840 - in trunk/surfnet_java-flowsubscription-mp: . conf src/main/java/org/perfsonar/service/measurementPoint/flowsubscription
Chronological Thread
- From:
- To:
- Subject: perfsonar: r3840 - in trunk/surfnet_java-flowsubscription-mp: . conf src/main/java/org/perfsonar/service/measurementPoint/flowsubscription
- Date: Thu, 15 May 2008 16:29:29 -0400
Author: michael.bischoff
Date: 2008-05-15 16:29:29 -0400 (Thu, 15 May 2008)
New Revision: 3840
Added:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowMpRequest.java
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowMpResponse.java
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/MessageType.java
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/NamedthreadFactory.java
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/NfReplayControll.java
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/Subscription.java
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/SubscriptionService.java
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/ZebedeeTunnel.java
Removed:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/ConfigFileWriter.java
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowTypeConstants.java
Modified:
trunk/surfnet_java-flowsubscription-mp/.classpath
trunk/surfnet_java-flowsubscription-mp/.project
trunk/surfnet_java-flowsubscription-mp/conf/service.properties
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowTypeMPServiceEngine.java
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/ZebedeeControl.java
Log:
-Massive overhaul, refactored, removed server-side python dependency
(contrib/manager can be removed)
-clean up of code
-simplyfied configuration by using the standard servlet temp directory for
nfcapd files.
other small changes
Modified: trunk/surfnet_java-flowsubscription-mp/.classpath
===================================================================
--- trunk/surfnet_java-flowsubscription-mp/.classpath 2008-05-13 14:51:44
UTC (rev 3839)
+++ trunk/surfnet_java-flowsubscription-mp/.classpath 2008-05-15 20:29:29
UTC (rev 3840)
@@ -1,7 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry including="**/*.java" kind="src"
path="src/main/java"/>
- <classpathentry kind="con"
path="org.devzuz.q.maven.jdt.core.mavenClasspathContainer"/>
+ <classpathentry kind="con"
path="org.devzuz.q.maven.jdt.core.mavenClasspathContainer">
+ <attributes>
+ <attribute
name="org.eclipse.jst.component.dependency" value="/WEB-INF/lib"/>
+ </attributes>
+ </classpathentry>
<classpathentry kind="con"
path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Modified: trunk/surfnet_java-flowsubscription-mp/.project
===================================================================
--- trunk/surfnet_java-flowsubscription-mp/.project 2008-05-13 14:51:44
UTC (rev 3839)
+++ trunk/surfnet_java-flowsubscription-mp/.project 2008-05-15 20:29:29
UTC (rev 3840)
@@ -16,14 +16,20 @@
</arguments>
</buildCommand>
<buildCommand>
+
<name>edu.umd.cs.findbugs.plugin.eclipse.findbugsBuilder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
+ <buildCommand>
<name>org.devzuz.q.maven.jdt.core.mavenIncrementalBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
+ <nature>org.python.pydev.pythonNature</nature>
<nature>org.devzuz.q.maven.jdt.core.mavenNature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
- <nature>org.python.pydev.pythonNature</nature>
+
<nature>edu.umd.cs.findbugs.plugin.eclipse.findbugsNature</nature>
</natures>
</projectDescription>
Modified: trunk/surfnet_java-flowsubscription-mp/conf/service.properties
===================================================================
--- trunk/surfnet_java-flowsubscription-mp/conf/service.properties
2008-05-13 14:51:44 UTC (rev 3839)
+++ trunk/surfnet_java-flowsubscription-mp/conf/service.properties
2008-05-15 20:29:29 UTC (rev 3840)
@@ -16,9 +16,10 @@
# === Measurement Archive general properties ===
#Group=MaGeneral
-service.mp.class_name=org.perfsonar.service.measurementPoint.flowsubscription.FlowsubscriptionMPServiceEngine
+service.mp.class_name=org.perfsonar.service.measurementPoint.flowsubscription.FlowTypeMPServiceEngine
#Group=MaGeneral
-service.mp.message_types=SetupDataRequest,SubscriptionRequest
+#service.mp.message_types=SetupDataRequest
+service.mp.message_types=SubscriptionRequest,UnSubscriptionRequest,SubscriptionKeepaliveRequest
#Group=MaGeneral
service.mp.conf_file=config.xml
@@ -38,8 +39,6 @@
service.as.authn_for_msg_types=SetupDataRequest
service.as.point=http://homer.rediris.es:8080/perfSONAR-AS/services/AuthService
-
-
#=== Flow subscription specific config shizzle ===
# port (and up) that the zebedee client should receive data at
@@ -58,17 +57,10 @@
# at what rate is the netflow data sampled
service.mp.flow.sample_rate = 100
-# in what directory does FlowFork life
-service.mp.flow.base_dir=/home/gijs/flowfork
-
-# where are the client xml files with connection data
-service.mp.flow.client_dir=/home/lingo/clients/
-
# maximum client netflow subscriptions
service.mp.flow.max_conn = 20
### network layout
-
service.mp.flow.router.0.name=arthur
service.mp.flow.router.0.enabled=true
service.mp.flow.router.0.address=145.145.127.1
@@ -76,7 +68,7 @@
service.mp.flow.router.0.interfaces=1,2,3,4,5,6
service.mp.flow.router.1.name=ford
-service.mp.flow.router.1.enabled=true,
+service.mp.flow.router.1.enabled=true
service.mp.flow.router.1.address=145.145.127.2
service.mp.flow.router.1.flowport=29002
service.mp.flow.router.1.interfaces=1,2,3,4,5,6
@@ -98,5 +90,3 @@
service.mp.flow.nfdump=/usr/local/bin/nfdump
service.mp.flow.nfcapd=/usr/local/bin/nfcapd
service.mp.flow.nfreplay=/usr/local/bin/nfreplay
-
-
Deleted:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/ConfigFileWriter.java
Added:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowMpRequest.java
Added:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowMpResponse.java
Deleted:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowTypeConstants.java
Modified:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowTypeMPServiceEngine.java
===================================================================
---
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowTypeMPServiceEngine.java
2008-05-13 14:51:44 UTC (rev 3839)
+++
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/FlowTypeMPServiceEngine.java
2008-05-15 20:29:29 UTC (rev 3840)
@@ -1,77 +1,49 @@
-/*
- * Created on 21-Feb-2006
- * Version Number: $Id: SNMPTypeMPServiceEngine.java 1128 2006-05-02
11:26:44Z loukik $
- * Project : perfSONAR
- */
package org.perfsonar.service.measurementPoint.flowsubscription;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Hashtable;
+import static
org.perfsonar.service.measurementPoint.flowsubscription.FlowMpResponse.FLOW_KEEPALIVE_RESPONSE;
+import static
org.perfsonar.service.measurementPoint.flowsubscription.FlowMpResponse.FLOW_SUBSCRIBE_RESPONSE;
+import static
org.perfsonar.service.measurementPoint.flowsubscription.FlowMpResponse.FLOW_UNSUBSCRIBE_RESPONSE;
-import org.ggf.ns.nmwg.base.v2_0.Data;
-import org.ggf.ns.nmwg.base.v2_0.Datum;
-import org.ggf.ns.nmwg.base.v2_0.EventType;
-import org.ggf.ns.nmwg.base.v2_0.Key;
+import java.util.List;
+
import org.ggf.ns.nmwg.base.v2_0.Message;
-import org.ggf.ns.nmwg.base.v2_0.Metadata;
-import org.ggf.ns.nmwg.base.v2_0.Parameter;
-import org.ggf.ns.nmwg.tools.netflow.v2_0.Subject;
import org.perfsonar.base.auxiliary.AuxiliaryComponentManager;
import org.perfsonar.base.auxiliary.ComponentNames;
-import
org.perfsonar.base.auxiliary.components.configuration.ConfigurationComponent;
import org.perfsonar.base.auxiliary.components.logger.LoggerComponent;
-import org.perfsonar.service.base.engine.ServiceEngine;
import org.perfsonar.base.exceptions.PerfSONARException;
+import org.perfsonar.service.base.engine.ServiceEngine;
/**
* Service Engine implementation for flow MP.
*
* @author Joost de Mes
+ * @author michael.bischoff
*
*/
public class FlowTypeMPServiceEngine implements ServiceEngine {
- // ----------------------------Member Variables
/**
- * The type of this ServiceEngine.
- */
- public String serviceEngineType;
-
- /**
* Logger component.
*/
- private LoggerComponent logger = null;
-
+ private final LoggerComponent logger;
/**
- * Configuration component.
+ * the subscription service
*/
- private ConfigurationComponent configuration = null;
-
- // -----------------------------Constructor
+ private final static SubscriptionService subscriptionService = new
SubscriptionService();
+
/**
* Instantiates the FlowTypeMPServiceEngine.
*/
public FlowTypeMPServiceEngine() throws PerfSONARException {
-
- this.serviceEngineType = "FlowTypeMPServiceEngine";
-
- this.logger = (LoggerComponent)
AuxiliaryComponentManager.getInstance()
- .getComponent(ComponentNames.LOGGER);
-
- this.configuration = (ConfigurationComponent)
AuxiliaryComponentManager
-
.getInstance().getComponent(ComponentNames.CONFIG);
+ this.logger = (LoggerComponent)
AuxiliaryComponentManager.getInstance().getComponent(ComponentNames.LOGGER);
}
-
- // ---------------------------- Public Methods
/**
* Returns a string containing the type of the ServiceEngine.
*
* @return The type of ServiceEngine.
*/
public String getType() {
- return this.serviceEngineType;
+ return "FlowTypeMPServiceEngine";
}
-
/**
* Implemented method which takes in the request along with an action
type
* and gives back a response based on the capabilities of the service.
@@ -87,242 +59,49 @@
*
* @return A response with a unique key identifying the tunnel.
*/
- public Message takeAction(String actionType, Message request)
- throws PerfSONARException {
+ public Message takeAction(String actionType, Message request) throws
PerfSONARException {
- logger.debug("FlowTypeMPServiceEngine: request.getType() = "
- + request.getType());
-
- if
(request.getType().equals(FlowTypeConstants.FLOW_KEEPALIVE)) {
- return keepalive(request);
- } else if
(request.getType().equals(FlowTypeConstants.FLOW_SUBSCRIBE)) {
- return subscribe(request);
- } else if
(request.getType().equals(FlowTypeConstants.FLOW_UNSUBSCRIBE)) {
- return unsubscribe(request);
- } else {
- throw new
PerfSONARException("error.common.action_not_supported","Request type " +
request.getType()
- + " not understood.");
+ logger.debug("FlowTypeMPServiceEngine: request.getType() = "+
request.getType());
+ MessageType type = MessageType.get(request.getType());
+ if(type==null) {
+ throw new PerfSONARException(
+ "error.common.action_not_supported",
+ "MessageType " + request.getType() + " not understood."
+ );
+ }
+
+ switch (type) {
+ case FLOW_KEEPALIVE: return keepalive(new
FlowMpRequest(request));
+ case FLOW_SUBSCRIBE: return subscribe(new
FlowMpRequest(request));
+ case FLOW_UNSUBSCRIBE: return unsubscribe(new
FlowMpRequest(request));
+ default:
+ throw new PerfSONARException(
+ "error.common.action_not_implemented",
+ "EventType("+type+") is not yet implemented"
+ );
}
}
- // ---------------------------- Private Methods
-
- // ---------------------------- Methods for filtering data from
requests
/**
- * Takes the subscription request, and filters out the common
parameters.
- *
- * @param request
- * The incomming request (should be subscription request).
- *
- * @return a hashtable with key,value pairs of common parameters.
- * @throws DataFormatException
- * When parameters aren't properly formatted.
- */
- private Hashtable<String, String> filterCommonParams(Message request)
- throws PerfSONARException {
- // TODO validate parameter list
- Hashtable<String, String> params = new Hashtable<String,
String>();
-
- // Meta data items (routers and common options)
- for (Metadata meta : request.getMetadataArray()) {
- // the first MD block contains information common to
the router
- // selection information
- if (meta.getParameters() != null
- && meta.getParameters() instanceof
org.ggf.ns.nmwg.tools.netflow.v2_0.Parameters) {
- logger.debug("Common router selection
information:");
- for (Parameter param :
((org.ggf.ns.nmwg.tools.netflow.v2_0.Parameters) meta
-
.getParameters()).getParameterArray()) {
- logger.debug("" +
param.getParameterName() + " = "
- +
param.getParameterValue());
- params.put(param.getParameterName(),
param
- .getParameterValue());
- }
- }
- }
- return params;
- }
-
- /**
- * Takes the subcription request, and filters out the list of
routernames.
- *
- * @param request
- * The incomming request (should be subscription request).
- * @return an ArrayList with routernames (Strings).
- * @throws DataFormatException
- * When there are no routers in the list, or the
routernames
- * aren't properly formatted.
- */
- private ArrayList<String> filterRouters(Message request)
- throws PerfSONARException {
- // TODO validate routerlist
- ArrayList<String> routers = new ArrayList<String>();
-
- // Meta data items (routers and common options)
- for (Metadata meta : request.getMetadataArray()) {
- if (meta.getSubject() != null
- && meta.getSubject() instanceof
org.ggf.ns.nmwg.tools.netflow.v2_0.Subject) {
- Subject subj =
(org.ggf.ns.nmwg.tools.netflow.v2_0.Subject) meta
- .getSubject();
- logger.debug("" +
subj.getRouter().getName().getName());
-
- // add the routers to the list by name
-
routers.add(subj.getRouter().getName().getName());
- }
- }
- if (routers.size() < 1) {
- throw new PerfSONARException("No routers found in
request.");
- } else {
- // TODO check routernames with configured list
- return routers;
- }
- }
-
- /**
- * Takes the subscription request, and filters out the (client)
endpoint
- * address and portnumber.
- *
- * @param request
- * The incomming request (should be subscription request).
- * @return a Hashtable with key,value pairs containing the client
address
- * and the client portnumber.
- * @throws DataFormatException
- * When the client endpoint parameters aren't properly
- * formatted.
- */
- private Hashtable<String, String> filterEndPointParams(Message
request)
- throws PerfSONARException {
- Hashtable<String, String> clientParams = new
Hashtable<String, String>();
-
- // Data item, client options
- for (Data dat : request.getDataArray()) {
- // Search in data items
- for (Metadata meta : dat.getMetadataArray()) {
- // with a metadata item
- if (meta.getSubject() != null
- && meta.getSubject()
instanceof org.ggf.ns.nmwg.tools.netflow.v2_0.Subject) {
- // that is of type
- //
org.ggf.ns.nmwg.tools.netflow.v2_0.Subject
- Subject subj =
((org.ggf.ns.nmwg.tools.netflow.v2_0.Subject) meta
- .getSubject());
- if (subj.getEndPoint() != null) {
- // and has an endpoint element
-
- String address =
subj.getEndPoint().getAddressElement()
- .getAddress();
- if (address != null) {
- clientParams.put(
-
FlowTypeConstants.KEY_CLIENT_ADDRESS,
-
address);
- } else {
- throw new
PerfSONARException(
-
"endpoint address is not set.");
- }
-
- String port =
subj.getEndPoint().getPort();
- if (port != null) {
-
clientParams.put(FlowTypeConstants.KEY_CLIENT_PORT,
- port);
- } else {
- throw new
PerfSONARException(
-
"endpoint port is not set.");
- }
-
- return clientParams;
- }
- }
- }
- }
- throw new PerfSONARException("No endpoint found in request.");
- }
-
- /**
- * Takes a unsubscription/keepalive request.
- *
- * @param request
- * A unsubscription/keepalive request containing the key.
- * @return the unique key from the request.
- * @throws DataFormatException
- * if the key is not properly formatted, or does not
exist.
- */
- private Integer filterKey(Message request) throws PerfSONARException {
- // search for a metadata element
- for (Metadata meta : request.getMetadataArray()) {
- // with a key element
- if (meta.getKey() != null) {
- // that has a
- if
(meta.getKey().getParameterByName("maSubscription") != null) {
- Parameter param =
meta.getKey().getParameterByName(
-
FlowTypeConstants.KEY_SUBSCRIPTION);
- if (param != null &&
param.getParameterValue() != null) {
- Integer key =
Integer.parseInt(param
-
.getParameterValue());
- if (key != null && key > 0) {
- return key;
- } else {
- throw new
PerfSONARException(
- "Key
is not positive numeric.");
- }
- }
- }
- }
- }
- throw new PerfSONARException("No key found in request.");
- }
-
- // ---------------------------- Helper methods for this service
- /**
* Helper method to subscribe a client to a tunnel.
*
- * @param request
+ * @param flowMpRequest
* a subscription request.
*
* @return a Message object that should be returned to the client.
*/
- private Message subscribe(Message request) throws PerfSONARException{
- ArrayList<String> routers = new ArrayList<String>();
- Hashtable<String, String> params = new Hashtable<String,
String>();
+ protected Message subscribe(FlowMpRequest request) throws
PerfSONARException{
+ List<String> routers = request.getRouters();
+ Subscription subscription = null;
- // get the common parameters
- params.putAll(filterCommonParams(request));
-
- // get the client endpoint params
- params.putAll(filterEndPointParams(request));
-
- // generate a anonymizing key for use with zebedee
- params.put("anonymizingkey", ZebedeeControl.getInstance()
- .generateAnonymizingKey());
-
- // get the list of routernames
- routers = filterRouters(request);
-
- logger.debug("common params: " + params);
- logger.debug("routers: " + routers);
-
- // start new tunnel
- try {
- Integer portNr = Integer.parseInt(params
-
.get(FlowTypeConstants.KEY_CLIENT_PORT));
- if (portNr == null || portNr < 0 || portNr > 65535) {
- throw new PerfSONARException("Invalid
portnumber");
- }
- Integer key =
ZebedeeControl.getInstance().startTunnel(
-
params.get(FlowTypeConstants.KEY_CLIENT_ADDRESS), portNr);
-
- ConfigFileWriter.save(key, params, routers);
-
- return
createResponse(FlowTypeConstants.FLOW_SUBSCRIBE_RESPONSE,
- key, "Tunnel opened with key [" + key
+ "]");
- } catch (IOException e) {
- logger.error("FlowTypeMPServiceEngine: IOException! "
- + e.getMessage());
- for (StackTraceElement ste : e.getStackTrace()) {
- logger.error(ste.toString());
- }
- throw new PerfSONARException(
- "IOException was thrown when opening
tunnel", e);
+ Integer portNr = request.getTargetPort();
+ if (portNr == null || portNr < 0 || portNr > 65535) {
+ throw new PerfSONARException("Invalid portnumber");
}
+ subscription =
subscriptionService.subscribe(request.getTargetAddress(), portNr, routers,
request.getFilter(), request.getAnonymizationLevel());
+ //TODO null check
+ return new FlowMpResponse(FLOW_SUBSCRIBE_RESPONSE,
subscription.getId(), "Tunnel opened with key [" + subscription.getId() +
"]");
}
-
/**
* Helper method to unsubscribe a client from a tunnel.
*
@@ -331,16 +110,14 @@
*
* @return a Message object that should be returned to the client.
*/
- private Message unsubscribe(Message request) throws PerfSONARException
- {
- // stop tunnel
- Integer key = filterKey(request);
-
- ZebedeeControl.getInstance().stopTunnel(key);
- return
createResponse(FlowTypeConstants.FLOW_UNSUBSCRIBE_RESPONSE, key,
- "Tunnel with key [" + key + "] was closed");
+ protected Message unsubscribe(FlowMpRequest request) throws
PerfSONARException {
+ Integer key = request.getKey();
+ if(key==null) {
+ throw new PerfSONARException("No key found in
request.");
+ }
+ subscriptionService.endSubscription(key);
+ return new FlowMpResponse(FLOW_UNSUBSCRIBE_RESPONSE, key,
"Tunnel with key [" + key + "] was closed");
}
-
/**
* Helper method to send a keep alive for a tunnel.
*
@@ -349,64 +126,12 @@
*
* @return a Message object that should be returned to the client.
*/
- private Message keepalive(Message request) throws PerfSONARException
- {
- // Keep alive
- Integer key = filterKey(request);
- ZebedeeControl.getInstance().keepAlive(key);
- return
createResponse(FlowTypeConstants.FLOW_KEEPALIVE_RESPONSE, key,
- "Tunnel received keepalive with key [" + key
+ "]");
+ protected Message keepalive(FlowMpRequest request) throws
PerfSONARException {
+ Integer key = request.getKey();
+ if(key==null) {
+ throw new PerfSONARException("No key found in
request.");
+ }
+ subscriptionService.keepSubscriptionAlive(key);
+ return new FlowMpResponse(FLOW_KEEPALIVE_RESPONSE, key,
"Tunnel received keepalive with key [" + key + "]");
}
-
- // ---------------------------- Response generators
-
- /**
- * Creates a response Message. For this ServiceEngine all responses
look
- * alike, with a key, a responsetype and a message.
- *
- * @param responseType
- * The response type.
- * @param key
- * The unique tunnel key.
- * @param a
- * message to explain the response.
- *
- * @return a ready-to-send response Message.
- */
- private Message createResponse(String responseType, Integer key,
- String message) {
- Message response = new Message();
-
- response.setType(responseType);
-
- Metadata meta = new Metadata();
- meta.setId("resultCodeMetadata");
-
- EventType evt = new EventType();
- evt.setEventType("result.success");
- meta.addChild(evt);
-
- Parameter param = new Parameter();
- param.setParameterName(FlowTypeConstants.KEY_SUBSCRIPTION);
- param.setParameterValue("" + key);
-
- Key keyElement = new Key();
- keyElement.addParameter(param);
-
- meta.addChild(keyElement);
-
- Data data = new Data();
- data.setId("resultCodeData");
- data.setMetadataIdRef(meta.getId());
-
- Datum msgDatum = new Datum();
- msgDatum.setDatum(message);
- data.addChild(msgDatum);
-
- response.setMetadata(meta);
- response.setData(data);
-
- // response.setCompleted(); ???
- return response;
- }
}
\ No newline at end of file
Added:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/MessageType.java
Added:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/NamedthreadFactory.java
Added:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/NfReplayControll.java
Added:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/Subscription.java
Added:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/SubscriptionService.java
Modified:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/ZebedeeControl.java
===================================================================
---
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/ZebedeeControl.java
2008-05-13 14:51:44 UTC (rev 3839)
+++
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/ZebedeeControl.java
2008-05-15 20:29:29 UTC (rev 3840)
@@ -1,9 +1,6 @@
package org.perfsonar.service.measurementPoint.flowsubscription;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.io.IOException;
-import java.lang.Process;
import org.perfsonar.base.auxiliary.AuxiliaryComponentManager;
import org.perfsonar.base.auxiliary.ComponentNames;
@@ -17,28 +14,8 @@
* @author Joost de Mes (SURFnet)
*
*/
-public class ZebedeeControl implements Runnable {
+public class ZebedeeControl {
/**
- * Singleton instance.
- */
- private static ZebedeeControl instance = null;
-
- /**
- * ArrayList with Keys.
- */
- private ArrayList<Integer> keyList = new ArrayList<Integer>();
-
- /**
- * HashMap with Key,Process items.
- */
- private HashMap<Integer, Process> processList = new HashMap<Integer,
Process>();
-
- /**
- * HashMap with key,updateTime items.
- */
- private HashMap<Integer, Long> keepAliveList = new HashMap<Integer,
Long>();
-
- /**
* Reference to the configuration component (to get stored
configuration
* parameters, such as routers and port numbers).
*/
@@ -55,85 +32,25 @@
private String zebedeeExecutable;
/**
- * Zebedee inportBase (inport = inportBase + key). Read from
configuration.
- */
- private Integer inPortBase;
-
- /**
- * Zebedee betweenPortBase (betweenPort = betweenPortBase + key).
Read from
- * configuration.
- */
- private Integer betweenPortBase;
-
- /**
- * Maximum number of clients that can subscribe to this service
- * simultaneously. Read from configuration.
- */
- private Integer maxConn;
-
- /**
- * The maximum time between two keepalive calls from the client
- */
- private static final long MAX_KEEPALIVE = 60000;
-
- /**
- * The time the thread sleeps between timeout checks
- */
- private static final long SLEEPTIME = 10000;
-
- /**
* Singleton constructor.
+ * @param inPortBase
+ * @param betweenPortBase
*
- * @throws PerfSONARException
+ * @throws RuntimeException
* if configuration items can not be read.
*/
- private ZebedeeControl() throws PerfSONARException {
- logger = (LoggerComponent)
AuxiliaryComponentManager.getInstance()
- .getComponent(ComponentNames.LOGGER);
-
- this.configuration = (ConfigurationComponent)
AuxiliaryComponentManager
-
.getInstance().getComponent(ComponentNames.CONFIG);
-
- zebedeeExecutable = configuration
- .getProperty("service.mp.flow.zebedee");
- inPortBase = Integer.parseInt(configuration
- .getProperty("service.mp.flow.inPort"));
- betweenPortBase = Integer.parseInt(configuration
- .getProperty("service.mp.flow.betweenPort"));
- maxConn = Integer.parseInt(configuration
- .getProperty("service.mp.flow.max_conn"));
- // Clean up the configuration directory on service startup
- ConfigFileWriter.cleanUp();
- }
+ public ZebedeeControl() {
+ try {
+ logger = (LoggerComponent)
AuxiliaryComponentManager.getInstance().getComponent(ComponentNames.LOGGER);
+ configuration = (ConfigurationComponent)
AuxiliaryComponentManager.getInstance().getComponent(ComponentNames.CONFIG);
- public void finalize() throws Throwable {
- // Clean up the configuration directory on service shutdown
- ConfigFileWriter.cleanUp();
- }
-
-
- /**
- * Singleton static access method. Creates an instance of this class
if
- * there is none, and returns it. Returns a running thread.
- *
- * @return the singleton instance (a running ZebedeeControl thread
- * instance).
- * @throws PerfSONARException
- * if configuration items can not be read.
- */
- public static ZebedeeControl getInstance() throws PerfSONARException {
- // If there's no instance, create it, and start it's thread.
- if (instance == null) {
- // create an instance (singleton).
- instance = new ZebedeeControl();
- // create a thread for this zebedee control instance.
- Thread zebedeeControlThread = new Thread(instance);
- // start the thread (calls the run method).
- zebedeeControlThread.start();
+ logger.debug("Creating zebedee controll");
+
+ zebedeeExecutable =
configuration.getProperty("service.mp.flow.zebedee");
+ } catch (Exception e) {
+ //fail fast with a runtime exception
+ throw new RuntimeException(e);
}
-
- // return the singleton instance.
- return instance;
}
/**
@@ -146,15 +63,13 @@
* @param outPort
* The port on wich the client wants to receive the flows.
This
* port is supplied by the subscriber.
- * @param betweenPort
+ * @param serverTunnelPort
* Port used by zebedee to setup connection
(betweenPortBase +
* key)
* @return The command to be executed for the tunnel to be set up.
*/
- private String getTunnelCommand(Integer inPort, String clientIP,
- Integer outPort, Integer betweenPort) {
- return zebedeeExecutable + " -z 0 -u -d -l " + inPort + ":" +
clientIP
- + ":" + outPort + " -T " + betweenPort + " -x
\"udptimeout 60\"";
+ private String getTunnelCommand(int inPort, String clientIP, int
outPort, int serverTunnelPort) {
+ return zebedeeExecutable + " -z 0 -u -d -l " + inPort + ":" +
clientIP + ":" + outPort + " -T " + serverTunnelPort + " -x \"udptimeout
60\"";
}
/**
@@ -170,164 +85,11 @@
* When the program execution failed.
* @throws PerfSONARException
*/
- public Integer startTunnel(String clientIP, Integer outPort)
- throws IOException, PerfSONARException {
- // Get a new unique key
- Integer key = newKey();
-
- //check for max clients reached
- if(key == -1) {
- throw new
PerfSONARException("error.mp.flow.max_clients_reached","The maximum number of
clients is reached, try again later.");
- }
-
- logger.debug("ZebedeeControl: starting tunnel with key " +
key + ".");
-
+ public ZebedeeTunnel startTunnel(int inPort, String clientIP, int
outPort, int serverTunnelPort) throws IOException {
// get the commandline options formatted.
- String cmd = getTunnelCommand(inPortBase + key, clientIP,
outPort,
- betweenPortBase + key);
-
- // Check if this process doesn't alread exist, if so, kill it.
- if (processList.containsKey(key)) {
- stopTunnel(key);
- }
+ String cmd = getTunnelCommand(inPort, clientIP, outPort,
serverTunnelPort);
// Run the process
- Process newProcess = Runtime.getRuntime().exec(cmd);
-
- // Put this key,process combination in the processList, and
send a
- // keepalive for this key.
- processList.put(key, newProcess);
- keepAlive(key);
-
- return key;
+ return new
ZebedeeTunnel(inPort,Runtime.getRuntime().exec(cmd));
}
- /**
- * Stop a running tunnel (kills the zebedee process).
- *
- * @param key
- * Unique key identifying the client (supplied by the
service).
- * @throws PerfSONARException
- */
- public void stopTunnel(Integer key) throws PerfSONARException {
- logger.debug("ZebedeeControl: stopping tunnel with key " +
key + ".");
- // Get the process that needs to be killed
- Process processToKill = processList.get(key);
- // Destroy (hard-kill) the process
- processToKill.destroy();
- // Remove the key from the processList and the keepAliveList
- processList.remove(key);
- keepAliveList.remove(key);
- ConfigFileWriter.delete(key);
- }
-
- /**
- * Infinate loop wich checks the keepalive times of the active
tunnels.
- *
- * @see Runnable.run
- */
- public void run() {
- // Infinate loop
- while (true) {
- try {
- Thread.sleep(SLEEPTIME);
- } catch (InterruptedException e) {
- logger.error("ZebedeeControl:
InterruptedException! "
- + e.getMessage());
- for (StackTraceElement ste :
e.getStackTrace()) {
- logger.error(ste.toString());
- }
- }
- // run the checkForTimeouts method to check for
timeouts, and to
- // close timed-out tunnels
- try {
- checkForTimeouts();
- } catch (PerfSONARException e) {
- logger.error("ZebedeeControl:
PerfSONARException! "
- + e.getMessage());
- for (StackTraceElement ste :
e.getStackTrace()) {
- logger.error(ste.toString());
- }
- }
- }
- }
-
- /**
- * Updates the last keepalive timestamp for this tunnel (identified
by key).
- *
- * @param key
- * The unique key for the clients tunnel connection.
- */
- public void keepAlive(Integer key) {
- logger.debug("ZebedeeControl: keeping alive tunnel with key "
+ key
- + ".");
- // current time
- Long now = (Long) System.currentTimeMillis();
-
- // add or update this key with the current time
- keepAliveList.put(key, now);
- }
-
- /**
- * Checks the keepAliveList list for timed-out tunnels and closes
them.
- *
- * @throws PerfSONARException
- */
- private void checkForTimeouts() throws PerfSONARException {
- logger.debug("ZebedeeControl: checking for timeouts.");
- // temporary list with timed out tunnel connections
- ArrayList<Integer> tunnelsToClose = new ArrayList<Integer>();
-
- // current time
- Long now = (Long) System.currentTimeMillis();
-
- // Loop through all active tunnels
- // to check their keepalive times
- for (Integer key : keepAliveList.keySet()) {
- // if time between keepalives is getting to large,
- // add it to the list of tunnels to be closed
- if (now - keepAliveList.get(key) > MAX_KEEPALIVE) {
- logger.debug("ZebedeeControl: tunnel with key
" + key
- + " has timed out.");
- tunnelsToClose.add(key);
- }
- }
-
- // Loop through all tunnels to close
- // and execute tunnel closing code
- for (Integer key : tunnelsToClose) {
- stopTunnel(key);
- }
- }
-
- /**
- * Find a new key, that is not already in use.
- *
- * @return a new unique key, or -1 when failed (max clients reached)
- */
- private synchronized Integer newKey() {
- Integer proposal = 1;
- while (proposal < maxConn && keyList.contains(proposal)) {
- proposal++;
- }
- // if maxConn reached && keyList.contains(proposal), we're
full
- if (keyList.contains(proposal)) {
- return -1;
- }
-
- // add the key, return it.
- keyList.add(proposal);
-
- return proposal;
- }
-
- public String generateAnonymizingKey() {
- String characters = "abcdefghijklmnopqrstuvwxyz"
- + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
"01234567890";
- String randomKey = "";
- while (randomKey.length() < 32) {
- char nextChar = characters.charAt((int)
(Math.random() * characters.length()));
- randomKey = randomKey + nextChar;
- }
- return randomKey;
- }
}
Added:
trunk/surfnet_java-flowsubscription-mp/src/main/java/org/perfsonar/service/measurementPoint/flowsubscription/ZebedeeTunnel.java
- perfsonar: r3840 - in trunk/surfnet_java-flowsubscription-mp: . conf src/main/java/org/perfsonar/service/measurementPoint/flowsubscription, svnlog, 05/15/2008
Archive powered by MHonArc 2.6.16.