24 added + 1 modified, total 25 files
java/trunk/monitoring-app
--- java/trunk/monitoring-app/pom.xml 2014-06-06 00:09:30 UTC (rev 684)
+++ java/trunk/monitoring-app/pom.xml 2014-06-06 02:16:36 UTC (rev 685)
@@ -1,23 +1,19 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
<modelVersion>4.0.0</modelVersion>
<artifactId>hps-monitoring-app</artifactId>
<name>monitoring-app</name>
<description>HPS online monitoring application</description>
-
<parent>
<groupId>org.hps</groupId>
<artifactId>hps-parent</artifactId>
<relativePath>../parent/pom.xml</relativePath>
<version>3.0.2-SNAPSHOT</version>
</parent>
-
<scm>
<url>http://java.freehep.org/svn/repos/hps/list/java/trunk/monitoring-app/</url>
<connection>scm:svn:svn://svn.freehep.org/hps/java/trunk/monitoring-app/</connection>
<developerConnection>scm:svn:svn://svn.freehep.org/hps/java/trunk/monitoring-app/</developerConnection>
</scm>
-
<build>
<plugins>
<plugin>
@@ -96,10 +92,19 @@
</goals>
</execution>
</executions>
- </plugin>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>org/hps/monitoring/record/**/**.java</exclude>
+ <exclude>org/hps/monitoring/subsys/et/**.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
</plugins>
</build>
-
<dependencies>
<dependency>
<groupId>org.hps</groupId>
@@ -138,5 +143,4 @@
</exclusions>
</dependency>
</dependencies>
-
</project>
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/AbstractRecordQueue.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/AbstractRecordQueue.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,127 @@
+package org.hps.monitoring.record;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.freehep.record.source.AbstractRecordSource;
+import org.freehep.record.source.NoSuchRecordException;
+
+/**
+ * Implementation of <tt>AbstractRecordSource</tt> using a dynamic queue that
+ * can receive events "on the fly" e.g. from an ET ring.
+ *
+ * @author Jeremy McCormick
+ */
+public abstract class AbstractRecordQueue<RecordType> extends AbstractRecordSource {
+
+ // The queue, which is a linked list with blocking behavior.
+ BlockingQueue<RecordType> records = new LinkedBlockingQueue<RecordType>();
+
+ // The current LCIO events.
+ RecordType currentRecord;
+
+ // The amount of time to wait for an LCIO event from the queue before dying.
+ long timeOutMillis = 1000;
+
+ /**
+ * Constructor that takes the timeout time in seconds.
+ * @param timeoutSeconds the timeout time in seconds
+ */
+ public AbstractRecordQueue(long timeoutMillis) {
+ this.timeOutMillis = timeoutMillis;
+ }
+
+ public AbstractRecordQueue() {
+ }
+
+ /**
+ * Set the time wait time before the poll call times out.
+ * @param timeoutMillis
+ */
+ public void setTimeOutMillis(long timeoutMillis) {
+ this.timeOutMillis = timeoutMillis;
+ }
+
+ /**
+ * Add a record to the queue.
+ * @param event the LCIO event to add
+ */
+ public void addRecord(RecordType record) {
+ records.add(record);
+ }
+
+ @Override
+ public Object getCurrentRecord() throws IOException {
+ return currentRecord;
+ }
+
+ @Override
+ public boolean supportsCurrent() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsNext() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsPrevious() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsIndex() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsShift() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsRewind() {
+ return false;
+ }
+
+ @Override
+ public boolean hasCurrent() {
+ return currentRecord != null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return records.size() != 0;
+ }
+
+ @Override
+ public void next() throws IOException, NoSuchRecordException {
+ try {
+ if (timeOutMillis > 0L)
+ // Poll the queue for the next record or until timeout is exceeded.
+ currentRecord = records.poll(timeOutMillis, TimeUnit.MILLISECONDS);
+ else
+ // Poll without an explicit wait time which will immediately return
+ // null if queue is empty.
+ currentRecord = records.poll();
+ } catch (InterruptedException e) {
+ }
+ if (currentRecord == null) {
+ throw new NoSuchRecordException("No records in queue.");
+ }
+ }
+
+ public long size() {
+ return records.size();
+ }
+
+ /*
+ void drain() {
+ do {
+ } while (records.peek() != null);
+ }
+ */
+}
\ No newline at end of file
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/EventProcessor.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/EventProcessor.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,11 @@
+package org.hps.monitoring.record;
+
+/**
+ * This is a very basic interface for event processing.
+ * @author Jeremy McCormick <[log in to unmask]>
+ *
+ * @param <EventType> The concrete type of the event record.
+ */
+public interface EventProcessor<EventType> {
+ void processEvent(EventType event);
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/etevent
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/etevent/EtEventAdapter.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/etevent/EtEventAdapter.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,52 @@
+package org.hps.monitoring.record.etevent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.freehep.record.loop.AbstractLoopListener;
+import org.freehep.record.loop.LoopEvent;
+import org.freehep.record.loop.RecordEvent;
+import org.freehep.record.loop.RecordListener;
+import org.jlab.coda.et.EtEvent;
+
+/**
+ * Adapter for processing <tt>EtEvent</tt> objects using a loop.
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class EtEventAdapter extends AbstractLoopListener implements RecordListener {
+
+ List<EtEventProcessor> processors = new ArrayList<EtEventProcessor>();
+
+ @Override
+ public void recordSupplied(RecordEvent recordEvent) {
+ Object object = recordEvent.getRecord();
+ if (object instanceof EtEvent) {
+ EtEvent event = (EtEvent)object;
+ processEvent(event);
+ }
+ }
+
+ @Override
+ public void start(LoopEvent event) {
+ for (EtEventProcessor processor : processors) {
+ processor.start();
+ }
+ }
+
+ @Override
+ public void finish(LoopEvent event) {
+ for (EtEventProcessor processor : processors) {
+ processor.stop();
+ }
+ }
+
+ void addEtEventProcessor(EtEventProcessor processor) {
+ processors.add(processor);
+ }
+
+ private void processEvent(EtEvent event) {
+ for (EtEventProcessor processor : processors) {
+ processor.processEvent(event);
+ }
+ }
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/etevent
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/etevent/EtEventLoop.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/etevent/EtEventLoop.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,46 @@
+package org.hps.monitoring.record.etevent;
+
+import java.io.IOException;
+
+import org.freehep.record.loop.DefaultRecordLoop;
+import org.freehep.record.source.RecordSource;
+import org.jlab.coda.et.EtEvent;
+
+/**
+ * Record loop implementation for processing <tt>EtEvent</tt> objects.
+ *
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class EtEventLoop extends DefaultRecordLoop {
+
+ EtEventAdapter adapter = new EtEventAdapter();
+
+ public EtEventLoop() {
+ addLoopListener(adapter);
+ addRecordListener(adapter);
+ }
+
+ public void addEtEventProcessor(EtEventProcessor processor) {
+ adapter.addEtEventProcessor(processor);
+ }
+
+ public void setRecordSource(RecordSource source) {
+ if (!source.getRecordClass().isAssignableFrom(EtEvent.class)) {
+ throw new IllegalArgumentException("The RecordSource has the wrong class.");
+ }
+ super.setRecordSource(source);
+ }
+
+ public long loop(long number) throws IOException {
+ if (number < 0L) {
+ execute(Command.GO, true);
+ } else {
+ execute(Command.GO_N, number, true);
+ execute(Command.STOP);
+ }
+ Throwable t = getProgress().getException();
+ if (t != null && t instanceof IOException)
+ throw (IOException) t;
+ return getSupplied();
+ }
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/etevent
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/etevent/EtEventProcessor.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/etevent/EtEventProcessor.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,32 @@
+package org.hps.monitoring.record.etevent;
+
+import org.hps.monitoring.record.EventProcessor;
+import org.jlab.coda.et.EtEvent;
+
+/**
+ * This is the basic abstract class that processors of
+ * <tt>EtEvent</tt> objects should implement.
+ *
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public abstract class EtEventProcessor implements EventProcessor<EtEvent> {
+
+ /**
+ * Start of ET session.
+ */
+ public void start() {
+ }
+
+ /**
+ * Process one <tt>EtEvent</tt>.
+ */
+ @Override
+ public void processEvent(EtEvent event) {
+ }
+
+ /**
+ * End of ET session.
+ */
+ public void stop() {
+ }
+}
\ No newline at end of file
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/etevent
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/etevent/EtEventQueue.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/etevent/EtEventQueue.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,19 @@
+package org.hps.monitoring.record.etevent;
+
+import org.hps.monitoring.record.AbstractRecordQueue;
+import org.jlab.coda.et.EtEvent;
+
+/**
+ * A dynamic queue for supplying <tt>EtEvent</tt> objects to a loop.
+ * This would most likely be run on a separate thread than the
+ * loop to avoid undesired blocking behavior.
+ *
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class EtEventQueue extends AbstractRecordQueue<EtEvent> {
+
+ @Override
+ public Class<EtEvent> getRecordClass() {
+ return EtEvent.class;
+ }
+}
\ No newline at end of file
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio/EvioAdapter.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio/EvioAdapter.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,56 @@
+package org.hps.monitoring.record.evio;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.freehep.record.loop.AbstractLoopListener;
+import org.freehep.record.loop.RecordEvent;
+import org.freehep.record.loop.RecordListener;
+import org.hps.evio.EventConstants;
+import org.jlab.coda.jevio.EvioEvent;
+
+/**
+ * Adapter to process <tt>EvioEvent</tt> objects using a loop.
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class EvioAdapter extends AbstractLoopListener implements RecordListener {
+
+ List<EvioEventProcessor> processors = new ArrayList<EvioEventProcessor>();
+
+ @Override
+ public void recordSupplied(RecordEvent recordEvent) {
+ Object object = recordEvent.getRecord();
+ if (object instanceof EvioEvent) {
+ EvioEvent event = (EvioEvent)object;
+ if (EventConstants.isPreStartEvent(event)) {
+ startRun(event);
+ } else if (EventConstants.isEndEvent(event)) {
+ endRun(event);
+ } else if (EventConstants.isPhysicsEvent(event)) {
+ processEvent(event);
+ }
+ }
+ }
+
+ void addEvioEventProcessor(EvioEventProcessor processor) {
+ processors.add(processor);
+ }
+
+ private void processEvent(EvioEvent event) {
+ for (EvioEventProcessor processor : processors) {
+ processor.processEvent(event);
+ }
+ }
+
+ private void startRun(EvioEvent event) {
+ for (EvioEventProcessor processor : processors) {
+ processor.startRun(event);
+ }
+ }
+
+ private void endRun(EvioEvent event) {
+ for (EvioEventProcessor processor : processors) {
+ processor.endRun(event);
+ }
+ }
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio/EvioEventLoop.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio/EvioEventLoop.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,47 @@
+package org.hps.monitoring.record.evio;
+
+import java.io.IOException;
+
+import org.freehep.record.loop.DefaultRecordLoop;
+import org.freehep.record.source.RecordSource;
+import org.jlab.coda.jevio.EvioEvent;
+
+/**
+ * Implementation of record loop for processing <tt>EvioEvent</tt> objects.
+ *
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class EvioEventLoop extends DefaultRecordLoop {
+
+ EvioAdapter adapter = new EvioAdapter();
+
+ public EvioEventLoop() {
+ addLoopListener(adapter);
+ addRecordListener(adapter);
+ }
+
+ public void addEvioEventProcessor(EvioEventProcessor processor) {
+ adapter.addEvioEventProcessor(processor);
+ }
+
+ public void setRecordSource(RecordSource source) {
+ if (!source.getRecordClass().isAssignableFrom(EvioEvent.class)) {
+ System.err.println("The class " + source.getRecordClass().getCanonicalName() + " is invalid.");
+ throw new IllegalArgumentException("The record class is invalid.");
+ }
+ super.setRecordSource(source);
+ }
+
+ public long loop(long number) throws IOException {
+ if (number < 0L) {
+ execute(Command.GO, true);
+ } else {
+ execute(Command.GO_N, number, true);
+ execute(Command.STOP);
+ }
+ Throwable t = getProgress().getException();
+ if (t != null && t instanceof IOException)
+ throw (IOException) t;
+ return getSupplied();
+ }
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio/EvioEventProcessor.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio/EvioEventProcessor.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,22 @@
+package org.hps.monitoring.record.evio;
+
+import org.hps.monitoring.record.EventProcessor;
+import org.jlab.coda.jevio.EvioEvent;
+
+/**
+ * This is the basic abstract class that processors of
+ * <tt>EvioEvent</tt> objects should implement.
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public abstract class EvioEventProcessor implements EventProcessor<EvioEvent> {
+
+ @Override
+ public void processEvent(EvioEvent event) {
+ }
+
+ public void startRun(EvioEvent event) {
+ }
+
+ public void endRun(EvioEvent event) {
+ }
+}
\ No newline at end of file
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio/EvioEventQueue.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio/EvioEventQueue.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,16 @@
+package org.hps.monitoring.record.evio;
+
+import org.hps.monitoring.record.AbstractRecordQueue;
+import org.jlab.coda.jevio.EvioEvent;
+
+/**
+ * A dynamic queue providing <tt>EvioEvent</tt> objects to a loop.
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class EvioEventQueue extends AbstractRecordQueue<EvioEvent> {
+
+ @Override
+ public Class<EvioEvent> getRecordClass() {
+ return EvioEvent.class;
+ }
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio/EvioFileSource.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/record/evio/EvioFileSource.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,126 @@
+package org.hps.monitoring.record.evio;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.freehep.record.source.AbstractRecordSource;
+import org.freehep.record.source.NoSuchRecordException;
+import org.jlab.coda.jevio.EvioEvent;
+import org.jlab.coda.jevio.EvioException;
+import org.jlab.coda.jevio.EvioReader;
+
+/**
+ * A very basic implementation of <tt>AbstractRecordSource</tt> for supplying <tt>EvioEvent</tt>
+ * objects to a loop from EVIO files. Unlike the LCIO record source, it has no rewind or
+ * indexing capabilities (for now at least).
+ *
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class EvioFileSource extends AbstractRecordSource {
+
+ EvioEvent currentEvent;
+ EvioReader reader;
+ List<File> files = new ArrayList<File>();
+ int fileIndex = 0;
+ boolean atEnd;
+
+ public EvioFileSource(List<File> files) {
+ this.files.addAll(files);
+ openReader();
+ }
+
+ public EvioFileSource(File file) {
+ this.files.add(file);
+ openReader();
+ }
+
+ private void openReader() {
+ try {
+ reader = new EvioReader(files.get(fileIndex), false);
+ } catch (EvioException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void closeReader() {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Object getCurrentRecord() throws IOException {
+ return currentEvent;
+ }
+
+ boolean endOfFiles() {
+ return fileIndex > (files.size() - 1);
+ }
+
+ @Override
+ public void next() throws IOException, NoSuchRecordException {
+ for (;;) {
+ try {
+ currentEvent = reader.parseNextEvent();
+ } catch (EvioException e) {
+ throw new IOException(e);
+ }
+ if (currentEvent == null) {
+ closeReader();
+ fileIndex++;
+ if (!endOfFiles()) {
+ openReader();
+ continue;
+ } else {
+ atEnd = true;
+ throw new NoSuchRecordException();
+ }
+ }
+ return;
+ }
+ }
+
+ @Override
+ public boolean supportsCurrent() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsNext() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsPrevious() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsIndex() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsShift() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsRewind() {
+ return false;
+ }
+
+ @Override
+ public boolean hasCurrent() {
+ return currentEvent != null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !atEnd;
+ }
+}
\ No newline at end of file
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/HasSystemInfo.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/HasSystemInfo.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,12 @@
+package org.hps.monitoring.subsys;
+
+/**
+ * A simple mix-in interface for objects that carry {@link SystemInfo}
+ * about some detector system.
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public interface HasSystemInfo {
+
+ SystemInfo getSystemInfo();
+
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemInfo.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemInfo.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,26 @@
+package org.hps.monitoring.subsys;
+
+/**
+ * Basic interface for information about a detector sub-system.
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public interface SystemInfo {
+
+ /**
+ * The name of the sub-system e.g. "SVT".
+ * @return The name of the sub-system.
+ */
+ String getName();
+
+ /**
+ * The current status of the sub-system.
+ * @return The sub-system status.
+ */
+ SystemStatus getStatus();
+
+ /**
+ * The set of statistics attached to the sub-system.
+ * @return The sub-system's statistics.
+ */
+ SystemStatistics getStatistics();
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemInfoImpl.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemInfoImpl.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,33 @@
+package org.hps.monitoring.subsys;
+
+/**
+ * Implementation of {@link SystemInfo}.
+ *
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class SystemInfoImpl implements SystemInfo {
+
+ String systemName = "";
+ SystemStatus status = new SystemStatusImpl();
+ SystemStatistics stats = new SystemStatisticsImpl();
+
+ public SystemInfoImpl(String systemName) {
+ this.systemName = systemName;
+ }
+
+ @Override
+ public String getName() {
+ return systemName;
+ }
+
+ @Override
+ public SystemStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public SystemStatistics getStatistics() {
+ return stats;
+ }
+
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemStatistics.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemStatistics.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,118 @@
+package org.hps.monitoring.subsys;
+
+import java.io.PrintStream;
+
+/**
+ * This is an interface for a set of basic statistics
+ * about an online event processing system.
+ *
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public interface SystemStatistics {
+
+ /**
+ * Set the desired timer tick length in millis.
+ * @param tickLengthMillis The desired tick length in millis.
+ */
+ void setTickLengthMillis(long tickLengthMillis);
+
+ /**
+ * Get the nominal length of one tick in millis.
+ * Actual ticks lengths may vary slightly.
+ * @return The nominal tick length in millis.
+ */
+ long getTickLengthMillis();
+
+ /**
+ * Start the timer thread for accumulating statistics.
+ */
+ void start();
+
+ /**
+ * Stop the timer thread for accumulating statistics.
+ */
+ void stop();
+
+ /**
+ * Update the statistics by incrementing the event count
+ * by one and then adding <tt>size</tt> to the number of bytes
+ * received.
+ * @param size The number of bytes received.
+ */
+ void update(int size);
+
+ /**
+ * Get the number of millis since the session started.
+ * @return The number of millis since session start.
+ */
+ long getTimeElapsedMillis();
+
+ /**
+ * Get the Unix start time of the session.
+ * @return The start time in millis.
+ */
+ long getStartTimeMillis();
+
+ /**
+ * Get the Unix stop time of the session.
+ * @return The stop time in millis.
+ */
+ long getStopTimeMillis();
+
+ /**
+ * Get the number of events in the current tick.
+ * @return The number of events in the current tick.
+ */
+ long getEventsSinceTick();
+
+ /**
+ * Get the total number of events processed thusfar.
+ * @return The total number of events processed so far.
+ */
+ long getCumulativeEvents();
+
+ /**
+ * Get the average number of events per second in the session.
+ * It simply divides the number of events by the time.
+ * @return The average events per second.
+ */
+ double getAverageEventsPerSecond();
+
+ /**
+ * Get the number of bytes received in the current tick.
+ * @return The number of bytes received in the tick.
+ */
+ long getBytesSinceTick();
+
+ /**
+ * Get the total number of megabytes of data received thusfar.
+ * @return The amount of data in megabytes received in the session.
+ */
+ double getCumulativeMegaBytes();
+
+ /**
+ * Get the average Mb per second of the session, which is the
+ * total amount of data divided by the total time.
+ * @return The average megabytes per second.
+ */
+ double getAverageMegaBytesPerSecond();
+
+ /**
+ * Get the number of milliseconds since the last tick.
+ * @return The number of millis elapsed in the current tick.
+ */
+ long getTickElapsedMillis();
+
+ /**
+ * Print session statistics.
+ * @param ps The PrintStream for display.
+ */
+ void printSession(PrintStream ps);
+
+ /**
+ * Print tick statistics.
+ * @param ps The PrintStream for display.
+ */
+ void printTick(PrintStream ps);
+
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemStatisticsImpl.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemStatisticsImpl.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,182 @@
+package org.hps.monitoring.subsys;
+
+import java.io.PrintStream;
+import java.text.DecimalFormat;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Implementation of {@link SystemStatistics}.
+ *
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class SystemStatisticsImpl implements SystemStatistics {
+
+ long tickLengthMillis = 1000; // default is one second tick
+ long sessionElapsedMillis;
+ long startTimeMillis;
+ long stopTimeMillis;
+ long eventsSinceTick;
+ long bytesSinceTick;
+ long totalEvents;
+ long totalBytes;
+ long tickStartMillis;
+ long tickElapsedMillis;
+ static final long Kb = 1 * 1024;
+ static final long Mb = Kb * 1024;
+ static final double milliToSecond = 0.001;
+ static final DecimalFormat decimalFormat = new DecimalFormat("#.##");
+ Timer timer;
+
+ @Override
+ public void update(int size) {
+ addEvent();
+ addData(size);
+ updateElapsedTime();
+ }
+
+ @Override
+ public void setTickLengthMillis(long tickLengthMillis) {
+ this.tickLengthMillis = tickLengthMillis;
+ }
+
+ @Override
+ public long getTickLengthMillis() {
+ return tickLengthMillis;
+ }
+
+ @Override
+ public long getTimeElapsedMillis() {
+ return sessionElapsedMillis;
+ }
+
+ @Override
+ public long getStartTimeMillis() {
+ return this.startTimeMillis;
+ }
+
+ @Override
+ public long getStopTimeMillis() {
+ return this.stopTimeMillis;
+ }
+
+ @Override
+ public long getCumulativeEvents() {
+ return totalEvents;
+ }
+
+ @Override
+ public double getAverageEventsPerSecond() {
+ try {
+ return Double.parseDouble(decimalFormat.format(totalEvents / millisToSeconds(getTimeElapsedMillis())));
+ } catch (NumberFormatException e) {
+ return Double.NaN;
+ }
+ }
+
+ @Override
+ public double getCumulativeMegaBytes() {
+ return bytesToMb(totalBytes);
+ }
+
+ @Override
+ public double getAverageMegaBytesPerSecond() {
+ try {
+ return Double.parseDouble(decimalFormat.format(bytesToMb(totalBytes) / millisToSeconds(getTimeElapsedMillis())));
+ } catch (NumberFormatException e) {
+ return Double.NaN;
+ }
+ }
+
+ @Override
+ public long getEventsSinceTick() {
+ return eventsSinceTick;
+ }
+
+ @Override
+ public long getBytesSinceTick() {
+ return bytesSinceTick;
+ }
+
+ @Override
+ public long getTickElapsedMillis() {
+ return tickElapsedMillis;
+ }
+
+ @Override
+ public void start() {
+
+ // Set time variables.
+ long currentTimeMillis = System.currentTimeMillis();
+ startTimeMillis = currentTimeMillis;
+ tickStartMillis = currentTimeMillis;
+
+ // Start Timer task which executes at tick length.
+ TimerTask task = new TimerTask() {
+ public void run() {
+ nextTick();
+ }
+ };
+ timer = new Timer();
+ timer.schedule(task, 0, tickLengthMillis);
+ }
+
+ @Override
+ public void stop() {
+ // Kill the Timer.
+ if (timer != null)
+ timer.cancel();
+
+ // Set stop time.
+ stopTimeMillis = System.currentTimeMillis();
+ }
+
+ @Override
+ public void printSession(PrintStream ps) {
+ ps.println("session statistics ...");
+ ps.println(" getTimeElapsedMillis = " + this.getTimeElapsedMillis());
+ ps.println(" getCumulativeEvents = " + this.getCumulativeEvents());
+ ps.println(" getAverageEventsPerSecond = " + this.getAverageEventsPerSecond());
+ ps.println(" getAverageMegaBytesPerSecond = " + this.getAverageMegaBytesPerSecond());
+
+ }
+
+ @Override
+ public void printTick(PrintStream ps) {
+ ps.println("tick statistics ...");
+ ps.println(" getTickElapsedMillis = " + this.getTickElapsedMillis());
+ ps.println(" getEventsSinceTick = " + this.getEventsSinceTick());
+ ps.println(" getBytesSinceTick = " + this.getBytesSinceTick());
+ }
+
+ void addEvent() {
+ eventsSinceTick += 1;
+ totalEvents += 1;
+ }
+
+ void addData(int size) {
+ bytesSinceTick += size;
+ totalBytes += size;
+ }
+
+ void updateElapsedTime() {
+ tickElapsedMillis = System.currentTimeMillis() - tickStartMillis;
+ sessionElapsedMillis = System.currentTimeMillis() - startTimeMillis;
+ }
+
+ // Bytes to megabytes to 2 decimal places.
+ static final double bytesToMb(long size) {
+ return Double.parseDouble(decimalFormat.format((double)size / Mb));
+ }
+
+ static final double millisToSeconds(long millis) {
+ return ((double)millis) / 1000.;
+ }
+
+ synchronized void nextTick() {
+ eventsSinceTick = 0;
+ bytesSinceTick = 0;
+ tickElapsedMillis = 0;
+ tickStartMillis = System.currentTimeMillis();
+ }
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemStatus.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemStatus.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,68 @@
+package org.hps.monitoring.subsys;
+
+/**
+ * The system status describes the state of a system, e.g. whether
+ * it is okay or some level of error has occurred. Listeners can
+ * be registered which will be notified whenever the status changes,
+ * in order to update a GUI, trip an alarm, etc.
+ *
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public interface SystemStatus {
+
+ /**
+ * Code that represents the system's overall status.
+ */
+ enum StatusCode {
+
+ OKAY (0, "okay", "The system appears to be working."),
+ UNKNOWN(1, "unknown", "The status is not known."),
+ OFFLINE(2, "offline", "The system is currently offline."),
+ WARNING(3, "warning", "There is a non-fatal warning."),
+ ERROR (4, "error", "A non-fatal but serious error has occurred."),
+ ALARM (5, "alarm", "An error has occurred and an alarm should trip."),
+ HALT (6, "halt", "The system should be immediately halted.");
+
+ int code;
+ String name;
+
+ StatusCode(int code, String name, String description) {
+ this.code = code;
+ this.name = name;
+ }
+
+ int getRawCode() {
+ return code;
+ }
+
+ String getName() {
+ return name;
+ }
+ }
+
+ /**
+ * Get the current status code.
+ * @return The current status code.
+ */
+ StatusCode getStatusCode();
+
+ /**
+ * Set the current status code, which will cause the last changed
+ * time to be set and the listeners to be notified.
+ * @param code The new status code.
+ */
+ void setStatusCode(StatusCode code);
+
+ /**
+ * Get the time when the system status last changed.
+ * @return The time when the system status changed.
+ */
+ long getLastChangedMillis();
+
+ /**
+ * Add a listener to receive notification when the system
+ * status changes.
+ * @param listener The listener object.
+ */
+ void addListener(SystemStatusListener listener);
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemStatusImpl.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemStatusImpl.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,51 @@
+package org.hps.monitoring.subsys;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The implementation of {@link SystemStatus}.
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class SystemStatusImpl implements SystemStatus {
+
+ StatusCode code = SystemStatus.StatusCode.UNKNOWN;
+ long lastChangedMillis;
+ List<SystemStatusListener> listeners = new ArrayList<SystemStatusListener>();
+
+ SystemStatusImpl() {
+ setCurrentTime();
+ }
+
+ @Override
+ public StatusCode getStatusCode() {
+ return code;
+ }
+
+ @Override
+ public void setStatusCode(StatusCode code) {
+ this.code = code;
+ setCurrentTime();
+ notifyListeners();
+ }
+
+ @Override
+ public void addListener(SystemStatusListener listener) {
+ this.listeners.add(listener);
+ }
+
+ @Override
+ public long getLastChangedMillis() {
+ return lastChangedMillis;
+ }
+
+ void notifyListeners() {
+ for (SystemStatusListener listener : listeners) {
+ listener.statusChanged(this);
+ }
+ }
+
+ private void setCurrentTime() {
+ this.lastChangedMillis = System.currentTimeMillis();
+ }
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemStatusListener.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/SystemStatusListener.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,18 @@
+package org.hps.monitoring.subsys;
+
+/**
+ * Interface for receiving changes to {@link SystemStatus} objects,
+ * e.g. when a new code is set.
+ *
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public interface SystemStatusListener {
+
+ /**
+ * Receive a change to the system status.
+ * The implementation of this method should absolutely not
+ * attempt to change the status!
+ * @param status The system status.
+ */
+ void statusChanged(SystemStatus status);
+}
java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/et
--- java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/et/EtSystemMonitor.java (rev 0)
+++ java/trunk/monitoring-app/src/main/java/org/hps/monitoring/subsys/et/EtSystemMonitor.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,41 @@
+package org.hps.monitoring.subsys.et;
+
+import org.hps.monitoring.record.etevent.EtEventProcessor;
+import org.hps.monitoring.subsys.HasSystemInfo;
+import org.hps.monitoring.subsys.SystemInfo;
+import org.hps.monitoring.subsys.SystemInfoImpl;
+import org.hps.monitoring.subsys.SystemStatus.StatusCode;
+import org.jlab.coda.et.EtEvent;
+
+/**
+ * This is a barebones implementation of an ET system monitor.
+ * It does not do much right now but accumulate statistics
+ * and set basic system statuses.
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class EtSystemMonitor extends EtEventProcessor implements HasSystemInfo {
+
+ SystemInfo info = new SystemInfoImpl("EtSystem");
+
+ @Override
+ public void start() {
+ info.getStatistics().start();
+ info.getStatus().setStatusCode(StatusCode.OKAY);
+ }
+
+ @Override
+ public void processEvent(EtEvent event) {
+ info.getStatistics().update(event.getLength());
+ }
+
+ @Override
+ public void stop() {
+ info.getStatistics().stop();
+ info.getStatus().setStatusCode(StatusCode.OFFLINE);
+ }
+
+ @Override
+ public SystemInfo getSystemInfo() {
+ return info;
+ }
+}
java/trunk/monitoring-app/src/test/java/org/hps/monitoring/record/etevent
--- java/trunk/monitoring-app/src/test/java/org/hps/monitoring/record/etevent/EtEventLoopTest.java (rev 0)
+++ java/trunk/monitoring-app/src/test/java/org/hps/monitoring/record/etevent/EtEventLoopTest.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,44 @@
+package org.hps.monitoring.record.etevent;
+
+import java.io.IOException;
+
+import org.jlab.coda.et.EtEvent;
+import org.jlab.coda.et.EtEventImpl;
+
+/**
+ * Test that the {@link EtEventLoop} works.
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class EtEventLoopTest {
+
+ public void testEtEventLoop() {
+
+ EtEventLoop loop = new EtEventLoop();
+ loop.addEtEventProcessor(new DummyEtEventProcessor());
+ EtEventQueue queue = new EtEventQueue();
+ queue.setTimeOutMillis(10000);
+ loop.setRecordSource(queue);
+
+ for (int i=0; i<100000; i++) {
+ EtEvent event = new EtEventImpl(1000);
+ queue.addRecord(event);
+ }
+
+ try {
+ loop.loop(-1);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ System.out.println("loop processed " + loop.getTotalSupplied() + " records");
+ }
+
+ static class DummyEtEventProcessor extends EtEventProcessor {
+
+ public void processEvent(EtEvent event) {
+ System.out.println(this.getClass().getSimpleName() + " got EtEvent of length " + event.getLength());
+ }
+
+ }
+
+}
java/trunk/monitoring-app/src/test/java/org/hps/monitoring/record/etevent
--- java/trunk/monitoring-app/src/test/java/org/hps/monitoring/record/etevent/MultiThreadedEtEventLoopTest.java (rev 0)
+++ java/trunk/monitoring-app/src/test/java/org/hps/monitoring/record/etevent/MultiThreadedEtEventLoopTest.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,111 @@
+package org.hps.monitoring.record.etevent;
+
+import java.io.IOException;
+
+import org.jlab.coda.et.EtEvent;
+import org.jlab.coda.et.EtEventImpl;
+
+/**
+ * Test that the {@link EtEventLoop} works when the loop and source
+ * are run on seperate threads.
+ *
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class MultiThreadedEtEventLoopTest {
+
+ // Time in milliseconds to wait before queuing a new dummy event.
+ static int EVENT_INTERVAL = 10;
+
+ public void testThreadedQueue() {
+
+ // Setup the loop.
+ EtEventLoop loop = new EtEventLoop();
+ loop.addEtEventProcessor(new DummyEtEventProcessor());
+
+ // Create the event queue.
+ EtEventQueue queue = new EtEventQueue();
+ queue.setTimeOutMillis(10000);
+ loop.setRecordSource(queue);
+
+ // Create runnable objects.
+ LoopRunnable loopRunnable = new LoopRunnable(loop);
+ QueueRunnable queueRunnable = new QueueRunnable(queue, EVENT_INTERVAL);
+
+ // Start loop thread.
+ Thread loopThread = new Thread(loopRunnable);
+ loopThread.start();
+
+ // Start queue thread.
+ Thread queueThread = new Thread(queueRunnable);
+ queueThread.start();
+
+ // Wait for queue thread to end.
+ try {
+ queueThread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ System.out.println("loop got " + loop.getConsumed() + " records");
+
+ System.out.println("disposing loop ...");
+ loop.dispose();
+ loop = null;
+ System.gc();
+ }
+
+ static class LoopRunnable implements Runnable {
+
+ EtEventLoop loop;
+
+ LoopRunnable(EtEventLoop loop) {
+ this.loop = loop;
+ }
+
+ public void run() {
+ try {
+ loop.loop(-1);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ static class QueueRunnable implements Runnable {
+
+ EtEventQueue queue = null;
+ int waitTimeMillis = 0;
+
+ QueueRunnable(EtEventQueue queue, int waitTimeMillis) {
+ this.queue = queue;
+ this.waitTimeMillis = waitTimeMillis;
+ }
+
+ public void run() {
+ for (int i = 1; i <= 1000; i++) {
+ byte[] data = new byte[256];
+ EtEventImpl event = new EtEventImpl(256);
+ event.setData(data);
+ queue.addRecord(event);
+ delay();
+ }
+ System.out.println(this.getClass().getSimpleName() + " is done adding events.");
+ }
+
+ synchronized private void delay() {
+ try {
+ wait(waitTimeMillis);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ static class DummyEtEventProcessor extends EtEventProcessor {
+
+ public void processEvent(EtEvent event) {
+ System.out.println(this.getClass().getSimpleName() + " got EtEvent of length " + event.getLength());
+ }
+
+ }
+}
java/trunk/monitoring-app/src/test/java/org/hps/monitoring/record/evio
--- java/trunk/monitoring-app/src/test/java/org/hps/monitoring/record/evio/EvioEventLoopTest.java (rev 0)
+++ java/trunk/monitoring-app/src/test/java/org/hps/monitoring/record/evio/EvioEventLoopTest.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,56 @@
+package org.hps.monitoring.record.evio;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.jlab.coda.jevio.EvioEvent;
+
+/**
+ * Test that the {@link EvioEventLoop} works.
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class EvioEventLoopTest extends TestCase {
+
+ File file1 = new File("/nfs/slac/g/hps3/data/testrun/runs/evio/hps_001351.evio.0");
+ File file2 = new File("/nfs/slac/g/hps3/data/testrun/runs/evio/hps_001353.evio.0");
+
+ public void testEvioRecordLoop() {
+
+ List<File> files = new ArrayList<File>();
+ files.add(file1);
+ files.add(file2);
+
+ EvioEventLoop loop = new EvioEventLoop();
+ loop.addEvioEventProcessor(new DummyEvioEventProcessor());
+ loop.setRecordSource(new EvioFileSource(files));
+ try {
+ loop.loop(-1);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ System.out.println("loop processed " + loop.getTotalSupplied() + " events");
+ }
+
+ static class DummyEvioEventProcessor extends EvioEventProcessor {
+
+ public void processEvent(EvioEvent event) {
+ System.out.println(this.getClass().getSimpleName() + " got EVIO event " + event.getEventNumber());
+ }
+
+ public void startRun(EvioEvent event) {
+ int[] data = event.getIntData();
+ int runNumber = data[1];
+ System.out.println(this.getClass().getSimpleName() + " starting run " + runNumber);
+ }
+
+ public void endRun(EvioEvent event) {
+ int[] data = event.getIntData();
+ int runNumber = data[1];
+ System.out.println(this.getClass().getSimpleName() + " ending run " + runNumber);
+ }
+ }
+}
java/trunk/monitoring-app/src/test/java/org/hps/monitoring/subsys/et
--- java/trunk/monitoring-app/src/test/java/org/hps/monitoring/subsys/et/EtSystemMonitorTest.java (rev 0)
+++ java/trunk/monitoring-app/src/test/java/org/hps/monitoring/subsys/et/EtSystemMonitorTest.java 2014-06-06 02:16:36 UTC (rev 685)
@@ -0,0 +1,76 @@
+package org.hps.monitoring.subsys.et;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.hps.monitoring.record.etevent.EtEventLoop;
+import org.hps.monitoring.record.etevent.EtEventQueue;
+import org.hps.monitoring.subsys.SystemStatus;
+import org.hps.monitoring.subsys.SystemStatusListener;
+import org.jlab.coda.et.EtEvent;
+import org.jlab.coda.et.EtEventImpl;
+
+/**
+ * Test that the {@link EtSystemMonitor} works.
+ * @author Jeremy McCormick <[log in to unmask]>
+ */
+public class EtSystemMonitorTest extends TestCase {
+
+ public void testEtEventMonitoring() {
+
+ EtEventLoop loop = new EtEventLoop();
+ EtSystemMonitor monitor = new EtSystemPrinter();
+ monitor.getSystemInfo().getStatus().addListener(new DummyListener());
+ loop.addEtEventProcessor(monitor);
+ EtEventQueue queue = new EtEventQueue();
+ queue.setTimeOutMillis(1000);
+ loop.setRecordSource(queue);
+
+ for (int i=0; i<100000; i++) {
+ byte[] data = new byte[256];
+ EtEventImpl event = new EtEventImpl(256);
+ event.setData(data);
+ queue.addRecord(event);
+ }
+
+ try {
+ loop.loop(-1);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ System.out.println("loop supplied " + loop.getTotalSupplied() + " records");
+ System.out.println("loop consumed " + loop.getCountableConsumed() + " records");
+ }
+
+ static class DummyListener implements SystemStatusListener {
+
+ public void statusChanged(SystemStatus status) {
+ System.out.println(this.getClass().getSimpleName() + " saw status changed to " + status.getStatusCode().toString() + " at " + status.getLastChangedMillis() + " millis");
+ }
+ }
+
+ static class EtSystemPrinter extends EtSystemMonitor{
+
+ int eventsProcessed = 0;
+
+ public void start() {
+ super.start();
+ info.getStatistics().printSession(System.out);
+ }
+
+ public void processEvent(EtEvent event) {
+ super.processEvent(event);
+ ++eventsProcessed;
+ if (eventsProcessed % 1000 == 0)
+ info.getStatistics().printTick(System.out);
+ }
+
+ public void stop() {
+ super.stop();
+ info.getStatistics().printSession(System.out);
+ }
+ }
+
+}
SVNspam 0.1