Author: [log in to unmask]
Date: Mon Mar 23 16:21:59 2015
New Revision: 2508
Log:
Updates to station thread class.
Modified:
java/trunk/record-util/src/main/java/org/hps/record/et/EtStationThread.java
Modified: java/trunk/record-util/src/main/java/org/hps/record/et/EtStationThread.java
=============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/et/EtStationThread.java (original)
+++ java/trunk/record-util/src/main/java/org/hps/record/et/EtStationThread.java Mon Mar 23 16:21:59 2015
@@ -20,21 +20,36 @@
import org.jlab.coda.et.exception.EtTooManyException;
import org.jlab.coda.et.exception.EtWakeUpException;
-
/**
+ * <p>
+ * This is a class which runs ET event processing on a separate thread
+ * using an ET station that is assigned to its own unique <code>EtSystem</code>.
+ * <p>
+ * Specific processing of ET events is provided with an {@link EtEventProcessor}.
+ *
* @author Jeremy McCormick <[log in to unmask]>
*/
-public class EtStationThread extends Thread {
+public final class EtStationThread extends Thread {
+
+ EtEventProcessor processor;
+ int stationPosition;
+ String name;
EtSystem system;
- EtEventProcessor processor;
- int stationPosition;
- String name;
-
EtStation station;
EtAttachment attachment;
- public EtStationThread(EtEventProcessor processor, EtSystem system, String name, int stationPosition) {
+ int[] select;
+
+ /**
+ * This creates an ET station that will run an ET processor on a separate thread.
+ * @param processor The ET processor.
+ * @param system The ET system.
+ * @param name The name of the station.
+ * @param stationPosition The station's position.
+ * @param select The station's select array (can be null).
+ */
+ public EtStationThread(EtEventProcessor processor, EtSystem system, String name, int stationPosition, int[] select) {
if (processor == null) {
throw new IllegalArgumentException("processor is null");
}
@@ -46,17 +61,27 @@
}
if (stationPosition < 1) {
throw new IllegalArgumentException("stationPosition must be > 0");
- }
+ }
+ if (select.length != EtConstants.stationSelectInts) {
+ throw new IllegalArgumentException("control array must have length " + EtConstants.stationSelectInts);
+ }
+
this.processor = processor;
+ this.name = name;
+ this.stationPosition = stationPosition;
+ this.select = select;
+
+ // Copy parameters from the provided EtSystem.
try {
this.system = new EtSystem(new EtSystemOpenConfig(system.getConfig()));
} catch (EtException e) {
throw new RuntimeException("Error setting up station.", e);
}
- this.stationPosition = stationPosition;
- this.name = name;
}
+ /**
+ * Setup this station for receiving events.
+ */
protected void setup() {
if (!system.alive()) {
@@ -72,7 +97,13 @@
EtStationConfig stationConfig = new EtStationConfig();
stationConfig.setFlowMode(EtConstants.stationSerial);
stationConfig.setBlockMode(EtConstants.stationNonBlocking);
-
+
+ // Setup event selection.
+ if (select != null) {
+ stationConfig.setSelect(select);
+ stationConfig.setSelectMode(EtConstants.stationSelectMatch); // ?????
+ }
+
// Create station and attach to the ET system.
station = system.createStation(stationConfig, name, stationPosition);
attachment = system.attach(station);
@@ -82,10 +113,12 @@
throw new RuntimeException(e);
}
}
-
- public void run() {
+
+ /**
+ * Run event processing on the station until woken up or interrupted.
+ */
+ public final void run() {
- // FIXME: Should be called independently?
setup();
try {
@@ -94,7 +127,7 @@
EtEvent[] events;
try {
- events = system.getEvents(attachment, Mode.SLEEP, Modify.NOTHING, 0, 1 /* read 1 event */);
+ events = system.getEvents(attachment, Mode.SLEEP, Modify.NOTHING, 0, 1 /* hard-coded to read 1 event for now */);
system.putEvents(attachment, events);
} catch (EtWakeUpException e) {
e.printStackTrace();
@@ -113,6 +146,7 @@
processor.process(event);
}
} catch (Exception e) {
+ // If there is an event processing error then print stack trace and continue.
e.printStackTrace();
continue;
}
@@ -127,7 +161,11 @@
}
}
- synchronized void disconnect() {
+ /**
+ * Disconnect the station.
+ * This will happen automatically at the end of the {@link #run()} method.
+ */
+ synchronized final void disconnect() {
if (system.alive()) {
if (attachment.isUsable()) {
try {
@@ -139,4 +177,19 @@
system.close();
}
}
+
+ /**
+ * Wake up the station if it is blocked which will cause it to disconnect.
+ */
+ public final synchronized void wakeUp() {
+ if (system.alive()) {
+ if (attachment.isUsable()) {
+ try {
+ system.wakeUpAll(station);
+ } catch (IOException | EtException | EtClosedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
}
|