Print

Print


Commit in hps-java/src/main/java/org/lcsim/hps/evio on MAIN
EvioConsumer.java+277added 1.1
in progress; does not current work

hps-java/src/main/java/org/lcsim/hps/evio
EvioConsumer.java added at 1.1
diff -N EvioConsumer.java
--- /dev/null	1 Jan 1970 00:00:00 -0000
+++ EvioConsumer.java	9 Mar 2012 23:34:21 -0000	1.1
@@ -0,0 +1,277 @@
+package org.lcsim.hps.evio;
+
+import java.nio.ByteBuffer;
+
+import org.jlab.coda.et.EtAttachment;
+import org.jlab.coda.et.EtConstants;
+import org.jlab.coda.et.EtEvent;
+import org.jlab.coda.et.EtStation;
+import org.jlab.coda.et.EtStationConfig;
+import org.jlab.coda.et.EtSystem;
+import org.jlab.coda.et.EtSystemOpenConfig;
+import org.jlab.coda.et.enums.Mode;
+import org.jlab.coda.jevio.CompositeData;
+import org.jlab.coda.jevio.DataType;
+import org.jlab.coda.jevio.EvioEvent;
+
+/**
+ * This class is an example of an event consumer for an ET system.
+ * 
+ * Based on Carl Timmer's apps.Consumer class from jevio 4.0 release.
+ *
+ * @author Jeremy McCormick
+ */
+public class EvioConsumer {
+    
+    public EvioConsumer() {
+    }
+
+
+    private static void usage() {
+        System.out.println("\nUsage: java Consumer -f <et name> -host <ET host> -s <station name> [-h] [-v] [-nb]\n" +
+                "                      [-p <ET server port>] [-c <chunk size>] [-q <queue size>]\n" +
+                "                      [-pos <station position>] [-ppos <parallel station position>]\n\n" +
+                "       -host  ET system's host\n" +
+                "       -f     ET system's (memory-mapped file) name\n" +
+                "       -s     create station of this name\n" +
+                "       -h     help\n" +
+                "       -v     verbose output\n" +
+                "       -nb    make station non-blocking\n" +
+                "       -p     ET server port\n" +
+                "       -c     number of events in one get/put array\n" +
+                "       -q     queue size if creating nonblocking station\n" +
+                "       -pos   position of created station in station list (1,2,...)\n" +
+                "       -ppos  position of created station within a group of parallel stations (-1=end, -2=head)\n\n" +
+                "        This consumer works by making a direct connection\n" +
+                "        to the ET system's tcp server port.\n");
+    }
+
+
+    public static void main(String[] args) {
+
+        int position = 1, pposition = 0, qSize = 0, chunk = 1;
+        boolean blocking = true, verbose = false;
+        String etName = null, host = null, statName = null;
+        int port = EtConstants.serverPort;
+        int flowMode = EtConstants.stationSerial;
+
+        for (int i = 0; i < args.length; i++) {
+            if (args[i].equalsIgnoreCase("-f")) {
+                etName = args[++i];
+            }
+            else if (args[i].equalsIgnoreCase("-host")) {
+                host = args[++i];
+            }
+            else if (args[i].equalsIgnoreCase("-nb")) {
+                blocking = false;
+            }
+            else if (args[i].equalsIgnoreCase("-v")) {
+                verbose = true;
+            }
+            else if (args[i].equalsIgnoreCase("-s")) {
+                statName = args[++i];
+            }
+            else if (args[i].equalsIgnoreCase("-p")) {
+                try {
+                    port = Integer.parseInt(args[++i]);
+                    if ((port < 1024) || (port > 65535)) {
+                        System.out.println("Port number must be between 1024 and 65535.");
+                        usage();
+                        return;
+                    }
+                }
+                catch (NumberFormatException ex) {
+                    System.out.println("Did not specify a proper port number.");
+                    usage();
+                    return;
+                }
+            }
+            else if (args[i].equalsIgnoreCase("-c")) {
+                try {
+                    chunk = Integer.parseInt(args[++i]);
+                    if ((chunk < 1) || (chunk > 1000)) {
+                        System.out.println("Chunk size may be 1 - 1000.");
+                        usage();
+                        return;
+                    }
+                }
+                catch (NumberFormatException ex) {
+                    System.out.println("Did not specify a proper chunk size.");
+                    usage();
+                    return;
+                }
+            }
+            else if (args[i].equalsIgnoreCase("-q")) {
+                try {
+                    qSize = Integer.parseInt(args[++i]);
+                    if (qSize < 1) {
+                        System.out.println("Queue size must be > 0.");
+                        usage();
+                        return;
+                    }
+                }
+                catch (NumberFormatException ex) {
+                    System.out.println("Did not specify a proper queue size number.");
+                    usage();
+                    return;
+                }
+            }
+            else if (args[i].equalsIgnoreCase("-pos")) {
+                try {
+                    position = Integer.parseInt(args[++i]);
+                    if (position < 1) {
+                        System.out.println("Position must be > 0.");
+                        usage();
+                        return;
+                    }
+                }
+                catch (NumberFormatException ex) {
+                    System.out.println("Did not specify a proper position number.");
+                    usage();
+                    return;
+                }
+            }
+            else if (args[i].equalsIgnoreCase("-ppos")) {
+                try {
+                    pposition = Integer.parseInt(args[++i]);
+                    if (pposition < -2 || pposition == 0) {
+                        System.out.println("Parallel position must be > -3 and != 0.");
+                        usage();
+                        return;
+                    }
+                    System.out.println("FLOW moDE is ||");
+                    flowMode = EtConstants.stationParallel;
+                    if (pposition == -2) pposition = EtConstants.newHead;
+                    else if (pposition == -1) pposition = EtConstants.end;
+
+                }
+                catch (NumberFormatException ex) {
+                    System.out.println("Did not specify a proper parallel position number.");
+                    usage();
+                    return;
+                }
+            }
+            else {
+                usage();
+                return;
+            }
+        }
+
+        if (host == null || etName == null || statName == null) {
+            usage();
+            return;
+        }
+
+        try {
+            // make a direct connection to ET system's tcp server
+            EtSystemOpenConfig config = new EtSystemOpenConfig(etName, host, port);
+
+            // create ET system object with verbose debugging output
+            EtSystem sys = new EtSystem(config, EtConstants.debugInfo);
+            sys.open();
+
+            // configuration of a new station
+            EtStationConfig statConfig = new EtStationConfig();
+            statConfig.setFlowMode(flowMode);
+            if (!blocking) {
+                statConfig.setBlockMode(EtConstants.stationNonBlocking);
+                if (qSize > 0) {
+                    statConfig.setCue(qSize);
+                }
+            }
+
+            // create station
+            EtStation stat = sys.createStation(statConfig, statName, position, pposition);
+
+            // attach to new station
+            EtAttachment att = sys.attach(stat);
+
+            // array of events
+            EtEvent[] mevs;
+            
+            // Test plots.
+            /*
+            AIDA aida = AIDA.defaultInstance();
+            ICloud1D idsPlot = aida.cloud1D("Cell IDs");
+            ICloud1D timePlot = aida.cloud1D("Time");
+            ICloud1D adcValuePlot = aida.cloud1D("ADC Value");
+            */
+            
+            // Number of events to read off ET Ring before exiting.
+            // TODO Make this a CL parameter.
+            //int ntoread = 10000;
+            int ntoread = 8879;
+            
+            // Number of events read so far.
+            int nread = 0;
+            
+            while (true) {
+
+                // get events from ET system
+                mevs = sys.getEvents(att, Mode.SLEEP, null, 0, chunk);
+                
+                nread += mevs.length;
+                
+                System.out.println("got " + mevs.length + " events from ET Ring");
+
+                // example of reading & printing event data
+                for (EtEvent mev : mevs) {
+
+                    if (mev.needToSwap()) {
+                        System.out.println("data needs swapping");
+                    }
+                    
+                    // Get event's data buffer
+                    // buf.limit() = length of the actual data (not buffer capacity)
+                    ByteBuffer buf = mev.getDataBuffer();
+                    
+                    System.out.println("mev.len="+mev.getLength());
+
+                    // Make composite data from raw bytes in event.
+                    System.out.println("making new EvioEvent");
+                    EvioEvent ev = new EvioEvent(0, DataType.COMPOSITE, 0);
+                    System.out.println("setting raw bytes");
+                    ev.setRawBytes(buf.array());
+                    System.out.println("getting CompositeData");
+                    CompositeData cdata = ev.getCompositeData();
+                    System.out.println("OK");
+
+                    /*
+                    // Make RawTrackerHits from composite data.
+                    List<RawTrackerHit> hits = new ArrayList<RawTrackerHit>();
+                    List<Object> items = cdata.getItems();                        
+                    int n = items.size();
+                    for (int i=0; i<n; i+=3) {
+                        Long id = (Long)items.get(i);
+                        int time = (Integer)items.get(i+1);
+                        int adcValue = (Integer)items.get(i+2);
+                        System.out.println("EvioConsumer --> id=0x"+Long.toHexString(id)+"; time="+time+"; adcValue="+adcValue);
+                        RawTrackerHit hit = new BaseRawTrackerHit(id, time, new short[] {(short)adcValue});
+                        hits.add(hit);
+                    }         
+                    System.out.println("Made " + hits.size() + " RawTrackerHits from EtEvent");
+                    
+                    // Make some test plots from tracker data.
+                    for (RawTrackerHit hit : hits) {
+                        idsPlot.fill(hit.getCellID());
+                        timePlot.fill(hit.getTime());
+                        adcValuePlot.fill(hit.getADCValues()[0]);
+                    }
+                    */
+                }                
+
+                // put events back into ET system
+                sys.putEvents(att, mevs);
+                
+                // Break out of read loop if max events events equaled or exceeded.
+                if (nread >= ntoread)
+                    break;
+            }
+        }
+        catch (Exception ex) {
+            System.out.println("Error using ET system as consumer");
+            ex.printStackTrace();
+        }
+    }
+    
+}
CVSspam 0.2.12


Use REPLY-ALL to reply to list

To unsubscribe from the LCD-CVS list, click the following link:
https://listserv.slac.stanford.edu/cgi-bin/wa?SUBED1=LCD-CVS&A=1