hps-java/src/main/java/org/lcsim/hps/monitoring
diff -u -r1.12 -r1.13
--- DefaultEtEventProcessor.java 4 Oct 2013 06:02:19 -0000 1.12
+++ DefaultEtEventProcessor.java 25 Oct 2013 23:13:53 -0000 1.13
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import java.util.logging.Handler;
import java.util.logging.Level;
@@ -26,7 +27,7 @@
* having a direct reference to the actual application.
*
* @author Jeremy McCormick <[log in to unmask]>
- * @version $Id: DefaultEtEventProcessor.java,v 1.12 2013/10/04 06:02:19 jeremy Exp $
+ * @version $Id: DefaultEtEventProcessor.java,v 1.13 2013/10/25 23:13:53 jeremy Exp $
*/
public class DefaultEtEventProcessor implements EtEventProcessor {
@@ -90,33 +91,53 @@
this.pauseMode = p;
}
+ /**
+ * Add an <code>EtEventListener</code> that will receive a callback for each event.
+ * @param callme The EtEventListener to add.
+ */
public void addListener(EtEventListener callme) {
listeners.add(callme);
}
+ /**
+ * Set the maximum number of events before processing should stop.
+ * @param maxEvents The maximum number of events before processing should be stopped.
+ */
public void setMaxEvents(int maxEvents) {
this.maxEvents = maxEvents;
logger.log(Level.INFO, "Set maxEvents to <{0}>.", maxEvents);
}
+ /**
+ * Notify listeners of start of event processing.
+ */
private void begin() {
for (EtEventListener listener : listeners) {
listener.begin();
}
}
+ /**
+ * Notify listeners of start of an event.
+ */
private void startOfEvent() {
for (EtEventListener listener : listeners) {
listener.startOfEvent();
}
}
+ /**
+ * Notify listeners of end of an event.
+ */
private void endOfEvent() {
for (EtEventListener listener : listeners) {
listener.endOfEvent();
}
}
+ /**
+ * Notify listeners that there was an error processing an event.
+ */
private void errorOnEvent() {
++errorEvents;
for (EtEventListener listener : listeners) {
@@ -124,6 +145,9 @@
}
}
+ /**
+ * Notify listeners of end of job.
+ */
private void finish() {
logger.log(Level.FINER, "Calling finish() methods of listeners.");
for (EtEventListener listener : listeners) {
@@ -131,28 +155,51 @@
}
}
+ /**
+ * Set the flag to stop processing events, which will cause the
+ * event processing loop to stop the next time it checks this value.
+ */
public void stop() {
this.stopProcessing = true;
logger.log(Level.FINEST, "Received stop request.");
}
+ /**
+ * Get the total number of events processed thusfar.
+ * @return The number of events processed.
+ */
public int getNumberOfEventsProcessed() {
return eventsProcessed;
}
+ /**
+ * Get the total number of errors that have occurred.
+ * @return The number of errors that have happened.
+ */
public int getNumberOfErrors() {
return errorEvents;
}
+ /**
+ * Get the maximum number of events that should be processed before stopping.
+ * @return The maximum number of events to process.
+ */
public int getMaxEvents() {
return maxEvents;
}
+ /**
+ * Reset the event counter.
+ */
public void resetNumberOfEventsProcessed() {
eventsProcessed = 0;
errorEvents = 0;
}
+ /**
+ * Get the status of the processor.
+ * @return The status of the processor.
+ */
public int getStatus() {
return status;
}
@@ -165,7 +212,10 @@
*/
public void processEtEvents() throws EtTimeoutException, MaxEventsException, Exception {
- // Get events from the ET system. This can potentially block forever until it receives events.
+ /*
+ * Get events from the ET system.
+ * WARNING: This can potentially block forever until it receives events.
+ */
blocked = true;
EtEvent[] mevs = et.sys.getEvents(et.att, et.param.waitMode, Modify.NOTHING, et.param.waitTime, et.param.chunk);
blocked = false;
@@ -196,10 +246,14 @@
// Process a single EtEvent using the default processing chain.
try {
processEtEvent(mev);
- } // Catch event processing errors, including ET -> EVIO, EVIO -> LCIO, and LCSim Driver execution.
- catch (EventProcessingException e) {
- e.getCause().printStackTrace(); // This message shows up in detailed job log.
- logger.log(Level.WARNING, "Error processing event <" + this.eventsProcessed + ">.");
+ // Catch event processing errors, including ET -> EVIO, EVIO -> LCIO, and LCSim Driver execution.
+ } catch (EventProcessingException e) {
+ /* Stack trace will show up log file. */
+ e.getCause().printStackTrace();
+ /* Print error message to log table. */
+ logger.log(Level.SEVERE, e.getMessage());
+ /* Print event number to log table. */
+ logger.log(Level.SEVERE, "Error processing event <" + this.eventsProcessed + ">.");
errorOnEvent();
if (stopOnError) {
this.status = ConnectionStatus.ERROR;
@@ -210,66 +264,106 @@
}
}
+ /**
+ * Process a single <code>EtEvent</code>.
+ * The exceptions throw by this method will be caught and handled from within {@link #processEtEvents()}.
+ * @param mev The EtEvent to process.
+ */
public void processEtEvent(EtEvent mev) throws EventProcessingException, MaxEventsException {
-
- // Check if max events was reached or exceeded.
+
+ /* Check if max events was reached or exceeded. */
if (maxEvents != -1 && eventsProcessed >= maxEvents) {
+ logger.log(Level.INFO, "Reached max events.");
throw new MaxEventsException();
}
+ /* Check that the supplied EtEvent was not null. An exception is thrown if it is null. */
if (mev == null) {
- throw new EventProcessingException("supplied EtEvent is null");
+ logger.log(Level.SEVERE, "Supplied EtEvent is null.");
+ throw new EventProcessingException("The supplied EtEvent is null");
}
- // Notify listeners of start of event.
+ /* Notify listeners of start of event. */
startOfEvent();
-
- // Increment number of events processed.
+
+ /* Increment the number of events processed. */
++eventsProcessed;
-
- // Attempt to create an EVIO event from the data buffer in the EtEvent.
+
+ /*
+ * Create the EVIO event.
+ */
EvioEvent evioEvent = null;
- do {
- // Create EvioEvent from EtEvent.
- try {
- evioEvent = createEvioEvent(mev);
+ boolean isPhysicsEvent = false;
+ try {
+ evioEvent = createEvioEvent(mev);
+ } catch (Exception e) {
+ throw new EventProcessingException("Failed to create EVIO event.", e);
+ }
+
+ /*
+ * Process the EVIO event if the EvioReader created a non-null object.
+ */
+ if (evioEvent != null) {
+
+ /* Check for physics event. */
+ isPhysicsEvent = eventBuilder.isPhysicsEvent(evioEvent);
+
+ /*
+ * This is basically just redundantly printing information about the event to System.out,
+ * but it also sets some internal state within the event builder so leave it for now.
+ */
+ eventBuilder.readEvioEvent(evioEvent);
+
+ /* Log non-physics event information. */
+ logEvioEvent(evioEvent);
+
+ /*
+ * Notify listeners of non-physics events.
+ */
+ if (isPreStartEvent(evioEvent)) {
+ preStartEvent(evioEvent);
+ } else if (isEndEvent(evioEvent)) {
+ endRun(evioEvent);
+ }
+
+ /* Check that the event has physics type. */
+ if (isPhysicsEvent) {
- if (evioEvent == null) {
- throw new EventProcessingException("createEvioEvent returned null EvioEvent");
+ /*
+ * Use the event builder to create the lcsim event.
+ */
+ EventHeader lcsimEvent = null;
+ try {
+ lcsimEvent = eventBuilder.makeLCSimEvent(evioEvent);
+ } catch (Exception e) {
+ throw new EventProcessingException("Failed to create LCSim event.", e);
}
-
- //let event builder check for run information
- eventBuilder.readEvioEvent(evioEvent);
- // Handlers for non-physics events.
- if (isPreStartEvent(evioEvent)) {
- preStartEvent(evioEvent);
- } else if (isEndEvent(evioEvent)) {
- endRun(evioEvent);
+ /*
+ * Process the event using the lcsim JobManager.
+ */
+ if (lcsimEvent != null) {
+ try {
+ jobManager.processEvent(lcsimEvent);
+ } catch (Exception e) {
+ throw new EventProcessingException("Error processing the LCSim event.", e);
+ }
+ } else {
+ throw new EventProcessingException("The builder returned a null lcsim event.");
}
-
- } catch (Exception e) {
- throw new EventProcessingException("Failed to create EVIO event.", e);
}
-
- } while (!eventBuilder.isPhysicsEvent(evioEvent));
-
- // Create the LCSim event.
- EventHeader lcsimEvent = null;
- try {
- lcsimEvent = eventBuilder.makeLCSimEvent(evioEvent);
- } catch (Exception e) {
- throw new EventProcessingException("Failed to create LCSim event.", e);
- }
-
- // Process the LCSim event.
- try {
- jobManager.processEvent(lcsimEvent);
- } catch (Exception e) {
- throw new EventProcessingException("Error processing the LCSim event.", e);
+ } else {
+ /*
+ * The EVIO event was null, so print an error message to the log. This happens enough
+ * that an exception is NOT thrown. Ideally an error should be thrown but this
+ * means that the "disconnect on error" setting becomes completely useless, as this problem
+ * basically happens at least a few times in every test run EVIO file.
+ */
+ logger.log(Level.SEVERE, "Error converting event <" + this.eventsProcessed + "> to EVIO.");
+ throw new EventProcessingException("The EVIO reader returned a null event.");
}
- // Notify listeners of end of event.
+ /* Notify listeners of end event. */
endOfEvent();
}
@@ -286,7 +380,7 @@
}
/**
- * Run the event processing from the ET connection.
+ * Run the event processing from the ET connection until the job finishes.
*/
public void process() {
@@ -312,27 +406,32 @@
break;
}
- // Try to process the next set of ET events.
+ /* Try to process the next set of ET events. */
try {
processEtEvents();
- } // The ET system timed out in the getEvents() method.
- catch (EtTimeoutException e) {
+ /* The ET system timed out in the getEvents() method. */
+ } catch (EtTimeoutException e) {
logger.log(Level.WARNING, "ET connection timed out.");
this.status = ConnectionStatus.TIMED_OUT;
- } // Event processing reached the maximum number of events.
- catch (MaxEventsException e) {
+ /* Event processing reached the maximum number of events. */
+ } catch (MaxEventsException e) {
logger.log(Level.INFO, "Reached max events <{0}>.", this.maxEvents);
this.status = ConnectionStatus.DISCONNECTING;
- } // Some generic error was thrown. These are most likely ET system exceptions.
- catch (Exception e) {
+ /*
+ * Catch other types of errors. These are most likely ET system exceptions
+ * or event processing errors.
+ */
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, e.getMessage());
e.printStackTrace();
this.status = ConnectionStatus.ERROR;
- } // Break out of processing loop if not still in connected state.
- finally {
+ /* Break out of processing loop if not still in connected state. */
+ } finally {
if (this.status != ConnectionStatus.CONNECTED) {
break;
}
}
+ /* Force messages to show in the log table. */
logger.getHandlers()[0].flush();
}
@@ -346,25 +445,25 @@
}
/**
- * Check if this event is a Pre Start Event.
+ * Check if this is a Pre Start event.
* @param event
- * @return
+ * @return True if event is of type Pre Start; false if not.
*/
private static boolean isPreStartEvent(EvioEvent event) {
return event.getHeader().getTag() == EventConstants.PRESTART_EVENT_TAG;
}
/**
- * Check if this event is an End Event.
+ * Check if this is an End Event.
* @param event
- * @return True if this event is an End Event; false if not.
+ * @return True if event is of type End Event; false if not.
*/
private static boolean isEndEvent(EvioEvent event) {
return event.getHeader().getTag() == EventConstants.END_EVENT_TAG;
}
/**
- * Notify listeners of Pre Start event being received by ET system.
+ * Notify listeners of Pre Start event being received by the ET system.
* @param event The EvioEvent for the Pre Start.
*/
private void preStartEvent(EvioEvent event) {
@@ -389,23 +488,72 @@
}
}
+ /**
+ * Set pause mode.
+ * @param p Set the pause mode; true to pause; false to unpause.
+ */
public void pauseMode(boolean p) {
pauseMode = p;
}
+ /**
+ * This is called externally to get the next event if running in pause mode.
+ */
public void nextEvents() {
nextEvents = true;
}
+ /**
+ * Set the application's log level.
+ * @param level The log level.
+ */
public void setLogLevel(Level level) {
logger.setLevel(level);
}
+ /**
+ * Get whether the event processing is blocked.
+ * @return True if blocked; false if not.
+ */
public boolean blocked() {
return blocked;
}
+ /**
+ * Get whether the event processing has been completed.
+ * @return True if event processing is complete; false if not.
+ */
public boolean done() {
return done;
}
+
+ /**
+ * Print information from non-physics event.
+ * Copied and modified from {@link org.lcsim.hps.evio.LCSimTestRunEventBuilder#readEvioEvent(EvioEvent)}
+ * in order to route messages to the log table instead of <code>System.out</code>.
+ * @param evioEvent The EVIO event.
+ */
+ private void logEvioEvent(EvioEvent evioEvent) {
+ if (EventConstants.isSyncEvent(evioEvent)) {
+ int[] data = evioEvent.getIntData();
+ int seconds = data[0];
+ logger.log(Level.INFO, "Sync event: time " + seconds + " - " + new Date(((long) seconds) * 1000) + ", event count since last sync " + data[1] + ", event count so far " + data[2] + ", status " + data[3]);
+ } else if (EventConstants.isPreStartEvent(evioEvent)) {
+ int[] data = evioEvent.getIntData();
+ int seconds = data[0];
+ logger.log(Level.INFO, "Prestart event: time " + seconds + " - " + new Date(((long) seconds) * 1000) + ", run type " + data[2]);
+ } else if (EventConstants.isGoEvent(evioEvent)) {
+ int[] data = evioEvent.getIntData();
+ int seconds = data[0];
+ logger.log(Level.INFO, "Go event: time " + seconds + " - " + new Date(((long) seconds) * 1000) + ", event count so far " + data[2]);
+ } else if (EventConstants.isPauseEvent(evioEvent)) {
+ int[] data = evioEvent.getIntData();
+ int seconds = data[0];
+ logger.log(Level.INFO, "Pause event: time " + seconds + " - " + new Date(((long) seconds) * 1000) + ", event count so far " + data[2]);
+ } else if (EventConstants.isEndEvent(evioEvent)) {
+ int[] data = evioEvent.getIntData();
+ int seconds = data[0];
+ logger.log(Level.INFO, "End event: time " + seconds + " - " + new Date(((long) seconds) * 1000) + ", event count " + data[2]);
+ }
+ }
}
\ No newline at end of file