Print

Print


Author: [log in to unmask]
Date: Wed Apr 29 15:05:20 2015
New Revision: 2860

Log:
Overhaul the EvioFileProducer class.  Command line parsing is done with Apache CLI now.  Default options are supplied so that only the EVIO files are required.

Modified:
    java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileProducer.java

Modified: java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileProducer.java
 =============================================================================
--- java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileProducer.java	(original)
+++ java/trunk/record-util/src/main/java/org/hps/record/evio/EvioFileProducer.java	Wed Apr 29 15:05:20 2015
@@ -1,13 +1,22 @@
 package org.hps.record.evio;
 
+import java.io.BufferedReader;
 import java.io.File;
-import java.net.InetAddress;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.jlab.coda.et.EtAttachment;
 import org.jlab.coda.et.EtConstants;
 import org.jlab.coda.et.EtEvent;
@@ -18,91 +27,143 @@
 import org.jlab.coda.jevio.EventWriter;
 import org.jlab.coda.jevio.EvioEvent;
 import org.jlab.coda.jevio.EvioReader;
+import org.lcsim.util.log.DefaultLogFormatter;
+import org.lcsim.util.log.LogUtil;
 
 /**
- * A utility class for streaming an EVIO file to an ET server.
+ * A command line utility for streaming EVIO files to an ET server.
  * <p>
- * Original version was copied from the CODA group's ET java module.
+ * The original version was copied from the CODA ET module and modified.
  *
  * @author <a href="mailto:[log in to unmask]">Jeremy McCormick</a>
  */
-// TODO: Add option to set number of events in the put array.
 public final class EvioFileProducer {
 
     /**
-     * Flag to turn on/off debug print.
-     */
-    private static final boolean debug = false;
+     * Default event count for printing event sequence number.
+     */
+    private static final int DEFAULT_COUNT = 1000;
+
+    /**
+     * Default delay time in milliseconds between events.
+     */
+    private static final int DEFAULT_DELAY = 0;
+
+    /**
+     * The default ET group.
+     */
+    private static final int DEFAULT_ET_GROUP = 1;
+
+    /**
+     * Default event buffer size (200 KB).
+     */
+    private static final int DEFAULT_EVENT_SIZE = 200000;
+
+    /**
+     * Default host name.
+     */
+    private static final String DEFAULT_HOST = "localhost";
+
+    /**
+     * Default ET system name.
+     */
+    private static final String DEFAULT_NAME = "ETBuffer";
+
+    /**
+     * Maximum port number of ET server (maximum value of a TCP/IP port).
+     */
+    private static final int ET_PORT_MAX = 65535;
+
+    /**
+     * Minimum port number of ET server (lower port numbers not allowed).
+     */
+    private static final int ET_PORT_MIN = 1024;
+
+    /**
+     * Setup the logger.
+     */
+    private static final Logger LOGGER = LogUtil
+            .create(EvioFileProducer.class, new DefaultLogFormatter(), Level.CONFIG);
+
+    /**
+     * The command line options.
+     */
+    private static final Options OPTIONS = new Options();
+
+    /**
+     * Define the command line options.
+     */
+    static {
+        OPTIONS.addOption("h", "help", false, "print help");
+        OPTIONS.addOption("c", "count", true, "interval for printing event numbers");
+        OPTIONS.addOption("d", "delay", true, "delay in milliseconds between events");
+        OPTIONS.addOption("e", "file", true, "add an EVIO file");
+        OPTIONS.addOption("f", "name", true, "ET system name which should be a buffer file");
+        OPTIONS.addOption("g", "group", true, "group number of the events");
+        OPTIONS.addOption("H", "host", true, "server host name");
+        OPTIONS.addOption("l", "list", true, "text file with list of EVIO files");
+        OPTIONS.addOption("L", "log", true, "log level (INFO, FINE, etc.)");
+        OPTIONS.addOption("p", "port", true, "server port");
+        OPTIONS.addOption("s", "size", true, "event buffer size in bytes");
+    }
 
     /**
      * The externally accessible main method.
      *
-     * @param args The command line arguments.
+     * @param args the command line arguments
      */
     public static void main(final String[] args) {
-        new EvioFileProducer().doMain(args); // call wrapper method
-    }
-
-    /**
-     * Print usage statement.
+        new EvioFileProducer().run(args);
+    }
+
+    /**
+     * Print usage statement and exit.
      */
     private static void usage() {
-        System.out.println("\nUsage: java Producer -f <et name> -e <evio file> [-p <server port>] [-host <host>]"
-                + " [-d <delay in millisec>] [-g <group #>]\n\n" + "       -f     ET system's name\n"
-                + "       -s     size in bytes for requested events\n"
-                + "       -p     port number for a udp broadcast\n"
-                + "       -g     group number of new events to get\n"
-                + "       -host  host the ET system resides on (defaults to anywhere)\n\n"
-                + "        This consumer works by making a connection to the\n"
-                + "        ET system's tcp server port.\n");
+        final HelpFormatter help = new HelpFormatter();
+        help.printHelp("EvioFileProducer", OPTIONS);
         System.exit(1);
     }
 
     /**
-     * The byte buffer used to transfer data from EVIO to ET.
-     */
-    private ByteBuffer byteBuffer;
+     * Event count for printing message.
+     */
+    private int count = DEFAULT_COUNT;
 
     /**
      * A delay in milliseconds between put operations.
      */
-    private int delay = 0;
-
-    /**
-     * The ET system name which generally maps to a buffer file.
-     */
-    private String etName;
-
-    /**
-     * The list of input EVIO files.
+    private int delay = DEFAULT_DELAY;
+
+    /**
+     * The ET system name which maps to a buffer file.
+     */
+    private String etName = DEFAULT_NAME;
+
+    /**
+     * The master list of input EVIO files to stream.
      */
     private final List<File> evioFiles = new ArrayList<File>();
 
     /**
-     * This is used for a "group" value when doing put but not sure what it actually does.
-     */
-    private int group = 1;
-
-    /**
-     * The server host name.
-     */
-    private String host;
-
-    /**
-     * The server's network port.
+     * The ET group (default is 1).
+     */
+    private int group = DEFAULT_ET_GROUP;
+
+    /**
+     * The server host name (default is localhost).
+     */
+    private String host = DEFAULT_HOST;
+
+    /**
+     * The server's network port (default is standard ET server port).
      */
     private int port = EtConstants.serverPort;
 
     /**
-     * The EVIO reader used to read the input EVIO events.
-     */
-    private EvioReader reader;
-
-    /**
-     * The default ET event size.
-     */
-    // FIXME: Should be a lot bigger?
-    private int size = 10000; // Default event size.
+     * The default ET event size (default is 200 KB).
+     */
+    private int size = DEFAULT_EVENT_SIZE;
 
     /**
      * Class constructor.
@@ -111,174 +172,211 @@
     }
 
     /**
-     * Copy byte buffer to an <code>EtEvent</code>.
+     * Print the command line job configuration to the log.
+     */
+    private void logConfig() {
+        final StringBuffer sb = new StringBuffer();
+        sb.append("count = " + this.count + '\n');
+        sb.append("delay = " + this.delay + '\n');
+        sb.append("etName = " + this.etName + '\n');
+        sb.append("group = " + this.group + '\n');
+        sb.append("host = " + this.host + '\n');
+        sb.append("port = " + this.port + '\n');
+        sb.append("size = " + this.size + '\n');
+        sb.append("EVIO files ..." + '\n');
+        for (final File evioFile : this.evioFiles) {
+            sb.append(evioFile.getPath() + '\n');
+        }
+        LOGGER.config(sb.toString());
+    }
+
+    /**
+     * Run the job by streaming all the EVIO files to the ET server using the command line arguments.
      *
-     * @param event The target EtEvent.
-     */
-    public void copyToEtEvent(final EtEvent event) {
-        event.getDataBuffer().put(this.byteBuffer);
-    }
-
-    /**
-     * Wrapper method called in main.
-     *
-     * @param args The command line arguments.
-     */
-    public void doMain(final String[] args) {
+     * @param args the command line arguments
+     */
+    public void run(final String[] args) {
+
+        // Command line parser.
+        final PosixParser parser = new PosixParser();
+
         try {
-            for (int i = 0; i < args.length; i++) {
-                if (args[i].equalsIgnoreCase("-e")) {
-                    // evioFileName = new String(args[++i]);
-                    this.evioFiles.add(new File(args[++i]));
-                } else if (args[i].equalsIgnoreCase("-f")) {
-                    this.etName = args[++i];
-                } else if (args[i].equalsIgnoreCase("-host")) {
-                    this.host = args[++i];
-                } else if (args[i].equalsIgnoreCase("-p")) {
-                    try {
-                        this.port = Integer.parseInt(args[++i]);
-                        if (this.port < 1024 || this.port > 65535) {
-                            System.out.println("Port number must be between 1024 and 65535.");
-                            usage();
-                            return;
-                        }
-                    } catch (final NumberFormatException ex) {
-                        System.out.println("Did not specify a proper port number.");
-                        usage();
-                        return;
-                    }
-                } else if (args[i].equalsIgnoreCase("-s")) {
-                    try {
-                        this.size = Integer.parseInt(args[++i]);
-                        if (this.size < 1) {
-                            System.out.println("Size needs to be positive int.");
-                            usage();
-                            return;
-                        }
-                    } catch (final NumberFormatException ex) {
-                        System.out.println("Did not specify a proper size.");
-                        usage();
-                        return;
-                    }
-                } else if (args[i].equalsIgnoreCase("-g")) {
-                    try {
-                        this.group = Integer.parseInt(args[++i]);
-                        if (this.group < 1 || this.group > 10) {
-                            System.out.println("Group number must be between 0 and 10.");
-                            usage();
-                            return;
-                        }
-                    } catch (final NumberFormatException ex) {
-                        System.out.println("Did not specify a proper group number.");
-                        usage();
-                        return;
-                    }
-                } else if (args[i].equalsIgnoreCase("-d")) {
-                    try {
-                        this.delay = Integer.parseInt(args[++i]);
-                        if (this.delay < 1) {
-                            System.out.println("delay must be > 0.");
-                            usage();
-                            return;
-                        }
-                    } catch (final NumberFormatException ex) {
-                        System.out.println("Did not specify a proper delay.");
-                        usage();
-                        return;
-                    }
-                } else {
-                    usage();
-                    return;
-                }
-            }
-
-            if (this.host == null) {
-                // host = EtConstants.hostAnywhere;
-                this.host = InetAddress.getLocalHost().getHostName();
-            }
-
-            // ET name is required.
-            if (this.etName == null) {
-                System.out.println("EVIO file name argument is required");
-                usage();
-                return;
-            }
-
+
+            // Parse the command line arguments.
+            final CommandLine cl = parser.parse(OPTIONS, args);
+
+            // Set the log level of this class before doing anything else.
+            if (cl.hasOption("L")) {
+                final Level level = Level.parse(cl.getOptionValue("L"));
+                // Default level is CONFIG so this message will always show.
+                LOGGER.config("Log level will be set to " + level + ".");
+
+                // Set the new log level. This may suppress subsequent configuration print outs!
+                LOGGER.setLevel(level);
+            }
+
+            // Add EVIO files to the job.
+            if (cl.hasOption("e")) {
+                for (final String fileName : cl.getOptionValues("e")) {
+                    final File evioFile = new File(fileName);
+                    LOGGER.config("adding EVIO file " + evioFile.getPath());
+                    this.evioFiles.add(evioFile);
+                }
+            }
+
+            // Set ET name which is the buffer file.
+            if (cl.hasOption("f")) {
+                this.etName = cl.getOptionValue("f");
+            }
+
+            // Add EVIO files from a text file list, assuming one file path per line.
+            if (cl.hasOption("l")) {
+                final String filePath = cl.getOptionValue("l");
+                final File listFile = new File(filePath);
+                if (!listFile.exists()) {
+                    throw new IllegalArgumentException("The file " + listFile.getPath() + " does not exist.");
+                }
+                BufferedReader br = null;
+                try {
+                    br = new BufferedReader(new InputStreamReader(new FileInputStream(listFile)));
+                    String line;
+                    while ((line = br.readLine()) != null) {
+                        this.evioFiles.add(new File(line.trim()));
+                    }
+                } finally {
+                    if (br != null) {
+                        br.close();
+                    }
+                }
+            }
+
+            // Set host name.
+            if (cl.hasOption("H")) {
+                this.host = cl.getOptionValue("H");
+            }
+            // if (this.host == null) {
+            // host = EtConstants.hostAnywhere;
+            // this.host = InetAddress.getLocalHost().getHostName();
+            // }
+
+            // Set the port number.
+            if (cl.hasOption("p")) {
+                this.port = Integer.parseInt(cl.getOptionValue("p"));
+                if (this.port < ET_PORT_MIN || this.port > ET_PORT_MAX) {
+                    throw new IllegalArgumentException("Port number must be between 1024 and 65535.");
+                }
+            }
+            // Set the size of the event buffer in bytes.
+            if (cl.hasOption("s")) {
+                this.size = Integer.parseInt(cl.getOptionValue("s"));
+                if (this.size < 1) {
+                    throw new IllegalArgumentException("Size needs to be positive int.");
+                }
+            }
+
+            // Set the group number.
+            if (cl.hasOption("g")) {
+                this.group = Integer.parseInt(cl.getOptionValue("g"));
+                if (this.group < 1 || this.group > 10) {
+                    throw new IllegalArgumentException("Group number must be between 0 and 10.");
+                }
+            }
+
+            // Set the delay in milliseconds between putting events.
+            if (cl.hasOption("d")) {
+                this.delay = Integer.parseInt(cl.getOptionValue("d"));
+                if (this.delay < 1) {
+                    throw new IllegalArgumentException("The delay must be > 0.");
+                }
+            }
+
+            if (cl.hasOption("c")) {
+                this.count = Integer.parseInt(cl.getOptionValue("c"));
+                if (this.count < 1) {
+                    throw new IllegalArgumentException("The count must be > 0.");
+                }
+            }
+
+            // At least one EVIO file must be present.
             if (this.evioFiles.size() == 0) {
-                System.out.println("At least one input EVIO file is required.");
-                usage();
-                return;
+                throw new IllegalArgumentException("At least one input EVIO file is required.");
             }
 
             // Check existence of EVIO files.
-            System.out.println("EVIO input files ...");
+            LOGGER.info("Checking EVIO file list ... ");
             for (final File evioFile : this.evioFiles) {
-                System.out.println(evioFile.getPath());
                 if (!evioFile.exists()) {
-                    System.err.println("EVIO file does not exist: " + evioFile.getPath());
-                    throw new RuntimeException("EVIO input file does not exist.");
-                }
-            }
-
-            // Setup ET system with the command line config.
+                    throw new IllegalArgumentException("EVIO input file does not exist: " + evioFile.getPath());
+                }
+            }
+            LOGGER.info("EVIO file list was checked!");
+
+            // Print out the configuration for the job to the log.
+            logConfig();
+
+        } catch (final Exception e) { /* Catches errors in command line arguments. */
+            // If there are errors parsing or validating the command line arguments then print usage and exit.
+            LOGGER.log(Level.SEVERE, "Error while processing command line options.", e);
+            usage();
+        }
+
+        EtSystem sys = null;
+        EvioReader reader = null;
+
+        try {
+
+            // Setup ET system from the command line options.
             final EtSystemOpenConfig config = new EtSystemOpenConfig(this.etName, this.host, this.port);
-            final EtSystem sys = new EtSystem(config, EtConstants.debugInfo);
+            sys = new EtSystem(config, EtConstants.debugInfo);
             sys.open();
             final EtStation gc = sys.stationNameToObject("GRAND_CENTRAL");
             final EtAttachment att = sys.attach(gc);
 
-            // array of events
+            // Array of ET events.
             EtEvent[] mevs;
 
             // Loop over input EVIO file list.
             for (final File evioFile : this.evioFiles) {
 
-                // Open EVIO reader.
-                System.out.println("Opening next EVIO file: " + evioFile.getPath());
-                this.reader = new EvioReader(evioFile.getPath(), false);
-
-                // Print number of events.
-                if (debug) {
-                    System.out.println("EVIO file opened with " + this.reader.getEventCount() + " events.");
-                }
-
-                // Ref to current EVIO event.
+                // Open a new EVIO reader.
+                LOGGER.info("Opening next EVIO file " + evioFile.getPath() + " ...");
+                reader = new EvioReader(evioFile.getPath(), false);
+                LOGGER.info("Done opening file!");
+
+                // Print the number of events.
+                LOGGER.info("EVIO file opened with " + reader.getEventCount() + " events.");
+
+                // Reference to the current EVIO event.
                 EvioEvent event;
 
-                // Event sequence number; starts with 1.
+                // Event sequence number.
                 int eventCount = 0;
 
                 // Loop until event source is exhausted.
-                while (true) {
-
-                    // Get next event.
-                    event = this.reader.nextEvent();
+                while ((event = reader.nextEvent()) != null) {
+
+                    // Increment event count.
                     ++eventCount;
-                    if (eventCount % 1000 == 0) {
-                        System.out.println("EvioFileProducer - event <" + eventCount + ">");
-                    }
-                    if (event == null) {
-                        break;
-                    }
-
-                    // Try to parse the next event.
+
+                    // Print event sequence.
+                    if (eventCount % this.count == 0) {
+                        LOGGER.info("EVIO event " + eventCount);
+                    }
+
                     try {
-                        this.reader.parseEvent(event);
-                        if (debug) {
-                            System.out.println("event #" + event.getEventNumber() + " is " + event.getTotalBytes()
-                                    + " bytes");
-                        }
-                    } catch (final Exception e) {
+                        // Parse the next EVIO event.
+                        reader.parseEvent(event);
+                        LOGGER.finest("EVIO event " + event.getEventNumber() + " is " + event.getTotalBytes()
+                                + " bytes.");
+                    } catch (final Exception e) { /* Catches parse errors reading the EVIO events. */
                         e.printStackTrace();
-                        System.out.println("Error making EVIO event with sequence number <" + eventCount
-                                + "> in file <" + evioFile.getPath() + ">.");
+                        LOGGER.warning("Error making EVIO event with seq number " + eventCount + " in file "
+                                + evioFile.getPath());
                         // Attempt to recover from errors by skipping to next event if there are exceptions.
                         continue;
                     }
 
-                    if (debug) {
-                        System.out.println("new events - size=" + this.size + "; group=" + this.group);
-                    }
+                    LOGGER.finest("new events - size=" + this.size + "; group=" + this.group);
 
                     final int eventTag = EvioEventUtilities.getEventTag(event);
 
@@ -296,7 +394,7 @@
                     Arrays.fill(control, eventTag);
                     mevs[0].setControl(control);
 
-                    // Delay for X millis if applicable.
+                    // Apply delay in milliseconds.
                     if (this.delay > 0) {
                         Thread.sleep(this.delay);
                     }
@@ -309,36 +407,43 @@
                     try {
                         writer.close();
                     } catch (final Exception e) {
-                        System.out.println("Caught exception while closing writer.");
-                        e.printStackTrace();
+                        LOGGER.log(Level.WARNING, "Error while closing writer.", e);
                     }
                     mevs[0].setLength(buf.position());
                     mevs[0].setByteOrder(ByteOrder.nativeOrder());
-                    if (debug) {
-                        for (final EtEvent mev : mevs) {
-                            System.out.println("event length = " + mev.getLength() + ", remaining bytes: "
-                                    + mev.getDataBuffer().remaining());
-                        }
+
+                    for (final EtEvent mev : mevs) {
+                        LOGGER.finest("event length = " + mev.getLength() + ", remaining bytes: "
+                                + mev.getDataBuffer().remaining());
                     }
 
                     // Put events onto the ET ring.
                     sys.putEvents(att, mevs);
 
-                    if (debug) {
-                        System.out.println("Wrote event #" + eventCount + " to ET");
-                        System.out.println("-------------------------------");
-                        ++eventCount;
-                    }
-                }
-
-                this.reader.close();
-            }
-
-            // Cleanup.
-            sys.close();
-
-        } catch (final Exception e) {
-            throw new RuntimeException(e);
+                    LOGGER.finest("Sucessfully wrote " + eventCount + " event to ET which was EVIO event number "
+                            + event.getEventNumber() + " from file " + evioFile.getPath() + ".");
+                }
+                reader.close();
+            }
+
+        } catch (final Exception e) { /* Catches all event processing errors. */
+            // This catches and re-throws all errors from processing the EVIO events and configuring the ET system.
+            throw new RuntimeException("Error streaming EVIO events to ET system.", e);
+        } finally {
+            // Cleanup the EVIO reader if needed.
+            if (reader != null && !reader.isClosed()) {
+                try {
+                    reader.close();
+                } catch (final IOException e) {
+                    LOGGER.log(Level.WARNING, e.getMessage(), e);
+                }
+            }
+            // Cleanup the ET system.
+            if (sys != null && sys.alive()) {
+                sys.close();
+            }
         }
+
+        LOGGER.info("Done!");
     }
 }