hps-java/src/main/java/org/lcsim/hps/evio
diff -N EvioConsumer.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ EvioConsumer.java 9 Mar 2012 23:34:21 -0000 1.1
@@ -0,0 +1,277 @@
+package org.lcsim.hps.evio;
+
+import java.nio.ByteBuffer;
+
+import org.jlab.coda.et.EtAttachment;
+import org.jlab.coda.et.EtConstants;
+import org.jlab.coda.et.EtEvent;
+import org.jlab.coda.et.EtStation;
+import org.jlab.coda.et.EtStationConfig;
+import org.jlab.coda.et.EtSystem;
+import org.jlab.coda.et.EtSystemOpenConfig;
+import org.jlab.coda.et.enums.Mode;
+import org.jlab.coda.jevio.CompositeData;
+import org.jlab.coda.jevio.DataType;
+import org.jlab.coda.jevio.EvioEvent;
+
+/**
+ * This class is an example of an event consumer for an ET system.
+ *
+ * Based on Carl Timmer's apps.Consumer class from jevio 4.0 release.
+ *
+ * @author Jeremy McCormick
+ */
+public class EvioConsumer {
+
+ public EvioConsumer() {
+ }
+
+
+ private static void usage() {
+ System.out.println("\nUsage: java Consumer -f <et name> -host <ET host> -s <station name> [-h] [-v] [-nb]\n" +
+ " [-p <ET server port>] [-c <chunk size>] [-q <queue size>]\n" +
+ " [-pos <station position>] [-ppos <parallel station position>]\n\n" +
+ " -host ET system's host\n" +
+ " -f ET system's (memory-mapped file) name\n" +
+ " -s create station of this name\n" +
+ " -h help\n" +
+ " -v verbose output\n" +
+ " -nb make station non-blocking\n" +
+ " -p ET server port\n" +
+ " -c number of events in one get/put array\n" +
+ " -q queue size if creating nonblocking station\n" +
+ " -pos position of created station in station list (1,2,...)\n" +
+ " -ppos position of created station within a group of parallel stations (-1=end, -2=head)\n\n" +
+ " This consumer works by making a direct connection\n" +
+ " to the ET system's tcp server port.\n");
+ }
+
+
+ public static void main(String[] args) {
+
+ int position = 1, pposition = 0, qSize = 0, chunk = 1;
+ boolean blocking = true, verbose = false;
+ String etName = null, host = null, statName = null;
+ int port = EtConstants.serverPort;
+ int flowMode = EtConstants.stationSerial;
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equalsIgnoreCase("-f")) {
+ etName = args[++i];
+ }
+ else if (args[i].equalsIgnoreCase("-host")) {
+ host = args[++i];
+ }
+ else if (args[i].equalsIgnoreCase("-nb")) {
+ blocking = false;
+ }
+ else if (args[i].equalsIgnoreCase("-v")) {
+ verbose = true;
+ }
+ else if (args[i].equalsIgnoreCase("-s")) {
+ statName = args[++i];
+ }
+ else if (args[i].equalsIgnoreCase("-p")) {
+ try {
+ port = Integer.parseInt(args[++i]);
+ if ((port < 1024) || (port > 65535)) {
+ System.out.println("Port number must be between 1024 and 65535.");
+ usage();
+ return;
+ }
+ }
+ catch (NumberFormatException ex) {
+ System.out.println("Did not specify a proper port number.");
+ usage();
+ return;
+ }
+ }
+ else if (args[i].equalsIgnoreCase("-c")) {
+ try {
+ chunk = Integer.parseInt(args[++i]);
+ if ((chunk < 1) || (chunk > 1000)) {
+ System.out.println("Chunk size may be 1 - 1000.");
+ usage();
+ return;
+ }
+ }
+ catch (NumberFormatException ex) {
+ System.out.println("Did not specify a proper chunk size.");
+ usage();
+ return;
+ }
+ }
+ else if (args[i].equalsIgnoreCase("-q")) {
+ try {
+ qSize = Integer.parseInt(args[++i]);
+ if (qSize < 1) {
+ System.out.println("Queue size must be > 0.");
+ usage();
+ return;
+ }
+ }
+ catch (NumberFormatException ex) {
+ System.out.println("Did not specify a proper queue size number.");
+ usage();
+ return;
+ }
+ }
+ else if (args[i].equalsIgnoreCase("-pos")) {
+ try {
+ position = Integer.parseInt(args[++i]);
+ if (position < 1) {
+ System.out.println("Position must be > 0.");
+ usage();
+ return;
+ }
+ }
+ catch (NumberFormatException ex) {
+ System.out.println("Did not specify a proper position number.");
+ usage();
+ return;
+ }
+ }
+ else if (args[i].equalsIgnoreCase("-ppos")) {
+ try {
+ pposition = Integer.parseInt(args[++i]);
+ if (pposition < -2 || pposition == 0) {
+ System.out.println("Parallel position must be > -3 and != 0.");
+ usage();
+ return;
+ }
+ System.out.println("FLOW moDE is ||");
+ flowMode = EtConstants.stationParallel;
+ if (pposition == -2) pposition = EtConstants.newHead;
+ else if (pposition == -1) pposition = EtConstants.end;
+
+ }
+ catch (NumberFormatException ex) {
+ System.out.println("Did not specify a proper parallel position number.");
+ usage();
+ return;
+ }
+ }
+ else {
+ usage();
+ return;
+ }
+ }
+
+ if (host == null || etName == null || statName == null) {
+ usage();
+ return;
+ }
+
+ try {
+ // make a direct connection to ET system's tcp server
+ EtSystemOpenConfig config = new EtSystemOpenConfig(etName, host, port);
+
+ // create ET system object with verbose debugging output
+ EtSystem sys = new EtSystem(config, EtConstants.debugInfo);
+ sys.open();
+
+ // configuration of a new station
+ EtStationConfig statConfig = new EtStationConfig();
+ statConfig.setFlowMode(flowMode);
+ if (!blocking) {
+ statConfig.setBlockMode(EtConstants.stationNonBlocking);
+ if (qSize > 0) {
+ statConfig.setCue(qSize);
+ }
+ }
+
+ // create station
+ EtStation stat = sys.createStation(statConfig, statName, position, pposition);
+
+ // attach to new station
+ EtAttachment att = sys.attach(stat);
+
+ // array of events
+ EtEvent[] mevs;
+
+ // Test plots.
+ /*
+ AIDA aida = AIDA.defaultInstance();
+ ICloud1D idsPlot = aida.cloud1D("Cell IDs");
+ ICloud1D timePlot = aida.cloud1D("Time");
+ ICloud1D adcValuePlot = aida.cloud1D("ADC Value");
+ */
+
+ // Number of events to read off ET Ring before exiting.
+ // TODO Make this a CL parameter.
+ //int ntoread = 10000;
+ int ntoread = 8879;
+
+ // Number of events read so far.
+ int nread = 0;
+
+ while (true) {
+
+ // get events from ET system
+ mevs = sys.getEvents(att, Mode.SLEEP, null, 0, chunk);
+
+ nread += mevs.length;
+
+ System.out.println("got " + mevs.length + " events from ET Ring");
+
+ // example of reading & printing event data
+ for (EtEvent mev : mevs) {
+
+ if (mev.needToSwap()) {
+ System.out.println("data needs swapping");
+ }
+
+ // Get event's data buffer
+ // buf.limit() = length of the actual data (not buffer capacity)
+ ByteBuffer buf = mev.getDataBuffer();
+
+ System.out.println("mev.len="+mev.getLength());
+
+ // Make composite data from raw bytes in event.
+ System.out.println("making new EvioEvent");
+ EvioEvent ev = new EvioEvent(0, DataType.COMPOSITE, 0);
+ System.out.println("setting raw bytes");
+ ev.setRawBytes(buf.array());
+ System.out.println("getting CompositeData");
+ CompositeData cdata = ev.getCompositeData();
+ System.out.println("OK");
+
+ /*
+ // Make RawTrackerHits from composite data.
+ List<RawTrackerHit> hits = new ArrayList<RawTrackerHit>();
+ List<Object> items = cdata.getItems();
+ int n = items.size();
+ for (int i=0; i<n; i+=3) {
+ Long id = (Long)items.get(i);
+ int time = (Integer)items.get(i+1);
+ int adcValue = (Integer)items.get(i+2);
+ System.out.println("EvioConsumer --> id=0x"+Long.toHexString(id)+"; time="+time+"; adcValue="+adcValue);
+ RawTrackerHit hit = new BaseRawTrackerHit(id, time, new short[] {(short)adcValue});
+ hits.add(hit);
+ }
+ System.out.println("Made " + hits.size() + " RawTrackerHits from EtEvent");
+
+ // Make some test plots from tracker data.
+ for (RawTrackerHit hit : hits) {
+ idsPlot.fill(hit.getCellID());
+ timePlot.fill(hit.getTime());
+ adcValuePlot.fill(hit.getADCValues()[0]);
+ }
+ */
+ }
+
+ // put events back into ET system
+ sys.putEvents(att, mevs);
+
+ // Break out of read loop if max events events equaled or exceeded.
+ if (nread >= ntoread)
+ break;
+ }
+ }
+ catch (Exception ex) {
+ System.out.println("Error using ET system as consumer");
+ ex.printStackTrace();
+ }
+ }
+
+}