Author: [log in to unmask] Date: Mon Mar 23 16:21:59 2015 New Revision: 2508 Log: Updates to station thread class. Modified: java/trunk/record-util/src/main/java/org/hps/record/et/EtStationThread.java Modified: java/trunk/record-util/src/main/java/org/hps/record/et/EtStationThread.java ============================================================================= --- java/trunk/record-util/src/main/java/org/hps/record/et/EtStationThread.java (original) +++ java/trunk/record-util/src/main/java/org/hps/record/et/EtStationThread.java Mon Mar 23 16:21:59 2015 @@ -20,21 +20,36 @@ import org.jlab.coda.et.exception.EtTooManyException; import org.jlab.coda.et.exception.EtWakeUpException; - /** + * <p> + * This is a class which runs ET event processing on a separate thread + * using an ET station that is assigned to its own unique <code>EtSystem</code>. + * <p> + * Specific processing of ET events is provided with an {@link EtEventProcessor}. + * * @author Jeremy McCormick <[log in to unmask]> */ -public class EtStationThread extends Thread { +public final class EtStationThread extends Thread { + + EtEventProcessor processor; + int stationPosition; + String name; EtSystem system; - EtEventProcessor processor; - int stationPosition; - String name; - EtStation station; EtAttachment attachment; - public EtStationThread(EtEventProcessor processor, EtSystem system, String name, int stationPosition) { + int[] select; + + /** + * This creates an ET station that will run an ET processor on a separate thread. + * @param processor The ET processor. + * @param system The ET system. + * @param name The name of the station. + * @param stationPosition The station's position. + * @param select The station's select array (can be null). + */ + public EtStationThread(EtEventProcessor processor, EtSystem system, String name, int stationPosition, int[] select) { if (processor == null) { throw new IllegalArgumentException("processor is null"); } @@ -46,17 +61,27 @@ } if (stationPosition < 1) { throw new IllegalArgumentException("stationPosition must be > 0"); - } + } + if (select.length != EtConstants.stationSelectInts) { + throw new IllegalArgumentException("control array must have length " + EtConstants.stationSelectInts); + } + this.processor = processor; + this.name = name; + this.stationPosition = stationPosition; + this.select = select; + + // Copy parameters from the provided EtSystem. try { this.system = new EtSystem(new EtSystemOpenConfig(system.getConfig())); } catch (EtException e) { throw new RuntimeException("Error setting up station.", e); } - this.stationPosition = stationPosition; - this.name = name; } + /** + * Setup this station for receiving events. + */ protected void setup() { if (!system.alive()) { @@ -72,7 +97,13 @@ EtStationConfig stationConfig = new EtStationConfig(); stationConfig.setFlowMode(EtConstants.stationSerial); stationConfig.setBlockMode(EtConstants.stationNonBlocking); - + + // Setup event selection. + if (select != null) { + stationConfig.setSelect(select); + stationConfig.setSelectMode(EtConstants.stationSelectMatch); // ????? + } + // Create station and attach to the ET system. station = system.createStation(stationConfig, name, stationPosition); attachment = system.attach(station); @@ -82,10 +113,12 @@ throw new RuntimeException(e); } } - - public void run() { + + /** + * Run event processing on the station until woken up or interrupted. + */ + public final void run() { - // FIXME: Should be called independently? setup(); try { @@ -94,7 +127,7 @@ EtEvent[] events; try { - events = system.getEvents(attachment, Mode.SLEEP, Modify.NOTHING, 0, 1 /* read 1 event */); + events = system.getEvents(attachment, Mode.SLEEP, Modify.NOTHING, 0, 1 /* hard-coded to read 1 event for now */); system.putEvents(attachment, events); } catch (EtWakeUpException e) { e.printStackTrace(); @@ -113,6 +146,7 @@ processor.process(event); } } catch (Exception e) { + // If there is an event processing error then print stack trace and continue. e.printStackTrace(); continue; } @@ -127,7 +161,11 @@ } } - synchronized void disconnect() { + /** + * Disconnect the station. + * This will happen automatically at the end of the {@link #run()} method. + */ + synchronized final void disconnect() { if (system.alive()) { if (attachment.isUsable()) { try { @@ -139,4 +177,19 @@ system.close(); } } + + /** + * Wake up the station if it is blocked which will cause it to disconnect. + */ + public final synchronized void wakeUp() { + if (system.alive()) { + if (attachment.isUsable()) { + try { + system.wakeUpAll(station); + } catch (IOException | EtException | EtClosedException e) { + e.printStackTrace(); + } + } + } + } }