Commit in hps-java/src/main/java/org/lcsim/hps/monitoring on MAIN | |||
DefaultEtEventProcessor.java | +211 | -63 | 1.12 -> 1.13 |
restructure the event processing code as what was there did not make complete sense for handling non-physics events; add better error trapping and handling, including more checks for null object values returned by reader and builder; add missing doc
diff -u -r1.12 -r1.13 --- DefaultEtEventProcessor.java 4 Oct 2013 06:02:19 -0000 1.12 +++ DefaultEtEventProcessor.java 25 Oct 2013 23:13:53 -0000 1.13 @@ -3,6 +3,7 @@
import java.io.IOException; import java.nio.BufferUnderflowException; import java.util.ArrayList;
+import java.util.Date;
import java.util.List; import java.util.logging.Handler; import java.util.logging.Level;
@@ -26,7 +27,7 @@
* having a direct reference to the actual application. * * @author Jeremy McCormick <[log in to unmask]>
- * @version $Id: DefaultEtEventProcessor.java,v 1.12 2013/10/04 06:02:19 jeremy Exp $
+ * @version $Id: DefaultEtEventProcessor.java,v 1.13 2013/10/25 23:13:53 jeremy Exp $
*/ public class DefaultEtEventProcessor implements EtEventProcessor {
@@ -90,33 +91,53 @@
this.pauseMode = p; }
+ /** + * Add an <code>EtEventListener</code> that will receive a callback for each event. + * @param callme The EtEventListener to add. + */
public void addListener(EtEventListener callme) { listeners.add(callme); }
+ /** + * Set the maximum number of events before processing should stop. + * @param maxEvents The maximum number of events before processing should be stopped. + */
public void setMaxEvents(int maxEvents) { this.maxEvents = maxEvents; logger.log(Level.INFO, "Set maxEvents to <{0}>.", maxEvents); }
+ /** + * Notify listeners of start of event processing. + */
private void begin() { for (EtEventListener listener : listeners) { listener.begin(); } }
+ /** + * Notify listeners of start of an event. + */
private void startOfEvent() { for (EtEventListener listener : listeners) { listener.startOfEvent(); } }
+ /** + * Notify listeners of end of an event. + */
private void endOfEvent() { for (EtEventListener listener : listeners) { listener.endOfEvent(); } }
+ /** + * Notify listeners that there was an error processing an event. + */
private void errorOnEvent() { ++errorEvents; for (EtEventListener listener : listeners) {
@@ -124,6 +145,9 @@
} }
+ /** + * Notify listeners of end of job. + */
private void finish() { logger.log(Level.FINER, "Calling finish() methods of listeners."); for (EtEventListener listener : listeners) {
@@ -131,28 +155,51 @@
} }
+ /** + * Set the flag to stop processing events, which will cause the + * event processing loop to stop the next time it checks this value. + */
public void stop() { this.stopProcessing = true; logger.log(Level.FINEST, "Received stop request."); }
+ /** + * Get the total number of events processed thusfar. + * @return The number of events processed. + */
public int getNumberOfEventsProcessed() { return eventsProcessed; }
+ /** + * Get the total number of errors that have occurred. + * @return The number of errors that have happened. + */
public int getNumberOfErrors() { return errorEvents; }
+ /** + * Get the maximum number of events that should be processed before stopping. + * @return The maximum number of events to process. + */
public int getMaxEvents() { return maxEvents; }
+ /** + * Reset the event counter. + */
public void resetNumberOfEventsProcessed() { eventsProcessed = 0; errorEvents = 0; }
+ /** + * Get the status of the processor. + * @return The status of the processor. + */
public int getStatus() { return status; }
@@ -165,7 +212,10 @@
*/ public void processEtEvents() throws EtTimeoutException, MaxEventsException, Exception {
- // Get events from the ET system. This can potentially block forever until it receives events.
+ /* + * Get events from the ET system. + * WARNING: This can potentially block forever until it receives events. + */
blocked = true; EtEvent[] mevs = et.sys.getEvents(et.att, et.param.waitMode, Modify.NOTHING, et.param.waitTime, et.param.chunk); blocked = false;
@@ -196,10 +246,14 @@
// Process a single EtEvent using the default processing chain. try { processEtEvent(mev);
- } // Catch event processing errors, including ET -> EVIO, EVIO -> LCIO, and LCSim Driver execution. - catch (EventProcessingException e) { - e.getCause().printStackTrace(); // This message shows up in detailed job log. - logger.log(Level.WARNING, "Error processing event <" + this.eventsProcessed + ">.");
+ // Catch event processing errors, including ET -> EVIO, EVIO -> LCIO, and LCSim Driver execution. + } catch (EventProcessingException e) { + /* Stack trace will show up log file. */ + e.getCause().printStackTrace(); + /* Print error message to log table. */ + logger.log(Level.SEVERE, e.getMessage()); + /* Print event number to log table. */ + logger.log(Level.SEVERE, "Error processing event <" + this.eventsProcessed + ">.");
errorOnEvent(); if (stopOnError) { this.status = ConnectionStatus.ERROR;
@@ -210,66 +264,106 @@
} }
+ /** + * Process a single <code>EtEvent</code>. + * The exceptions throw by this method will be caught and handled from within {@link #processEtEvents()}. + * @param mev The EtEvent to process. + */
public void processEtEvent(EtEvent mev) throws EventProcessingException, MaxEventsException {
- - // Check if max events was reached or exceeded.
+ + /* Check if max events was reached or exceeded. */
if (maxEvents != -1 && eventsProcessed >= maxEvents) {
+ logger.log(Level.INFO, "Reached max events.");
throw new MaxEventsException(); }
+ /* Check that the supplied EtEvent was not null. An exception is thrown if it is null. */
if (mev == null) {
- throw new EventProcessingException("supplied EtEvent is null");
+ logger.log(Level.SEVERE, "Supplied EtEvent is null."); + throw new EventProcessingException("The supplied EtEvent is null");
}
- // Notify listeners of start of event.
+ /* Notify listeners of start of event. */
startOfEvent();
- - // Increment number of events processed.
+ + /* Increment the number of events processed. */
++eventsProcessed;
- - // Attempt to create an EVIO event from the data buffer in the EtEvent.
+ + /* + * Create the EVIO event. + */
EvioEvent evioEvent = null;
- do { - // Create EvioEvent from EtEvent. - try { - evioEvent = createEvioEvent(mev);
+ boolean isPhysicsEvent = false; + try { + evioEvent = createEvioEvent(mev); + } catch (Exception e) { + throw new EventProcessingException("Failed to create EVIO event.", e); + } + + /* + * Process the EVIO event if the EvioReader created a non-null object. + */ + if (evioEvent != null) { + + /* Check for physics event. */ + isPhysicsEvent = eventBuilder.isPhysicsEvent(evioEvent); + + /* + * This is basically just redundantly printing information about the event to System.out, + * but it also sets some internal state within the event builder so leave it for now. + */ + eventBuilder.readEvioEvent(evioEvent); + + /* Log non-physics event information. */ + logEvioEvent(evioEvent); + + /* + * Notify listeners of non-physics events. + */ + if (isPreStartEvent(evioEvent)) { + preStartEvent(evioEvent); + } else if (isEndEvent(evioEvent)) { + endRun(evioEvent); + } + + /* Check that the event has physics type. */ + if (isPhysicsEvent) {
- if (evioEvent == null) { - throw new EventProcessingException("createEvioEvent returned null EvioEvent");
+ /* + * Use the event builder to create the lcsim event. + */ + EventHeader lcsimEvent = null; + try { + lcsimEvent = eventBuilder.makeLCSimEvent(evioEvent); + } catch (Exception e) { + throw new EventProcessingException("Failed to create LCSim event.", e);
}
- - //let event builder check for run information - eventBuilder.readEvioEvent(evioEvent);
- // Handlers for non-physics events. - if (isPreStartEvent(evioEvent)) { - preStartEvent(evioEvent); - } else if (isEndEvent(evioEvent)) { - endRun(evioEvent);
+ /* + * Process the event using the lcsim JobManager. + */ + if (lcsimEvent != null) { + try { + jobManager.processEvent(lcsimEvent); + } catch (Exception e) { + throw new EventProcessingException("Error processing the LCSim event.", e); + } + } else { + throw new EventProcessingException("The builder returned a null lcsim event.");
}
- - } catch (Exception e) { - throw new EventProcessingException("Failed to create EVIO event.", e);
}
- - } while (!eventBuilder.isPhysicsEvent(evioEvent)); - - // Create the LCSim event. - EventHeader lcsimEvent = null; - try { - lcsimEvent = eventBuilder.makeLCSimEvent(evioEvent); - } catch (Exception e) { - throw new EventProcessingException("Failed to create LCSim event.", e); - } - - // Process the LCSim event. - try { - jobManager.processEvent(lcsimEvent); - } catch (Exception e) { - throw new EventProcessingException("Error processing the LCSim event.", e);
+ } else { + /* + * The EVIO event was null, so print an error message to the log. This happens enough + * that an exception is NOT thrown. Ideally an error should be thrown but this + * means that the "disconnect on error" setting becomes completely useless, as this problem + * basically happens at least a few times in every test run EVIO file. + */ + logger.log(Level.SEVERE, "Error converting event <" + this.eventsProcessed + "> to EVIO."); + throw new EventProcessingException("The EVIO reader returned a null event.");
}
- // Notify listeners of end of event.
+ /* Notify listeners of end event. */
endOfEvent(); }
@@ -286,7 +380,7 @@
} /**
- * Run the event processing from the ET connection.
+ * Run the event processing from the ET connection until the job finishes.
*/ public void process() {
@@ -312,27 +406,32 @@
break; }
- // Try to process the next set of ET events.
+ /* Try to process the next set of ET events. */
try { processEtEvents();
- } // The ET system timed out in the getEvents() method. - catch (EtTimeoutException e) {
+ /* The ET system timed out in the getEvents() method. */ + } catch (EtTimeoutException e) {
logger.log(Level.WARNING, "ET connection timed out."); this.status = ConnectionStatus.TIMED_OUT;
- } // Event processing reached the maximum number of events. - catch (MaxEventsException e) {
+ /* Event processing reached the maximum number of events. */ + } catch (MaxEventsException e) {
logger.log(Level.INFO, "Reached max events <{0}>.", this.maxEvents); this.status = ConnectionStatus.DISCONNECTING;
- } // Some generic error was thrown. These are most likely ET system exceptions. - catch (Exception e) {
+ /* + * Catch other types of errors. These are most likely ET system exceptions + * or event processing errors. + */ + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage());
e.printStackTrace(); this.status = ConnectionStatus.ERROR;
- } // Break out of processing loop if not still in connected state. - finally {
+ /* Break out of processing loop if not still in connected state. */ + } finally {
if (this.status != ConnectionStatus.CONNECTED) { break; } }
+ /* Force messages to show in the log table. */
logger.getHandlers()[0].flush(); }
@@ -346,25 +445,25 @@
} /**
- * Check if this event is a Pre Start Event.
+ * Check if this is a Pre Start event.
* @param event
- * @return
+ * @return True if event is of type Pre Start; false if not.
*/ private static boolean isPreStartEvent(EvioEvent event) { return event.getHeader().getTag() == EventConstants.PRESTART_EVENT_TAG; } /**
- * Check if this event is an End Event.
+ * Check if this is an End Event.
* @param event
- * @return True if this event is an End Event; false if not.
+ * @return True if event is of type End Event; false if not.
*/ private static boolean isEndEvent(EvioEvent event) { return event.getHeader().getTag() == EventConstants.END_EVENT_TAG; } /**
- * Notify listeners of Pre Start event being received by ET system.
+ * Notify listeners of Pre Start event being received by the ET system.
* @param event The EvioEvent for the Pre Start. */ private void preStartEvent(EvioEvent event) {
@@ -389,23 +488,72 @@
} }
+ /** + * Set pause mode. + * @param p Set the pause mode; true to pause; false to unpause. + */
public void pauseMode(boolean p) { pauseMode = p; }
+ /** + * This is called externally to get the next event if running in pause mode. + */
public void nextEvents() { nextEvents = true; }
+ /** + * Set the application's log level. + * @param level The log level. + */
public void setLogLevel(Level level) { logger.setLevel(level); }
+ /** + * Get whether the event processing is blocked. + * @return True if blocked; false if not. + */
public boolean blocked() { return blocked; }
+ /** + * Get whether the event processing has been completed. + * @return True if event processing is complete; false if not. + */
public boolean done() { return done; }
+ + /** + * Print information from non-physics event. + * Copied and modified from {@link org.lcsim.hps.evio.LCSimTestRunEventBuilder#readEvioEvent(EvioEvent)} + * in order to route messages to the log table instead of <code>System.out</code>. + * @param evioEvent The EVIO event. + */ + private void logEvioEvent(EvioEvent evioEvent) { + if (EventConstants.isSyncEvent(evioEvent)) { + int[] data = evioEvent.getIntData(); + int seconds = data[0]; + logger.log(Level.INFO, "Sync event: time " + seconds + " - " + new Date(((long) seconds) * 1000) + ", event count since last sync " + data[1] + ", event count so far " + data[2] + ", status " + data[3]); + } else if (EventConstants.isPreStartEvent(evioEvent)) { + int[] data = evioEvent.getIntData(); + int seconds = data[0]; + logger.log(Level.INFO, "Prestart event: time " + seconds + " - " + new Date(((long) seconds) * 1000) + ", run type " + data[2]); + } else if (EventConstants.isGoEvent(evioEvent)) { + int[] data = evioEvent.getIntData(); + int seconds = data[0]; + logger.log(Level.INFO, "Go event: time " + seconds + " - " + new Date(((long) seconds) * 1000) + ", event count so far " + data[2]); + } else if (EventConstants.isPauseEvent(evioEvent)) { + int[] data = evioEvent.getIntData(); + int seconds = data[0]; + logger.log(Level.INFO, "Pause event: time " + seconds + " - " + new Date(((long) seconds) * 1000) + ", event count so far " + data[2]); + } else if (EventConstants.isEndEvent(evioEvent)) { + int[] data = evioEvent.getIntData(); + int seconds = data[0]; + logger.log(Level.INFO, "End event: time " + seconds + " - " + new Date(((long) seconds) * 1000) + ", event count " + data[2]); + } + }
}
\ No newline at end of file
Use REPLY-ALL to reply to list
To unsubscribe from the LCD-CVS list, click the following link:
https://listserv.slac.stanford.edu/cgi-bin/wa?SUBED1=LCD-CVS&A=1